scholarmind-architecture / docs /LATENCY_OPTIMIZATION.md
heyingyue's picture
Add 8-strategy latency optimization guide
cc0461e verified

ScholarMind Agent 延迟优化全攻略

目标:将 QA 端到端延迟从 ~3s(复合问题)/ ~1.5s(简单问题)压缩到 <800ms


当前延迟瓶颈分析

当前请求时间线 (复合问题, ~3000ms):

0ms        200ms       500ms       1200ms      2000ms      2800ms     3000ms
│──────────│───────────│───────────│───────────│───────────│──────────│
│复杂度判断 │  查询分解  │  子问题检索  │  子问题生成  │  聚合重排   │ 最终生成  │
│ LLM调用  │  LLM调用  │ 向量+图谱   │  LLM×N    │ Reranker  │ LLM调用  │
│ ~150ms   │  ~300ms   │  ~300ms    │  ~700ms   │  ~200ms   │ ~500ms  │
│          │           │            │           │           │         │
│ 串行❌    │  串行❌   │  可并行✅   │  可并行✅  │  串行❌   │ 可流式✅ │

优化目标:
- 消除不必要的串行等待
- 合并/并行化 LLM 调用
- 投机执行 (speculation)
- 流式输出 (用户感知延迟↓)

8 大优化策略

┌──────────────────────────────────────────────────────────────────────────────┐
│              ScholarMind Agent 延迟优化 — 8大策略                              │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  策略1 ─── 投机执行 (Speculative Execution)                                   │
│            不等分解结果, 乐观地用原始query先启动检索                             │
│            节省: ~300ms (分解与检索并行)                                       │
│                                                                              │
│  策略2 ─── LLM 调用合并 (Batch & Merge)                                      │
│            复杂度判断+分解 合为1次调用; 子问题答案并行生成                       │
│            节省: ~300ms (减少1次LLM往返)                                      │
│                                                                              │
│  策略3 ─── 检索并行化 + 预取 (Parallel Retrieval + Prefetch)                  │
│            向量/图谱/RAPTOR 三路同时发起; 热门实体子图预加载                     │
│            节省: ~200ms (三路串行→并行)                                       │
│                                                                              │
│  策略4 ─── 流式生成 + 增量输出 (Streaming)                                    │
│            首token即开始返回, 用户感知延迟从3s→~500ms                          │
│            感知节省: ~2500ms                                                  │
│                                                                              │
│  策略5 ─── 轻量级路由器 (规则+小模型替代LLM)                                   │
│            复杂度判断用正则+启发式, 不调LLM                                    │
│            节省: ~150ms (消除1次LLM调用)                                      │
│                                                                              │
│  策略6 ─── 分解结果缓存 (Decomposition Cache)                                 │
│            相似复合问题复用历史分解结构                                         │
│            节省: ~300ms (命中时跳过分解LLM)                                   │
│                                                                              │
│  策略7 ─── 异步管道化 (Pipeline Parallelism)                                  │
│            前一步的部分结果立即喂给下一步, 不等全部完成                          │
│            节省: ~300ms (消除步间等待)                                        │
│                                                                              │
│  策略8 ─── 模型级加速 (Smaller Models + Speculative Decoding)                 │
│            路由/分解用3B模型; 生成用8B+投机解码                                │
│            节省: ~40% 生成延迟                                                │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

策略1: 投机执行 (Speculative Execution)

原理

不等 LLM 分解完成,乐观假设用户的原始查询就是一个有效检索 query,立即启动检索。分解完成后,补充缺失的子查询检索。

优化前 (串行):
[分解 300ms] ──等待──▶ [检索 300ms] ──等待──▶ [生成]
总计: 600ms 后才开始生成

优化后 (投机并行):
[分解 300ms] ─────────────────────────▶ 分解完成, 补充检索
[原始query检索 300ms] ──▶ 结果就绪!    (只检索分解新增的子查询)
                                        ↓
                              两组结果合并 → 生成 (几乎零等待)
总计: ~300ms 后即可开始生成 (节省300ms)

实现

import asyncio

