Threat_Hunter / agents /intel_fusion.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# agents/intel_fusion.py
# 功能:Intel Fusion Agent — 六維情報融合師
# 架構依據:MacNet DAG 並行節點 + 六維複合評分公式
# Harness 支柱:Constraints(憲法注入)+ Observability(維度追蹤)+ Graceful Degradation
#
# 使用方式:
# from agents.intel_fusion import build_intel_fusion_agent, run_intel_fusion
#
# 六維情報來源(來自 skills/intel_fusion.md):
# NVD(CVSS)=0.20, EPSS=0.30, KEV=0.25, GHSA=0.10, ATT&CK=0.10, OTX=0.05
#
# 自主決策(Agent 根據漏洞特徵動態調整):
# cve_year < 2020 → EPSS 降至 0.10
# in_kev == True → EPSS 降至 0(KEV 已是最高事實)
# otx_fail_rate > 0.5 → OTX 降至 0.01
import json
import logging
import re
import time
from typing import Any, Callable
from crewai import Agent, Task
from config import SKILLS_DIR, SYSTEM_CONSTITUTION, degradation_status, get_llm
from tools.kev_tool import check_cisa_kev
from tools.memory_tool import read_memory, write_memory
from tools.nvd_tool import search_nvd
from tools.otx_tool import search_otx
logger = logging.getLogger("ThreatHunter.intel_fusion")
# ══════════════════════════════════════════════════════════════
# 六維預設權重(skills/intel_fusion.md Step 2)
# ══════════════════════════════════════════════════════════════
DEFAULT_WEIGHTS = {
"cvss": 0.20, # NVD CVSS — 理論嚴重性
"epss": 0.30, # FIRST.org EPSS — 實際利用概率(最重要)
"kev": 0.25, # CISA KEV — 確認在野利用(二元)
"ghsa": 0.10, # GitHub Advisory — 生態系專屬
"attck": 0.10, # MITRE ATT&CK — 攻擊戰術類型
"otx": 0.05, # AlienVault OTX — IoC 情報(可信度較低)
}
SKILL_PATH = SKILLS_DIR / "intel_fusion.md"
# KEV 確認後的最低複合分數(品質紅線:KEV 確認不可低估)
KEV_MIN_COMPOSITE_SCORE = 8.0
# 信心度計算閾值
CONFIDENCE_HIGH_DIMS = 4 # >= 4 個維度有資料 → HIGH
CONFIDENCE_MEDIUM_DIMS = 2 # >= 2 個維度有資料 → MEDIUM
# ══════════════════════════════════════════════════════════════
# 動態加權計算引擎(確定性程式碼)
# ══════════════════════════════════════════════════════════════
def calculate_composite_score(
cvss: float,
epss: float,
in_kev: bool,
ghsa_hits: int,
attack_techniques: int,
otx_count: int,
cve_year: int,
otx_fail_rate: float = 0.0,
) -> tuple[float, dict, str]:
"""
六維動態加權複合分數計算(skills/intel_fusion.md Step 4)。
這是確定性函式,不依賴 LLM。即使 LLM 推理出錯,這個計算不受影響。
權重動態調整規則(SOP Step 2):
cve_year < 2020 → epss_weight = 0.10(老漏洞 EPSS 數據少,重新分配至 cvss)
in_kev == True → epss_weight = 0(KEV 已是最高事實,重新分配至 kev)
otx_fail_rate > 0.5 → otx_weight = 0.01(OTX 降為可選,重新分配至 cvss)
Args:
cvss: CVSS 分數(0.0-10.0)
epss: EPSS 分數(0.0-1.0)
in_kev: 是否在 CISA KEV 清單
ghsa_hits: GHSA 告警命中數
attack_techniques: ATT&CK 技術匹配數(暫時用 0-3 估算)
otx_count: OTX 威脅情報命中數
cve_year: CVE 發布年份(如 2024)
otx_fail_rate: OTX API 失敗率(模組級追蹤)
Returns:
(composite_score, weights_used, confidence)
"""
# ── Step 1:動態調整權重 ─────────────────────────────────
weights = dict(DEFAULT_WEIGHTS)
if in_kev:
# KEV 確認 → EPSS 的「機率預測」已無意義(已確認在野)
surplus = weights["epss"]
weights["epss"] = 0.0
weights["kev"] += surplus # 重新分配給 KEV
logger.info("[INTEL] Weight adjusted: in_kev=True → epss=0.0, kev+=%.2f", surplus)
elif cve_year < 2020:
# 老漏洞 → EPSS 數據稀疏,降低 EPSS 權重
surplus = weights["epss"] - 0.10
weights["epss"] = 0.10
weights["cvss"] += surplus # 重新分配給 CVSS(更可靠)
logger.info("[INTEL] Weight adjusted: cve_year=%d < 2020 → epss=0.10, cvss+=%.2f", cve_year, surplus)
if otx_fail_rate > 0.5:
# OTX 不穩定 → 降低 OTX 權重
surplus = weights["otx"] - 0.01
weights["otx"] = 0.01
weights["cvss"] += surplus
logger.info("[INTEL] Weight adjusted: otx_fail_rate=%.2f → otx=0.01", otx_fail_rate)
# 確保權重總和為 1.0(浮點數精度修正)
total = sum(weights.values())
if abs(total - 1.0) > 0.001:
weights["cvss"] += 1.0 - total
# ── Step 2:各維度分數正規化(統一到 0.0-1.0)────────────
cvss_norm = min(cvss / 10.0, 1.0) # CVSS 0-10 → 0-1
epss_norm = min(max(float(epss), 0.0), 1.0) # 已是 0-1
kev_norm = 1.0 if in_kev else 0.0 # 二元
ghsa_norm = min(ghsa_hits / 5.0, 1.0) # 5+ 個 advisory → 滿分
attck_norm = min(attack_techniques / 3.0, 1.0) # 3+ 種技術 → 滿分
otx_norm = min(otx_count / 10.0, 1.0) # 10+ IoC → 滿分
# ── Step 3:加權計算 + 正規化到 0-10 ──────────────────────
composite_raw = (
cvss_norm * weights["cvss"] +
epss_norm * weights["epss"] +
kev_norm * weights["kev"] +
ghsa_norm * weights["ghsa"] +
attck_norm * weights["attck"] +
otx_norm * weights["otx"]
)
composite_score = round(composite_raw * 10.0, 4)
# ── Step 4:品質紅線(KEV 確認不可低估)───────────────────
if in_kev and composite_score < KEV_MIN_COMPOSITE_SCORE:
logger.warning(
"[INTEL] KEV hit but composite_score=%.2f < %.2f, applying floor",
composite_score, KEV_MIN_COMPOSITE_SCORE,
)
composite_score = KEV_MIN_COMPOSITE_SCORE
# ── Step 5:信心度計算(有多少維度有資料)───────────────────
dims_with_data = sum([
bool(cvss > 0),
bool(epss > 0),
True, # KEV:已查詢(即使 in_kev=False 也算查過)
bool(ghsa_hits > 0),
bool(attack_techniques > 0),
bool(otx_count > 0),
])
if dims_with_data >= CONFIDENCE_HIGH_DIMS:
confidence = "HIGH"
elif dims_with_data >= CONFIDENCE_MEDIUM_DIMS:
confidence = "MEDIUM"
else:
confidence = "NEEDS_VERIFICATION"
return composite_score, weights, confidence
# ══════════════════════════════════════════════════════════════
# Skill SOP 載入
# ══════════════════════════════════════════════════════════════
# Phase 4D: 使用 SkillLoader 熱載入系統
try:
from skills.skill_loader import skill_loader as _skill_loader
_SKILL_LOADER_AVAILABLE = True
logger.info("[IntelFusion] Phase 4D: SkillLoader 啟用 ✓")
except ImportError:
_skill_loader = None
_SKILL_LOADER_AVAILABLE = False
def _load_skill() -> str:
"""載入 Intel Fusion SOP(Phase 4D: SkillLoader 熱載入 + Graceful Degradation)"""
# Phase 4D: SkillLoader 熱載入路徑
if _SKILL_LOADER_AVAILABLE and _skill_loader is not None:
try:
return _skill_loader.load_skill("intel_fusion.md")
except Exception as e:
logger.warning("[IntelFusion] SkillLoader 失敗,回退磁碟讀取: %s", e)
# Fallback: 直接從磁碟讀取
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
if SKILL_PATH.exists():
content = SKILL_PATH.read_text(encoding=encoding).strip()
if content:
logger.info("[OK] Intel Fusion Skill loaded: %d chars", len(content))
return content
except (IOError, UnicodeDecodeError):
continue
logger.warning("[WARN] Intel Fusion Skill file not found, using fallback")
return _FALLBACK_SKILL
_FALLBACK_SKILL = """
# Intel Fusion Agent - Six-Dimension Intelligence Fusion SOP
## Core Work
1. Read API health state with read_memory(intel_fusion).
2. Decide which intelligence dimensions to query for each CVE.
3. Call available tools: search_nvd / check_cisa_kev / search_otx.
4. Use EPSS and GHSA tools when they are available.
5. When KEV is positive, output shortcut_kev: true to notify the Orchestrator.
6. Output six-dimension scoring results as pure JSON.
""".strip()
# ══════════════════════════════════════════════════════════════
# Agent 工廠
# ══════════════════════════════════════════════════════════════
def build_intel_fusion_agent(excluded_models: list[str] | None = None) -> Agent:
"""
建立 Intel Fusion Agent(六維情報融合師)。
可用 Tools:
- search_nvd(NVD CVSS)
- check_cisa_kev(KEV 清單)
- search_otx(OTX 威脅情報)
- fetch_epss_score(EPSS)
- query_ghsa(GHSA)
- read_memory / write_memory(API 健康狀態)
Args:
excluded_models: 要排除的模型名稱列表(429 重試時傳入)
Returns:
CrewAI Agent 實例
"""
skill_content = _load_skill()
# 嘗試載入 EPSS 和 GHSA Tool(可選,失敗時降級)
optional_tools: list = []
try:
from tools.epss_tool import fetch_epss_score
optional_tools.append(fetch_epss_score)
logger.info("[OK] EPSS Tool loaded for Intel Fusion")
except Exception as e:
logger.warning("[WARN] EPSS Tool not available: %s", e)
try:
from tools.ghsa_tool import query_ghsa
optional_tools.append(query_ghsa)
logger.info("[OK] GHSA Tool loaded for Intel Fusion")
except Exception as e:
logger.warning("[WARN] GHSA Tool not available: %s", e)
# Phase 7.5: 加入 search_osv(ecosystem-aware,不會返回 CVE-1999 遠古漏洞)
try:
from tools.osv_tool import search_osv as _search_osv_tool
optional_tools.append(_search_osv_tool)
except Exception as _osv_ex:
logger.warning("[WARN] OSV Tool not available for Intel Fusion: %s", _osv_ex)
core_tools = [search_nvd, check_cisa_kev, search_otx, read_memory, write_memory]
all_tools = core_tools + optional_tools
backstory = f"""You are ThreatHunter's Intelligence Fusion Agent.
Your task is to decide which intelligence dimensions to query, fuse six-dimensional evidence, and output composite risk scores.
=== System Constitution ===
{SYSTEM_CONSTITUTION}
=== Six-Dimension Fusion SOP ===
{skill_content}
=== Available Tools ===
- search_osv: query OSV.dev advisories first; it is ecosystem-aware and avoids ancient unrelated CVEs.
- search_nvd: fallback only when search_osv returns count=0.
- check_cisa_kev: query the CISA KEV catalog; batch CVEs with comma-separated input.
- search_otx: query OTX threat intelligence when CVSS >= 7.0.
{('- fetch_epss_score: query EPSS exploit probability when the item is not in KEV.' + chr(10)) if any(t.name == 'search_epss' for t in optional_tools) else ''}
{('- query_ghsa: query GitHub Advisory Database for Python/npm package advisories.' + chr(10)) if any(t.name == 'search_ghsa' for t in optional_tools) else ''}
- read_memory / write_memory: read and write API health state.
=== Autonomous Decision Rules (must follow) ===
- If in_kev == true, skip EPSS because KEV is already the strongest exploitation signal; output shortcut_kev: true.
- If cve_year < 2020, EPSS data may be sparse; EPSS can be skipped.
- If OTX fails repeatedly, record it in api_health and lower OTX priority.
- Query at least two dimensions; otherwise set confidence = NEEDS_VERIFICATION.
=== Output Format (pure JSON only) ===
{{
"fusion_results": [
{{
"cve_id": "CVE-2024-XXXX",
"composite_score": 8.7,
"dimension_scores": {{
"cvss": 9.8, "epss": 0.97, "kev": true, "ghsa_severity": "CRITICAL",
"attck_technique": "T1190", "otx_threat": "active"
}},
"weights_used": {{"cvss": 0.20, "epss": 0.30, "kev": 0.25, "ghsa": 0.10, "attck": 0.10, "otx": 0.05}},
"confidence": "HIGH",
"dimensions_used": ["nvd", "epss", "kev"],
"shortcut_kev": false
}}
],
"strategy_applied": "standard_2024",
"api_health_summary": {{"nvd": "ok", "epss": "ok", "kev": "ok"}}
}}
"""
llm = get_llm(exclude_models=excluded_models or [])
agent = Agent(
role="Intelligence Fusion Specialist",
goal=(
"Autonomously choose six-dimension intelligence queries, fuse NVD/EPSS/KEV/GHSA/ATT&CK/OTX evidence, "
"output composite risk scores with dimension contributions, and trigger the Small-World shortcut on KEV hits."
),
backstory=backstory,
tools=all_tools,
llm=llm,
verbose=True, # Harness: Observability
max_iter=5, # v3.5: Gemini-3-Flash ~4s/call, 5次NVD/KEV查詢足夠
allow_delegation=False, # Intel Fusion 自己做完,不委派
)
logger.info(
"[OK] Intel Fusion Agent created | tools=%s | max_iter=%d",
[t.name for t in agent.tools], agent.max_iter,
)
return agent
# ══════════════════════════════════════════════════════════════
# Pipeline 執行器
# ══════════════════════════════════════════════════════════════
def run_intel_fusion(
tech_stack_or_cves: str | list,
on_progress: Callable | None = None,
orchestration_ctx: Any = None,
) -> dict:
"""
執行完整的 Intel Fusion Pipeline。
Harness Engineering 多層保障:
Layer 1(Agent):LLM 自主選擇查詢維度 + 執行工具呼叫
Layer 2(程式碼):確定性 calculate_composite_score() 重新計算(防止 LLM 算錯)
Layer 3(Schema):驗證輸出格式 + KEV 命中通知 Orchestrator
Args:
tech_stack_or_cves: 技術堆疊字串 或 CVE ID 列表(Feedback Loop 用)
on_progress: 進度回調(SSE 使用)
orchestration_ctx: OrchestrationContext(用於記錄 KEV 捷徑)
Returns:
fusion_results dict(格式符合 FINAL_PLAN.md §六 的 Scout → Analyst 輸入)
"""
t0 = time.time()
logger.info("[INTEL] Starting Intel Fusion Pipeline...")
if on_progress:
try:
on_progress("intel_fusion", "RUNNING", {"step": "initializing"})
except Exception:
pass
# ── v3.4 準備輸入(輸入類型感知)────────────────────────────
# list[str]:來自 PackageExtractor 的乾淨套件名稱(Path B 程式碼模式,正確路徑)
# str:原始 tech_stack 或 CVE 列表(Path A 套件清單模式)
discovery_context = tech_stack_or_cves if isinstance(tech_stack_or_cves, dict) else {}
cve_list_for_task: list[str] = []
cwe_list_for_task: list[str] = []
package_list_for_task: list[str] = []
if discovery_context:
cve_list_for_task = [
str(item).strip() for item in discovery_context.get("cve_ids", [])
if str(item).strip().startswith(("CVE-", "GHSA-"))
]
cwe_list_for_task = [
str(item).strip().upper() for item in discovery_context.get("cwe_ids", [])
if str(item).strip().upper().startswith("CWE-")
]
package_list_for_task = [
str(item).strip() for item in discovery_context.get("packages", [])
if str(item).strip()
]
input_type = "post_discovery"
input_str = json.dumps({
"cve_ids": cve_list_for_task,
"cwe_ids": cwe_list_for_task,
"packages": package_list_for_task,
}, ensure_ascii=False)
logger.info(
"[INTEL] Input: post_discovery mode | cves=%d cwes=%d packages=%d",
len(cve_list_for_task), len(cwe_list_for_task), len(package_list_for_task),
)
if not cve_list_for_task and not cwe_list_for_task and not package_list_for_task:
logger.warning("[INTEL] Empty discovery context received")
if on_progress:
try:
on_progress("intel_fusion", "COMPLETE", {
"status": "NO_FINDINGS",
"cves_scored": 0,
"message": "No Scout CVEs or Security Guard CWE targets",
"duration_ms": 0,
})
except Exception as exc:
logger.debug("[INTEL] progress callback ignored: %s", exc)
return {
"fusion_results": [],
"strategy_applied": "no_findings",
"api_health_summary": {},
"_no_findings": True,
"_message": "No Scout CVEs or Security Guard CWE targets",
"_duration_ms": 0,
}
if cwe_list_for_task and not cve_list_for_task and not package_list_for_task:
package_list_for_task = cwe_list_for_task
elif isinstance(tech_stack_or_cves, list):
if not tech_stack_or_cves:
# Harness Layer 0:空套件列表 → 結構性降級,不浪費 LLM 呼叫
logger.warning(
"[INTEL] Empty package list received — no 3rd-party packages identified. "
"Returning structured empty result (not a LLM failure)."
)
if on_progress:
try:
on_progress("intel_fusion", "COMPLETE", {
"status": "NO_PACKAGES",
"cves_scored": 0,
"message": "No third-party packages identified in source code",
"duration_ms": 0,
})
except Exception:
pass
return {
"fusion_results": [],
"strategy_applied": "no_packages",
"api_health_summary": {},
"_no_packages": True,
"_message": "No third-party packages identified — only stdlib imports detected",
"_duration_ms": 0,
}
input_str = ", ".join(tech_stack_or_cves)
input_type = "package_list"
package_list_for_task = tech_stack_or_cves
logger.info("[INTEL] Input: package_list mode with %d packages: %s", len(tech_stack_or_cves), tech_stack_or_cves)
else:
input_str = str(tech_stack_or_cves)
input_type = "tech_stack"
package_list_for_task = []
if len(input_str) > 500:
logger.warning(
"[INTEL] WARNING: input_str length=%d (may be raw source code). "
"Expected package names. Use PackageExtractor in main.py.",
len(input_str)
)
# ── 執行 Agent(含 429 重試)──────────────────────────────
MAX_RETRIES = 2
excluded_models: list[str] = []
result: dict = {}
for attempt in range(MAX_RETRIES + 1):
try:
from config import get_current_model_name, mark_model_failed
from crewai import Crew, Process
agent = build_intel_fusion_agent(excluded_models=excluded_models)
# v5.3:根據輸入類型使用不同的 task description(支援 CWE 查詢模式)
if cve_list_for_task:
cve_lines = "\n".join(f" - {cve}" for cve in cve_list_for_task[:30])
cwe_context = ", ".join(cwe_list_for_task[:12]) or "none"
package_context = ", ".join(package_list_for_task[:12]) or "none"
task_desc = (
"Scout and Security Guard already completed discovery. "
"Intel Fusion must only rank and enrich the discovered vulnerabilities.\n\n"
f"Discovered CVE/GHSA IDs:\n{cve_lines}\n\n"
f"Security Guard CWE context: {cwe_context}\n"
f"Package context: {package_context}\n\n"
"For each CVE/GHSA ID:\n"
"1. Call search_nvd(cve_id) for CVSS and description when the ID is a CVE.\n"
"2. Call check_cisa_kev(cve_id) for KEV status.\n"
"3. Call fetch_epss_score(cve_id) if the EPSS tool is available and the item is not in KEV.\n"
"4. Call search_otx(cve_id) for active threat signals when relevant.\n"
"5. Optionally call query_ghsa for GHSA/package advisory context.\n"
"6. Output pure JSON fusion_results for the discovered IDs only.\n\n"
"Hard constraints:\n"
"- Do not invent new CVE IDs.\n"
"- Do not add IDs that were not provided above unless a tool result explicitly aliases them.\n"
"- Use CWE context only as supporting code weakness context, not as fake CVE evidence.\n"
"- Output valid JSON only."
)
elif package_list_for_task:
# 判斷是套件名稱 還是 CWE targets(Security Guard 偵測後傳入)
is_cwe_mode = all(str(p).upper().startswith("CWE-") for p in package_list_for_task)
if is_cwe_mode:
# CWE 模式:用 search_nvd(cwe_id) 查對應真實 CVE,提供佐證
cwe_lines = "\n".join(f" - {cwe}" for cwe in package_list_for_task)
task_desc = (
f"Security Guard detected the following code weakness categories (CWE IDs) in source code:\n\n"
f"CWE categories to investigate:\n{cwe_lines}\n\n"
f"Your task: for each CWE, query NVD for the most relevant real CVEs from recent years "
f"(2018-2024) and retrieve CVSS scores as supporting evidence:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each CWE, call search_nvd(keyword=cwe_id) to query related CVEs.\n"
f" Example: search_nvd('CWE-89') for SQL Injection related CVEs.\n"
f" Example: search_nvd('CWE-502') for Insecure Deserialization related CVEs.\n"
f"3. Select the most representative CVEs, prioritizing highest CVSS and recent year.\n"
f"4. For selected CVEs, call check_cisa_kev to verify KEV status.\n"
f"5. Call write_memory to store API health state.\n"
f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Important notes:\n"
f"- The input is CWE IDs (weakness categories), not package names.\n"
f"- The purpose is to find real CVE/CVSS evidence for Security Guard detections.\n"
f"- Report at least one related CVE per CWE when NVD has data.\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
else:
# 套件模式(原本邏輯):search_osv 優先,search_nvd 為 fallback
pkg_lines = "\n".join(f" - {pkg}" for pkg in package_list_for_task)
task_desc = (
f"Analyze security intelligence for the following third-party packages extracted from source code:\n\n"
f"Package list to investigate:\n{pkg_lines}\n\n"
f"Input type: {input_type} (package-name list)\n\n"
f"You must query every package one by one. Do not skip any package:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each package, call search_osv first for ecosystem-aware CVEs without CVE-1999 noise.\n"
f" If search_osv returns count=0, use search_nvd as fallback.\n"
f"3. Batch call check_cisa_kev to query KEV status.\n"
f"4. If NOT in_kev, call search_otx.\n"
f"5. Call write_memory to store API health state.\n"
f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Important notes:\n"
f"- The items above are package names such as requests or flask, not source code.\n"
f"- Call search_osv for every package, for example search_osv('requests').\n"
f"- Each package may have zero or more CVEs; report the tool results honestly.\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
else:
task_desc = (
f"Analyze intelligence for the following technology stack or CVE list:\n{input_str[:2000]}\n\n"
f"Input type: {input_type}\n\n"
f"You need to:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each package, call search_osv for ecosystem-aware CVEs; use search_nvd only as fallback when empty.\n"
f"3. Batch call check_cisa_kev to query KEV status.\n"
f"4. If NOT in_kev, call search_epss or search_otx.\n"
f"5. For Python packages, call search_ghsa.\n"
f"6. Call write_memory to store API health state.\n"
f"7. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
task = Task(
description=task_desc,
expected_output=(
"Pure JSON six-dimension intelligence fusion result, "
"including the fusion_results array and api_health_summary."
),
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True,
)
try:
from checkpoint import recorder as _cp
from config import get_current_model_name as _gcmn
_if_model = _gcmn(agent.llm)
_cp.llm_call("intel_fusion", _if_model, "openrouter", f"attempt={attempt+1}")
except Exception:
_if_model = "unknown"
_t_if = time.time()
crew_result = crew.kickoff()
result_str = str(crew_result).strip()
try:
_cp.llm_result("intel_fusion", _if_model, "SUCCESS",
len(result_str), int((time.time() - _t_if) * 1000),
thinking=result_str[:1000])
except Exception:
pass
# ── 解析 JSON 輸出 ──────────────────────────
# v5.2: 超短輸出保護(< 500 chars 且無 JSON → CrewAI 純文字 forceRun 回覆)
# 例:len=168 "In the Final Answer, do not use JSON..." → 空 fusion_results
MIN_JSON_LEN = 500
if len(result_str) < MIN_JSON_LEN and "{" not in result_str:
logger.warning(
"[INTEL] LLM output too short for JSON (%d chars, no '{'), "
"likely CrewAI forceRun plain-text reply. Returning empty fusion.",
len(result_str)
)
result = {
"fusion_results": [],
"packages_queried": [],
"_degraded": True,
"_reason": f"Intel Fusion plain-text response ({len(result_str)} chars): {result_str[:100]}",
}
break
# v5.1: 超長輸出保護(LLM 輸出 >50k chars 通常是 CrewAI forceRun 觸發)
MAX_RESULT_LEN = 50_000
if len(result_str) > MAX_RESULT_LEN:
logger.warning(
"[INTEL] LLM output too long (%d chars), truncating and extracting JSON",
len(result_str)
)
# 嘗試從末尾 10000 chars 找 JSON(CrewAI 強迫完成時 JSON 通常放最後)
tail = result_str[-10_000:]
# 從中間提取,不截斷 result_str(以免破壞完整 JSON block)
result_str_for_parse = tail
else:
result_str_for_parse = result_str
if "```json" in result_str_for_parse:
result_str_for_parse = result_str_for_parse.split("```json")[1].split("```")[0].strip()
elif "```" in result_str_for_parse:
parts = result_str_for_parse.split("```")
if len(parts) >= 3:
result_str_for_parse = parts[1].strip()
# 若尾部沒有,回頭從完整輸出找
if len(result_str) > MAX_RESULT_LEN and "{" not in result_str_for_parse:
result_str_for_parse = result_str
result = None
try:
result = json.loads(result_str_for_parse)
except json.JSONDecodeError:
# 層 2:非貪婪匹配最後一個完整 {} block
# 用 findall 取所有候選,優先嘗試最後一個(通常是 LLM 真實輸出)
_candidates = re.findall(r'\{[\s\S]+?\}', result_str_for_parse)
if not _candidates and len(result_str) > MAX_RESULT_LEN:
# 若尾部沒找到,掃整個輸出找最大的 JSON block
_candidates = re.findall(r'\{[\s\S]+?\}', result_str)
for _candidate in reversed(_candidates):
try:
result = json.loads(_candidate)
if isinstance(result, dict):
logger.info("[INTEL] JSON extracted from long output (candidate len=%d)", len(_candidate))
break
except json.JSONDecodeError:
continue
if result is None:
# 層 3:無法解析,讓外層 except 捕獲並 graceful degrade
raise ValueError(
f"LLM output is not JSON (len={len(result_str)}): {result_str[:120]}"
)
break # 成功
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_RETRIES:
from config import get_current_model_name, mark_model_failed
try:
current_model = get_current_model_name(agent.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[INTEL] 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_RETRIES, retry_after)
try:
from checkpoint import recorder as _cp2
_cp2.llm_retry("intel_fusion", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
from config import rate_limiter as _rl
_rl.on_429(retry_after=retry_after, caller="intel_fusion") # 最少 30s
continue
except Exception:
pass
# 非 429 或重試超限 → Graceful Degradation
logger.error("[INTEL] Agent failed: %s", e)
degradation_status.degrade("Intel Fusion Agent", str(e))
result = _build_degraded_result(input_str, str(e))
break
# ── Harness Layer 2:程式碼層重新計算複合分數 ────────────
# 即使 LLM 計算錯誤,這一層確保數學正確性
result = _verify_and_recalculate(result)
result = _apply_evidence_type_contract(
result,
direct_cve_ids=set(cve_list_for_task),
cwe_targets=set(cwe_list_for_task),
package_targets=set(package_list_for_task),
)
try:
_recalc_count = sum(1 for f in result.get("fusion_results", []) if f.get("_score_recalculated"))
_cp.harness_check("intel_fusion", "L2", "score_recalculation",
"CORRECTED" if _recalc_count > 0 else "PASS",
details={"recalculated_count": _recalc_count,
"total_fusions": len(result.get("fusion_results", []))})
except Exception:
pass
# ── Harness Layer 3:KEV 捷徑通知 ────────────────────────
if orchestration_ctx is not None:
for fusion in result.get("fusion_results", []):
if fusion.get("shortcut_kev") or fusion.get("dimension_scores", {}).get("kev"):
cve_id = fusion.get("cve_id", "")
if cve_id:
try:
orchestration_ctx.record_kev_hit(cve_id)
logger.warning("[INTEL] KEV shortcut registered for %s", cve_id)
except Exception:
pass
duration_ms = int((time.time() - t0) * 1000)
result["_duration_ms"] = duration_ms
if on_progress:
try:
fusion_count = len(result.get("fusion_results", []))
kev_hits = sum(1 for f in result.get("fusion_results", []) if f.get("shortcut_kev"))
is_degraded = result.get("_degraded", False)
on_progress("intel_fusion", "COMPLETE", {
"status": "DEGRADED" if is_degraded else "SUCCESS",
"fusion_count": fusion_count,
"kev_hits": kev_hits,
"duration_ms": duration_ms,
# DEGRADED 時帶入錯誤訊息,供 server.py on_progress 提取
"_degraded": is_degraded,
"_error": result.get("_error", "") if is_degraded else "",
})
except Exception:
pass
logger.info(
"[INTEL] Pipeline complete in %dms | fusions=%d",
duration_ms, len(result.get("fusion_results", [])),
)
return result
def _apply_evidence_type_contract(
result: dict,
direct_cve_ids: set[str] | None = None,
cwe_targets: set[str] | None = None,
package_targets: set[str] | None = None,
) -> dict:
"""
Intel Fusion evidence boundary.
Direct CVE/GHSA IDs from Scout can enrich package actions. CVEs discovered only
while supporting Security Guard CWE findings are representative evidence and
must not enter package action lists.
"""
direct_ids = {str(item).strip() for item in (direct_cve_ids or set()) if str(item).strip()}
cwes = {str(item).strip().upper() for item in (cwe_targets or set()) if str(item).strip().upper().startswith("CWE-")}
packages = {str(item).strip() for item in (package_targets or set()) if str(item).strip()}
if not result.get("fusion_results"):
result.setdefault("evidence_contract", {
"direct_cve_count": len(direct_ids),
"cwe_support_count": len(cwes),
"package_target_count": len(packages),
"representative_cve_count": 0,
})
return result
representative_count = 0
direct_count = 0
for fusion in result.get("fusion_results", []):
cve_id = str(fusion.get("cve_id", "")).strip()
current_type = str(fusion.get("evidence_type", "")).strip()
if current_type == "representative_cve" or fusion.get("must_not_enter_package_actions"):
evidence_type = "representative_cve"
elif cve_id in direct_ids:
evidence_type = "direct_cve"
elif cwes and cve_id not in direct_ids:
evidence_type = "representative_cve"
else:
evidence_type = "package_cve"
fusion["evidence_type"] = evidence_type
if evidence_type == "representative_cve":
representative_count += 1
fusion["not_directly_observed"] = True
fusion["must_not_enter_package_actions"] = True
fusion.setdefault("finding_source", "cwe_support")
if cwes and not fusion.get("supports_cwe"):
fusion["supports_cwe"] = sorted(cwes)
fusion.setdefault(
"evidence_note",
"Representative CVE for a Security Guard CWE finding; not a directly observed package CVE.",
)
else:
direct_count += 1
fusion["not_directly_observed"] = False
fusion["must_not_enter_package_actions"] = False
fusion.setdefault("finding_source", "package_scan")
if packages and not fusion.get("source_package_evidence"):
fusion["source_package_evidence"] = sorted(packages)
result["evidence_contract"] = {
"direct_cve_count": direct_count,
"cwe_support_count": len(cwes),
"package_target_count": len(packages),
"representative_cve_count": representative_count,
}
return result
def _verify_and_recalculate(result: dict) -> dict:
"""
Harness Layer 2:用確定性程式碼重新計算複合分數。
防止 LLM 計算錯誤或編造數字。
"""
fusion_results = result.get("fusion_results", [])
if not fusion_results:
return result
recalculated = []
for fusion in fusion_results:
try:
dims = fusion.get("dimension_scores", {})
cvss = float(dims.get("cvss", 0.0))
epss = float(dims.get("epss", 0.0)) if dims.get("epss") is not None else 0.0
in_kev = bool(dims.get("kev", False))
ghsa_sev = dims.get("ghsa_severity", "UNKNOWN")
ghsa_hits = {"CRITICAL": 3, "HIGH": 2, "MODERATE": 1, "LOW": 1}.get(ghsa_sev, 0)
attck_tech = 1 if dims.get("attck_technique") else 0
otx_threat = 1 if dims.get("otx_threat") == "active" else 0
# 從 CVE ID 取出年份
cve_id = fusion.get("cve_id", "CVE-2024-0000")
try:
cve_year = int(cve_id.split("-")[1])
except (IndexError, ValueError):
cve_year = 2024
recalculated_score, weights, confidence = calculate_composite_score(
cvss=cvss,
epss=epss,
in_kev=in_kev,
ghsa_hits=ghsa_hits,
attack_techniques=attck_tech,
otx_count=otx_threat,
cve_year=cve_year,
)
# 若 LLM 的分數與程式碼計算差異超過 1.5 → 使用程式碼計算的(更可信)
original_score = float(fusion.get("composite_score", recalculated_score))
if abs(original_score - recalculated_score) > 1.5:
logger.warning(
"[INTEL][VERIFY] Score discrepancy for %s: LLM=%.2f, Code=%.2f → using Code",
cve_id, original_score, recalculated_score,
)
fusion["composite_score"] = recalculated_score
fusion["confidence"] = confidence
fusion["weights_used"] = weights
fusion["_score_recalculated"] = True
else:
# 分數合理,但信心度統一用程式碼計算
fusion["confidence"] = confidence
recalculated.append(fusion)
except Exception as e:
logger.warning("[INTEL][VERIFY] Failed to recalculate for %s: %s", fusion.get("cve_id"), e)
recalculated.append(fusion) # 保留原始值
result["fusion_results"] = recalculated
# Harness Layer 2.5:CVE 年份過濾(最后防線)
# 任何進入 Intel Fusion 的远古 CVE( < 2005)在此一律濾除
CVE_YEAR_MIN = 2005
fresh_fusions = []
ancient_removed = []
for fusion in result["fusion_results"]:
cve_id = fusion.get("cve_id", "")
if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
fresh_fusions.append(fusion)
continue
try:
yr = int(cve_id.split("-")[1])
if yr < CVE_YEAR_MIN:
ancient_removed.append(cve_id)
logger.warning(
"[INTEL HARNESS 2.5] Ancient CVE filtered (year=%d < %d): %s",
yr, CVE_YEAR_MIN, cve_id
)
else:
fresh_fusions.append(fusion)
except (IndexError, ValueError):
fresh_fusions.append(fusion)
if ancient_removed:
result["fusion_results"] = fresh_fusions
result["ancient_cves_filtered"] = ancient_removed
logger.warning(
"[INTEL] Removed %d ancient CVEs from fusion_results: %s",
len(ancient_removed), ancient_removed
)
return result
def _build_degraded_result(input_str: str, error: str) -> dict:
"""
Graceful Degradation:Agent 失敗時的最小生存輸出。
讓 Scout 知道 Intel Fusion 已降級,但不中斷管線。
"""
return {
"fusion_results": [],
"strategy_applied": "degraded",
"api_health_summary": {"nvd": "unknown", "epss": "unknown", "kev": "unknown"},
"_degraded": True,
"_error": error[:200],
"_input": input_str[:100],
}