File size: 5,730 Bytes
494c9e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Server-Sent Events (SSE) 工具模块"""
import json
import queue
import time
from typing import Callable, Generator, Optional, Tuple
from flask import Response


class SSEProgressReporter:
    """SSE进度报告器"""
    
    def __init__(self, generator_func: Callable):
        """
        初始化SSE进度报告器
        
        Args:
            generator_func: 生成器函数,用于生成SSE事件
        """
        self.generator_func = generator_func
    
    def generate(self):
        """生成SSE事件流"""
        try:
            for event in self.generator_func():
                yield event
        except Exception as e:
            # 发送错误事件
            error_data = {
                'type': 'error',
                'message': str(e)
            }
            yield f"data: {json.dumps(error_data)}\n\n"
    
    def create_response(self) -> Response:
        """创建SSE响应"""
        return Response(
            self.generate(),
            mimetype='text/event-stream',
            headers={
                'Cache-Control': 'no-cache',
                'X-Accel-Buffering': 'no',  # 禁用nginx缓冲
                'Connection': 'keep-alive'
            }
        )


def send_progress_event(step: int, total_steps: int, stage: str, percentage: Optional[int] = None, message: Optional[str] = None) -> str:
    """
    生成SSE进度事件
    
    Args:
        step: 当前步骤 (1-based)
        total_steps: 总步骤数
        stage: 阶段名称 (encoding, inference, processing)
        percentage: 可选的进度百分比 (0-100),仅在需要显示百分比的阶段提供
        message: 可选的进度消息
    
    Returns:
        SSE格式的事件字符串
    """
    data = {
        'type': 'progress',
        'step': step,
        'total_steps': total_steps,
        'stage': stage
    }
    if percentage is not None:
        data['percentage'] = percentage
    if message:
        data['message'] = message
    return f"data: {json.dumps(data)}\n\n"


def send_result_event(result: dict) -> str:
    """
    生成SSE结果事件
    
    Args:
        result: 分析结果字典
    
    Returns:
        SSE格式的事件字符串
    """
    data = {
        'type': 'result',
        'data': result
    }
    return f"data: {json.dumps(data)}\n\n"


def send_completion_delta_event(text: str, stream_end: bool) -> str:
    """续写流式:与 analyze 的 progress/result 并列,type=delta。"""
    data = {
        "type": "delta",
        "text": text,
    }
    if stream_end:
        data["stream_end"] = True
    return f"data: {json.dumps(data)}\n\n"


def send_prompt_used_event(prompt_used: str) -> str:
    """续写流式:在首条 delta 之前下发实际送入模型的 prompt 原文。"""
    data = {
        "type": "prompt_used",
        "prompt_used": prompt_used,
    }
    return f"data: {json.dumps(data)}\n\n"


def send_error_event(message: str, status_code: Optional[int] = None) -> str:
    """
    生成SSE错误事件

    Args:
        message: 错误消息
        status_code: 可选 HTTP 状态码,供非流式封装解析

    Returns:
        SSE格式的事件字符串
    """
    data = {'type': 'error', 'message': message}
    if status_code is not None:
        data['status_code'] = status_code
    return f"data: {json.dumps(data)}\n\n"


def consume_progress_queue(
    progress_queue: queue.Queue,
    analysis_done,
    start_time: float,
    timeout_seconds: float,
    timeout_label: str = "分析",
) -> Generator[Tuple[str, str], None, None]:
    """
    消费进度队列,yield (kind, event_str)。
    kind: 'progress' | 'timeout' | 'done'
    event_str: SSE 格式字符串(timeout 时含错误信息,done 时为空)
    """
    done_received = False
    last_progress_info = None

    while True:
        elapsed = time.perf_counter() - start_time
        if elapsed >= timeout_seconds:
            progress_str = f" | {last_progress_info}" if last_progress_info else ""
            print(f"⏱️ {timeout_label}超时: 处理时长 {elapsed:.2f}s 超过限制 {timeout_seconds}s,已放弃{progress_str}")
            yield ('timeout', send_error_event(f"分析超时:处理时长超过 {timeout_seconds} 秒限制,已放弃"))
            return

        try:
            event_data = progress_queue.get(timeout=0.1)
            event_type = event_data[0]
            if event_type == 'progress':
                _, step, total_steps, stage, percentage = event_data
                if total_steps > 0:
                    last_progress_info = f"step={step}/{total_steps}"
                else:
                    last_progress_info = f"step={step}"
                if stage:
                    last_progress_info += f" stage={stage}"
                if percentage is not None:
                    last_progress_info += f" {percentage}%"
                yield ('progress', send_progress_event(step, total_steps, stage, percentage))
            elif event_type == 'done':
                done_received = True
                while not progress_queue.empty():
                    try:
                        remaining = progress_queue.get_nowait()
                        if remaining[0] == 'progress':
                            _, step, total_steps, stage, percentage = remaining
                            yield ('progress', send_progress_event(step, total_steps, stage, percentage))
                    except queue.Empty:
                        break
                yield ('done', '')
                return
        except queue.Empty:
            if analysis_done.is_set() and done_received:
                yield ('done', '')
                return