async def speculative_retrieval(query: str) -> dict:
    """投机执行: 分解与检索并行"""
    
    # 同时启动: (1) 查询分解 (2) 原始query直接检索
    decompose_task = asyncio.create_task(decompose_query(query))
    speculative_task = asyncio.create_task(
        hybrid_retriever.retrieve(query, mode="hybrid", top_k=10)
    )
    
    # 等待两者完成 (实际上谁快谁先完成)
    decomposition, speculative_docs = await asyncio.gather(
        decompose_task, speculative_task
    )
    
    # 分解结果出来后, 只检索"投机检索未覆盖"的子查询
    covered_by_speculation = estimate_coverage(speculative_docs, decomposition)
    
    additional_tasks = []
    for sq in decomposition.sub_questions:
        if sq.id not in covered_by_speculation and not sq.depends_on:
            additional_tasks.append(
                hybrid_retriever.retrieve(sq.question, mode=sq.type, top_k=5)
            )
    
    # 并行补充检索
    if additional_tasks:
        additional_docs = await asyncio.gather(*additional_tasks)
        all_docs = speculative_docs + [d for docs in additional_docs for d in docs]
    else:
        all_docs = speculative_docs  # 投机检索已足够!
    
    return {"docs": all_docs, "decomposition": decomposition}

策略2: LLM 调用合并

原理

将多次串行 LLM 调用合并为更少次调用:

优化前 (4次LLM调用):
1. 复杂度判断 (~150ms)
2. 查询分解 (~300ms)
3. 子问题答案生成 ×N (~700ms, 但可并行)
4. 最终综合生成 (~500ms)
总计: 至少 1650ms 纯LLM时间 (假设3/4串行)

优化后 (2-3次):
1. 复杂度+分解 合并为1次 (~350ms)  ← 节省1次往返
2. 子问题答案并行生成 (~700ms并行→等最慢的那个)
3. 最终综合(流式) (~500ms, 但首token ~50ms)
总计: ~1050ms, 且用户在 ~400ms 后就开始看到输出

实现

# 合并: 复杂度判断 + 分解 → 一次调用完成
UNIFIED_ROUTING_PROMPT = """Analyze this query in ONE step:

1. Is it SIMPLE (single focused question) or COMPOSITE (multiple sub-questions)?
2. If COMPOSITE, decompose into sub-questions with types and dependencies.
3. If SIMPLE, classify as: factual | reasoning | global

Query: {query}

Return JSON:
- If simple: {{"complexity": "simple", "type": "factual|reasoning|global"}}
- If composite: {{"complexity": "composite", "sub_questions": [...], ...}}
"""

async def unified_route_and_decompose(query: str) -> dict:
    """单次LLM调用完成: 复杂度判断 + 路由/分解"""
    result = await llm.complete(
        UNIFIED_ROUTING_PROMPT.format(query=query),
        task="routing",  # 本地模型, 低延迟
    )
    return result  # 一次调用搞定两步
# 子问题答案: 并行生成 (不是串行!)
async def parallel_sub_answers(sub_questions: list, retrieved_docs: dict) -> list:
    """并行为所有无依赖子问题生成答案"""
    tasks = []
    for sq in sub_questions:
        if not sq.depends_on:  # 无依赖 → 可并行
            tasks.append(generate_sub_answer(sq, retrieved_docs[sq.id]))
    
    # asyncio.gather 并行执行所有LLM调用
    results = await asyncio.gather(*tasks)
    return results

策略3: 检索并行化 + 预取

原理

三路检索(向量 + 图谱 + RAPTOR)不应串行执行:

优化前:
[向量检索 50ms] → [图谱查询 200ms] → [RAPTOR 100ms] → [重排 100ms]
总计: ~450ms

优化后:
[向量检索 50ms  ]─┐
[图谱查询 200ms ]─┼── 等最慢的 (200ms) ──▶ [重排 100ms]
[RAPTOR   100ms ]─┘
总计: ~300ms (节省150ms)

实现

