Threat_Hunter / docs /pipeline_guide.md
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc

ThreatHunter Pipeline 執行手冊

版本:v1.0
日期:2026-04-06
適用對象:開發者、維運人員


一、Pipeline 架構

1.1 整體流程

使用者輸入
    │
    ▼
┌─────────────────────────────────────────────────────┐
│  main.py: run_pipeline(tech_stack)                   │
│                                                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐          │
│  │  Scout   │─▶│ Analyst  │─▶│  Critic  │──┐       │
│  │(事實收集) │  │(推理判斷) │  │(可插拔)  │  │       │
│  └──────────┘  └──────────┘  └──────────┘  │       │
│                              ┌──────────────▼──┐    │
│                              │    Advisor      │    │
│                              │  (最終裁決報告)  │    │
│                              └─────────────────┘    │
└─────────────────────────────────────────────────────┘
    │
    ▼
最終 JSON 報告 + pipeline_meta

1.2 設計哲學

為什麼不用純 CrewAI Sequential?

根據 CrewAI 官方文檔:*"For any production-ready application, start with a Flow."*

本專案採用 Pipeline + Crew 內部化 架構:

  • 每個 Stage 內部使用 CrewAI Crew(Agent 自主推理)
  • Stage 之間由 Python 程式碼串接(精確控制、錯誤隔離)
  • 保留 17 層 Harness 保障層(程式碼層驗證)

詳細分析請參考 .opencode/plans/integration_plan.md


二、各 Stage 詳細說明

Stage 1:Scout Agent(事實收集)

職責:從公開漏洞資料庫收集指定技術堆疊的已知漏洞。

項目 說明
輸入 tech_stack(字串,如 "Django 4.2, Redis 7.0")
輸出 Scout JSON(scan_id, vulnerabilities[], summary)
Tools search_nvd, search_otx, read_memory, write_memory, history_search
Skill skills/threat_intel.md
Harness 5 層(強制 write_memory、Schema 驗證、CVE 真實性、package 補全、is_new 校正)

執行流程

  1. 讀取歷史記憶(read_memory)
  2. 對每個套件查詢 NVD(search_nvd)
  3. 對高危 CVE 查詢 OTX 威脅情報(search_otx)
  4. 比對歷史標記 is_new
  5. 寫入記憶(write_memory)
  6. 程式碼層 CVE 真實性驗證(反查 NVD API)
  7. 校正 is_new 標記
  8. 輸出 JSON

Graceful Degradation

  • NVD API 失敗 → 使用離線快取(data/nvd_cache/)
  • 全部失敗 → 回傳空 vulnerabilities 清單,管線繼續

Stage 2:Analyst Agent(推理判斷)

職責:驗證 KEV 和 Exploit 狀態,分析漏洞連鎖攻擊路徑。

項目 說明
輸入 Scout 輸出(dict)
輸出 Analyst JSON(scan_id, risk_score, risk_trend, analysis[])
Tools check_cisa_kev, search_exploits, read_memory, write_memory, history_search
Skill skills/chain_analysis.md
Harness 4 層(強制 write_memory、Schema 驗證、chain_risk 邏輯、風險禁止降級)

執行流程(3-Task 拆分架構):

  1. Collector:讀取歷史 + 解析 Scout JSON
  2. Verifier:KEV 驗證 + Exploit 搜尋 + Chain 分析
  3. Scorer:風險計算 + 寫入記憶 + 輸出 JSON

風險計算公式

risk_score = min(100, Σ(cvss_score × weight))
weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5

風險禁止降級規則

  • adjusted_risk 只能升級,不能降級
  • 如果 Agent 試圖降級,Harness Layer 4 強制修正

Graceful Degradation

  • LLM 輸出無法解析 → 使用 Fallback 輸出
  • chain_risk 欄位缺失 → 自動補充預設值

Stage 3:Critic Agent(質疑挑戰,可插拔)

