# ScholarMind Agent 延迟优化全攻略 > 目标:将 QA 端到端延迟从 ~3s(复合问题)/ ~1.5s(简单问题)压缩到 **<800ms** --- ## 当前延迟瓶颈分析 ``` 当前请求时间线 (复合问题, ~3000ms): 0ms 200ms 500ms 1200ms 2000ms 2800ms 3000ms │──────────│───────────│───────────│───────────│───────────│──────────│ │复杂度判断 │ 查询分解 │ 子问题检索 │ 子问题生成 │ 聚合重排 │ 最终生成 │ │ LLM调用 │ LLM调用 │ 向量+图谱 │ LLM×N │ Reranker │ LLM调用 │ │ ~150ms │ ~300ms │ ~300ms │ ~700ms │ ~200ms │ ~500ms │ │ │ │ │ │ │ │ │ 串行❌ │ 串行❌ │ 可并行✅ │ 可并行✅ │ 串行❌ │ 可流式✅ │ 优化目标: - 消除不必要的串行等待 - 合并/并行化 LLM 调用 - 投机执行 (speculation) - 流式输出 (用户感知延迟↓) ``` --- ## 8 大优化策略 ``` ┌──────────────────────────────────────────────────────────────────────────────┐ │ ScholarMind Agent 延迟优化 — 8大策略 │ ├──────────────────────────────────────────────────────────────────────────────┤ │ │ │ 策略1 ─── 投机执行 (Speculative Execution) │ │ 不等分解结果, 乐观地用原始query先启动检索 │ │ 节省: ~300ms (分解与检索并行) │ │ │ │ 策略2 ─── LLM 调用合并 (Batch & Merge) │ │ 复杂度判断+分解 合为1次调用; 子问题答案并行生成 │ │ 节省: ~300ms (减少1次LLM往返) │ │ │ │ 策略3 ─── 检索并行化 + 预取 (Parallel Retrieval + Prefetch) │ │ 向量/图谱/RAPTOR 三路同时发起; 热门实体子图预加载 │ │ 节省: ~200ms (三路串行→并行) │ │ │ │ 策略4 ─── 流式生成 + 增量输出 (Streaming) │ │ 首token即开始返回, 用户感知延迟从3s→~500ms │ │ 感知节省: ~2500ms │ │ │ │ 策略5 ─── 轻量级路由器 (规则+小模型替代LLM) │ │ 复杂度判断用正则+启发式, 不调LLM │ │ 节省: ~150ms (消除1次LLM调用) │ │ │ │ 策略6 ─── 分解结果缓存 (Decomposition Cache) │ │ 相似复合问题复用历史分解结构 │ │ 节省: ~300ms (命中时跳过分解LLM) │ │ │ │ 策略7 ─── 异步管道化 (Pipeline Parallelism) │ │ 前一步的部分结果立即喂给下一步, 不等全部完成 │ │ 节省: ~300ms (消除步间等待) │ │ │ │ 策略8 ─── 模型级加速 (Smaller Models + Speculative Decoding) │ │ 路由/分解用3B模型; 生成用8B+投机解码 │ │ 节省: ~40% 生成延迟 │ │ │ └──────────────────────────────────────────────────────────────────────────────┘ ``` --- ## 策略1: 投机执行 (Speculative Execution) ### 原理 不等 LLM 分解完成,**乐观假设**用户的原始查询就是一个有效检索 query,立即启动检索。分解完成后,补充缺失的子查询检索。 ``` 优化前 (串行): [分解 300ms] ──等待──▶ [检索 300ms] ──等待──▶ [生成] 总计: 600ms 后才开始生成 优化后 (投机并行): [分解 300ms] ─────────────────────────▶ 分解完成, 补充检索 [原始query检索 300ms] ──▶ 结果就绪! (只检索分解新增的子查询) ↓ 两组结果合并 → 生成 (几乎零等待) 总计: ~300ms 后即可开始生成 (节省300ms) ``` ### 实现 ```python import asyncio async def speculative_retrieval(query: str) -> dict: """投机执行: 分解与检索并行""" # 同时启动: (1) 查询分解 (2) 原始query直接检索 decompose_task = asyncio.create_task(decompose_query(query)) speculative_task = asyncio.create_task( hybrid_retriever.retrieve(query, mode="hybrid", top_k=10) ) # 等待两者完成 (实际上谁快谁先完成) decomposition, speculative_docs = await asyncio.gather( decompose_task, speculative_task ) # 分解结果出来后, 只检索"投机检索未覆盖"的子查询 covered_by_speculation = estimate_coverage(speculative_docs, decomposition) additional_tasks = [] for sq in decomposition.sub_questions: if sq.id not in covered_by_speculation and not sq.depends_on: additional_tasks.append( hybrid_retriever.retrieve(sq.question, mode=sq.type, top_k=5) ) # 并行补充检索 if additional_tasks: additional_docs = await asyncio.gather(*additional_tasks) all_docs = speculative_docs + [d for docs in additional_docs for d in docs] else: all_docs = speculative_docs # 投机检索已足够! return {"docs": all_docs, "decomposition": decomposition} ``` --- ## 策略2: LLM 调用合并 ### 原理 将多次串行 LLM 调用合并为更少次调用: ``` 优化前 (4次LLM调用): 1. 复杂度判断 (~150ms) 2. 查询分解 (~300ms) 3. 子问题答案生成 ×N (~700ms, 但可并行) 4. 最终综合生成 (~500ms) 总计: 至少 1650ms 纯LLM时间 (假设3/4串行) 优化后 (2-3次): 1. 复杂度+分解 合并为1次 (~350ms) ← 节省1次往返 2. 子问题答案并行生成 (~700ms并行→等最慢的那个) 3. 最终综合(流式) (~500ms, 但首token ~50ms) 总计: ~1050ms, 且用户在 ~400ms 后就开始看到输出 ``` ### 实现 ```python # 合并: 复杂度判断 + 分解 → 一次调用完成 UNIFIED_ROUTING_PROMPT = """Analyze this query in ONE step: 1. Is it SIMPLE (single focused question) or COMPOSITE (multiple sub-questions)? 2. If COMPOSITE, decompose into sub-questions with types and dependencies. 3. If SIMPLE, classify as: factual | reasoning | global Query: {query} Return JSON: - If simple: {{"complexity": "simple", "type": "factual|reasoning|global"}} - If composite: {{"complexity": "composite", "sub_questions": [...], ...}} """ async def unified_route_and_decompose(query: str) -> dict: """单次LLM调用完成: 复杂度判断 + 路由/分解""" result = await llm.complete( UNIFIED_ROUTING_PROMPT.format(query=query), task="routing", # 本地模型, 低延迟 ) return result # 一次调用搞定两步 ``` ```python # 子问题答案: 并行生成 (不是串行!) async def parallel_sub_answers(sub_questions: list, retrieved_docs: dict) -> list: """并行为所有无依赖子问题生成答案""" tasks = [] for sq in sub_questions: if not sq.depends_on: # 无依赖 → 可并行 tasks.append(generate_sub_answer(sq, retrieved_docs[sq.id])) # asyncio.gather 并行执行所有LLM调用 results = await asyncio.gather(*tasks) return results ``` --- ## 策略3: 检索并行化 + 预取 ### 原理 三路检索(向量 + 图谱 + RAPTOR)不应串行执行: ``` 优化前: [向量检索 50ms] → [图谱查询 200ms] → [RAPTOR 100ms] → [重排 100ms] 总计: ~450ms 优化后: [向量检索 50ms ]─┐ [图谱查询 200ms ]─┼── 等最慢的 (200ms) ──▶ [重排 100ms] [RAPTOR 100ms ]─┘ 总计: ~300ms (节省150ms) ``` ### 实现 ```python async def parallel_hybrid_retrieve(query: str, mode: str = "hybrid") -> list: """三路检索完全并行""" tasks = {} if mode in ("factual", "hybrid"): tasks["vector"] = asyncio.create_task( qdrant_search(query, top_k=10) ) tasks["sparse"] = asyncio.create_task( qdrant_sparse_search(query, top_k=10) ) if mode in ("reasoning", "hybrid"): tasks["graph"] = asyncio.create_task( neo4j_graph_search(query, limit=10) ) if mode in ("global", "hybrid"): tasks["raptor"] = asyncio.create_task( raptor_tree_search(query, limit=5) ) # 等待所有并行任务完成 results = await asyncio.gather(*tasks.values(), return_exceptions=True) # 合并 + RRF融合 all_docs = [] for result in results: if not isinstance(result, Exception): all_docs.extend(result) return rrf_fusion(all_docs) ``` ### 热门实体预取 ```python class GraphPrefetcher: """后台预取热门实体的2跳子图, 查询时零延迟""" def __init__(self, neo4j_driver, top_k: int = 500): self.cache = {} # entity_name → subgraph self._prefetch_top_entities(top_k) async def _prefetch_top_entities(self, top_k: int): """启动时预加载高频实体""" # 按被引用次数排序的top实体 cypher = """ MATCH (n)-[r]-() WITH n, count(r) as degree ORDER BY degree DESC LIMIT $k RETURN n.name as name """ top_entities = await self.driver.run(cypher, k=top_k) # 并行预取每个实体的2跳子图 for entity in top_entities: subgraph = await self._fetch_subgraph(entity["name"], depth=2) self.cache[entity["name"]] = subgraph def get_instant(self, entity_name: str) -> dict: """查询时: 缓存命中 → 0ms, 未命中 → 走正常路径""" return self.cache.get(entity_name) ``` --- ## 策略4: 流式输出 ### 原理 用户感知延迟 = **首 token 到达时间 (TTFT)**,不是完整响应时间。 ``` 优化前 (非流式): 用户发问 ──── [3000ms 完全空白等待] ──── 完整答案一次性出现 用户感知: "卡了3秒" 优化后 (流式): 用户发问 ── [500ms] ── 首token到达, 开始逐字显示 ── [2500ms 持续输出] 用户感知: "0.5秒就开始回答了" ``` ### 实现 ```python from fastapi.responses import StreamingResponse import json async def stream_answer(query: str): """流式输出: 检索完成后立即开始生成""" # Phase 1: 快速检索 (非流式, 但快) docs = await parallel_hybrid_retrieve(query) # Phase 2: 先输出 "正在思考..." + 检索来源 yield json.dumps({"type": "sources", "data": format_sources(docs[:3])}) + "\n" # Phase 3: 流式生成答案 prompt = build_prompt(query, docs) async for chunk in llm.stream(prompt, task="generation"): yield json.dumps({"type": "token", "data": chunk}) + "\n" # Phase 4: 追加引用信息 yield json.dumps({"type": "citations", "data": extract_citations(docs)}) + "\n" @app.post("/api/v1/query/stream") async def query_stream(req: QueryRequest): return StreamingResponse( stream_answer(req.query), media_type="application/x-ndjson" ) ``` ### 增量管道流式 (进阶) ```python async def incremental_stream(query: str): """增量管道: 每个子问题答案生成后立即推送, 不等全部完成""" # 分解 decomp = await unified_route_and_decompose(query) yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]} # 并行子问题, 每个完成后立即推送 tasks = { sq.id: asyncio.create_task(process_sub_question(sq)) for sq in decomp.sub_questions if not sq.depends_on } for coro in asyncio.as_completed(tasks.values()): result = await coro yield {"type": "sub_answer", "data": result} # 用户立即看到部分答案! # 最终综合 (流式) async for token in synthesize_stream(query, all_results): yield {"type": "token", "data": token} ``` --- ## 策略5: 轻量级路由器 (消除LLM调用) ### 原理 复杂度判断不需要 LLM——用**规则+特征**即可在 <1ms 内完成: ```python import re def fast_complexity_check(query: str) -> str: """规则引擎: <1ms, 无LLM调用""" # 特征提取 question_marks = query.count("?") + query.count("?") has_conjunction = bool(re.search( r'(and also|以及|另外|同时|besides|moreover|还有|并且|versus|compared to|对比)', query, re.IGNORECASE )) has_multiple_verbs = len(re.findall( r'(explain|compare|list|describe|who|what|how|why|解释|对比|列举|描述|是谁|是什么)', query, re.IGNORECASE )) >= 2 # 实体计数 (用已缓存的GLiNER或简单NER) entity_count = count_entities_fast(query) # 决策规则 composite_score = ( (question_marks >= 2) * 3 + has_conjunction * 2 + has_multiple_verbs * 2 + (entity_count >= 3) * 1 + (len(query) > 100) * 1 ) return "composite" if composite_score >= 3 else "simple" def fast_query_type(query: str) -> str: """规则路由: <1ms""" query_lower = query.lower() if any(w in query_lower for w in ["趋势", "overview", "综述", "发展", "trend", "survey"]): return "global" elif any(w in query_lower for w in ["为什么", "how does", "原理", "机制", "compare", "对比", "区别"]): return "reasoning" else: return "factual" ``` **效果**: 消除 1 次 LLM 调用 (150ms → 0ms),准确率 ~90%(边界情况走 LLM fallback) --- ## 策略6: 分解缓存 ```python class DecompositionCache: """缓存历史分解结构 — 相似复合问题复用""" def __init__(self, threshold: float = 0.88): self.cache = {} # embedding → decomposition self.threshold = threshold async def get_or_decompose(self, query: str) -> DecomposedQuery: # 检查缓存: 语义相似的历史分解 query_vec = embed(query) best_match = find_nearest(self.cache, query_vec) if best_match and best_match.score > self.threshold: # 命中! 复用分解结构, 但替换实体名 cached_decomp = best_match.value adapted = adapt_entities(cached_decomp, query) # 替换实体名 return adapted # 未命中: LLM分解 → 存入缓存 decomp = await decompose_query(query) self.cache[query_vec] = decomp return decomp ``` **示例**: - "Compare BERT and GPT-2 on GLUE" → 分解结构被缓存 - "Compare T5 and BART on SuperGLUE" → 命中! 复用结构,替换实体名 --- ## 策略7: 异步管道化 ### 原理 不要等一个步骤完全完成再开始下一步。**部分结果就足以触发下一步**。 ``` 优化前 (步间全等待): [检索10个doc全部返回] ──等待──▶ [用10个doc生成答案] 总计: 300ms + 500ms = 800ms 优化后 (管道化): [检索: doc1 50ms] → 立即送给生成器开始prefill [检索: doc2 80ms] → 追加到生成器context [检索: doc3-10 ...] → 继续追加 ↓ [生成: 首token ~200ms后开始] → 流式输出 总计: 用户在 ~250ms 后开始看到输出! ``` ### 实现 ```python import asyncio from asyncio import Queue async def pipeline_retrieve_and_generate(query: str): """管道化: 检索结果流式喂给生成器""" doc_queue = Queue() async def retriever_producer(): """检索器: 每找到一批文档就放入队列""" # 向量检索最快, 先返回 vec_docs = await qdrant_search(query, top_k=5) await doc_queue.put(("vector", vec_docs)) # 图谱稍慢 graph_docs = await neo4j_search(query, limit=5) await doc_queue.put(("graph", graph_docs)) # RAPTOR 最慢 raptor_docs = await raptor_search(query, limit=3) await doc_queue.put(("raptor", raptor_docs)) await doc_queue.put(None) # 信号: 检索完毕 async def generator_consumer(): """生成器: 一旦有足够文档就开始生成""" all_docs = [] min_docs_to_start = 3 # 有3个文档就可以开始 while True: item = await doc_queue.get() if item is None: break all_docs.extend(item[1]) if len(all_docs) >= min_docs_to_start: # 开始流式生成 (后续文档可以通过context扩展追加) break # 用已有文档开始生成 prompt = build_prompt(query, all_docs) async for token in llm.stream(prompt): yield token # 并行启动生产者和消费者 asyncio.create_task(retriever_producer()) async for token in generator_consumer(): yield token ``` --- ## 策略8: 模型级加速 ### 8a. 分级模型选择 ```python MODEL_LATENCY_MAP = { # 任务 → 模型 → 预估延迟 "routing": ("Qwen2.5-3B-Instruct", "~50ms"), # 最小模型, 最快 "decompose": ("Qwen2.5-7B-Instruct", "~150ms"), # 中等模型 "sub_answer": ("Qwen2.5-14B-Instruct", "~300ms"), # 质量要求高 "synthesis": ("GPT-4o-mini", "~500ms"), # 最终综合用API } # 对比: 如果所有步骤都用14B # routing(14B): ~150ms, decompose(14B): ~300ms → 多花200ms却不提升质量 ``` ### 8b. 投机解码 (Speculative Decoding) ```bash # vLLM 投机解码: 小模型草稿 + 大模型验证 vllm serve meta-llama/Llama-3.1-8B-Instruct \ --speculative-model meta-llama/Llama-3.1-1B-Instruct \ --num-speculative-tokens 5 \ --speculative-disable-mqa-scorer # 效果: 生成速度提升 1.5-2.5×, 输出质量不变 ``` ### 8c. 结构化输出加速 ```python # 用 Outlines / vLLM structured output 约束生成 # 减少无效token生成, 直接输出JSON结构 from vllm import LLM, SamplingParams from vllm.sampling_params import GuidedDecodingParams llm = LLM(model="Qwen/Qwen2.5-7B-Instruct") # JSON schema 约束 → 生成更快 (减少不必要的思考token) guided_params = GuidedDecodingParams(json_schema=DecomposedQuery.model_json_schema()) params = SamplingParams(max_tokens=512, guided_decoding=guided_params) ``` --- ## 优化后的完整时间线 ``` === 简单问题 (优化后, ~600ms 首token) === 0ms 1ms 50ms 250ms 600ms 1200ms │──────│───────│───────────│───────────│───────────│ │规则路由│HyDE改写│ 三路并行检索 │ 重排(100ms)│ 流式生成开始 │ ...输出完成 │<1ms │(并行) │(最慢200ms) │ │ ← 用户看到首token === 复合问题 (优化后, ~800ms 首token) === 0ms 1ms 350ms 500ms 800ms 1500ms 2000ms │──────│───────│───────────│───────────│───────────│───────────│ │规则判断│ 分解+投机检索 │ 补充检索 │ 子答案并行 │ 流式综合开始│ ...完成 │<1ms │(并行300ms) │(并行150ms)│ (并行500ms)│ ← 首token │ │ │ │ │ │ │ │ │ 投机: 原始query检索 与 LLM分解 同时进行 │ │ │ 节省: 不用等分解完才开始检索 │ ``` --- ## 延迟对比汇总 ``` ┌────────────────────────────────────────────────────────────────────┐ │ 优化前后延迟对比 │ ├──────────────────┬────────────┬────────────┬──────────────────────┤ │ 阶段 │ 优化前 │ 优化后 │ 优化手段 │ ├──────────────────┼────────────┼────────────┼──────────────────────┤ │ 复杂度判断 │ 150ms │ <1ms │ 规则引擎替代LLM │ │ 查询分解 │ 300ms(串行) │ 0ms(并行) │ 投机执行+合并调用 │ │ 检索 │ 450ms(串行) │ 200ms(并行)│ asyncio.gather │ │ 子问题生成 │ 700ms(串行) │ 350ms(并行)│ 并行LLM+小模型 │ │ 重排 │ 200ms │ 100ms │ 量化reranker │ │ 最终生成(首token)│ 500ms(等) │ 50ms(流式) │ 流式+管道化 │ ├──────────────────┼────────────┼────────────┼──────────────────────┤ │ 总延迟(简单) │ ~1500ms │ ~600ms │ 60% 降低 │ │ 总延迟(复合) │ ~3000ms │ ~800ms │ 73% 降低 │ │ 用户感知(首token)│ ~1500ms │ ~400ms │ 73% 降低 │ │ 缓存命中时 │ — │ ~5ms │ L1语义缓存 │ └──────────────────┴────────────┴────────────┴──────────────────────┘ ``` --- ## 实施优先级 | 优先级 | 策略 | 延迟收益 | 工作量 | 风险 | |--------|------|---------|--------|------| | **P0** | 策略4: 流式输出 | 感知 -73% | 半天 | 无 | | **P0** | 策略3: 检索并行化 | -150ms | 半天 | 无 | | **P0** | 策略5: 规则路由器 | -150ms | 2小时 | 边界case | | **P1** | 策略1: 投机执行 | -300ms | 1天 | 无用检索浪费 | | **P1** | 策略2: LLM调用合并 | -300ms | 1天 | Prompt工程 | | **P2** | 策略7: 管道化 | -200ms | 2天 | 复杂度高 | | **P2** | 策略8: 模型加速 | -40%生成 | 2天 | 配置调优 | | **P3** | 策略6: 分解缓存 | 命中-300ms | 1天 | 缓存失效 | --- ## 终极架构: 低延迟 Agent 流水线 ```python async def ultra_fast_query(query: str, session_id: str): """ 终极低延迟 Agent: - 简单问题: 400ms 首token - 复合问题: 800ms 首token - 缓存命中: 5ms """ # ===== L1: 语义缓存 (5ms) ===== cached = await semantic_cache.get(query) if cached: yield {"type": "cached", "data": cached} return # ===== 规则路由 (<1ms) ===== complexity = fast_complexity_check(query) if complexity == "simple": query_type = fast_query_type(query) # 检索 + HyDE 并行 retrieve_task = asyncio.create_task( parallel_hybrid_retrieve(query, mode=query_type) ) hyde_task = asyncio.create_task(hyde_expand(query)) docs, hyde_docs = await asyncio.gather(retrieve_task, hyde_task) all_docs = rrf_fusion(docs + hyde_docs) reranked = await fast_rerank(query, all_docs, top_k=5) # 流式生成 async for token in llm.stream(build_prompt(query, reranked)): yield {"type": "token", "data": token} else: # composite # 投机执行: 分解 与 原始query检索 并行 decompose_task = asyncio.create_task( unified_route_and_decompose(query) ) speculative_task = asyncio.create_task( parallel_hybrid_retrieve(query, mode="hybrid", top_k=10) ) # 先推送 "思考中" + 搜索来源 spec_docs = await speculative_task yield {"type": "sources", "data": format_sources(spec_docs[:3])} # 分解完成 decomp = await decompose_task yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]} # 补充子查询检索 (并行) additional = await parallel_sub_retrieval(decomp, already_have=spec_docs) all_docs = deduplicate(spec_docs + additional) reranked = await fast_rerank(query, all_docs, top_k=10) # 并行子答案 + 流式综合 sub_answers = await parallel_sub_answers(decomp, reranked) async for token in synthesize_stream(query, sub_answers, reranked): yield {"type": "token", "data": token} # 写入缓存 full_answer = collect_tokens() await semantic_cache.set(query, full_answer) ``` --- ## 参考论文 | 论文 | 技术 | 应用 | |------|------|------| | Speculative Decoding (2302.01318) | 小模型草稿+大模型验证 | 策略8b | | Medusa (2401.10774) | 多头投机解码 | 策略8b | | ChunkAttention (2402.15220) | 共享前缀KV管理 | 策略7 | | Skeleton-of-Thought (2307.15337) | 并行内容生成 | 策略7 | | SGLang (2312.07104) | RadixAttention + 前端调度 | 策略4/7 |