async def parallel_hybrid_retrieve(query: str, mode: str = "hybrid") -> list:
    """三路检索完全并行"""
    
    tasks = {}
    
    if mode in ("factual", "hybrid"):
        tasks["vector"] = asyncio.create_task(
            qdrant_search(query, top_k=10)
        )
        tasks["sparse"] = asyncio.create_task(
            qdrant_sparse_search(query, top_k=10)
        )
    
    if mode in ("reasoning", "hybrid"):
        tasks["graph"] = asyncio.create_task(
            neo4j_graph_search(query, limit=10)
        )
    
    if mode in ("global", "hybrid"):
        tasks["raptor"] = asyncio.create_task(
            raptor_tree_search(query, limit=5)
        )
    
    # 等待所有并行任务完成
    results = await asyncio.gather(*tasks.values(), return_exceptions=True)
    
    # 合并 + RRF融合
    all_docs = []
    for result in results:
        if not isinstance(result, Exception):
            all_docs.extend(result)
    
    return rrf_fusion(all_docs)

热门实体预取

class GraphPrefetcher:
    """后台预取热门实体的2跳子图, 查询时零延迟"""
    
    def __init__(self, neo4j_driver, top_k: int = 500):
        self.cache = {}  # entity_name → subgraph
        self._prefetch_top_entities(top_k)
    
    async def _prefetch_top_entities(self, top_k: int):
        """启动时预加载高频实体"""
        # 按被引用次数排序的top实体
        cypher = """
        MATCH (n)-[r]-() 
        WITH n, count(r) as degree 
        ORDER BY degree DESC LIMIT $k
        RETURN n.name as name
        """
        top_entities = await self.driver.run(cypher, k=top_k)
        
        # 并行预取每个实体的2跳子图
        for entity in top_entities:
            subgraph = await self._fetch_subgraph(entity["name"], depth=2)
            self.cache[entity["name"]] = subgraph
    
    def get_instant(self, entity_name: str) -> dict:
        """查询时: 缓存命中 → 0ms, 未命中 → 走正常路径"""
        return self.cache.get(entity_name)

策略4: 流式输出

原理

用户感知延迟 = **首 token 到达时间 (TTFT)**,不是完整响应时间。

优化前 (非流式):
用户发问 ──── [3000ms 完全空白等待] ──── 完整答案一次性出现
用户感知: "卡了3秒"

优化后 (流式):
用户发问 ── [500ms] ── 首token到达, 开始逐字显示 ── [2500ms 持续输出]
用户感知: "0.5秒就开始回答了"

实现

from fastapi.responses import StreamingResponse
import json

async def stream_answer(query: str):
    """流式输出: 检索完成后立即开始生成"""
    
    # Phase 1: 快速检索 (非流式, 但快)
    docs = await parallel_hybrid_retrieve(query)
    
    # Phase 2: 先输出 "正在思考..." + 检索来源
    yield json.dumps({"type": "sources", "data": format_sources(docs[:3])}) + "\n"
    
    # Phase 3: 流式生成答案
    prompt = build_prompt(query, docs)
    async for chunk in llm.stream(prompt, task="generation"):
        yield json.dumps({"type": "token", "data": chunk}) + "\n"
    
    # Phase 4: 追加引用信息
    yield json.dumps({"type": "citations", "data": extract_citations(docs)}) + "\n"

@app.post("/api/v1/query/stream")
async def query_stream(req: QueryRequest):
    return StreamingResponse(
        stream_answer(req.query),
        media_type="application/x-ndjson"
    )

增量管道流式 (进阶)

async def incremental_stream(query: str):
    """增量管道: 每个子问题答案生成后立即推送, 不等全部完成"""
    
    # 分解
    decomp = await unified_route_and_decompose(query)
    yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]}
    
    # 并行子问题, 每个完成后立即推送
    tasks = {
        sq.id: asyncio.create_task(process_sub_question(sq))
        for sq in decomp.sub_questions if not sq.depends_on
    }
    
    for coro in asyncio.as_completed(tasks.values()):
        result = await coro
        yield {"type": "sub_answer", "data": result}
        # 用户立即看到部分答案!
    
    # 最终综合 (流式)
    async for token in synthesize_stream(query, all_results):
        yield {"type": "token", "data": token}

策略5: 轻量级路由器 (消除LLM调用)