職責:對抗式辯論,挑戰 Analyst 的主觀判斷。

項目 說明
輸入 Analyst 輸出(dict)
輸出 Critic JSON(debate_rounds, challenges[], scorecard, verdict)
開關 ENABLE_CRITIC 環境變數(預設 false)
Tools check_cisa_kev, search_exploits, read_memory
Skill skills/debate_sop.md
Harness 3 層(SKIPPED/fallback、Schema+scorecard 修復、verdict 枚舉+型別安全)

辯論流程(參考李宏毅論文 LLM Discussion Framework):

  1. 挑戰 Analyst 的連鎖推理前提
  2. 使用 Tool 驗證前提條件
  3. 計算 5 維評分卡(證據 30%、路徑完整性 25%、反駁品質 20%、回應品質 15%、信心校準 10%)
  4. 裁決:MAINTAIN(≥50 分)或 DOWNGRADE(<50 分)

裁決規則

  • weighted_score ≥ 50 → MAINTAIN
  • weighted_score < 50 → DOWNGRADE
  • 有 CVE 在 CISA KEV 中 → 禁止 DOWNGRADE

為什麼可插拔?

  • 辯論是「增強項」,不是「核心依賴」
  • Demo 時可展示完整辯論,平時關閉節省 Token

Stage 4:Advisor Agent(最終裁決)

職責:產出可執行的資安行動報告。

項目 說明
輸入 Analyst 輸出 + Critic 裁決(dict)
輸出 Advisor JSON(executive_summary, actions{}, risk_score, risk_trend)
Tools read_memory, write_memory, history_search
Skill skills/action_report.md
Harness 5 層(強制 write_memory、Schema 驗證、SOP 邏輯、risk_score 範圍、command 確保、歷史比對)

分級規則

  • URGENT:CVSS ≥ 9.0 或在 CISA KEV 中或有公開 PoC
  • IMPORTANT:CVSS ≥ 7.0 或有攻擊鏈風險
  • RESOLVED:已修補的歷史漏洞

