Threat_Hunter / docs /pipeline_guide.md
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# ThreatHunter Pipeline 執行手冊
> **版本**:v1.0
> **日期**:2026-04-06
> **適用對象**:開發者、維運人員
---
## 一、Pipeline 架構
### 1.1 整體流程
```
使用者輸入
┌─────────────────────────────────────────────────────┐
│ main.py: run_pipeline(tech_stack) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Scout │─▶│ Analyst │─▶│ Critic │──┐ │
│ │(事實收集) │ │(推理判斷) │ │(可插拔) │ │ │
│ └──────────┘ └──────────┘ └──────────┘ │ │
│ ┌──────────────▼──┐ │
│ │ Advisor │ │
│ │ (最終裁決報告) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────┘
最終 JSON 報告 + pipeline_meta
```
### 1.2 設計哲學
**為什麼不用純 CrewAI Sequential?**
根據 CrewAI 官方文檔:*"For any production-ready application, start with a Flow."*
本專案採用 **Pipeline + Crew 內部化** 架構:
- 每個 Stage 內部使用 CrewAI Crew(Agent 自主推理)
- Stage 之間由 Python 程式碼串接(精確控制、錯誤隔離)
- 保留 17 層 Harness 保障層(程式碼層驗證)
詳細分析請參考 `.opencode/plans/integration_plan.md`
---
## 二、各 Stage 詳細說明
### Stage 1:Scout Agent(事實收集)
**職責**:從公開漏洞資料庫收集指定技術堆疊的已知漏洞。
| 項目 | 說明 |
|---|---|
| 輸入 | `tech_stack`(字串,如 "Django 4.2, Redis 7.0") |
| 輸出 | Scout JSON(scan_id, vulnerabilities[], summary) |
| Tools | search_nvd, search_otx, read_memory, write_memory, history_search |
| Skill | `skills/threat_intel.md` |
| Harness | 5 層(強制 write_memory、Schema 驗證、CVE 真實性、package 補全、is_new 校正) |
**執行流程**
1. 讀取歷史記憶(read_memory)
2. 對每個套件查詢 NVD(search_nvd)
3. 對高危 CVE 查詢 OTX 威脅情報(search_otx)
4. 比對歷史標記 is_new
5. 寫入記憶(write_memory)
6. 程式碼層 CVE 真實性驗證(反查 NVD API)
7. 校正 is_new 標記
8. 輸出 JSON
**Graceful Degradation**
- NVD API 失敗 → 使用離線快取(data/nvd_cache/)
- 全部失敗 → 回傳空 vulnerabilities 清單,管線繼續
---
### Stage 2:Analyst Agent(推理判斷)
**職責**:驗證 KEV 和 Exploit 狀態,分析漏洞連鎖攻擊路徑。
| 項目 | 說明 |
|---|---|
| 輸入 | Scout 輸出(dict) |
| 輸出 | Analyst JSON(scan_id, risk_score, risk_trend, analysis[]) |
| Tools | check_cisa_kev, search_exploits, read_memory, write_memory, history_search |
| Skill | `skills/chain_analysis.md` |
| Harness | 4 層(強制 write_memory、Schema 驗證、chain_risk 邏輯、風險禁止降級) |
**執行流程**(3-Task 拆分架構):
1. **Collector**:讀取歷史 + 解析 Scout JSON
2. **Verifier**:KEV 驗證 + Exploit 搜尋 + Chain 分析
3. **Scorer**:風險計算 + 寫入記憶 + 輸出 JSON
**風險計算公式**
```
risk_score = min(100, Σ(cvss_score × weight))
weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
```
**風險禁止降級規則**
- adjusted_risk 只能升級,不能降級
- 如果 Agent 試圖降級,Harness Layer 4 強制修正
**Graceful Degradation**
- LLM 輸出無法解析 → 使用 Fallback 輸出
- chain_risk 欄位缺失 → 自動補充預設值
---
### Stage 3:Critic Agent(質疑挑戰,可插拔)
**職責**:對抗式辯論,挑戰 Analyst 的主觀判斷。
| 項目 | 說明 |
|---|---|
| 輸入 | Analyst 輸出(dict) |
| 輸出 | Critic JSON(debate_rounds, challenges[], scorecard, verdict) |
| 開關 | `ENABLE_CRITIC` 環境變數(預設 false) |
| Tools | check_cisa_kev, search_exploits, read_memory |
| Skill | `skills/debate_sop.md` |
| Harness | 3 層(SKIPPED/fallback、Schema+scorecard 修復、verdict 枚舉+型別安全) |
**辯論流程**(參考李宏毅論文 LLM Discussion Framework):
1. 挑戰 Analyst 的連鎖推理前提
2. 使用 Tool 驗證前提條件
3. 計算 5 維評分卡(證據 30%、路徑完整性 25%、反駁品質 20%、回應品質 15%、信心校準 10%)
4. 裁決:MAINTAIN(≥50 分)或 DOWNGRADE(<50 分)
**裁決規則**
- weighted_score ≥ 50 → MAINTAIN
- weighted_score < 50 → DOWNGRADE
- 有 CVE 在 CISA KEV 中 → 禁止 DOWNGRADE
**為什麼可插拔?**
- 辯論是「增強項」,不是「核心依賴」
- Demo 時可展示完整辯論,平時關閉節省 Token
---
### Stage 4:Advisor Agent(最終裁決)
**職責**:產出可執行的資安行動報告。
| 項目 | 說明 |
|---|---|
| 輸入 | Analyst 輸出 + Critic 裁決(dict) |
| 輸出 | Advisor JSON(executive_summary, actions{}, risk_score, risk_trend) |
| Tools | read_memory, write_memory, history_search |
| Skill | `skills/action_report.md` |
| Harness | 5 層(強制 write_memory、Schema 驗證、SOP 邏輯、risk_score 範圍、command 確保、歷史比對) |
**分級規則**
- **URGENT**:CVSS ≥ 9.0 或在 CISA KEV 中或有公開 PoC
- **IMPORTANT**:CVSS ≥ 7.0 或有攻擊鏈風險
- **RESOLVED**:已修補的歷史漏洞
**每個 URGENT 項目必須附帶**
- `command`:具體修補指令(如 `pip install --upgrade django`
- `reason`:為何標記為 URGENT
- `is_repeated`:是否為重複未修補項目
**歷史比對(Harness Layer 5)**
- 讀取 advisor_memory.json
- 如果 CVE 曾在歷史中出現且未 resolved → is_repeated=True
- 語氣遞升:"[REPEATED — STILL NOT PATCHED] ..."
---
## 三、錯誤處理機制
### 3.1 每個 Stage 的獨立 try-except
```python
try:
result = run_*_pipeline(input)
return result, sl # 成功
except Exception as e:
degradation_status.degrade("StageName", str(e))
return fallback_output, sl # 降級
```
### 3.2 降級狀態追蹤
```python
degradation_status.to_dict()
# {
# "level": 1, # 1=全速, 2=LLM降級, 3=API降級, 4=Agent降級, 5=最低生存
# "label": "⚡ 全速運行",
# "degraded_components": ["Scout: NVD API timeout"],
# "timestamp": "2026-04-06T..."
# }
```
### 3.3 五層降級瀑布
| 層級 | 狀態 | 觸發條件 |
|---|---|---|
| 1 | ⚡ 全速運行 | 所有元件正常 |
| 2 | ⚠️ LLM 降級 | vLLM → OpenRouter → OpenAI |
| 3 | ⚠️ API 降級 | NVD/OTX → 離線快取 |
| 4 | 🔶 Agent 降級 | Analyst/Critic 跳過 |
| 5 | 🔶 最低生存模式 | 使用上次掃描結果 |
---
## 四、Observability 日誌
### 4.1 StepLogger 原子步驟
每個 Stage 的執行都會記錄:
```json
{
"step": "COMPLETE",
"agent": "scout",
"timestamp": "2026-04-06T10:00:00Z",
"status": "SUCCESS",
"detail": "found 9 vulnerabilities",
"duration_ms": 1200
}
```
### 4.2 pipeline_meta 完整資訊
最終輸出的 `pipeline_meta` 包含:
```json
{
"pipeline_version": "3.0",
"tech_stack": "Django 4.2, Redis 7.0",
"stages_completed": 4,
"stages_detail": {
"scout": {"status": "SUCCESS", "vuln_count": 9, "duration_ms": 1200},
"analyst": {"status": "SUCCESS", "risk_score": 85, "duration_ms": 800},
"critic": {"status": "SUCCESS", "verdict": "MAINTAIN", "score": 80.5, "duration_ms": 600},
"advisor": {"status": "SUCCESS", "urgent_count": 2, "duration_ms": 500}
},
"enable_critic": false,
"critic_verdict": "SKIPPED",
"critic_score": 0,
"duration_seconds": 3.1,
"degradation": {"level": 1, "label": "⚡ 全速運行"},
"generated_at": "2026-04-06T..."
}
```
---
## 五、資料契約
### 5.1 Scout → Analyst
```json
{
"scan_id": "scan_20260406_001",
"timestamp": "ISO 8601",
"tech_stack": ["django 4.2", "redis 7.0"],
"vulnerabilities": [{"cve_id", "package", "cvss_score", "severity", "description", "is_new"}],
"summary": {"total", "new_since_last_scan", "critical", "high", "medium", "low"}
}
```
### 5.2 Analyst → Advisor
```json
{
"scan_id": "scan_20260406_001",
"risk_score": 85,
"risk_trend": "+10",
"analysis": [{"cve_id", "original_cvss", "adjusted_risk", "in_cisa_kev", "exploit_available", "chain_risk", "reasoning"}]
}
```
### 5.3 Advisor → UI
```json
{
"executive_summary": "...",
"actions": {
"urgent": [{"cve_id", "package", "severity", "action", "command", "reason", "is_repeated"}],
"important": [{"cve_id", "package", "severity", "action", "reason"}],
"resolved": []
},
"risk_score": 85,
"risk_trend": "+10",
"scan_count": 1,
"generated_at": "ISO 8601"
}
```
---
## 六、記憶系統
### 6.1 雙層記憶架構
| 層級 | 儲存方式 | 用途 | 冷啟動 |
|---|---|---|---|
| Layer 1 | JSON 檔案 | 精確取得上次結果 | 回傳 {},不崩潰 |
| Layer 2 | LlamaIndex 向量索引 | 語義搜尋歷史案例 | 0 文件時跳過 |
### 6.2 記憶檔案位置
```
memory/
├── scout_memory.json # Scout 的掃描歷史
├── analyst_memory.json # Analyst 的分析歷史
├── advisor_memory.json # Advisor 的建議歷史
└── vector_store/ # LlamaIndex 向量索引
```
### 6.3 寫入時機
每個 Agent 在給出 Final Answer 之前,必須先呼叫 `write_memory`
如果 Agent 忘記呼叫,Harness Layer 1 會強制代為執行。
---
*本文件與 `docs/quickstart.md` 搭配使用。*
## 7. 向量資料庫使用邊界
本專案目前不把向量資料庫作為漏洞判斷的主要來源。比賽展示與修復建議必須能回溯到本次掃描的 JSON、Checkpoint、工具輸出與程式碼行號;這些資料是 deterministic evidence,可直接驗證與重跑。
LlamaIndex/vector_store 只保留為歷史案例語義搜尋與輔助回憶用途,不可覆蓋本次掃描結果,也不可在缺少掃描證據時自行補出 CWE/CVE 結論。原因如下:
- 可審計性:JSON 與 checkpoint 能指出哪個 Agent、哪個工具、哪一行 code 產生結論。
- 比賽可重現性:同一份輸入重跑時,主結論應來自目前 pipeline,而不是相似歷史案例。
- 避免污染:向量檢索可能召回舊專案或相似但不相同的漏洞,若直接作為主證據會造成誤報。
- 成本控制:目前資料量小,JSON + checkpoint 已足以支撐 UI、Thinking Path 與修復建議。
未來若要啟用向量資料庫作為 RAG,需要新增 evidence provenance 欄位,並在 UI 標記「retrieved historical context」,不得標記成 verified finding。
---