原理

复杂度判断不需要 LLM——用规则+特征即可在 <1ms 内完成:

import re

def fast_complexity_check(query: str) -> str:
    """规则引擎: <1ms, 无LLM调用"""
    
    # 特征提取
    question_marks = query.count("?") + query.count("?")
    has_conjunction = bool(re.search(
        r'(and also|以及|另外|同时|besides|moreover|还有|并且|versus|compared to|对比)', 
        query, re.IGNORECASE
    ))
    has_multiple_verbs = len(re.findall(
        r'(explain|compare|list|describe|who|what|how|why|解释|对比|列举|描述|是谁|是什么)', 
        query, re.IGNORECASE
    )) >= 2
    
    # 实体计数 (用已缓存的GLiNER或简单NER)
    entity_count = count_entities_fast(query)
    
    # 决策规则
    composite_score = (
        (question_marks >= 2) * 3 +
        has_conjunction * 2 +
        has_multiple_verbs * 2 +
        (entity_count >= 3) * 1 +
        (len(query) > 100) * 1
    )
    
    return "composite" if composite_score >= 3 else "simple"

def fast_query_type(query: str) -> str:
    """规则路由: <1ms"""
    query_lower = query.lower()
    
    if any(w in query_lower for w in ["趋势", "overview", "综述", "发展", "trend", "survey"]):
        return "global"
    elif any(w in query_lower for w in ["为什么", "how does", "原理", "机制", "compare", "对比", "区别"]):
        return "reasoning"
    else:
        return "factual"

效果: 消除 1 次 LLM 调用 (150ms → 0ms),准确率 ~90%(边界情况走 LLM fallback)


策略6: 分解缓存

class DecompositionCache:
    """缓存历史分解结构 — 相似复合问题复用"""
    
    def __init__(self, threshold: float = 0.88):
        self.cache = {}  # embedding → decomposition
        self.threshold = threshold
    
    async def get_or_decompose(self, query: str) -> DecomposedQuery:
        # 检查缓存: 语义相似的历史分解
        query_vec = embed(query)
        best_match = find_nearest(self.cache, query_vec)
        
        if best_match and best_match.score > self.threshold:
            # 命中! 复用分解结构, 但替换实体名
            cached_decomp = best_match.value
            adapted = adapt_entities(cached_decomp, query)  # 替换实体名
            return adapted
        
        # 未命中: LLM分解 → 存入缓存
        decomp = await decompose_query(query)
        self.cache[query_vec] = decomp
        return decomp

示例:

  • "Compare BERT and GPT-2 on GLUE" → 分解结构被缓存
  • "Compare T5 and BART on SuperGLUE" → 命中! 复用结构,替换实体名

策略7: 异步管道化

原理

不要等一个步骤完全完成再开始下一步。部分结果就足以触发下一步

优化前 (步间全等待):
[检索10个doc全部返回] ──等待──▶ [用10个doc生成答案]
总计: 300ms + 500ms = 800ms

优化后 (管道化):
[检索: doc1 50ms] → 立即送给生成器开始prefill
[检索: doc2 80ms] → 追加到生成器context
[检索: doc3-10 ...] → 继续追加
          ↓
[生成: 首token ~200ms后开始] → 流式输出
总计: 用户在 ~250ms 后开始看到输出!

实现

import asyncio
from asyncio import Queue

async def pipeline_retrieve_and_generate(query: str):
    """管道化: 检索结果流式喂给生成器"""
    
    doc_queue = Queue()
    
    async def retriever_producer():
        """检索器: 每找到一批文档就放入队列"""
        # 向量检索最快, 先返回
        vec_docs = await qdrant_search(query, top_k=5)
        await doc_queue.put(("vector", vec_docs))
        
        # 图谱稍慢
        graph_docs = await neo4j_search(query, limit=5)
        await doc_queue.put(("graph", graph_docs))
        
        # RAPTOR 最慢
        raptor_docs = await raptor_search(query, limit=3)
        await doc_queue.put(("raptor", raptor_docs))
        
        await doc_queue.put(None)  # 信号: 检索完毕
    
    async def generator_consumer():
        """生成器: 一旦有足够文档就开始生成"""
        all_docs = []
        min_docs_to_start = 3  # 有3个文档就可以开始
        
        while True:
            item = await doc_queue.get()
            if item is None:
                break
            all_docs.extend(item[1])
            
            if len(all_docs) >= min_docs_to_start:
                # 开始流式生成 (后续文档可以通过context扩展追加)
                break
        
        # 用已有文档开始生成
        prompt = build_prompt(query, all_docs)
        async for token in llm.stream(prompt):
            yield token
    
    # 并行启动生产者和消费者
    asyncio.create_task(retriever_producer())
    async for token in generator_consumer():
        yield token

