ScholarMind Agent 延迟优化全攻略
目标:将 QA 端到端延迟从 ~3s(复合问题)/ ~1.5s(简单问题)压缩到 <800ms
当前延迟瓶颈分析
当前请求时间线 (复合问题, ~3000ms):
0ms 200ms 500ms 1200ms 2000ms 2800ms 3000ms
│──────────│───────────│───────────│───────────│───────────│──────────│
│复杂度判断 │ 查询分解 │ 子问题检索 │ 子问题生成 │ 聚合重排 │ 最终生成 │
│ LLM调用 │ LLM调用 │ 向量+图谱 │ LLM×N │ Reranker │ LLM调用 │
│ ~150ms │ ~300ms │ ~300ms │ ~700ms │ ~200ms │ ~500ms │
│ │ │ │ │ │ │
│ 串行❌ │ 串行❌ │ 可并行✅ │ 可并行✅ │ 串行❌ │ 可流式✅ │
优化目标:
- 消除不必要的串行等待
- 合并/并行化 LLM 调用
- 投机执行 (speculation)
- 流式输出 (用户感知延迟↓)
8 大优化策略
┌──────────────────────────────────────────────────────────────────────────────┐
│ ScholarMind Agent 延迟优化 — 8大策略 │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 策略1 ─── 投机执行 (Speculative Execution) │
│ 不等分解结果, 乐观地用原始query先启动检索 │
│ 节省: ~300ms (分解与检索并行) │
│ │
│ 策略2 ─── LLM 调用合并 (Batch & Merge) │
│ 复杂度判断+分解 合为1次调用; 子问题答案并行生成 │
│ 节省: ~300ms (减少1次LLM往返) │
│ │
│ 策略3 ─── 检索并行化 + 预取 (Parallel Retrieval + Prefetch) │
│ 向量/图谱/RAPTOR 三路同时发起; 热门实体子图预加载 │
│ 节省: ~200ms (三路串行→并行) │
│ │
│ 策略4 ─── 流式生成 + 增量输出 (Streaming) │
│ 首token即开始返回, 用户感知延迟从3s→~500ms │
│ 感知节省: ~2500ms │
│ │
│ 策略5 ─── 轻量级路由器 (规则+小模型替代LLM) │
│ 复杂度判断用正则+启发式, 不调LLM │
│ 节省: ~150ms (消除1次LLM调用) │
│ │
│ 策略6 ─── 分解结果缓存 (Decomposition Cache) │
│ 相似复合问题复用历史分解结构 │
│ 节省: ~300ms (命中时跳过分解LLM) │
│ │
│ 策略7 ─── 异步管道化 (Pipeline Parallelism) │
│ 前一步的部分结果立即喂给下一步, 不等全部完成 │
│ 节省: ~300ms (消除步间等待) │
│ │
│ 策略8 ─── 模型级加速 (Smaller Models + Speculative Decoding) │
│ 路由/分解用3B模型; 生成用8B+投机解码 │
│ 节省: ~40% 生成延迟 │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
策略1: 投机执行 (Speculative Execution)
原理
不等 LLM 分解完成,乐观假设用户的原始查询就是一个有效检索 query,立即启动检索。分解完成后,补充缺失的子查询检索。
优化前 (串行):
[分解 300ms] ──等待──▶ [检索 300ms] ──等待──▶ [生成]
总计: 600ms 后才开始生成
优化后 (投机并行):
[分解 300ms] ─────────────────────────▶ 分解完成, 补充检索
[原始query检索 300ms] ──▶ 结果就绪! (只检索分解新增的子查询)
↓
两组结果合并 → 生成 (几乎零等待)
总计: ~300ms 后即可开始生成 (节省300ms)
实现
import asyncio
async def speculative_retrieval(query: str) -> dict:
"""投机执行: 分解与检索并行"""
# 同时启动: (1) 查询分解 (2) 原始query直接检索
decompose_task = asyncio.create_task(decompose_query(query))
speculative_task = asyncio.create_task(
hybrid_retriever.retrieve(query, mode="hybrid", top_k=10)
)
# 等待两者完成 (实际上谁快谁先完成)
decomposition, speculative_docs = await asyncio.gather(
decompose_task, speculative_task
)
# 分解结果出来后, 只检索"投机检索未覆盖"的子查询
covered_by_speculation = estimate_coverage(speculative_docs, decomposition)
additional_tasks = []
for sq in decomposition.sub_questions:
if sq.id not in covered_by_speculation and not sq.depends_on:
additional_tasks.append(
hybrid_retriever.retrieve(sq.question, mode=sq.type, top_k=5)
)
# 并行补充检索
if additional_tasks:
additional_docs = await asyncio.gather(*additional_tasks)
all_docs = speculative_docs + [d for docs in additional_docs for d in docs]
else:
all_docs = speculative_docs # 投机检索已足够!
return {"docs": all_docs, "decomposition": decomposition}
策略2: LLM 调用合并
原理
将多次串行 LLM 调用合并为更少次调用:
优化前 (4次LLM调用):
1. 复杂度判断 (~150ms)
2. 查询分解 (~300ms)
3. 子问题答案生成 ×N (~700ms, 但可并行)
4. 最终综合生成 (~500ms)
总计: 至少 1650ms 纯LLM时间 (假设3/4串行)
优化后 (2-3次):
1. 复杂度+分解 合并为1次 (~350ms) ← 节省1次往返
2. 子问题答案并行生成 (~700ms并行→等最慢的那个)
3. 最终综合(流式) (~500ms, 但首token ~50ms)
总计: ~1050ms, 且用户在 ~400ms 后就开始看到输出
实现
# 合并: 复杂度判断 + 分解 → 一次调用完成
UNIFIED_ROUTING_PROMPT = """Analyze this query in ONE step:
1. Is it SIMPLE (single focused question) or COMPOSITE (multiple sub-questions)?
2. If COMPOSITE, decompose into sub-questions with types and dependencies.
3. If SIMPLE, classify as: factual | reasoning | global
Query: {query}
Return JSON:
- If simple: {{"complexity": "simple", "type": "factual|reasoning|global"}}
- If composite: {{"complexity": "composite", "sub_questions": [...], ...}}
"""
async def unified_route_and_decompose(query: str) -> dict:
"""单次LLM调用完成: 复杂度判断 + 路由/分解"""
result = await llm.complete(
UNIFIED_ROUTING_PROMPT.format(query=query),
task="routing", # 本地模型, 低延迟
)
return result # 一次调用搞定两步
# 子问题答案: 并行生成 (不是串行!)
async def parallel_sub_answers(sub_questions: list, retrieved_docs: dict) -> list:
"""并行为所有无依赖子问题生成答案"""
tasks = []
for sq in sub_questions:
if not sq.depends_on: # 无依赖 → 可并行
tasks.append(generate_sub_answer(sq, retrieved_docs[sq.id]))
# asyncio.gather 并行执行所有LLM调用
results = await asyncio.gather(*tasks)
return results
策略3: 检索并行化 + 预取
原理
三路检索(向量 + 图谱 + RAPTOR)不应串行执行:
优化前:
[向量检索 50ms] → [图谱查询 200ms] → [RAPTOR 100ms] → [重排 100ms]
总计: ~450ms
优化后:
[向量检索 50ms ]─┐
[图谱查询 200ms ]─┼── 等最慢的 (200ms) ──▶ [重排 100ms]
[RAPTOR 100ms ]─┘
总计: ~300ms (节省150ms)
实现
async def parallel_hybrid_retrieve(query: str, mode: str = "hybrid") -> list:
"""三路检索完全并行"""
tasks = {}
if mode in ("factual", "hybrid"):
tasks["vector"] = asyncio.create_task(
qdrant_search(query, top_k=10)
)
tasks["sparse"] = asyncio.create_task(
qdrant_sparse_search(query, top_k=10)
)
if mode in ("reasoning", "hybrid"):
tasks["graph"] = asyncio.create_task(
neo4j_graph_search(query, limit=10)
)
if mode in ("global", "hybrid"):
tasks["raptor"] = asyncio.create_task(
raptor_tree_search(query, limit=5)
)
# 等待所有并行任务完成
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# 合并 + RRF融合
all_docs = []
for result in results:
if not isinstance(result, Exception):
all_docs.extend(result)
return rrf_fusion(all_docs)
热门实体预取
class GraphPrefetcher:
"""后台预取热门实体的2跳子图, 查询时零延迟"""
def __init__(self, neo4j_driver, top_k: int = 500):
self.cache = {} # entity_name → subgraph
self._prefetch_top_entities(top_k)
async def _prefetch_top_entities(self, top_k: int):
"""启动时预加载高频实体"""
# 按被引用次数排序的top实体
cypher = """
MATCH (n)-[r]-()
WITH n, count(r) as degree
ORDER BY degree DESC LIMIT $k
RETURN n.name as name
"""
top_entities = await self.driver.run(cypher, k=top_k)
# 并行预取每个实体的2跳子图
for entity in top_entities:
subgraph = await self._fetch_subgraph(entity["name"], depth=2)
self.cache[entity["name"]] = subgraph
def get_instant(self, entity_name: str) -> dict:
"""查询时: 缓存命中 → 0ms, 未命中 → 走正常路径"""
return self.cache.get(entity_name)
策略4: 流式输出
原理
用户感知延迟 = **首 token 到达时间 (TTFT)**,不是完整响应时间。
优化前 (非流式):
用户发问 ──── [3000ms 完全空白等待] ──── 完整答案一次性出现
用户感知: "卡了3秒"
优化后 (流式):
用户发问 ── [500ms] ── 首token到达, 开始逐字显示 ── [2500ms 持续输出]
用户感知: "0.5秒就开始回答了"
实现
from fastapi.responses import StreamingResponse
import json
async def stream_answer(query: str):
"""流式输出: 检索完成后立即开始生成"""
# Phase 1: 快速检索 (非流式, 但快)
docs = await parallel_hybrid_retrieve(query)
# Phase 2: 先输出 "正在思考..." + 检索来源
yield json.dumps({"type": "sources", "data": format_sources(docs[:3])}) + "\n"
# Phase 3: 流式生成答案
prompt = build_prompt(query, docs)
async for chunk in llm.stream(prompt, task="generation"):
yield json.dumps({"type": "token", "data": chunk}) + "\n"
# Phase 4: 追加引用信息
yield json.dumps({"type": "citations", "data": extract_citations(docs)}) + "\n"
@app.post("/api/v1/query/stream")
async def query_stream(req: QueryRequest):
return StreamingResponse(
stream_answer(req.query),
media_type="application/x-ndjson"
)
增量管道流式 (进阶)
async def incremental_stream(query: str):
"""增量管道: 每个子问题答案生成后立即推送, 不等全部完成"""
# 分解
decomp = await unified_route_and_decompose(query)
yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]}
# 并行子问题, 每个完成后立即推送
tasks = {
sq.id: asyncio.create_task(process_sub_question(sq))
for sq in decomp.sub_questions if not sq.depends_on
}
for coro in asyncio.as_completed(tasks.values()):
result = await coro
yield {"type": "sub_answer", "data": result}
# 用户立即看到部分答案!
# 最终综合 (流式)
async for token in synthesize_stream(query, all_results):
yield {"type": "token", "data": token}
策略5: 轻量级路由器 (消除LLM调用)
原理
复杂度判断不需要 LLM——用规则+特征即可在 <1ms 内完成:
import re
def fast_complexity_check(query: str) -> str:
"""规则引擎: <1ms, 无LLM调用"""
# 特征提取
question_marks = query.count("?") + query.count("?")
has_conjunction = bool(re.search(
r'(and also|以及|另外|同时|besides|moreover|还有|并且|versus|compared to|对比)',
query, re.IGNORECASE
))
has_multiple_verbs = len(re.findall(
r'(explain|compare|list|describe|who|what|how|why|解释|对比|列举|描述|是谁|是什么)',
query, re.IGNORECASE
)) >= 2
# 实体计数 (用已缓存的GLiNER或简单NER)
entity_count = count_entities_fast(query)
# 决策规则
composite_score = (
(question_marks >= 2) * 3 +
has_conjunction * 2 +
has_multiple_verbs * 2 +
(entity_count >= 3) * 1 +
(len(query) > 100) * 1
)
return "composite" if composite_score >= 3 else "simple"
def fast_query_type(query: str) -> str:
"""规则路由: <1ms"""
query_lower = query.lower()
if any(w in query_lower for w in ["趋势", "overview", "综述", "发展", "trend", "survey"]):
return "global"
elif any(w in query_lower for w in ["为什么", "how does", "原理", "机制", "compare", "对比", "区别"]):
return "reasoning"
else:
return "factual"
效果: 消除 1 次 LLM 调用 (150ms → 0ms),准确率 ~90%(边界情况走 LLM fallback)
策略6: 分解缓存
class DecompositionCache:
"""缓存历史分解结构 — 相似复合问题复用"""
def __init__(self, threshold: float = 0.88):
self.cache = {} # embedding → decomposition
self.threshold = threshold
async def get_or_decompose(self, query: str) -> DecomposedQuery:
# 检查缓存: 语义相似的历史分解
query_vec = embed(query)
best_match = find_nearest(self.cache, query_vec)
if best_match and best_match.score > self.threshold:
# 命中! 复用分解结构, 但替换实体名
cached_decomp = best_match.value
adapted = adapt_entities(cached_decomp, query) # 替换实体名
return adapted
# 未命中: LLM分解 → 存入缓存
decomp = await decompose_query(query)
self.cache[query_vec] = decomp
return decomp
示例:
- "Compare BERT and GPT-2 on GLUE" → 分解结构被缓存
- "Compare T5 and BART on SuperGLUE" → 命中! 复用结构,替换实体名
策略7: 异步管道化
原理
不要等一个步骤完全完成再开始下一步。部分结果就足以触发下一步。
优化前 (步间全等待):
[检索10个doc全部返回] ──等待──▶ [用10个doc生成答案]
总计: 300ms + 500ms = 800ms
优化后 (管道化):
[检索: doc1 50ms] → 立即送给生成器开始prefill
[检索: doc2 80ms] → 追加到生成器context
[检索: doc3-10 ...] → 继续追加
↓
[生成: 首token ~200ms后开始] → 流式输出
总计: 用户在 ~250ms 后开始看到输出!
实现
import asyncio
from asyncio import Queue
async def pipeline_retrieve_and_generate(query: str):
"""管道化: 检索结果流式喂给生成器"""
doc_queue = Queue()
async def retriever_producer():
"""检索器: 每找到一批文档就放入队列"""
# 向量检索最快, 先返回
vec_docs = await qdrant_search(query, top_k=5)
await doc_queue.put(("vector", vec_docs))
# 图谱稍慢
graph_docs = await neo4j_search(query, limit=5)
await doc_queue.put(("graph", graph_docs))
# RAPTOR 最慢
raptor_docs = await raptor_search(query, limit=3)
await doc_queue.put(("raptor", raptor_docs))
await doc_queue.put(None) # 信号: 检索完毕
async def generator_consumer():
"""生成器: 一旦有足够文档就开始生成"""
all_docs = []
min_docs_to_start = 3 # 有3个文档就可以开始
while True:
item = await doc_queue.get()
if item is None:
break
all_docs.extend(item[1])
if len(all_docs) >= min_docs_to_start:
# 开始流式生成 (后续文档可以通过context扩展追加)
break
# 用已有文档开始生成
prompt = build_prompt(query, all_docs)
async for token in llm.stream(prompt):
yield token
# 并行启动生产者和消费者
asyncio.create_task(retriever_producer())
async for token in generator_consumer():
yield token
策略8: 模型级加速
8a. 分级模型选择
MODEL_LATENCY_MAP = {
# 任务 → 模型 → 预估延迟
"routing": ("Qwen2.5-3B-Instruct", "~50ms"), # 最小模型, 最快
"decompose": ("Qwen2.5-7B-Instruct", "~150ms"), # 中等模型
"sub_answer": ("Qwen2.5-14B-Instruct", "~300ms"), # 质量要求高
"synthesis": ("GPT-4o-mini", "~500ms"), # 最终综合用API
}
# 对比: 如果所有步骤都用14B
# routing(14B): ~150ms, decompose(14B): ~300ms → 多花200ms却不提升质量
8b. 投机解码 (Speculative Decoding)
# vLLM 投机解码: 小模型草稿 + 大模型验证
vllm serve meta-llama/Llama-3.1-8B-Instruct \
--speculative-model meta-llama/Llama-3.1-1B-Instruct \
--num-speculative-tokens 5 \
--speculative-disable-mqa-scorer
# 效果: 生成速度提升 1.5-2.5×, 输出质量不变
8c. 结构化输出加速
# 用 Outlines / vLLM structured output 约束生成
# 减少无效token生成, 直接输出JSON结构
from vllm import LLM, SamplingParams
from vllm.sampling_params import GuidedDecodingParams
llm = LLM(model="Qwen/Qwen2.5-7B-Instruct")
# JSON schema 约束 → 生成更快 (减少不必要的思考token)
guided_params = GuidedDecodingParams(json_schema=DecomposedQuery.model_json_schema())
params = SamplingParams(max_tokens=512, guided_decoding=guided_params)
优化后的完整时间线
=== 简单问题 (优化后, ~600ms 首token) ===
0ms 1ms 50ms 250ms 600ms 1200ms
│──────│───────│───────────│───────────│───────────│
│规则路由│HyDE改写│ 三路并行检索 │ 重排(100ms)│ 流式生成开始 │ ...输出完成
│<1ms │(并行) │(最慢200ms) │ │ ← 用户看到首token
=== 复合问题 (优化后, ~800ms 首token) ===
0ms 1ms 350ms 500ms 800ms 1500ms 2000ms
│──────│───────│───────────│───────────│───────────│───────────│
│规则判断│ 分解+投机检索 │ 补充检索 │ 子答案并行 │ 流式综合开始│ ...完成
│<1ms │(并行300ms) │(并行150ms)│ (并行500ms)│ ← 首token │
│ │ │ │ │ │
│ │ 投机: 原始query检索 与 LLM分解 同时进行 │
│ │ 节省: 不用等分解完才开始检索 │
延迟对比汇总
┌────────────────────────────────────────────────────────────────────┐
│ 优化前后延迟对比 │
├──────────────────┬────────────┬────────────┬──────────────────────┤
│ 阶段 │ 优化前 │ 优化后 │ 优化手段 │
├──────────────────┼────────────┼────────────┼──────────────────────┤
│ 复杂度判断 │ 150ms │ <1ms │ 规则引擎替代LLM │
│ 查询分解 │ 300ms(串行) │ 0ms(并行) │ 投机执行+合并调用 │
│ 检索 │ 450ms(串行) │ 200ms(并行)│ asyncio.gather │
│ 子问题生成 │ 700ms(串行) │ 350ms(并行)│ 并行LLM+小模型 │
│ 重排 │ 200ms │ 100ms │ 量化reranker │
│ 最终生成(首token)│ 500ms(等) │ 50ms(流式) │ 流式+管道化 │
├──────────────────┼────────────┼────────────┼──────────────────────┤
│ 总延迟(简单) │ ~1500ms │ ~600ms │ 60% 降低 │
│ 总延迟(复合) │ ~3000ms │ ~800ms │ 73% 降低 │
│ 用户感知(首token)│ ~1500ms │ ~400ms │ 73% 降低 │
│ 缓存命中时 │ — │ ~5ms │ L1语义缓存 │
└──────────────────┴────────────┴────────────┴──────────────────────┘
实施优先级
| 优先级 | 策略 | 延迟收益 | 工作量 | 风险 |
|---|---|---|---|---|
| P0 | 策略4: 流式输出 | 感知 -73% | 半天 | 无 |
| P0 | 策略3: 检索并行化 | -150ms | 半天 | 无 |
| P0 | 策略5: 规则路由器 | -150ms | 2小时 | 边界case |
| P1 | 策略1: 投机执行 | -300ms | 1天 | 无用检索浪费 |
| P1 | 策略2: LLM调用合并 | -300ms | 1天 | Prompt工程 |
| P2 | 策略7: 管道化 | -200ms | 2天 | 复杂度高 |
| P2 | 策略8: 模型加速 | -40%生成 | 2天 | 配置调优 |
| P3 | 策略6: 分解缓存 | 命中-300ms | 1天 | 缓存失效 |
终极架构: 低延迟 Agent 流水线
async def ultra_fast_query(query: str, session_id: str):
"""
终极低延迟 Agent:
- 简单问题: 400ms 首token
- 复合问题: 800ms 首token
- 缓存命中: 5ms
"""
# ===== L1: 语义缓存 (5ms) =====
cached = await semantic_cache.get(query)
if cached:
yield {"type": "cached", "data": cached}
return
# ===== 规则路由 (<1ms) =====
complexity = fast_complexity_check(query)
if complexity == "simple":
query_type = fast_query_type(query)
# 检索 + HyDE 并行
retrieve_task = asyncio.create_task(
parallel_hybrid_retrieve(query, mode=query_type)
)
hyde_task = asyncio.create_task(hyde_expand(query))
docs, hyde_docs = await asyncio.gather(retrieve_task, hyde_task)
all_docs = rrf_fusion(docs + hyde_docs)
reranked = await fast_rerank(query, all_docs, top_k=5)
# 流式生成
async for token in llm.stream(build_prompt(query, reranked)):
yield {"type": "token", "data": token}
else: # composite
# 投机执行: 分解 与 原始query检索 并行
decompose_task = asyncio.create_task(
unified_route_and_decompose(query)
)
speculative_task = asyncio.create_task(
parallel_hybrid_retrieve(query, mode="hybrid", top_k=10)
)
# 先推送 "思考中" + 搜索来源
spec_docs = await speculative_task
yield {"type": "sources", "data": format_sources(spec_docs[:3])}
# 分解完成
decomp = await decompose_task
yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]}
# 补充子查询检索 (并行)
additional = await parallel_sub_retrieval(decomp, already_have=spec_docs)
all_docs = deduplicate(spec_docs + additional)
reranked = await fast_rerank(query, all_docs, top_k=10)
# 并行子答案 + 流式综合
sub_answers = await parallel_sub_answers(decomp, reranked)
async for token in synthesize_stream(query, sub_answers, reranked):
yield {"type": "token", "data": token}
# 写入缓存
full_answer = collect_tokens()
await semantic_cache.set(query, full_answer)
参考论文
| 论文 | 技术 | 应用 |
|---|---|---|
| Speculative Decoding (2302.01318) | 小模型草稿+大模型验证 | 策略8b |
| Medusa (2401.10774) | 多头投机解码 | 策略8b |
| ChunkAttention (2402.15220) | 共享前缀KV管理 | 策略7 |
| Skeleton-of-Thought (2307.15337) | 并行内容生成 | 策略7 |
| SGLang (2312.07104) | RadixAttention + 前端调度 | 策略4/7 |