| """Server-Sent Events (SSE) 工具模块""" |
| import json |
| import queue |
| import time |
| from typing import Callable, Generator, Optional, Tuple |
| from flask import Response |
|
|
|
|
| class SSEProgressReporter: |
| """SSE进度报告器""" |
| |
| def __init__(self, generator_func: Callable): |
| """ |
| 初始化SSE进度报告器 |
| |
| Args: |
| generator_func: 生成器函数,用于生成SSE事件 |
| """ |
| self.generator_func = generator_func |
| |
| def generate(self): |
| """生成SSE事件流""" |
| try: |
| for event in self.generator_func(): |
| yield event |
| except Exception as e: |
| |
| error_data = { |
| 'type': 'error', |
| 'message': str(e) |
| } |
| yield f"data: {json.dumps(error_data)}\n\n" |
| |
| def create_response(self) -> Response: |
| """创建SSE响应""" |
| return Response( |
| self.generate(), |
| mimetype='text/event-stream', |
| headers={ |
| 'Cache-Control': 'no-cache', |
| 'X-Accel-Buffering': 'no', |
| 'Connection': 'keep-alive' |
| } |
| ) |
|
|
|
|
| def send_progress_event(step: int, total_steps: int, stage: str, percentage: Optional[int] = None, message: Optional[str] = None) -> str: |
| """ |
| 生成SSE进度事件 |
| |
| Args: |
| step: 当前步骤 (1-based) |
| total_steps: 总步骤数 |
| stage: 阶段名称 (encoding, inference, processing) |
| percentage: 可选的进度百分比 (0-100),仅在需要显示百分比的阶段提供 |
| message: 可选的进度消息 |
| |
| Returns: |
| SSE格式的事件字符串 |
| """ |
| data = { |
| 'type': 'progress', |
| 'step': step, |
| 'total_steps': total_steps, |
| 'stage': stage |
| } |
| if percentage is not None: |
| data['percentage'] = percentage |
| if message: |
| data['message'] = message |
| return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
| def send_result_event(result: dict) -> str: |
| """ |
| 生成SSE结果事件 |
| |
| Args: |
| result: 分析结果字典 |
| |
| Returns: |
| SSE格式的事件字符串 |
| """ |
| data = { |
| 'type': 'result', |
| 'data': result |
| } |
| return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
| def send_completion_delta_event(text: str, stream_end: bool) -> str: |
| """续写流式:与 analyze 的 progress/result 并列,type=delta。""" |
| data = { |
| "type": "delta", |
| "text": text, |
| } |
| if stream_end: |
| data["stream_end"] = True |
| return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
| def send_prompt_used_event(prompt_used: str) -> str: |
| """续写流式:在首条 delta 之前下发实际送入模型的 prompt 原文。""" |
| data = { |
| "type": "prompt_used", |
| "prompt_used": prompt_used, |
| } |
| return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
| def send_error_event(message: str, status_code: Optional[int] = None) -> str: |
| """ |
| 生成SSE错误事件 |
| |
| Args: |
| message: 错误消息 |
| status_code: 可选 HTTP 状态码,供非流式封装解析 |
| |
| Returns: |
| SSE格式的事件字符串 |
| """ |
| data = {'type': 'error', 'message': message} |
| if status_code is not None: |
| data['status_code'] = status_code |
| return f"data: {json.dumps(data)}\n\n" |
|
|
|
|
| def consume_progress_queue( |
| progress_queue: queue.Queue, |
| analysis_done, |
| start_time: float, |
| timeout_seconds: float, |
| timeout_label: str = "分析", |
| ) -> Generator[Tuple[str, str], None, None]: |
| """ |
| 消费进度队列,yield (kind, event_str)。 |
| kind: 'progress' | 'timeout' | 'done' |
| event_str: SSE 格式字符串(timeout 时含错误信息,done 时为空) |
| """ |
| done_received = False |
| last_progress_info = None |
|
|
| while True: |
| elapsed = time.perf_counter() - start_time |
| if elapsed >= timeout_seconds: |
| progress_str = f" | {last_progress_info}" if last_progress_info else "" |
| print(f"⏱️ {timeout_label}超时: 处理时长 {elapsed:.2f}s 超过限制 {timeout_seconds}s,已放弃{progress_str}") |
| yield ('timeout', send_error_event(f"分析超时:处理时长超过 {timeout_seconds} 秒限制,已放弃")) |
| return |
|
|
| try: |
| event_data = progress_queue.get(timeout=0.1) |
| event_type = event_data[0] |
| if event_type == 'progress': |
| _, step, total_steps, stage, percentage = event_data |
| if total_steps > 0: |
| last_progress_info = f"step={step}/{total_steps}" |
| else: |
| last_progress_info = f"step={step}" |
| if stage: |
| last_progress_info += f" stage={stage}" |
| if percentage is not None: |
| last_progress_info += f" {percentage}%" |
| yield ('progress', send_progress_event(step, total_steps, stage, percentage)) |
| elif event_type == 'done': |
| done_received = True |
| while not progress_queue.empty(): |
| try: |
| remaining = progress_queue.get_nowait() |
| if remaining[0] == 'progress': |
| _, step, total_steps, stage, percentage = remaining |
| yield ('progress', send_progress_event(step, total_steps, stage, percentage)) |
| except queue.Empty: |
| break |
| yield ('done', '') |
| return |
| except queue.Empty: |
| if analysis_done.is_set() and done_received: |
| yield ('done', '') |
| return |
|
|
|
|