策略8: 模型级加速

8a. 分级模型选择

MODEL_LATENCY_MAP = {
    # 任务 → 模型 → 预估延迟
    "routing":     ("Qwen2.5-3B-Instruct", "~50ms"),    # 最小模型, 最快
    "decompose":   ("Qwen2.5-7B-Instruct", "~150ms"),   # 中等模型
    "sub_answer":  ("Qwen2.5-14B-Instruct", "~300ms"),  # 质量要求高
    "synthesis":   ("GPT-4o-mini", "~500ms"),            # 最终综合用API
}

# 对比: 如果所有步骤都用14B
# routing(14B): ~150ms, decompose(14B): ~300ms → 多花200ms却不提升质量

8b. 投机解码 (Speculative Decoding)

# vLLM 投机解码: 小模型草稿 + 大模型验证
vllm serve meta-llama/Llama-3.1-8B-Instruct \
    --speculative-model meta-llama/Llama-3.1-1B-Instruct \
    --num-speculative-tokens 5 \
    --speculative-disable-mqa-scorer
    
# 效果: 生成速度提升 1.5-2.5×, 输出质量不变

8c. 结构化输出加速

# 用 Outlines / vLLM structured output 约束生成
# 减少无效token生成, 直接输出JSON结构

from vllm import LLM, SamplingParams
from vllm.sampling_params import GuidedDecodingParams

llm = LLM(model="Qwen/Qwen2.5-7B-Instruct")

# JSON schema 约束 → 生成更快 (减少不必要的思考token)
guided_params = GuidedDecodingParams(json_schema=DecomposedQuery.model_json_schema())
params = SamplingParams(max_tokens=512, guided_decoding=guided_params)

优化后的完整时间线

=== 简单问题 (优化后, ~600ms 首token) ===

0ms    1ms     50ms        250ms       600ms       1200ms
│──────│───────│───────────│───────────│───────────│
│规则路由│HyDE改写│ 三路并行检索 │ 重排(100ms)│ 流式生成开始 │ ...输出完成
│<1ms  │(并行)  │(最慢200ms) │           │ ← 用户看到首token

=== 复合问题 (优化后, ~800ms 首token) ===

0ms    1ms     350ms       500ms       800ms       1500ms      2000ms
│──────│───────│───────────│───────────│───────────│───────────│
│规则判断│ 分解+投机检索 │ 补充检索    │ 子答案并行  │ 流式综合开始│ ...完成
│<1ms  │(并行300ms) │(并行150ms)│ (并行500ms)│ ← 首token  │
│      │            │           │            │            │
│      │ 投机: 原始query检索 与 LLM分解 同时进行          │
│      │ 节省: 不用等分解完才开始检索                       │

延迟对比汇总