每個 URGENT 項目必須附帶

  • command:具體修補指令(如 pip install --upgrade django
  • reason:為何標記為 URGENT
  • is_repeated:是否為重複未修補項目

歷史比對(Harness Layer 5)

  • 讀取 advisor_memory.json
  • 如果 CVE 曾在歷史中出現且未 resolved → is_repeated=True
  • 語氣遞升:"[REPEATED — STILL NOT PATCHED] ..."

三、錯誤處理機制

3.1 每個 Stage 的獨立 try-except

try:
    result = run_*_pipeline(input)
    return result, sl  # 成功
except Exception as e:
    degradation_status.degrade("StageName", str(e))
    return fallback_output, sl  # 降級

3.2 降級狀態追蹤

degradation_status.to_dict()
# {
#   "level": 1,           # 1=全速, 2=LLM降級, 3=API降級, 4=Agent降級, 5=最低生存
#   "label": "⚡ 全速運行",
#   "degraded_components": ["Scout: NVD API timeout"],
#   "timestamp": "2026-04-06T..."
# }

3.3 五層降級瀑布

層級 狀態 觸發條件
1 ⚡ 全速運行 所有元件正常
2 ⚠️ LLM 降級 vLLM → OpenRouter → OpenAI
3 ⚠️ API 降級 NVD/OTX → 離線快取
4 🔶 Agent 降級 Analyst/Critic 跳過
5 🔶 最低生存模式 使用上次掃描結果

四、Observability 日誌

4.1 StepLogger 原子步驟

每個 Stage 的執行都會記錄:

{
  "step": "COMPLETE",
  "agent": "scout",
  "timestamp": "2026-04-06T10:00:00Z",
  "status": "SUCCESS",
  "detail": "found 9 vulnerabilities",
  "duration_ms": 1200
}

4.2 pipeline_meta 完整資訊

最終輸出的 pipeline_meta 包含:

{
  "pipeline_version": "3.0",
  "tech_stack": "Django 4.2, Redis 7.0",
  "stages_completed": 4,
  "stages_detail": {
    "scout": {"status": "SUCCESS", "vuln_count": 9, "duration_ms": 1200},
    "analyst": {"status": "SUCCESS", "risk_score": 85, "duration_ms": 800},
    "critic": {"status": "SUCCESS", "verdict": "MAINTAIN", "score": 80.5, "duration_ms": 600},
    "advisor": {"status": "SUCCESS", "urgent_count": 2, "duration_ms": 500}
  },
  "enable_critic": false,
  "critic_verdict": "SKIPPED",
  "critic_score": 0,
  "duration_seconds": 3.1,
  "degradation": {"level": 1, "label": "⚡ 全速運行"},
  "generated_at": "2026-04-06T..."
}

五、資料契約

5.1 Scout → Analyst

{
  "scan_id": "scan_20260406_001",
  "timestamp": "ISO 8601",
  "tech_stack": ["django 4.2", "redis 7.0"],
  "vulnerabilities": [{"cve_id", "package", "cvss_score", "severity", "description", "is_new"}],
  "summary": {"total", "new_since_last_scan", "critical", "high", "medium", "low"}
}

5.2 Analyst → Advisor

{
  "scan_id": "scan_20260406_001",
  "risk_score": 85,
  "risk_trend": "+10",
  "analysis": [{"cve_id", "original_cvss", "adjusted_risk", "in_cisa_kev", "exploit_available", "chain_risk", "reasoning"}]
}

5.3 Advisor → UI

{
  "executive_summary": "...",
  "actions": {
    "urgent": [{"cve_id", "package", "severity", "action", "command", "reason", "is_repeated"}],
    "important": [{"cve_id", "package", "severity", "action", "reason"}],
    "resolved": []
  },
  "risk_score": 85,
  "risk_trend": "+10",
  "scan_count": 1,
  "generated_at": "ISO 8601"
}

六、記憶系統

6.1 雙層記憶架構

層級 儲存方式 用途 冷啟動
Layer 1 JSON 檔案 精確取得上次結果 回傳 {},不崩潰
Layer 2 LlamaIndex 向量索引 語義搜尋歷史案例 0 文件時跳過

6.2 記憶檔案位置

memory/
├── scout_memory.json      # Scout 的掃描歷史
├── analyst_memory.json    # Analyst 的分析歷史
├── advisor_memory.json    # Advisor 的建議歷史
└── vector_store/          # LlamaIndex 向量索引

6.3 寫入時機

每個 Agent 在給出 Final Answer 之前,必須先呼叫 write_memory。 如果 Agent 忘記呼叫,Harness Layer 1 會強制代為執行。


本文件與 docs/quickstart.md 搭配使用。

7. 向量資料庫使用邊界

本專案目前不把向量資料庫作為漏洞判斷的主要來源。比賽展示與修復建議必須能回溯到本次掃描的 JSON、Checkpoint、工具輸出與程式碼行號;這些資料是 deterministic evidence,可直接驗證與重跑。

LlamaIndex/vector_store 只保留為歷史案例語義搜尋與輔助回憶用途,不可覆蓋本次掃描結果,也不可在缺少掃描證據時自行補出 CWE/CVE 結論。原因如下:

  • 可審計性:JSON 與 checkpoint 能指出哪個 Agent、哪個工具、哪一行 code 產生結論。
  • 比賽可重現性:同一份輸入重跑時,主結論應來自目前 pipeline,而不是相似歷史案例。
  • 避免污染:向量檢索可能召回舊專案或相似但不相同的漏洞,若直接作為主證據會造成誤報。
  • 成本控制:目前資料量小,JSON + checkpoint 已足以支撐 UI、Thinking Path 與修復建議。

未來若要啟用向量資料庫作為 RAG,需要新增 evidence provenance 欄位,並在 UI 標記「retrieved historical context」,不得標記成 verified finding。