File size: 11,178 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | # ThreatHunter Pipeline 執行手冊
> **版本**:v1.0
> **日期**:2026-04-06
> **適用對象**:開發者、維運人員
---
## 一、Pipeline 架構
### 1.1 整體流程
```
使用者輸入
│
▼
┌─────────────────────────────────────────────────────┐
│ main.py: run_pipeline(tech_stack) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Scout │─▶│ Analyst │─▶│ Critic │──┐ │
│ │(事實收集) │ │(推理判斷) │ │(可插拔) │ │ │
│ └──────────┘ └──────────┘ └──────────┘ │ │
│ ┌──────────────▼──┐ │
│ │ Advisor │ │
│ │ (最終裁決報告) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────┘
│
▼
最終 JSON 報告 + pipeline_meta
```
### 1.2 設計哲學
**為什麼不用純 CrewAI Sequential?**
根據 CrewAI 官方文檔:*"For any production-ready application, start with a Flow."*
本專案採用 **Pipeline + Crew 內部化** 架構:
- 每個 Stage 內部使用 CrewAI Crew(Agent 自主推理)
- Stage 之間由 Python 程式碼串接(精確控制、錯誤隔離)
- 保留 17 層 Harness 保障層(程式碼層驗證)
詳細分析請參考 `.opencode/plans/integration_plan.md`。
---
## 二、各 Stage 詳細說明
### Stage 1:Scout Agent(事實收集)
**職責**:從公開漏洞資料庫收集指定技術堆疊的已知漏洞。
| 項目 | 說明 |
|---|---|
| 輸入 | `tech_stack`(字串,如 "Django 4.2, Redis 7.0") |
| 輸出 | Scout JSON(scan_id, vulnerabilities[], summary) |
| Tools | search_nvd, search_otx, read_memory, write_memory, history_search |
| Skill | `skills/threat_intel.md` |
| Harness | 5 層(強制 write_memory、Schema 驗證、CVE 真實性、package 補全、is_new 校正) |
**執行流程**:
1. 讀取歷史記憶(read_memory)
2. 對每個套件查詢 NVD(search_nvd)
3. 對高危 CVE 查詢 OTX 威脅情報(search_otx)
4. 比對歷史標記 is_new
5. 寫入記憶(write_memory)
6. 程式碼層 CVE 真實性驗證(反查 NVD API)
7. 校正 is_new 標記
8. 輸出 JSON
**Graceful Degradation**:
- NVD API 失敗 → 使用離線快取(data/nvd_cache/)
- 全部失敗 → 回傳空 vulnerabilities 清單,管線繼續
---
### Stage 2:Analyst Agent(推理判斷)
**職責**:驗證 KEV 和 Exploit 狀態,分析漏洞連鎖攻擊路徑。
| 項目 | 說明 |
|---|---|
| 輸入 | Scout 輸出(dict) |
| 輸出 | Analyst JSON(scan_id, risk_score, risk_trend, analysis[]) |
| Tools | check_cisa_kev, search_exploits, read_memory, write_memory, history_search |
| Skill | `skills/chain_analysis.md` |
| Harness | 4 層(強制 write_memory、Schema 驗證、chain_risk 邏輯、風險禁止降級) |
**執行流程**(3-Task 拆分架構):
1. **Collector**:讀取歷史 + 解析 Scout JSON
2. **Verifier**:KEV 驗證 + Exploit 搜尋 + Chain 分析
3. **Scorer**:風險計算 + 寫入記憶 + 輸出 JSON
**風險計算公式**:
```
risk_score = min(100, Σ(cvss_score × weight))
weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
```
**風險禁止降級規則**:
- adjusted_risk 只能升級,不能降級
- 如果 Agent 試圖降級,Harness Layer 4 強制修正
**Graceful Degradation**:
- LLM 輸出無法解析 → 使用 Fallback 輸出
- chain_risk 欄位缺失 → 自動補充預設值
---
### Stage 3:Critic Agent(質疑挑戰,可插拔)
**職責**:對抗式辯論,挑戰 Analyst 的主觀判斷。
| 項目 | 說明 |
|---|---|
| 輸入 | Analyst 輸出(dict) |
| 輸出 | Critic JSON(debate_rounds, challenges[], scorecard, verdict) |
| 開關 | `ENABLE_CRITIC` 環境變數(預設 false) |
| Tools | check_cisa_kev, search_exploits, read_memory |
| Skill | `skills/debate_sop.md` |
| Harness | 3 層(SKIPPED/fallback、Schema+scorecard 修復、verdict 枚舉+型別安全) |
**辯論流程**(參考李宏毅論文 LLM Discussion Framework):
1. 挑戰 Analyst 的連鎖推理前提
2. 使用 Tool 驗證前提條件
3. 計算 5 維評分卡(證據 30%、路徑完整性 25%、反駁品質 20%、回應品質 15%、信心校準 10%)
4. 裁決:MAINTAIN(≥50 分)或 DOWNGRADE(<50 分)
**裁決規則**:
- weighted_score ≥ 50 → MAINTAIN
- weighted_score < 50 → DOWNGRADE
- 有 CVE 在 CISA KEV 中 → 禁止 DOWNGRADE
**為什麼可插拔?**
- 辯論是「增強項」,不是「核心依賴」
- Demo 時可展示完整辯論,平時關閉節省 Token
---
### Stage 4:Advisor Agent(最終裁決)
**職責**:產出可執行的資安行動報告。
| 項目 | 說明 |
|---|---|
| 輸入 | Analyst 輸出 + Critic 裁決(dict) |
| 輸出 | Advisor JSON(executive_summary, actions{}, risk_score, risk_trend) |
| Tools | read_memory, write_memory, history_search |
| Skill | `skills/action_report.md` |
| Harness | 5 層(強制 write_memory、Schema 驗證、SOP 邏輯、risk_score 範圍、command 確保、歷史比對) |
**分級規則**:
- **URGENT**:CVSS ≥ 9.0 或在 CISA KEV 中或有公開 PoC
- **IMPORTANT**:CVSS ≥ 7.0 或有攻擊鏈風險
- **RESOLVED**:已修補的歷史漏洞
**每個 URGENT 項目必須附帶**:
- `command`:具體修補指令(如 `pip install --upgrade django`)
- `reason`:為何標記為 URGENT
- `is_repeated`:是否為重複未修補項目
**歷史比對(Harness Layer 5)**:
- 讀取 advisor_memory.json
- 如果 CVE 曾在歷史中出現且未 resolved → is_repeated=True
- 語氣遞升:"[REPEATED — STILL NOT PATCHED] ..."
---
## 三、錯誤處理機制
### 3.1 每個 Stage 的獨立 try-except
```python
try:
result = run_*_pipeline(input)
return result, sl # 成功
except Exception as e:
degradation_status.degrade("StageName", str(e))
return fallback_output, sl # 降級
```
### 3.2 降級狀態追蹤
```python
degradation_status.to_dict()
# {
# "level": 1, # 1=全速, 2=LLM降級, 3=API降級, 4=Agent降級, 5=最低生存
# "label": "⚡ 全速運行",
# "degraded_components": ["Scout: NVD API timeout"],
# "timestamp": "2026-04-06T..."
# }
```
### 3.3 五層降級瀑布
| 層級 | 狀態 | 觸發條件 |
|---|---|---|
| 1 | ⚡ 全速運行 | 所有元件正常 |
| 2 | ⚠️ LLM 降級 | vLLM → OpenRouter → OpenAI |
| 3 | ⚠️ API 降級 | NVD/OTX → 離線快取 |
| 4 | 🔶 Agent 降級 | Analyst/Critic 跳過 |
| 5 | 🔶 最低生存模式 | 使用上次掃描結果 |
---
## 四、Observability 日誌
### 4.1 StepLogger 原子步驟
每個 Stage 的執行都會記錄:
```json
{
"step": "COMPLETE",
"agent": "scout",
"timestamp": "2026-04-06T10:00:00Z",
"status": "SUCCESS",
"detail": "found 9 vulnerabilities",
"duration_ms": 1200
}
```
### 4.2 pipeline_meta 完整資訊
最終輸出的 `pipeline_meta` 包含:
```json
{
"pipeline_version": "3.0",
"tech_stack": "Django 4.2, Redis 7.0",
"stages_completed": 4,
"stages_detail": {
"scout": {"status": "SUCCESS", "vuln_count": 9, "duration_ms": 1200},
"analyst": {"status": "SUCCESS", "risk_score": 85, "duration_ms": 800},
"critic": {"status": "SUCCESS", "verdict": "MAINTAIN", "score": 80.5, "duration_ms": 600},
"advisor": {"status": "SUCCESS", "urgent_count": 2, "duration_ms": 500}
},
"enable_critic": false,
"critic_verdict": "SKIPPED",
"critic_score": 0,
"duration_seconds": 3.1,
"degradation": {"level": 1, "label": "⚡ 全速運行"},
"generated_at": "2026-04-06T..."
}
```
---
## 五、資料契約
### 5.1 Scout → Analyst
```json
{
"scan_id": "scan_20260406_001",
"timestamp": "ISO 8601",
"tech_stack": ["django 4.2", "redis 7.0"],
"vulnerabilities": [{"cve_id", "package", "cvss_score", "severity", "description", "is_new"}],
"summary": {"total", "new_since_last_scan", "critical", "high", "medium", "low"}
}
```
### 5.2 Analyst → Advisor
```json
{
"scan_id": "scan_20260406_001",
"risk_score": 85,
"risk_trend": "+10",
"analysis": [{"cve_id", "original_cvss", "adjusted_risk", "in_cisa_kev", "exploit_available", "chain_risk", "reasoning"}]
}
```
### 5.3 Advisor → UI
```json
{
"executive_summary": "...",
"actions": {
"urgent": [{"cve_id", "package", "severity", "action", "command", "reason", "is_repeated"}],
"important": [{"cve_id", "package", "severity", "action", "reason"}],
"resolved": []
},
"risk_score": 85,
"risk_trend": "+10",
"scan_count": 1,
"generated_at": "ISO 8601"
}
```
---
## 六、記憶系統
### 6.1 雙層記憶架構
| 層級 | 儲存方式 | 用途 | 冷啟動 |
|---|---|---|---|
| Layer 1 | JSON 檔案 | 精確取得上次結果 | 回傳 {},不崩潰 |
| Layer 2 | LlamaIndex 向量索引 | 語義搜尋歷史案例 | 0 文件時跳過 |
### 6.2 記憶檔案位置
```
memory/
├── scout_memory.json # Scout 的掃描歷史
├── analyst_memory.json # Analyst 的分析歷史
├── advisor_memory.json # Advisor 的建議歷史
└── vector_store/ # LlamaIndex 向量索引
```
### 6.3 寫入時機
每個 Agent 在給出 Final Answer 之前,必須先呼叫 `write_memory`。
如果 Agent 忘記呼叫,Harness Layer 1 會強制代為執行。
---
*本文件與 `docs/quickstart.md` 搭配使用。*
## 7. 向量資料庫使用邊界
本專案目前不把向量資料庫作為漏洞判斷的主要來源。比賽展示與修復建議必須能回溯到本次掃描的 JSON、Checkpoint、工具輸出與程式碼行號;這些資料是 deterministic evidence,可直接驗證與重跑。
LlamaIndex/vector_store 只保留為歷史案例語義搜尋與輔助回憶用途,不可覆蓋本次掃描結果,也不可在缺少掃描證據時自行補出 CWE/CVE 結論。原因如下:
- 可審計性:JSON 與 checkpoint 能指出哪個 Agent、哪個工具、哪一行 code 產生結論。
- 比賽可重現性:同一份輸入重跑時,主結論應來自目前 pipeline,而不是相似歷史案例。
- 避免污染:向量檢索可能召回舊專案或相似但不相同的漏洞,若直接作為主證據會造成誤報。
- 成本控制:目前資料量小,JSON + checkpoint 已足以支撐 UI、Thinking Path 與修復建議。
未來若要啟用向量資料庫作為 RAG,需要新增 evidence provenance 欄位,並在 UI 標記「retrieved historical context」,不得標記成 verified finding。
---
|