┌────────────────────────────────────────────────────────────────────┐
│                    优化前后延迟对比                                   │
├──────────────────┬────────────┬────────────┬──────────────────────┤
│ 阶段             │ 优化前      │ 优化后      │ 优化手段              │
├──────────────────┼────────────┼────────────┼──────────────────────┤
│ 复杂度判断       │ 150ms      │ <1ms       │ 规则引擎替代LLM       │
│ 查询分解         │ 300ms(串行) │ 0ms(并行)  │ 投机执行+合并调用      │
│ 检索             │ 450ms(串行) │ 200ms(并行)│ asyncio.gather       │
│ 子问题生成       │ 700ms(串行) │ 350ms(并行)│ 并行LLM+小模型        │
│ 重排             │ 200ms      │ 100ms      │ 量化reranker          │
│ 最终生成(首token)│ 500ms(等)  │ 50ms(流式) │ 流式+管道化           │
├──────────────────┼────────────┼────────────┼──────────────────────┤
│ 总延迟(简单)     │ ~1500ms    │ ~600ms     │ 60% 降低             │
│ 总延迟(复合)     │ ~3000ms    │ ~800ms     │ 73% 降低             │
│ 用户感知(首token)│ ~1500ms    │ ~400ms     │ 73% 降低             │
│ 缓存命中时       │ —          │ ~5ms       │ L1语义缓存            │
└──────────────────┴────────────┴────────────┴──────────────────────┘

实施优先级

优先级 策略 延迟收益 工作量 风险
P0 策略4: 流式输出 感知 -73% 半天
P0 策略3: 检索并行化 -150ms 半天
P0 策略5: 规则路由器 -150ms 2小时 边界case
P1 策略1: 投机执行 -300ms 1天 无用检索浪费
P1 策略2: LLM调用合并 -300ms 1天 Prompt工程
P2 策略7: 管道化 -200ms 2天 复杂度高
P2 策略8: 模型加速 -40%生成 2天 配置调优
P3 策略6: 分解缓存 命中-300ms 1天 缓存失效

终极架构: 低延迟 Agent 流水线

async def ultra_fast_query(query: str, session_id: str):
    """
    终极低延迟 Agent:
    - 简单问题: 400ms 首token
    - 复合问题: 800ms 首token  
    - 缓存命中: 5ms
    """
    
    # ===== L1: 语义缓存 (5ms) =====
    cached = await semantic_cache.get(query)
    if cached:
        yield {"type": "cached", "data": cached}
        return
    
    # ===== 规则路由 (<1ms) =====
    complexity = fast_complexity_check(query)
    
    if complexity == "simple":
        query_type = fast_query_type(query)
        
        # 检索 + HyDE 并行
        retrieve_task = asyncio.create_task(
            parallel_hybrid_retrieve(query, mode=query_type)
        )
        hyde_task = asyncio.create_task(hyde_expand(query))
        
        docs, hyde_docs = await asyncio.gather(retrieve_task, hyde_task)
        all_docs = rrf_fusion(docs + hyde_docs)
        reranked = await fast_rerank(query, all_docs, top_k=5)
        
        # 流式生成
        async for token in llm.stream(build_prompt(query, reranked)):
            yield {"type": "token", "data": token}
    
    else:  # composite
        # 投机执行: 分解 与 原始query检索 并行
        decompose_task = asyncio.create_task(
            unified_route_and_decompose(query)
        )
        speculative_task = asyncio.create_task(
            parallel_hybrid_retrieve(query, mode="hybrid", top_k=10)
        )
        
        # 先推送 "思考中" + 搜索来源
        spec_docs = await speculative_task
        yield {"type": "sources", "data": format_sources(spec_docs[:3])}
        
        # 分解完成
        decomp = await decompose_task
        yield {"type": "plan", "data": [sq.question for sq in decomp.sub_questions]}
        
        # 补充子查询检索 (并行)
        additional = await parallel_sub_retrieval(decomp, already_have=spec_docs)
        all_docs = deduplicate(spec_docs + additional)
        reranked = await fast_rerank(query, all_docs, top_k=10)
        
        # 并行子答案 + 流式综合
        sub_answers = await parallel_sub_answers(decomp, reranked)
        async for token in synthesize_stream(query, sub_answers, reranked):
            yield {"type": "token", "data": token}
    
    # 写入缓存
    full_answer = collect_tokens()
    await semantic_cache.set(query, full_answer)

参考论文

论文 技术 应用
Speculative Decoding (2302.01318) 小模型草稿+大模型验证 策略8b
Medusa (2401.10774) 多头投机解码 策略8b
ChunkAttention (2402.15220) 共享前缀KV管理 策略7
Skeleton-of-Thought (2307.15337) 并行内容生成 策略7
SGLang (2312.07104) RadixAttention + 前端调度 策略4/7