File size: 43,752 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 | # agents/intel_fusion.py
# 功能:Intel Fusion Agent — 六維情報融合師
# 架構依據:MacNet DAG 並行節點 + 六維複合評分公式
# Harness 支柱:Constraints(憲法注入)+ Observability(維度追蹤)+ Graceful Degradation
#
# 使用方式:
# from agents.intel_fusion import build_intel_fusion_agent, run_intel_fusion
#
# 六維情報來源(來自 skills/intel_fusion.md):
# NVD(CVSS)=0.20, EPSS=0.30, KEV=0.25, GHSA=0.10, ATT&CK=0.10, OTX=0.05
#
# 自主決策(Agent 根據漏洞特徵動態調整):
# cve_year < 2020 → EPSS 降至 0.10
# in_kev == True → EPSS 降至 0(KEV 已是最高事實)
# otx_fail_rate > 0.5 → OTX 降至 0.01
import json
import logging
import re
import time
from typing import Any, Callable
from crewai import Agent, Task
from config import SKILLS_DIR, SYSTEM_CONSTITUTION, degradation_status, get_llm
from tools.kev_tool import check_cisa_kev
from tools.memory_tool import read_memory, write_memory
from tools.nvd_tool import search_nvd
from tools.otx_tool import search_otx
logger = logging.getLogger("ThreatHunter.intel_fusion")
# ══════════════════════════════════════════════════════════════
# 六維預設權重(skills/intel_fusion.md Step 2)
# ══════════════════════════════════════════════════════════════
DEFAULT_WEIGHTS = {
"cvss": 0.20, # NVD CVSS — 理論嚴重性
"epss": 0.30, # FIRST.org EPSS — 實際利用概率(最重要)
"kev": 0.25, # CISA KEV — 確認在野利用(二元)
"ghsa": 0.10, # GitHub Advisory — 生態系專屬
"attck": 0.10, # MITRE ATT&CK — 攻擊戰術類型
"otx": 0.05, # AlienVault OTX — IoC 情報(可信度較低)
}
SKILL_PATH = SKILLS_DIR / "intel_fusion.md"
# KEV 確認後的最低複合分數(品質紅線:KEV 確認不可低估)
KEV_MIN_COMPOSITE_SCORE = 8.0
# 信心度計算閾值
CONFIDENCE_HIGH_DIMS = 4 # >= 4 個維度有資料 → HIGH
CONFIDENCE_MEDIUM_DIMS = 2 # >= 2 個維度有資料 → MEDIUM
# ══════════════════════════════════════════════════════════════
# 動態加權計算引擎(確定性程式碼)
# ══════════════════════════════════════════════════════════════
def calculate_composite_score(
cvss: float,
epss: float,
in_kev: bool,
ghsa_hits: int,
attack_techniques: int,
otx_count: int,
cve_year: int,
otx_fail_rate: float = 0.0,
) -> tuple[float, dict, str]:
"""
六維動態加權複合分數計算(skills/intel_fusion.md Step 4)。
這是確定性函式,不依賴 LLM。即使 LLM 推理出錯,這個計算不受影響。
權重動態調整規則(SOP Step 2):
cve_year < 2020 → epss_weight = 0.10(老漏洞 EPSS 數據少,重新分配至 cvss)
in_kev == True → epss_weight = 0(KEV 已是最高事實,重新分配至 kev)
otx_fail_rate > 0.5 → otx_weight = 0.01(OTX 降為可選,重新分配至 cvss)
Args:
cvss: CVSS 分數(0.0-10.0)
epss: EPSS 分數(0.0-1.0)
in_kev: 是否在 CISA KEV 清單
ghsa_hits: GHSA 告警命中數
attack_techniques: ATT&CK 技術匹配數(暫時用 0-3 估算)
otx_count: OTX 威脅情報命中數
cve_year: CVE 發布年份(如 2024)
otx_fail_rate: OTX API 失敗率(模組級追蹤)
Returns:
(composite_score, weights_used, confidence)
"""
# ── Step 1:動態調整權重 ─────────────────────────────────
weights = dict(DEFAULT_WEIGHTS)
if in_kev:
# KEV 確認 → EPSS 的「機率預測」已無意義(已確認在野)
surplus = weights["epss"]
weights["epss"] = 0.0
weights["kev"] += surplus # 重新分配給 KEV
logger.info("[INTEL] Weight adjusted: in_kev=True → epss=0.0, kev+=%.2f", surplus)
elif cve_year < 2020:
# 老漏洞 → EPSS 數據稀疏,降低 EPSS 權重
surplus = weights["epss"] - 0.10
weights["epss"] = 0.10
weights["cvss"] += surplus # 重新分配給 CVSS(更可靠)
logger.info("[INTEL] Weight adjusted: cve_year=%d < 2020 → epss=0.10, cvss+=%.2f", cve_year, surplus)
if otx_fail_rate > 0.5:
# OTX 不穩定 → 降低 OTX 權重
surplus = weights["otx"] - 0.01
weights["otx"] = 0.01
weights["cvss"] += surplus
logger.info("[INTEL] Weight adjusted: otx_fail_rate=%.2f → otx=0.01", otx_fail_rate)
# 確保權重總和為 1.0(浮點數精度修正)
total = sum(weights.values())
if abs(total - 1.0) > 0.001:
weights["cvss"] += 1.0 - total
# ── Step 2:各維度分數正規化(統一到 0.0-1.0)────────────
cvss_norm = min(cvss / 10.0, 1.0) # CVSS 0-10 → 0-1
epss_norm = min(max(float(epss), 0.0), 1.0) # 已是 0-1
kev_norm = 1.0 if in_kev else 0.0 # 二元
ghsa_norm = min(ghsa_hits / 5.0, 1.0) # 5+ 個 advisory → 滿分
attck_norm = min(attack_techniques / 3.0, 1.0) # 3+ 種技術 → 滿分
otx_norm = min(otx_count / 10.0, 1.0) # 10+ IoC → 滿分
# ── Step 3:加權計算 + 正規化到 0-10 ──────────────────────
composite_raw = (
cvss_norm * weights["cvss"] +
epss_norm * weights["epss"] +
kev_norm * weights["kev"] +
ghsa_norm * weights["ghsa"] +
attck_norm * weights["attck"] +
otx_norm * weights["otx"]
)
composite_score = round(composite_raw * 10.0, 4)
# ── Step 4:品質紅線(KEV 確認不可低估)───────────────────
if in_kev and composite_score < KEV_MIN_COMPOSITE_SCORE:
logger.warning(
"[INTEL] KEV hit but composite_score=%.2f < %.2f, applying floor",
composite_score, KEV_MIN_COMPOSITE_SCORE,
)
composite_score = KEV_MIN_COMPOSITE_SCORE
# ── Step 5:信心度計算(有多少維度有資料)───────────────────
dims_with_data = sum([
bool(cvss > 0),
bool(epss > 0),
True, # KEV:已查詢(即使 in_kev=False 也算查過)
bool(ghsa_hits > 0),
bool(attack_techniques > 0),
bool(otx_count > 0),
])
if dims_with_data >= CONFIDENCE_HIGH_DIMS:
confidence = "HIGH"
elif dims_with_data >= CONFIDENCE_MEDIUM_DIMS:
confidence = "MEDIUM"
else:
confidence = "NEEDS_VERIFICATION"
return composite_score, weights, confidence
# ══════════════════════════════════════════════════════════════
# Skill SOP 載入
# ══════════════════════════════════════════════════════════════
# Phase 4D: 使用 SkillLoader 熱載入系統
try:
from skills.skill_loader import skill_loader as _skill_loader
_SKILL_LOADER_AVAILABLE = True
logger.info("[IntelFusion] Phase 4D: SkillLoader 啟用 ✓")
except ImportError:
_skill_loader = None
_SKILL_LOADER_AVAILABLE = False
def _load_skill() -> str:
"""載入 Intel Fusion SOP(Phase 4D: SkillLoader 熱載入 + Graceful Degradation)"""
# Phase 4D: SkillLoader 熱載入路徑
if _SKILL_LOADER_AVAILABLE and _skill_loader is not None:
try:
return _skill_loader.load_skill("intel_fusion.md")
except Exception as e:
logger.warning("[IntelFusion] SkillLoader 失敗,回退磁碟讀取: %s", e)
# Fallback: 直接從磁碟讀取
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
if SKILL_PATH.exists():
content = SKILL_PATH.read_text(encoding=encoding).strip()
if content:
logger.info("[OK] Intel Fusion Skill loaded: %d chars", len(content))
return content
except (IOError, UnicodeDecodeError):
continue
logger.warning("[WARN] Intel Fusion Skill file not found, using fallback")
return _FALLBACK_SKILL
_FALLBACK_SKILL = """
# Intel Fusion Agent - Six-Dimension Intelligence Fusion SOP
## Core Work
1. Read API health state with read_memory(intel_fusion).
2. Decide which intelligence dimensions to query for each CVE.
3. Call available tools: search_nvd / check_cisa_kev / search_otx.
4. Use EPSS and GHSA tools when they are available.
5. When KEV is positive, output shortcut_kev: true to notify the Orchestrator.
6. Output six-dimension scoring results as pure JSON.
""".strip()
# ══════════════════════════════════════════════════════════════
# Agent 工廠
# ══════════════════════════════════════════════════════════════
def build_intel_fusion_agent(excluded_models: list[str] | None = None) -> Agent:
"""
建立 Intel Fusion Agent(六維情報融合師)。
可用 Tools:
- search_nvd(NVD CVSS)
- check_cisa_kev(KEV 清單)
- search_otx(OTX 威脅情報)
- fetch_epss_score(EPSS)
- query_ghsa(GHSA)
- read_memory / write_memory(API 健康狀態)
Args:
excluded_models: 要排除的模型名稱列表(429 重試時傳入)
Returns:
CrewAI Agent 實例
"""
skill_content = _load_skill()
# 嘗試載入 EPSS 和 GHSA Tool(可選,失敗時降級)
optional_tools: list = []
try:
from tools.epss_tool import fetch_epss_score
optional_tools.append(fetch_epss_score)
logger.info("[OK] EPSS Tool loaded for Intel Fusion")
except Exception as e:
logger.warning("[WARN] EPSS Tool not available: %s", e)
try:
from tools.ghsa_tool import query_ghsa
optional_tools.append(query_ghsa)
logger.info("[OK] GHSA Tool loaded for Intel Fusion")
except Exception as e:
logger.warning("[WARN] GHSA Tool not available: %s", e)
# Phase 7.5: 加入 search_osv(ecosystem-aware,不會返回 CVE-1999 遠古漏洞)
try:
from tools.osv_tool import search_osv as _search_osv_tool
optional_tools.append(_search_osv_tool)
except Exception as _osv_ex:
logger.warning("[WARN] OSV Tool not available for Intel Fusion: %s", _osv_ex)
core_tools = [search_nvd, check_cisa_kev, search_otx, read_memory, write_memory]
all_tools = core_tools + optional_tools
backstory = f"""You are ThreatHunter's Intelligence Fusion Agent.
Your task is to decide which intelligence dimensions to query, fuse six-dimensional evidence, and output composite risk scores.
=== System Constitution ===
{SYSTEM_CONSTITUTION}
=== Six-Dimension Fusion SOP ===
{skill_content}
=== Available Tools ===
- search_osv: query OSV.dev advisories first; it is ecosystem-aware and avoids ancient unrelated CVEs.
- search_nvd: fallback only when search_osv returns count=0.
- check_cisa_kev: query the CISA KEV catalog; batch CVEs with comma-separated input.
- search_otx: query OTX threat intelligence when CVSS >= 7.0.
{('- fetch_epss_score: query EPSS exploit probability when the item is not in KEV.' + chr(10)) if any(t.name == 'search_epss' for t in optional_tools) else ''}
{('- query_ghsa: query GitHub Advisory Database for Python/npm package advisories.' + chr(10)) if any(t.name == 'search_ghsa' for t in optional_tools) else ''}
- read_memory / write_memory: read and write API health state.
=== Autonomous Decision Rules (must follow) ===
- If in_kev == true, skip EPSS because KEV is already the strongest exploitation signal; output shortcut_kev: true.
- If cve_year < 2020, EPSS data may be sparse; EPSS can be skipped.
- If OTX fails repeatedly, record it in api_health and lower OTX priority.
- Query at least two dimensions; otherwise set confidence = NEEDS_VERIFICATION.
=== Output Format (pure JSON only) ===
{{
"fusion_results": [
{{
"cve_id": "CVE-2024-XXXX",
"composite_score": 8.7,
"dimension_scores": {{
"cvss": 9.8, "epss": 0.97, "kev": true, "ghsa_severity": "CRITICAL",
"attck_technique": "T1190", "otx_threat": "active"
}},
"weights_used": {{"cvss": 0.20, "epss": 0.30, "kev": 0.25, "ghsa": 0.10, "attck": 0.10, "otx": 0.05}},
"confidence": "HIGH",
"dimensions_used": ["nvd", "epss", "kev"],
"shortcut_kev": false
}}
],
"strategy_applied": "standard_2024",
"api_health_summary": {{"nvd": "ok", "epss": "ok", "kev": "ok"}}
}}
"""
llm = get_llm(exclude_models=excluded_models or [])
agent = Agent(
role="Intelligence Fusion Specialist",
goal=(
"Autonomously choose six-dimension intelligence queries, fuse NVD/EPSS/KEV/GHSA/ATT&CK/OTX evidence, "
"output composite risk scores with dimension contributions, and trigger the Small-World shortcut on KEV hits."
),
backstory=backstory,
tools=all_tools,
llm=llm,
verbose=True, # Harness: Observability
max_iter=5, # v3.5: Gemini-3-Flash ~4s/call, 5次NVD/KEV查詢足夠
allow_delegation=False, # Intel Fusion 自己做完,不委派
)
logger.info(
"[OK] Intel Fusion Agent created | tools=%s | max_iter=%d",
[t.name for t in agent.tools], agent.max_iter,
)
return agent
# ══════════════════════════════════════════════════════════════
# Pipeline 執行器
# ══════════════════════════════════════════════════════════════
def run_intel_fusion(
tech_stack_or_cves: str | list,
on_progress: Callable | None = None,
orchestration_ctx: Any = None,
) -> dict:
"""
執行完整的 Intel Fusion Pipeline。
Harness Engineering 多層保障:
Layer 1(Agent):LLM 自主選擇查詢維度 + 執行工具呼叫
Layer 2(程式碼):確定性 calculate_composite_score() 重新計算(防止 LLM 算錯)
Layer 3(Schema):驗證輸出格式 + KEV 命中通知 Orchestrator
Args:
tech_stack_or_cves: 技術堆疊字串 或 CVE ID 列表(Feedback Loop 用)
on_progress: 進度回調(SSE 使用)
orchestration_ctx: OrchestrationContext(用於記錄 KEV 捷徑)
Returns:
fusion_results dict(格式符合 FINAL_PLAN.md §六 的 Scout → Analyst 輸入)
"""
t0 = time.time()
logger.info("[INTEL] Starting Intel Fusion Pipeline...")
if on_progress:
try:
on_progress("intel_fusion", "RUNNING", {"step": "initializing"})
except Exception:
pass
# ── v3.4 準備輸入(輸入類型感知)────────────────────────────
# list[str]:來自 PackageExtractor 的乾淨套件名稱(Path B 程式碼模式,正確路徑)
# str:原始 tech_stack 或 CVE 列表(Path A 套件清單模式)
discovery_context = tech_stack_or_cves if isinstance(tech_stack_or_cves, dict) else {}
cve_list_for_task: list[str] = []
cwe_list_for_task: list[str] = []
package_list_for_task: list[str] = []
if discovery_context:
cve_list_for_task = [
str(item).strip() for item in discovery_context.get("cve_ids", [])
if str(item).strip().startswith(("CVE-", "GHSA-"))
]
cwe_list_for_task = [
str(item).strip().upper() for item in discovery_context.get("cwe_ids", [])
if str(item).strip().upper().startswith("CWE-")
]
package_list_for_task = [
str(item).strip() for item in discovery_context.get("packages", [])
if str(item).strip()
]
input_type = "post_discovery"
input_str = json.dumps({
"cve_ids": cve_list_for_task,
"cwe_ids": cwe_list_for_task,
"packages": package_list_for_task,
}, ensure_ascii=False)
logger.info(
"[INTEL] Input: post_discovery mode | cves=%d cwes=%d packages=%d",
len(cve_list_for_task), len(cwe_list_for_task), len(package_list_for_task),
)
if not cve_list_for_task and not cwe_list_for_task and not package_list_for_task:
logger.warning("[INTEL] Empty discovery context received")
if on_progress:
try:
on_progress("intel_fusion", "COMPLETE", {
"status": "NO_FINDINGS",
"cves_scored": 0,
"message": "No Scout CVEs or Security Guard CWE targets",
"duration_ms": 0,
})
except Exception as exc:
logger.debug("[INTEL] progress callback ignored: %s", exc)
return {
"fusion_results": [],
"strategy_applied": "no_findings",
"api_health_summary": {},
"_no_findings": True,
"_message": "No Scout CVEs or Security Guard CWE targets",
"_duration_ms": 0,
}
if cwe_list_for_task and not cve_list_for_task and not package_list_for_task:
package_list_for_task = cwe_list_for_task
elif isinstance(tech_stack_or_cves, list):
if not tech_stack_or_cves:
# Harness Layer 0:空套件列表 → 結構性降級,不浪費 LLM 呼叫
logger.warning(
"[INTEL] Empty package list received — no 3rd-party packages identified. "
"Returning structured empty result (not a LLM failure)."
)
if on_progress:
try:
on_progress("intel_fusion", "COMPLETE", {
"status": "NO_PACKAGES",
"cves_scored": 0,
"message": "No third-party packages identified in source code",
"duration_ms": 0,
})
except Exception:
pass
return {
"fusion_results": [],
"strategy_applied": "no_packages",
"api_health_summary": {},
"_no_packages": True,
"_message": "No third-party packages identified — only stdlib imports detected",
"_duration_ms": 0,
}
input_str = ", ".join(tech_stack_or_cves)
input_type = "package_list"
package_list_for_task = tech_stack_or_cves
logger.info("[INTEL] Input: package_list mode with %d packages: %s", len(tech_stack_or_cves), tech_stack_or_cves)
else:
input_str = str(tech_stack_or_cves)
input_type = "tech_stack"
package_list_for_task = []
if len(input_str) > 500:
logger.warning(
"[INTEL] WARNING: input_str length=%d (may be raw source code). "
"Expected package names. Use PackageExtractor in main.py.",
len(input_str)
)
# ── 執行 Agent(含 429 重試)──────────────────────────────
MAX_RETRIES = 2
excluded_models: list[str] = []
result: dict = {}
for attempt in range(MAX_RETRIES + 1):
try:
from config import get_current_model_name, mark_model_failed
from crewai import Crew, Process
agent = build_intel_fusion_agent(excluded_models=excluded_models)
# v5.3:根據輸入類型使用不同的 task description(支援 CWE 查詢模式)
if cve_list_for_task:
cve_lines = "\n".join(f" - {cve}" for cve in cve_list_for_task[:30])
cwe_context = ", ".join(cwe_list_for_task[:12]) or "none"
package_context = ", ".join(package_list_for_task[:12]) or "none"
task_desc = (
"Scout and Security Guard already completed discovery. "
"Intel Fusion must only rank and enrich the discovered vulnerabilities.\n\n"
f"Discovered CVE/GHSA IDs:\n{cve_lines}\n\n"
f"Security Guard CWE context: {cwe_context}\n"
f"Package context: {package_context}\n\n"
"For each CVE/GHSA ID:\n"
"1. Call search_nvd(cve_id) for CVSS and description when the ID is a CVE.\n"
"2. Call check_cisa_kev(cve_id) for KEV status.\n"
"3. Call fetch_epss_score(cve_id) if the EPSS tool is available and the item is not in KEV.\n"
"4. Call search_otx(cve_id) for active threat signals when relevant.\n"
"5. Optionally call query_ghsa for GHSA/package advisory context.\n"
"6. Output pure JSON fusion_results for the discovered IDs only.\n\n"
"Hard constraints:\n"
"- Do not invent new CVE IDs.\n"
"- Do not add IDs that were not provided above unless a tool result explicitly aliases them.\n"
"- Use CWE context only as supporting code weakness context, not as fake CVE evidence.\n"
"- Output valid JSON only."
)
elif package_list_for_task:
# 判斷是套件名稱 還是 CWE targets(Security Guard 偵測後傳入)
is_cwe_mode = all(str(p).upper().startswith("CWE-") for p in package_list_for_task)
if is_cwe_mode:
# CWE 模式:用 search_nvd(cwe_id) 查對應真實 CVE,提供佐證
cwe_lines = "\n".join(f" - {cwe}" for cwe in package_list_for_task)
task_desc = (
f"Security Guard detected the following code weakness categories (CWE IDs) in source code:\n\n"
f"CWE categories to investigate:\n{cwe_lines}\n\n"
f"Your task: for each CWE, query NVD for the most relevant real CVEs from recent years "
f"(2018-2024) and retrieve CVSS scores as supporting evidence:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each CWE, call search_nvd(keyword=cwe_id) to query related CVEs.\n"
f" Example: search_nvd('CWE-89') for SQL Injection related CVEs.\n"
f" Example: search_nvd('CWE-502') for Insecure Deserialization related CVEs.\n"
f"3. Select the most representative CVEs, prioritizing highest CVSS and recent year.\n"
f"4. For selected CVEs, call check_cisa_kev to verify KEV status.\n"
f"5. Call write_memory to store API health state.\n"
f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Important notes:\n"
f"- The input is CWE IDs (weakness categories), not package names.\n"
f"- The purpose is to find real CVE/CVSS evidence for Security Guard detections.\n"
f"- Report at least one related CVE per CWE when NVD has data.\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
else:
# 套件模式(原本邏輯):search_osv 優先,search_nvd 為 fallback
pkg_lines = "\n".join(f" - {pkg}" for pkg in package_list_for_task)
task_desc = (
f"Analyze security intelligence for the following third-party packages extracted from source code:\n\n"
f"Package list to investigate:\n{pkg_lines}\n\n"
f"Input type: {input_type} (package-name list)\n\n"
f"You must query every package one by one. Do not skip any package:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each package, call search_osv first for ecosystem-aware CVEs without CVE-1999 noise.\n"
f" If search_osv returns count=0, use search_nvd as fallback.\n"
f"3. Batch call check_cisa_kev to query KEV status.\n"
f"4. If NOT in_kev, call search_otx.\n"
f"5. Call write_memory to store API health state.\n"
f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Important notes:\n"
f"- The items above are package names such as requests or flask, not source code.\n"
f"- Call search_osv for every package, for example search_osv('requests').\n"
f"- Each package may have zero or more CVEs; report the tool results honestly.\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
else:
task_desc = (
f"Analyze intelligence for the following technology stack or CVE list:\n{input_str[:2000]}\n\n"
f"Input type: {input_type}\n\n"
f"You need to:\n"
f"1. First call read_memory(intel_fusion) to get API health state.\n"
f"2. For each package, call search_osv for ecosystem-aware CVEs; use search_nvd only as fallback when empty.\n"
f"3. Batch call check_cisa_kev to query KEV status.\n"
f"4. If NOT in_kev, call search_epss or search_otx.\n"
f"5. For Python packages, call search_ghsa.\n"
f"6. Call write_memory to store API health state.\n"
f"7. Output pure JSON fusion_results using the SOP Step 7 format.\n\n"
f"Absolute prohibitions:\n"
f"- Do not fabricate CVE IDs or EPSS scores.\n"
f"- Do not skip tool calls.\n"
f"- Output pure JSON only."
)
task = Task(
description=task_desc,
expected_output=(
"Pure JSON six-dimension intelligence fusion result, "
"including the fusion_results array and api_health_summary."
),
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True,
)
try:
from checkpoint import recorder as _cp
from config import get_current_model_name as _gcmn
_if_model = _gcmn(agent.llm)
_cp.llm_call("intel_fusion", _if_model, "openrouter", f"attempt={attempt+1}")
except Exception:
_if_model = "unknown"
_t_if = time.time()
crew_result = crew.kickoff()
result_str = str(crew_result).strip()
try:
_cp.llm_result("intel_fusion", _if_model, "SUCCESS",
len(result_str), int((time.time() - _t_if) * 1000),
thinking=result_str[:1000])
except Exception:
pass
# ── 解析 JSON 輸出 ──────────────────────────
# v5.2: 超短輸出保護(< 500 chars 且無 JSON → CrewAI 純文字 forceRun 回覆)
# 例:len=168 "In the Final Answer, do not use JSON..." → 空 fusion_results
MIN_JSON_LEN = 500
if len(result_str) < MIN_JSON_LEN and "{" not in result_str:
logger.warning(
"[INTEL] LLM output too short for JSON (%d chars, no '{'), "
"likely CrewAI forceRun plain-text reply. Returning empty fusion.",
len(result_str)
)
result = {
"fusion_results": [],
"packages_queried": [],
"_degraded": True,
"_reason": f"Intel Fusion plain-text response ({len(result_str)} chars): {result_str[:100]}",
}
break
# v5.1: 超長輸出保護(LLM 輸出 >50k chars 通常是 CrewAI forceRun 觸發)
MAX_RESULT_LEN = 50_000
if len(result_str) > MAX_RESULT_LEN:
logger.warning(
"[INTEL] LLM output too long (%d chars), truncating and extracting JSON",
len(result_str)
)
# 嘗試從末尾 10000 chars 找 JSON(CrewAI 強迫完成時 JSON 通常放最後)
tail = result_str[-10_000:]
# 從中間提取,不截斷 result_str(以免破壞完整 JSON block)
result_str_for_parse = tail
else:
result_str_for_parse = result_str
if "```json" in result_str_for_parse:
result_str_for_parse = result_str_for_parse.split("```json")[1].split("```")[0].strip()
elif "```" in result_str_for_parse:
parts = result_str_for_parse.split("```")
if len(parts) >= 3:
result_str_for_parse = parts[1].strip()
# 若尾部沒有,回頭從完整輸出找
if len(result_str) > MAX_RESULT_LEN and "{" not in result_str_for_parse:
result_str_for_parse = result_str
result = None
try:
result = json.loads(result_str_for_parse)
except json.JSONDecodeError:
# 層 2:非貪婪匹配最後一個完整 {} block
# 用 findall 取所有候選,優先嘗試最後一個(通常是 LLM 真實輸出)
_candidates = re.findall(r'\{[\s\S]+?\}', result_str_for_parse)
if not _candidates and len(result_str) > MAX_RESULT_LEN:
# 若尾部沒找到,掃整個輸出找最大的 JSON block
_candidates = re.findall(r'\{[\s\S]+?\}', result_str)
for _candidate in reversed(_candidates):
try:
result = json.loads(_candidate)
if isinstance(result, dict):
logger.info("[INTEL] JSON extracted from long output (candidate len=%d)", len(_candidate))
break
except json.JSONDecodeError:
continue
if result is None:
# 層 3:無法解析,讓外層 except 捕獲並 graceful degrade
raise ValueError(
f"LLM output is not JSON (len={len(result_str)}): {result_str[:120]}"
)
break # 成功
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_RETRIES:
from config import get_current_model_name, mark_model_failed
try:
current_model = get_current_model_name(agent.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[INTEL] 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_RETRIES, retry_after)
try:
from checkpoint import recorder as _cp2
_cp2.llm_retry("intel_fusion", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
from config import rate_limiter as _rl
_rl.on_429(retry_after=retry_after, caller="intel_fusion") # 最少 30s
continue
except Exception:
pass
# 非 429 或重試超限 → Graceful Degradation
logger.error("[INTEL] Agent failed: %s", e)
degradation_status.degrade("Intel Fusion Agent", str(e))
result = _build_degraded_result(input_str, str(e))
break
# ── Harness Layer 2:程式碼層重新計算複合分數 ────────────
# 即使 LLM 計算錯誤,這一層確保數學正確性
result = _verify_and_recalculate(result)
result = _apply_evidence_type_contract(
result,
direct_cve_ids=set(cve_list_for_task),
cwe_targets=set(cwe_list_for_task),
package_targets=set(package_list_for_task),
)
try:
_recalc_count = sum(1 for f in result.get("fusion_results", []) if f.get("_score_recalculated"))
_cp.harness_check("intel_fusion", "L2", "score_recalculation",
"CORRECTED" if _recalc_count > 0 else "PASS",
details={"recalculated_count": _recalc_count,
"total_fusions": len(result.get("fusion_results", []))})
except Exception:
pass
# ── Harness Layer 3:KEV 捷徑通知 ────────────────────────
if orchestration_ctx is not None:
for fusion in result.get("fusion_results", []):
if fusion.get("shortcut_kev") or fusion.get("dimension_scores", {}).get("kev"):
cve_id = fusion.get("cve_id", "")
if cve_id:
try:
orchestration_ctx.record_kev_hit(cve_id)
logger.warning("[INTEL] KEV shortcut registered for %s", cve_id)
except Exception:
pass
duration_ms = int((time.time() - t0) * 1000)
result["_duration_ms"] = duration_ms
if on_progress:
try:
fusion_count = len(result.get("fusion_results", []))
kev_hits = sum(1 for f in result.get("fusion_results", []) if f.get("shortcut_kev"))
is_degraded = result.get("_degraded", False)
on_progress("intel_fusion", "COMPLETE", {
"status": "DEGRADED" if is_degraded else "SUCCESS",
"fusion_count": fusion_count,
"kev_hits": kev_hits,
"duration_ms": duration_ms,
# DEGRADED 時帶入錯誤訊息,供 server.py on_progress 提取
"_degraded": is_degraded,
"_error": result.get("_error", "") if is_degraded else "",
})
except Exception:
pass
logger.info(
"[INTEL] Pipeline complete in %dms | fusions=%d",
duration_ms, len(result.get("fusion_results", [])),
)
return result
def _apply_evidence_type_contract(
result: dict,
direct_cve_ids: set[str] | None = None,
cwe_targets: set[str] | None = None,
package_targets: set[str] | None = None,
) -> dict:
"""
Intel Fusion evidence boundary.
Direct CVE/GHSA IDs from Scout can enrich package actions. CVEs discovered only
while supporting Security Guard CWE findings are representative evidence and
must not enter package action lists.
"""
direct_ids = {str(item).strip() for item in (direct_cve_ids or set()) if str(item).strip()}
cwes = {str(item).strip().upper() for item in (cwe_targets or set()) if str(item).strip().upper().startswith("CWE-")}
packages = {str(item).strip() for item in (package_targets or set()) if str(item).strip()}
if not result.get("fusion_results"):
result.setdefault("evidence_contract", {
"direct_cve_count": len(direct_ids),
"cwe_support_count": len(cwes),
"package_target_count": len(packages),
"representative_cve_count": 0,
})
return result
representative_count = 0
direct_count = 0
for fusion in result.get("fusion_results", []):
cve_id = str(fusion.get("cve_id", "")).strip()
current_type = str(fusion.get("evidence_type", "")).strip()
if current_type == "representative_cve" or fusion.get("must_not_enter_package_actions"):
evidence_type = "representative_cve"
elif cve_id in direct_ids:
evidence_type = "direct_cve"
elif cwes and cve_id not in direct_ids:
evidence_type = "representative_cve"
else:
evidence_type = "package_cve"
fusion["evidence_type"] = evidence_type
if evidence_type == "representative_cve":
representative_count += 1
fusion["not_directly_observed"] = True
fusion["must_not_enter_package_actions"] = True
fusion.setdefault("finding_source", "cwe_support")
if cwes and not fusion.get("supports_cwe"):
fusion["supports_cwe"] = sorted(cwes)
fusion.setdefault(
"evidence_note",
"Representative CVE for a Security Guard CWE finding; not a directly observed package CVE.",
)
else:
direct_count += 1
fusion["not_directly_observed"] = False
fusion["must_not_enter_package_actions"] = False
fusion.setdefault("finding_source", "package_scan")
if packages and not fusion.get("source_package_evidence"):
fusion["source_package_evidence"] = sorted(packages)
result["evidence_contract"] = {
"direct_cve_count": direct_count,
"cwe_support_count": len(cwes),
"package_target_count": len(packages),
"representative_cve_count": representative_count,
}
return result
def _verify_and_recalculate(result: dict) -> dict:
"""
Harness Layer 2:用確定性程式碼重新計算複合分數。
防止 LLM 計算錯誤或編造數字。
"""
fusion_results = result.get("fusion_results", [])
if not fusion_results:
return result
recalculated = []
for fusion in fusion_results:
try:
dims = fusion.get("dimension_scores", {})
cvss = float(dims.get("cvss", 0.0))
epss = float(dims.get("epss", 0.0)) if dims.get("epss") is not None else 0.0
in_kev = bool(dims.get("kev", False))
ghsa_sev = dims.get("ghsa_severity", "UNKNOWN")
ghsa_hits = {"CRITICAL": 3, "HIGH": 2, "MODERATE": 1, "LOW": 1}.get(ghsa_sev, 0)
attck_tech = 1 if dims.get("attck_technique") else 0
otx_threat = 1 if dims.get("otx_threat") == "active" else 0
# 從 CVE ID 取出年份
cve_id = fusion.get("cve_id", "CVE-2024-0000")
try:
cve_year = int(cve_id.split("-")[1])
except (IndexError, ValueError):
cve_year = 2024
recalculated_score, weights, confidence = calculate_composite_score(
cvss=cvss,
epss=epss,
in_kev=in_kev,
ghsa_hits=ghsa_hits,
attack_techniques=attck_tech,
otx_count=otx_threat,
cve_year=cve_year,
)
# 若 LLM 的分數與程式碼計算差異超過 1.5 → 使用程式碼計算的(更可信)
original_score = float(fusion.get("composite_score", recalculated_score))
if abs(original_score - recalculated_score) > 1.5:
logger.warning(
"[INTEL][VERIFY] Score discrepancy for %s: LLM=%.2f, Code=%.2f → using Code",
cve_id, original_score, recalculated_score,
)
fusion["composite_score"] = recalculated_score
fusion["confidence"] = confidence
fusion["weights_used"] = weights
fusion["_score_recalculated"] = True
else:
# 分數合理,但信心度統一用程式碼計算
fusion["confidence"] = confidence
recalculated.append(fusion)
except Exception as e:
logger.warning("[INTEL][VERIFY] Failed to recalculate for %s: %s", fusion.get("cve_id"), e)
recalculated.append(fusion) # 保留原始值
result["fusion_results"] = recalculated
# Harness Layer 2.5:CVE 年份過濾(最后防線)
# 任何進入 Intel Fusion 的远古 CVE( < 2005)在此一律濾除
CVE_YEAR_MIN = 2005
fresh_fusions = []
ancient_removed = []
for fusion in result["fusion_results"]:
cve_id = fusion.get("cve_id", "")
if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
fresh_fusions.append(fusion)
continue
try:
yr = int(cve_id.split("-")[1])
if yr < CVE_YEAR_MIN:
ancient_removed.append(cve_id)
logger.warning(
"[INTEL HARNESS 2.5] Ancient CVE filtered (year=%d < %d): %s",
yr, CVE_YEAR_MIN, cve_id
)
else:
fresh_fusions.append(fusion)
except (IndexError, ValueError):
fresh_fusions.append(fusion)
if ancient_removed:
result["fusion_results"] = fresh_fusions
result["ancient_cves_filtered"] = ancient_removed
logger.warning(
"[INTEL] Removed %d ancient CVEs from fusion_results: %s",
len(ancient_removed), ancient_removed
)
return result
def _build_degraded_result(input_str: str, error: str) -> dict:
"""
Graceful Degradation:Agent 失敗時的最小生存輸出。
讓 Scout 知道 Intel Fusion 已降級,但不中斷管線。
"""
return {
"fusion_results": [],
"strategy_applied": "degraded",
"api_health_summary": {"nvd": "unknown", "epss": "unknown", "kev": "unknown"},
"_degraded": True,
"_error": error[:200],
"_input": input_str[:100],
}
|