| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import json |
| import logging |
| import re |
| import time |
| from typing import Any, Callable |
|
|
| from crewai import Agent, Task |
|
|
| from config import SKILLS_DIR, SYSTEM_CONSTITUTION, degradation_status, get_llm |
| from tools.kev_tool import check_cisa_kev |
| from tools.memory_tool import read_memory, write_memory |
| from tools.nvd_tool import search_nvd |
| from tools.otx_tool import search_otx |
|
|
| logger = logging.getLogger("ThreatHunter.intel_fusion") |
|
|
| |
| |
| |
|
|
| DEFAULT_WEIGHTS = { |
| "cvss": 0.20, |
| "epss": 0.30, |
| "kev": 0.25, |
| "ghsa": 0.10, |
| "attck": 0.10, |
| "otx": 0.05, |
| } |
|
|
| SKILL_PATH = SKILLS_DIR / "intel_fusion.md" |
|
|
| |
| KEV_MIN_COMPOSITE_SCORE = 8.0 |
|
|
| |
| CONFIDENCE_HIGH_DIMS = 4 |
| CONFIDENCE_MEDIUM_DIMS = 2 |
|
|
|
|
| |
| |
| |
|
|
| def calculate_composite_score( |
| cvss: float, |
| epss: float, |
| in_kev: bool, |
| ghsa_hits: int, |
| attack_techniques: int, |
| otx_count: int, |
| cve_year: int, |
| otx_fail_rate: float = 0.0, |
| ) -> tuple[float, dict, str]: |
| """ |
| 六維動態加權複合分數計算(skills/intel_fusion.md Step 4)。 |
| |
| 這是確定性函式,不依賴 LLM。即使 LLM 推理出錯,這個計算不受影響。 |
| |
| 權重動態調整規則(SOP Step 2): |
| cve_year < 2020 → epss_weight = 0.10(老漏洞 EPSS 數據少,重新分配至 cvss) |
| in_kev == True → epss_weight = 0(KEV 已是最高事實,重新分配至 kev) |
| otx_fail_rate > 0.5 → otx_weight = 0.01(OTX 降為可選,重新分配至 cvss) |
| |
| Args: |
| cvss: CVSS 分數(0.0-10.0) |
| epss: EPSS 分數(0.0-1.0) |
| in_kev: 是否在 CISA KEV 清單 |
| ghsa_hits: GHSA 告警命中數 |
| attack_techniques: ATT&CK 技術匹配數(暫時用 0-3 估算) |
| otx_count: OTX 威脅情報命中數 |
| cve_year: CVE 發布年份(如 2024) |
| otx_fail_rate: OTX API 失敗率(模組級追蹤) |
| |
| Returns: |
| (composite_score, weights_used, confidence) |
| """ |
| |
| weights = dict(DEFAULT_WEIGHTS) |
|
|
| if in_kev: |
| |
| surplus = weights["epss"] |
| weights["epss"] = 0.0 |
| weights["kev"] += surplus |
| logger.info("[INTEL] Weight adjusted: in_kev=True → epss=0.0, kev+=%.2f", surplus) |
|
|
| elif cve_year < 2020: |
| |
| surplus = weights["epss"] - 0.10 |
| weights["epss"] = 0.10 |
| weights["cvss"] += surplus |
| logger.info("[INTEL] Weight adjusted: cve_year=%d < 2020 → epss=0.10, cvss+=%.2f", cve_year, surplus) |
|
|
| if otx_fail_rate > 0.5: |
| |
| surplus = weights["otx"] - 0.01 |
| weights["otx"] = 0.01 |
| weights["cvss"] += surplus |
| logger.info("[INTEL] Weight adjusted: otx_fail_rate=%.2f → otx=0.01", otx_fail_rate) |
|
|
| |
| total = sum(weights.values()) |
| if abs(total - 1.0) > 0.001: |
| weights["cvss"] += 1.0 - total |
|
|
| |
| cvss_norm = min(cvss / 10.0, 1.0) |
| epss_norm = min(max(float(epss), 0.0), 1.0) |
| kev_norm = 1.0 if in_kev else 0.0 |
| ghsa_norm = min(ghsa_hits / 5.0, 1.0) |
| attck_norm = min(attack_techniques / 3.0, 1.0) |
| otx_norm = min(otx_count / 10.0, 1.0) |
|
|
| |
| composite_raw = ( |
| cvss_norm * weights["cvss"] + |
| epss_norm * weights["epss"] + |
| kev_norm * weights["kev"] + |
| ghsa_norm * weights["ghsa"] + |
| attck_norm * weights["attck"] + |
| otx_norm * weights["otx"] |
| ) |
| composite_score = round(composite_raw * 10.0, 4) |
|
|
| |
| if in_kev and composite_score < KEV_MIN_COMPOSITE_SCORE: |
| logger.warning( |
| "[INTEL] KEV hit but composite_score=%.2f < %.2f, applying floor", |
| composite_score, KEV_MIN_COMPOSITE_SCORE, |
| ) |
| composite_score = KEV_MIN_COMPOSITE_SCORE |
|
|
| |
| dims_with_data = sum([ |
| bool(cvss > 0), |
| bool(epss > 0), |
| True, |
| bool(ghsa_hits > 0), |
| bool(attack_techniques > 0), |
| bool(otx_count > 0), |
| ]) |
| if dims_with_data >= CONFIDENCE_HIGH_DIMS: |
| confidence = "HIGH" |
| elif dims_with_data >= CONFIDENCE_MEDIUM_DIMS: |
| confidence = "MEDIUM" |
| else: |
| confidence = "NEEDS_VERIFICATION" |
|
|
| return composite_score, weights, confidence |
|
|
|
|
| |
| |
| |
|
|
| |
| try: |
| from skills.skill_loader import skill_loader as _skill_loader |
| _SKILL_LOADER_AVAILABLE = True |
| logger.info("[IntelFusion] Phase 4D: SkillLoader 啟用 ✓") |
| except ImportError: |
| _skill_loader = None |
| _SKILL_LOADER_AVAILABLE = False |
|
|
|
|
| def _load_skill() -> str: |
| """載入 Intel Fusion SOP(Phase 4D: SkillLoader 熱載入 + Graceful Degradation)""" |
| |
| if _SKILL_LOADER_AVAILABLE and _skill_loader is not None: |
| try: |
| return _skill_loader.load_skill("intel_fusion.md") |
| except Exception as e: |
| logger.warning("[IntelFusion] SkillLoader 失敗,回退磁碟讀取: %s", e) |
|
|
| |
| for encoding in ("utf-8", "utf-8-sig", "latin-1"): |
| try: |
| if SKILL_PATH.exists(): |
| content = SKILL_PATH.read_text(encoding=encoding).strip() |
| if content: |
| logger.info("[OK] Intel Fusion Skill loaded: %d chars", len(content)) |
| return content |
| except (IOError, UnicodeDecodeError): |
| continue |
|
|
| logger.warning("[WARN] Intel Fusion Skill file not found, using fallback") |
| return _FALLBACK_SKILL |
|
|
|
|
| _FALLBACK_SKILL = """ |
| # Intel Fusion Agent - Six-Dimension Intelligence Fusion SOP |
| |
| ## Core Work |
| 1. Read API health state with read_memory(intel_fusion). |
| 2. Decide which intelligence dimensions to query for each CVE. |
| 3. Call available tools: search_nvd / check_cisa_kev / search_otx. |
| 4. Use EPSS and GHSA tools when they are available. |
| 5. When KEV is positive, output shortcut_kev: true to notify the Orchestrator. |
| 6. Output six-dimension scoring results as pure JSON. |
| """.strip() |
|
|
|
|
| |
| |
| |
|
|
| def build_intel_fusion_agent(excluded_models: list[str] | None = None) -> Agent: |
| """ |
| 建立 Intel Fusion Agent(六維情報融合師)。 |
| |
| 可用 Tools: |
| - search_nvd(NVD CVSS) |
| - check_cisa_kev(KEV 清單) |
| - search_otx(OTX 威脅情報) |
| - fetch_epss_score(EPSS) |
| - query_ghsa(GHSA) |
| - read_memory / write_memory(API 健康狀態) |
| |
| Args: |
| excluded_models: 要排除的模型名稱列表(429 重試時傳入) |
| |
| Returns: |
| CrewAI Agent 實例 |
| """ |
| skill_content = _load_skill() |
|
|
| |
| optional_tools: list = [] |
| try: |
| from tools.epss_tool import fetch_epss_score |
| optional_tools.append(fetch_epss_score) |
| logger.info("[OK] EPSS Tool loaded for Intel Fusion") |
| except Exception as e: |
| logger.warning("[WARN] EPSS Tool not available: %s", e) |
|
|
| try: |
| from tools.ghsa_tool import query_ghsa |
| optional_tools.append(query_ghsa) |
| logger.info("[OK] GHSA Tool loaded for Intel Fusion") |
| except Exception as e: |
| logger.warning("[WARN] GHSA Tool not available: %s", e) |
|
|
| |
| try: |
| from tools.osv_tool import search_osv as _search_osv_tool |
| optional_tools.append(_search_osv_tool) |
| except Exception as _osv_ex: |
| logger.warning("[WARN] OSV Tool not available for Intel Fusion: %s", _osv_ex) |
|
|
| core_tools = [search_nvd, check_cisa_kev, search_otx, read_memory, write_memory] |
| all_tools = core_tools + optional_tools |
|
|
| backstory = f"""You are ThreatHunter's Intelligence Fusion Agent. |
| Your task is to decide which intelligence dimensions to query, fuse six-dimensional evidence, and output composite risk scores. |
| |
| === System Constitution === |
| {SYSTEM_CONSTITUTION} |
| |
| === Six-Dimension Fusion SOP === |
| {skill_content} |
| |
| === Available Tools === |
| - search_osv: query OSV.dev advisories first; it is ecosystem-aware and avoids ancient unrelated CVEs. |
| - search_nvd: fallback only when search_osv returns count=0. |
| - check_cisa_kev: query the CISA KEV catalog; batch CVEs with comma-separated input. |
| - search_otx: query OTX threat intelligence when CVSS >= 7.0. |
| {('- fetch_epss_score: query EPSS exploit probability when the item is not in KEV.' + chr(10)) if any(t.name == 'search_epss' for t in optional_tools) else ''} |
| {('- query_ghsa: query GitHub Advisory Database for Python/npm package advisories.' + chr(10)) if any(t.name == 'search_ghsa' for t in optional_tools) else ''} |
| - read_memory / write_memory: read and write API health state. |
| |
| === Autonomous Decision Rules (must follow) === |
| - If in_kev == true, skip EPSS because KEV is already the strongest exploitation signal; output shortcut_kev: true. |
| - If cve_year < 2020, EPSS data may be sparse; EPSS can be skipped. |
| - If OTX fails repeatedly, record it in api_health and lower OTX priority. |
| - Query at least two dimensions; otherwise set confidence = NEEDS_VERIFICATION. |
| |
| === Output Format (pure JSON only) === |
| {{ |
| "fusion_results": [ |
| {{ |
| "cve_id": "CVE-2024-XXXX", |
| "composite_score": 8.7, |
| "dimension_scores": {{ |
| "cvss": 9.8, "epss": 0.97, "kev": true, "ghsa_severity": "CRITICAL", |
| "attck_technique": "T1190", "otx_threat": "active" |
| }}, |
| "weights_used": {{"cvss": 0.20, "epss": 0.30, "kev": 0.25, "ghsa": 0.10, "attck": 0.10, "otx": 0.05}}, |
| "confidence": "HIGH", |
| "dimensions_used": ["nvd", "epss", "kev"], |
| "shortcut_kev": false |
| }} |
| ], |
| "strategy_applied": "standard_2024", |
| "api_health_summary": {{"nvd": "ok", "epss": "ok", "kev": "ok"}} |
| }} |
| """ |
|
|
| llm = get_llm(exclude_models=excluded_models or []) |
| agent = Agent( |
| role="Intelligence Fusion Specialist", |
| goal=( |
| "Autonomously choose six-dimension intelligence queries, fuse NVD/EPSS/KEV/GHSA/ATT&CK/OTX evidence, " |
| "output composite risk scores with dimension contributions, and trigger the Small-World shortcut on KEV hits." |
| ), |
| backstory=backstory, |
| tools=all_tools, |
| llm=llm, |
| verbose=True, |
| max_iter=5, |
| allow_delegation=False, |
| ) |
|
|
| logger.info( |
| "[OK] Intel Fusion Agent created | tools=%s | max_iter=%d", |
| [t.name for t in agent.tools], agent.max_iter, |
| ) |
| return agent |
|
|
|
|
| |
| |
| |
|
|
| def run_intel_fusion( |
| tech_stack_or_cves: str | list, |
| on_progress: Callable | None = None, |
| orchestration_ctx: Any = None, |
| ) -> dict: |
| """ |
| 執行完整的 Intel Fusion Pipeline。 |
| |
| Harness Engineering 多層保障: |
| Layer 1(Agent):LLM 自主選擇查詢維度 + 執行工具呼叫 |
| Layer 2(程式碼):確定性 calculate_composite_score() 重新計算(防止 LLM 算錯) |
| Layer 3(Schema):驗證輸出格式 + KEV 命中通知 Orchestrator |
| |
| Args: |
| tech_stack_or_cves: 技術堆疊字串 或 CVE ID 列表(Feedback Loop 用) |
| on_progress: 進度回調(SSE 使用) |
| orchestration_ctx: OrchestrationContext(用於記錄 KEV 捷徑) |
| |
| Returns: |
| fusion_results dict(格式符合 FINAL_PLAN.md §六 的 Scout → Analyst 輸入) |
| """ |
| t0 = time.time() |
| logger.info("[INTEL] Starting Intel Fusion Pipeline...") |
|
|
| if on_progress: |
| try: |
| on_progress("intel_fusion", "RUNNING", {"step": "initializing"}) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| discovery_context = tech_stack_or_cves if isinstance(tech_stack_or_cves, dict) else {} |
| cve_list_for_task: list[str] = [] |
| cwe_list_for_task: list[str] = [] |
| package_list_for_task: list[str] = [] |
|
|
| if discovery_context: |
| cve_list_for_task = [ |
| str(item).strip() for item in discovery_context.get("cve_ids", []) |
| if str(item).strip().startswith(("CVE-", "GHSA-")) |
| ] |
| cwe_list_for_task = [ |
| str(item).strip().upper() for item in discovery_context.get("cwe_ids", []) |
| if str(item).strip().upper().startswith("CWE-") |
| ] |
| package_list_for_task = [ |
| str(item).strip() for item in discovery_context.get("packages", []) |
| if str(item).strip() |
| ] |
| input_type = "post_discovery" |
| input_str = json.dumps({ |
| "cve_ids": cve_list_for_task, |
| "cwe_ids": cwe_list_for_task, |
| "packages": package_list_for_task, |
| }, ensure_ascii=False) |
| logger.info( |
| "[INTEL] Input: post_discovery mode | cves=%d cwes=%d packages=%d", |
| len(cve_list_for_task), len(cwe_list_for_task), len(package_list_for_task), |
| ) |
| if not cve_list_for_task and not cwe_list_for_task and not package_list_for_task: |
| logger.warning("[INTEL] Empty discovery context received") |
| if on_progress: |
| try: |
| on_progress("intel_fusion", "COMPLETE", { |
| "status": "NO_FINDINGS", |
| "cves_scored": 0, |
| "message": "No Scout CVEs or Security Guard CWE targets", |
| "duration_ms": 0, |
| }) |
| except Exception as exc: |
| logger.debug("[INTEL] progress callback ignored: %s", exc) |
| return { |
| "fusion_results": [], |
| "strategy_applied": "no_findings", |
| "api_health_summary": {}, |
| "_no_findings": True, |
| "_message": "No Scout CVEs or Security Guard CWE targets", |
| "_duration_ms": 0, |
| } |
| if cwe_list_for_task and not cve_list_for_task and not package_list_for_task: |
| package_list_for_task = cwe_list_for_task |
| elif isinstance(tech_stack_or_cves, list): |
| if not tech_stack_or_cves: |
| |
| logger.warning( |
| "[INTEL] Empty package list received — no 3rd-party packages identified. " |
| "Returning structured empty result (not a LLM failure)." |
| ) |
| if on_progress: |
| try: |
| on_progress("intel_fusion", "COMPLETE", { |
| "status": "NO_PACKAGES", |
| "cves_scored": 0, |
| "message": "No third-party packages identified in source code", |
| "duration_ms": 0, |
| }) |
| except Exception: |
| pass |
| return { |
| "fusion_results": [], |
| "strategy_applied": "no_packages", |
| "api_health_summary": {}, |
| "_no_packages": True, |
| "_message": "No third-party packages identified — only stdlib imports detected", |
| "_duration_ms": 0, |
| } |
|
|
| input_str = ", ".join(tech_stack_or_cves) |
| input_type = "package_list" |
| package_list_for_task = tech_stack_or_cves |
| logger.info("[INTEL] Input: package_list mode with %d packages: %s", len(tech_stack_or_cves), tech_stack_or_cves) |
| else: |
| input_str = str(tech_stack_or_cves) |
| input_type = "tech_stack" |
| package_list_for_task = [] |
| if len(input_str) > 500: |
| logger.warning( |
| "[INTEL] WARNING: input_str length=%d (may be raw source code). " |
| "Expected package names. Use PackageExtractor in main.py.", |
| len(input_str) |
| ) |
|
|
| |
| MAX_RETRIES = 2 |
| excluded_models: list[str] = [] |
| result: dict = {} |
|
|
| for attempt in range(MAX_RETRIES + 1): |
| try: |
| from config import get_current_model_name, mark_model_failed |
| from crewai import Crew, Process |
|
|
| agent = build_intel_fusion_agent(excluded_models=excluded_models) |
|
|
| |
| if cve_list_for_task: |
| cve_lines = "\n".join(f" - {cve}" for cve in cve_list_for_task[:30]) |
| cwe_context = ", ".join(cwe_list_for_task[:12]) or "none" |
| package_context = ", ".join(package_list_for_task[:12]) or "none" |
| task_desc = ( |
| "Scout and Security Guard already completed discovery. " |
| "Intel Fusion must only rank and enrich the discovered vulnerabilities.\n\n" |
| f"Discovered CVE/GHSA IDs:\n{cve_lines}\n\n" |
| f"Security Guard CWE context: {cwe_context}\n" |
| f"Package context: {package_context}\n\n" |
| "For each CVE/GHSA ID:\n" |
| "1. Call search_nvd(cve_id) for CVSS and description when the ID is a CVE.\n" |
| "2. Call check_cisa_kev(cve_id) for KEV status.\n" |
| "3. Call fetch_epss_score(cve_id) if the EPSS tool is available and the item is not in KEV.\n" |
| "4. Call search_otx(cve_id) for active threat signals when relevant.\n" |
| "5. Optionally call query_ghsa for GHSA/package advisory context.\n" |
| "6. Output pure JSON fusion_results for the discovered IDs only.\n\n" |
| "Hard constraints:\n" |
| "- Do not invent new CVE IDs.\n" |
| "- Do not add IDs that were not provided above unless a tool result explicitly aliases them.\n" |
| "- Use CWE context only as supporting code weakness context, not as fake CVE evidence.\n" |
| "- Output valid JSON only." |
| ) |
| elif package_list_for_task: |
| |
| is_cwe_mode = all(str(p).upper().startswith("CWE-") for p in package_list_for_task) |
|
|
| if is_cwe_mode: |
| |
| cwe_lines = "\n".join(f" - {cwe}" for cwe in package_list_for_task) |
| task_desc = ( |
| f"Security Guard detected the following code weakness categories (CWE IDs) in source code:\n\n" |
| f"CWE categories to investigate:\n{cwe_lines}\n\n" |
| f"Your task: for each CWE, query NVD for the most relevant real CVEs from recent years " |
| f"(2018-2024) and retrieve CVSS scores as supporting evidence:\n" |
| f"1. First call read_memory(intel_fusion) to get API health state.\n" |
| f"2. For each CWE, call search_nvd(keyword=cwe_id) to query related CVEs.\n" |
| f" Example: search_nvd('CWE-89') for SQL Injection related CVEs.\n" |
| f" Example: search_nvd('CWE-502') for Insecure Deserialization related CVEs.\n" |
| f"3. Select the most representative CVEs, prioritizing highest CVSS and recent year.\n" |
| f"4. For selected CVEs, call check_cisa_kev to verify KEV status.\n" |
| f"5. Call write_memory to store API health state.\n" |
| f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n" |
| f"Important notes:\n" |
| f"- The input is CWE IDs (weakness categories), not package names.\n" |
| f"- The purpose is to find real CVE/CVSS evidence for Security Guard detections.\n" |
| f"- Report at least one related CVE per CWE when NVD has data.\n" |
| f"Absolute prohibitions:\n" |
| f"- Do not fabricate CVE IDs or EPSS scores.\n" |
| f"- Do not skip tool calls.\n" |
| f"- Output pure JSON only." |
| ) |
| else: |
| |
| pkg_lines = "\n".join(f" - {pkg}" for pkg in package_list_for_task) |
| task_desc = ( |
| f"Analyze security intelligence for the following third-party packages extracted from source code:\n\n" |
| f"Package list to investigate:\n{pkg_lines}\n\n" |
| f"Input type: {input_type} (package-name list)\n\n" |
| f"You must query every package one by one. Do not skip any package:\n" |
| f"1. First call read_memory(intel_fusion) to get API health state.\n" |
| f"2. For each package, call search_osv first for ecosystem-aware CVEs without CVE-1999 noise.\n" |
| f" If search_osv returns count=0, use search_nvd as fallback.\n" |
| f"3. Batch call check_cisa_kev to query KEV status.\n" |
| f"4. If NOT in_kev, call search_otx.\n" |
| f"5. Call write_memory to store API health state.\n" |
| f"6. Output pure JSON fusion_results using the SOP Step 7 format.\n\n" |
| f"Important notes:\n" |
| f"- The items above are package names such as requests or flask, not source code.\n" |
| f"- Call search_osv for every package, for example search_osv('requests').\n" |
| f"- Each package may have zero or more CVEs; report the tool results honestly.\n" |
| f"Absolute prohibitions:\n" |
| f"- Do not fabricate CVE IDs or EPSS scores.\n" |
| f"- Do not skip tool calls.\n" |
| f"- Output pure JSON only." |
| ) |
| else: |
| task_desc = ( |
| f"Analyze intelligence for the following technology stack or CVE list:\n{input_str[:2000]}\n\n" |
| f"Input type: {input_type}\n\n" |
| f"You need to:\n" |
| f"1. First call read_memory(intel_fusion) to get API health state.\n" |
| f"2. For each package, call search_osv for ecosystem-aware CVEs; use search_nvd only as fallback when empty.\n" |
| f"3. Batch call check_cisa_kev to query KEV status.\n" |
| f"4. If NOT in_kev, call search_epss or search_otx.\n" |
| f"5. For Python packages, call search_ghsa.\n" |
| f"6. Call write_memory to store API health state.\n" |
| f"7. Output pure JSON fusion_results using the SOP Step 7 format.\n\n" |
| f"Absolute prohibitions:\n" |
| f"- Do not fabricate CVE IDs or EPSS scores.\n" |
| f"- Do not skip tool calls.\n" |
| f"- Output pure JSON only." |
| ) |
|
|
| task = Task( |
| description=task_desc, |
| expected_output=( |
| "Pure JSON six-dimension intelligence fusion result, " |
| "including the fusion_results array and api_health_summary." |
| ), |
| agent=agent, |
| ) |
|
|
| crew = Crew( |
| agents=[agent], |
| tasks=[task], |
| process=Process.sequential, |
| verbose=True, |
| ) |
| try: |
| from checkpoint import recorder as _cp |
| from config import get_current_model_name as _gcmn |
| _if_model = _gcmn(agent.llm) |
| _cp.llm_call("intel_fusion", _if_model, "openrouter", f"attempt={attempt+1}") |
| except Exception: |
| _if_model = "unknown" |
| _t_if = time.time() |
|
|
| crew_result = crew.kickoff() |
| result_str = str(crew_result).strip() |
|
|
| try: |
| _cp.llm_result("intel_fusion", _if_model, "SUCCESS", |
| len(result_str), int((time.time() - _t_if) * 1000), |
| thinking=result_str[:1000]) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| MIN_JSON_LEN = 500 |
| if len(result_str) < MIN_JSON_LEN and "{" not in result_str: |
| logger.warning( |
| "[INTEL] LLM output too short for JSON (%d chars, no '{'), " |
| "likely CrewAI forceRun plain-text reply. Returning empty fusion.", |
| len(result_str) |
| ) |
| result = { |
| "fusion_results": [], |
| "packages_queried": [], |
| "_degraded": True, |
| "_reason": f"Intel Fusion plain-text response ({len(result_str)} chars): {result_str[:100]}", |
| } |
| break |
|
|
| |
| MAX_RESULT_LEN = 50_000 |
| if len(result_str) > MAX_RESULT_LEN: |
| logger.warning( |
| "[INTEL] LLM output too long (%d chars), truncating and extracting JSON", |
| len(result_str) |
| ) |
| |
| tail = result_str[-10_000:] |
| |
| result_str_for_parse = tail |
| else: |
| result_str_for_parse = result_str |
|
|
| if "```json" in result_str_for_parse: |
| result_str_for_parse = result_str_for_parse.split("```json")[1].split("```")[0].strip() |
| elif "```" in result_str_for_parse: |
| parts = result_str_for_parse.split("```") |
| if len(parts) >= 3: |
| result_str_for_parse = parts[1].strip() |
|
|
| |
| if len(result_str) > MAX_RESULT_LEN and "{" not in result_str_for_parse: |
| result_str_for_parse = result_str |
|
|
| result = None |
| try: |
| result = json.loads(result_str_for_parse) |
| except json.JSONDecodeError: |
| |
| |
| _candidates = re.findall(r'\{[\s\S]+?\}', result_str_for_parse) |
| if not _candidates and len(result_str) > MAX_RESULT_LEN: |
| |
| _candidates = re.findall(r'\{[\s\S]+?\}', result_str) |
| for _candidate in reversed(_candidates): |
| try: |
| result = json.loads(_candidate) |
| if isinstance(result, dict): |
| logger.info("[INTEL] JSON extracted from long output (candidate len=%d)", len(_candidate)) |
| break |
| except json.JSONDecodeError: |
| continue |
| if result is None: |
| |
| raise ValueError( |
| f"LLM output is not JSON (len={len(result_str)}): {result_str[:120]}" |
| ) |
| break |
|
|
| except Exception as e: |
| error_str = str(e) |
| if "429" in error_str and attempt < MAX_RETRIES: |
| from config import get_current_model_name, mark_model_failed |
| try: |
| current_model = get_current_model_name(agent.llm) |
| mark_model_failed(current_model) |
| excluded_models.append(current_model) |
| import re as _re |
| _m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE) |
| retry_after = float(_m.group(1)) if _m else 0.0 |
| logger.warning("[INTEL] 429 on %s (attempt %d/%d), api_retry_after=%.0fs", |
| current_model, attempt + 1, MAX_RETRIES, retry_after) |
| try: |
| from checkpoint import recorder as _cp2 |
| _cp2.llm_retry("intel_fusion", current_model, error_str[:200], |
| attempt + 1, "next_in_waterfall") |
| except Exception: |
| pass |
| from config import rate_limiter as _rl |
| _rl.on_429(retry_after=retry_after, caller="intel_fusion") |
| continue |
| except Exception: |
| pass |
|
|
| |
| logger.error("[INTEL] Agent failed: %s", e) |
| degradation_status.degrade("Intel Fusion Agent", str(e)) |
| result = _build_degraded_result(input_str, str(e)) |
| break |
|
|
| |
| |
| result = _verify_and_recalculate(result) |
| result = _apply_evidence_type_contract( |
| result, |
| direct_cve_ids=set(cve_list_for_task), |
| cwe_targets=set(cwe_list_for_task), |
| package_targets=set(package_list_for_task), |
| ) |
| try: |
| _recalc_count = sum(1 for f in result.get("fusion_results", []) if f.get("_score_recalculated")) |
| _cp.harness_check("intel_fusion", "L2", "score_recalculation", |
| "CORRECTED" if _recalc_count > 0 else "PASS", |
| details={"recalculated_count": _recalc_count, |
| "total_fusions": len(result.get("fusion_results", []))}) |
| except Exception: |
| pass |
|
|
| |
| if orchestration_ctx is not None: |
| for fusion in result.get("fusion_results", []): |
| if fusion.get("shortcut_kev") or fusion.get("dimension_scores", {}).get("kev"): |
| cve_id = fusion.get("cve_id", "") |
| if cve_id: |
| try: |
| orchestration_ctx.record_kev_hit(cve_id) |
| logger.warning("[INTEL] KEV shortcut registered for %s", cve_id) |
| except Exception: |
| pass |
|
|
| duration_ms = int((time.time() - t0) * 1000) |
| result["_duration_ms"] = duration_ms |
|
|
| if on_progress: |
| try: |
| fusion_count = len(result.get("fusion_results", [])) |
| kev_hits = sum(1 for f in result.get("fusion_results", []) if f.get("shortcut_kev")) |
| is_degraded = result.get("_degraded", False) |
| on_progress("intel_fusion", "COMPLETE", { |
| "status": "DEGRADED" if is_degraded else "SUCCESS", |
| "fusion_count": fusion_count, |
| "kev_hits": kev_hits, |
| "duration_ms": duration_ms, |
| |
| "_degraded": is_degraded, |
| "_error": result.get("_error", "") if is_degraded else "", |
| }) |
| except Exception: |
| pass |
|
|
| logger.info( |
| "[INTEL] Pipeline complete in %dms | fusions=%d", |
| duration_ms, len(result.get("fusion_results", [])), |
| ) |
| return result |
|
|
|
|
| def _apply_evidence_type_contract( |
| result: dict, |
| direct_cve_ids: set[str] | None = None, |
| cwe_targets: set[str] | None = None, |
| package_targets: set[str] | None = None, |
| ) -> dict: |
| """ |
| Intel Fusion evidence boundary. |
| |
| Direct CVE/GHSA IDs from Scout can enrich package actions. CVEs discovered only |
| while supporting Security Guard CWE findings are representative evidence and |
| must not enter package action lists. |
| """ |
| direct_ids = {str(item).strip() for item in (direct_cve_ids or set()) if str(item).strip()} |
| cwes = {str(item).strip().upper() for item in (cwe_targets or set()) if str(item).strip().upper().startswith("CWE-")} |
| packages = {str(item).strip() for item in (package_targets or set()) if str(item).strip()} |
|
|
| if not result.get("fusion_results"): |
| result.setdefault("evidence_contract", { |
| "direct_cve_count": len(direct_ids), |
| "cwe_support_count": len(cwes), |
| "package_target_count": len(packages), |
| "representative_cve_count": 0, |
| }) |
| return result |
|
|
| representative_count = 0 |
| direct_count = 0 |
| for fusion in result.get("fusion_results", []): |
| cve_id = str(fusion.get("cve_id", "")).strip() |
| current_type = str(fusion.get("evidence_type", "")).strip() |
|
|
| if current_type == "representative_cve" or fusion.get("must_not_enter_package_actions"): |
| evidence_type = "representative_cve" |
| elif cve_id in direct_ids: |
| evidence_type = "direct_cve" |
| elif cwes and cve_id not in direct_ids: |
| evidence_type = "representative_cve" |
| else: |
| evidence_type = "package_cve" |
|
|
| fusion["evidence_type"] = evidence_type |
| if evidence_type == "representative_cve": |
| representative_count += 1 |
| fusion["not_directly_observed"] = True |
| fusion["must_not_enter_package_actions"] = True |
| fusion.setdefault("finding_source", "cwe_support") |
| if cwes and not fusion.get("supports_cwe"): |
| fusion["supports_cwe"] = sorted(cwes) |
| fusion.setdefault( |
| "evidence_note", |
| "Representative CVE for a Security Guard CWE finding; not a directly observed package CVE.", |
| ) |
| else: |
| direct_count += 1 |
| fusion["not_directly_observed"] = False |
| fusion["must_not_enter_package_actions"] = False |
| fusion.setdefault("finding_source", "package_scan") |
| if packages and not fusion.get("source_package_evidence"): |
| fusion["source_package_evidence"] = sorted(packages) |
|
|
| result["evidence_contract"] = { |
| "direct_cve_count": direct_count, |
| "cwe_support_count": len(cwes), |
| "package_target_count": len(packages), |
| "representative_cve_count": representative_count, |
| } |
| return result |
|
|
|
|
| def _verify_and_recalculate(result: dict) -> dict: |
| """ |
| Harness Layer 2:用確定性程式碼重新計算複合分數。 |
| 防止 LLM 計算錯誤或編造數字。 |
| """ |
| fusion_results = result.get("fusion_results", []) |
| if not fusion_results: |
| return result |
|
|
| recalculated = [] |
| for fusion in fusion_results: |
| try: |
| dims = fusion.get("dimension_scores", {}) |
| cvss = float(dims.get("cvss", 0.0)) |
| epss = float(dims.get("epss", 0.0)) if dims.get("epss") is not None else 0.0 |
| in_kev = bool(dims.get("kev", False)) |
| ghsa_sev = dims.get("ghsa_severity", "UNKNOWN") |
| ghsa_hits = {"CRITICAL": 3, "HIGH": 2, "MODERATE": 1, "LOW": 1}.get(ghsa_sev, 0) |
| attck_tech = 1 if dims.get("attck_technique") else 0 |
| otx_threat = 1 if dims.get("otx_threat") == "active" else 0 |
|
|
| |
| cve_id = fusion.get("cve_id", "CVE-2024-0000") |
| try: |
| cve_year = int(cve_id.split("-")[1]) |
| except (IndexError, ValueError): |
| cve_year = 2024 |
|
|
| recalculated_score, weights, confidence = calculate_composite_score( |
| cvss=cvss, |
| epss=epss, |
| in_kev=in_kev, |
| ghsa_hits=ghsa_hits, |
| attack_techniques=attck_tech, |
| otx_count=otx_threat, |
| cve_year=cve_year, |
| ) |
|
|
| |
| original_score = float(fusion.get("composite_score", recalculated_score)) |
| if abs(original_score - recalculated_score) > 1.5: |
| logger.warning( |
| "[INTEL][VERIFY] Score discrepancy for %s: LLM=%.2f, Code=%.2f → using Code", |
| cve_id, original_score, recalculated_score, |
| ) |
| fusion["composite_score"] = recalculated_score |
| fusion["confidence"] = confidence |
| fusion["weights_used"] = weights |
| fusion["_score_recalculated"] = True |
| else: |
| |
| fusion["confidence"] = confidence |
|
|
| recalculated.append(fusion) |
|
|
| except Exception as e: |
| logger.warning("[INTEL][VERIFY] Failed to recalculate for %s: %s", fusion.get("cve_id"), e) |
| recalculated.append(fusion) |
|
|
| result["fusion_results"] = recalculated |
|
|
| |
| |
| CVE_YEAR_MIN = 2005 |
| fresh_fusions = [] |
| ancient_removed = [] |
| for fusion in result["fusion_results"]: |
| cve_id = fusion.get("cve_id", "") |
| if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"): |
| fresh_fusions.append(fusion) |
| continue |
| try: |
| yr = int(cve_id.split("-")[1]) |
| if yr < CVE_YEAR_MIN: |
| ancient_removed.append(cve_id) |
| logger.warning( |
| "[INTEL HARNESS 2.5] Ancient CVE filtered (year=%d < %d): %s", |
| yr, CVE_YEAR_MIN, cve_id |
| ) |
| else: |
| fresh_fusions.append(fusion) |
| except (IndexError, ValueError): |
| fresh_fusions.append(fusion) |
|
|
| if ancient_removed: |
| result["fusion_results"] = fresh_fusions |
| result["ancient_cves_filtered"] = ancient_removed |
| logger.warning( |
| "[INTEL] Removed %d ancient CVEs from fusion_results: %s", |
| len(ancient_removed), ancient_removed |
| ) |
|
|
| return result |
|
|
|
|
| def _build_degraded_result(input_str: str, error: str) -> dict: |
| """ |
| Graceful Degradation:Agent 失敗時的最小生存輸出。 |
| 讓 Scout 知道 Intel Fusion 已降級,但不中斷管線。 |
| """ |
| return { |
| "fusion_results": [], |
| "strategy_applied": "degraded", |
| "api_health_summary": {"nvd": "unknown", "epss": "unknown", "kev": "unknown"}, |
| "_degraded": True, |
| "_error": error[:200], |
| "_input": input_str[:100], |
| } |
|
|
|
|
|
|