# agents/scout.py # 功能:Scout Agent 定義 — 威脅情報偵察員 # Harness 支柱:Constraints(系統憲法 + Skill SOP)+ Observability(verbose=True) # 擁有者:成員 B(Scout Agent Pipeline) # # 使用方式: # from agents.scout import create_scout_agent # # 架構定位: # Pipeline 的第一環 — 收集情報 → 輸出 JSON → Analyst 接收 # Agent = Tool(手)+ Skill(腦)+ Constitution(法) import os import logging import time from typing import Any, TYPE_CHECKING import requests from config import get_llm # LLM 與 Tool 皆延遲初始化,避免純 helper / 測試 import 路徑觸發 CrewAI 副作用。 logger = logging.getLogger("ThreatHunter") if TYPE_CHECKING: from crewai import Agent # ══════════════════════════════════════════════════════════════ # Skill 載入(Phase 4D:使用 SkillLoader 熱載入系統) # ══════════════════════════════════════════════════════════════ # ====================================================================== # v3.7: Path-Aware Skill Map # 每種 input_type 對應一份 Skill SOP # ====================================================================== SKILL_MAP: dict[str, str] = { "pkg": "threat_intel.md", # Path A: package CVE scan "code": "source_code_audit.md", # Path B-code: source code review "injection": "ai_security_audit.md", # Path B-inject: AI security "config": "config_audit.md", # Path C: config file audit } # 專案根目錄(agents/ 的上一層) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SKILL_PATH = os.path.join(PROJECT_ROOT, "skills", "threat_intel.md") # default fallback # Phase 4D: 使用 SkillLoader 熱載入系統 try: from skills.skill_loader import skill_loader as _skill_loader _SKILL_LOADER_AVAILABLE = True logger.info("[Scout] Phase 4D: SkillLoader 啟用 ✓") except ImportError: _skill_loader = None _SKILL_LOADER_AVAILABLE = False logger.warning("[Scout] Phase 4D: SkillLoader 不可用,使用內建 _load_skill") # ── GHSA Severity 解析輔助(Phase 7.5)────────────────────── # OSV vuln_dict 中的 database_specific.severity 存有 GitHub Advisory 嚴重度 # 直接解析此欄位填補 Intel Fusion 的 GHSA 維度(10% 權重) # 來源:https://docs.github.com/en/graphql/reference/enums#securityadvisoryidentifiertype def _extract_ghsa_severity_from_osv(vuln_dict: dict) -> str: """ 從 OSV vuln_dict 解析 GHSA severity。 OSV 的 database_specific 欄位: { "severity": "HIGH", ← GitHub Advisory severity "cvss": {...} } また、vuln["osv_id"].startswith("GHSA-") 也是 GHSA 直接來源。 Returns: "CRITICAL" | "HIGH" | "MODERATE" | "MEDIUM" | "LOW" | "UNKNOWN" """ # 優先從已解析的欄位取(如果 _parse_osv_vuln 已填入) if vuln_dict.get("ghsa_severity"): return vuln_dict["ghsa_severity"] # 從原始 severity 欄位取 sev = vuln_dict.get("severity", "") if sev in ("CRITICAL", "HIGH", "MODERATE", "MEDIUM", "LOW"): return sev return "UNKNOWN" # ───────────────────────────────────────────────────────────── def _severity_from_cvss(cvss_score: float) -> str: """將 CVSS 分數轉成標準 severity 字串。""" if cvss_score >= 9.0: return "CRITICAL" if cvss_score >= 7.0: return "HIGH" if cvss_score >= 4.0: return "MEDIUM" return "LOW" def _summarize_intel_fusion_for_task(intel_fusion_result: dict | None) -> str: """將 Intel Fusion 結果壓縮成給 Scout 任務描述使用的摘要。""" if not intel_fusion_result: return "" fusion_results = intel_fusion_result.get("fusion_results", []) if not fusion_results: return "" lines: list[str] = [ "Layer 1 Intel Fusion evidence is available.", "Reuse this enrichment instead of re-querying EPSS or OTX.", "Use OSV/NVD only for CVE discovery, verification, or missing-package fallback.", "Intel Fusion evidence:", ] for fusion in fusion_results[:8]: dims = fusion.get("dimension_scores", {}) cve_id = fusion.get("cve_id", "UNKNOWN") score = fusion.get("composite_score", "n/a") kev = bool(dims.get("kev", False)) epss = dims.get("epss", "n/a") ghsa = dims.get("ghsa_severity", "UNKNOWN") otx = dims.get("otx_threat", "unknown") lines.append( f"- {cve_id}: composite={score}, kev={kev}, epss={epss}, ghsa={ghsa}, otx={otx}" ) return "\n".join(lines) def _merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]: """把 Intel Fusion 的富化證據併入 Scout 最終漏洞清單。""" if not intel_fusion_result: return output fusion_results = intel_fusion_result.get("fusion_results", []) if not fusion_results: return output representative_fusions: list[dict[str, Any]] = [] fusion_by_cve: dict[str, dict[str, Any]] = {} for fusion in fusion_results: cve_id = str(fusion.get("cve_id", "")).strip() is_representative = ( fusion.get("evidence_type") == "representative_cve" or bool(fusion.get("must_not_enter_package_actions")) or bool(fusion.get("not_directly_observed")) ) if is_representative: representative_fusions.append(fusion) continue if cve_id: fusion_by_cve[cve_id] = fusion if not fusion_by_cve: if representative_fusions: output["_intel_fusion_applied"] = { "merged_existing": 0, "injected_missing": 0, "fusion_count": 0, "representative_cves_skipped": len(representative_fusions), } output["representative_cve_evidence"] = representative_fusions return output vulnerabilities = output.setdefault("vulnerabilities", []) seen_cves = {str(v.get("cve_id", "")).strip() for v in vulnerabilities} merged_count = 0 injected_count = 0 for vuln in vulnerabilities: cve_id = str(vuln.get("cve_id", "")).strip() fusion = fusion_by_cve.get(cve_id) if not fusion: continue dims = fusion.get("dimension_scores", {}) vuln["composite_score"] = fusion.get("composite_score", vuln.get("composite_score")) vuln["intel_confidence"] = fusion.get("confidence", vuln.get("intel_confidence", "")) vuln["dimensions_used"] = fusion.get("dimensions_used", vuln.get("dimensions_used", [])) vuln["weights_used"] = fusion.get("weights_used", vuln.get("weights_used", {})) vuln["evidence_type"] = fusion.get("evidence_type", vuln.get("evidence_type", "direct_cve")) vuln["not_directly_observed"] = bool(fusion.get("not_directly_observed", False)) vuln["must_not_enter_package_actions"] = bool(fusion.get("must_not_enter_package_actions", False)) if dims.get("epss") is not None: vuln["epss_score"] = dims.get("epss") if "kev" in dims: vuln["in_cisa_kev"] = bool(dims.get("kev")) if dims.get("ghsa_severity"): vuln["ghsa_severity"] = dims.get("ghsa_severity") if dims.get("otx_threat"): vuln["otx_threat"] = dims.get("otx_threat") merged_count += 1 for cve_id, fusion in fusion_by_cve.items(): if cve_id in seen_cves: continue dims = fusion.get("dimension_scores", {}) cvss_score = float(dims.get("cvss", 0.0) or 0.0) vulnerabilities.append({ "cve_id": cve_id, "package": fusion.get("package", "unknown"), "cvss_score": cvss_score, "severity": _severity_from_cvss(cvss_score), "description": fusion.get("description", ""), "is_new": True, "source": "INTEL_FUSION", "evidence_type": fusion.get("evidence_type", "package_cve"), "not_directly_observed": bool(fusion.get("not_directly_observed", False)), "must_not_enter_package_actions": bool(fusion.get("must_not_enter_package_actions", False)), "composite_score": fusion.get("composite_score", 0.0), "intel_confidence": fusion.get("confidence", ""), "dimensions_used": fusion.get("dimensions_used", []), "weights_used": fusion.get("weights_used", {}), "epss_score": dims.get("epss"), "in_cisa_kev": bool(dims.get("kev", False)), "ghsa_severity": dims.get("ghsa_severity", "UNKNOWN"), "otx_threat": dims.get("otx_threat", "unknown"), }) injected_count += 1 output["_intel_fusion_applied"] = { "merged_existing": merged_count, "injected_missing": injected_count, "fusion_count": len(fusion_by_cve), "representative_cves_skipped": len(representative_fusions), } if representative_fusions: output["representative_cve_evidence"] = representative_fusions return output def merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]: """Public wrapper for post-discovery Intel Fusion enrichment.""" return _merge_intel_fusion_evidence(output, intel_fusion_result) def _reconcile_is_new_flags(output: dict[str, Any], historical_cves: set[str]) -> dict[str, Any]: """依照歷史記憶重新校正所有漏洞的 is_new 旗標。""" corrected = 0 for vuln in output.get("vulnerabilities", []): cve_id = vuln.get("cve_id", "") expected_is_new = cve_id not in historical_cves if vuln.get("is_new") != expected_is_new: vuln["is_new"] = expected_is_new corrected += 1 summary = output.setdefault("summary", {}) summary["new_since_last_scan"] = sum( 1 for vuln in output.get("vulnerabilities", []) if vuln.get("is_new") ) output["_is_new_corrected"] = corrected return output def _load_skill(skill_filename: str = "threat_intel.md") -> str: """ Load Skill SOP file by filename (v3.7 path-aware + Phase 4D 熱載入). Phase 4D: 優先使用 SkillLoader 單例(支援熱載入、mtime 驗證)。 Fallback: 直接從磁碟讀取(原有實作,確保向後相容)。 """ # Phase 4D: SkillLoader 熱載入路徑 if _SKILL_LOADER_AVAILABLE and _skill_loader is not None: try: return _skill_loader.load_skill(skill_filename) except Exception as e: logger.warning("[Scout] SkillLoader 失敗,回退直接讀取: %s", e) # Fallback: 直接從磁碟讀取(原有實作) skill_path = os.path.join(PROJECT_ROOT, "skills", skill_filename) for encoding in ("utf-8", "utf-8-sig", "latin-1"): try: if os.path.exists(skill_path): with open(skill_path, "r", encoding=encoding) as f: content = f.read().strip() if content: logger.info("[OK] Skill loaded: %s (%d chars)", skill_path, len(content)) return content except (IOError, UnicodeDecodeError): continue logger.warning("[WARN] Skill file not found, using fallback: %s", skill_path) return _FALLBACK_SKILL # 內嵌精簡版 Skill(Graceful Degradation — Skill 檔案遺失時的保底) _FALLBACK_SKILL = """ # Skill: Threat Intelligence Collection (minimal fallback) ## SOP 1. First call read_memory(agent_name="scout") to read history. 2. Query vulnerabilities for each technology package with search_nvd. 3. For CVEs with CVSS >= 7.0, call search_otx for threat intelligence. 4. Compare with history and mark is_new. 5. Write the result with write_memory. 6. Output pure JSON with no extra text. ## Quality Red Lines - CVE IDs must come from search_nvd. Do not fabricate them. - CVSS scores must come from the NVD API. - Output pure JSON only. """.strip() # ══════════════════════════════════════════════════════════════ # 系統憲法(Constraints 支柱 — 層級 A) # ══════════════════════════════════════════════════════════════ CONSTITUTION = """ ## System Constitution - Non-Negotiable Rules 1. **CVE source constraint**: every CVE ID must come from search_nvd tool results. Never fabricate, infer, or recall CVE IDs from memory. Violating this rule creates hallucinated output and causes the Sentinel fact-check to fail the pipeline. 2. **CVSS source constraint**: every CVSS score must come from the NVD API result. Do not estimate, adjust, or round CVSS values yourself. 3. **Output format constraint**: your Final Answer must be JSON and only JSON. Do not add explanations, headings, markdown, or natural-language text before or after the JSON. 4. **Tool-use constraint**: you must query vulnerabilities through the search_nvd tool. Do not skip tool calls and answer from training data; training data may be outdated. 5. **Honesty constraint**: if a package has no results, report count: 0 honestly. Do not invent vulnerabilities to make the report look useful. 6. **Memory-read constraint**: the first step after startup must be read_memory. Sentinel Behavior Monitor checks this behavior. 7. **Loop constraint**: run at most 15 ReAct iterations. If all packages are queried before 15 iterations, output the result immediately and do no extra work. 8. **Memory-write constraint (most important)**: before giving the Final Answer, you must call write_memory to store the complete report. Order: query all packages -> assemble JSON -> call write_memory -> confirm success -> then provide Final Answer. If you are about to answer before calling write_memory, stop and call write_memory first. """.strip() # ══════════════════════════════════════════════════════════════ # Agent 工廠函式 # ══════════════════════════════════════════════════════════════ def create_scout_agent( excluded_models: list[str] | None = None, input_type: str = "pkg", ) -> "Agent": """ Build Scout Agent with Path-Aware Skill SOP (v3.7). input_type selects which Skill file to embed in backstory: pkg -> threat_intel.md (NVD CVE scan for packages) code -> source_code_audit.md (OWASP Top10 + CWE for source code) injection -> ai_security_audit.md (OWASP LLM Top10 + MITRE ATLAS) config -> config_audit.md (CIS Benchmark for config files) Args: excluded_models: Models to skip (429-rate-limited) input_type: Path type from frontend detector Returns: CrewAI Agent instance ready for Task and Crew """ skill_filename = SKILL_MAP.get(input_type, "threat_intel.md") skill_content = _load_skill(skill_filename) logger.info("[Scout] Path=%s -> Skill=%s", input_type, skill_filename) # Goal adapts to the input path _GOAL_MAP = { "pkg": "Collect known CVEs for the given package list from OSV/NVD, merge Intel Fusion evidence when available, compare with history, and output structured JSON.", "code": "Audit source code for OWASP Top10 / CWE vulnerabilities; extract package imports and scan NVD; output structured JSON.", "injection": "Classify and assess AI security threats (OWASP LLM Top10 / MITRE ATLAS) in the given input; output structured JSON with no CVE hallucination.", "config": "Audit the given configuration file against CIS Benchmarks for misconfigurations and hardcoded secrets; output structured JSON.", } agent_goal = _GOAL_MAP.get(input_type, _GOAL_MAP["pkg"]) backstory = f"""You are an expert security analyst specialized in identifying software and AI system vulnerabilities. You are rigorous, precise, and never fabricate data. {CONSTITUTION} --- ## Analysis Methodology (Skill SOP) You MUST follow this Standard Operating Procedure for the current scan path ({input_type}): {skill_content} """ # 延遲匯入 CrewAI,避免純 helper / 測試路徑在 import 階段觸發本機儲存副作用。 from crewai import Agent from tools.nvd_tool import search_nvd from tools.osv_tool import search_osv from tools.memory_tool import read_memory, write_memory, history_search llm = get_llm(exclude_models=excluded_models) scout = Agent( role="Threat Intelligence Scout", goal=agent_goal, backstory=backstory, tools=[ search_osv, # 主力:OSV.dev ecosystem-aware 精確查詢(不會返回無關 1999 CVE) search_nvd, # 備用:NVD CPE 查詢(OSV 無結果時使用) read_memory, write_memory, history_search, ], llm=llm, verbose=True, max_iter=15, # SOP: 最多 15 輪 ReAct(包含 6 步驟程:read_memory+search_nvd+OTX+write_memory) allow_delegation=False, ) logger.info( "[OK] Scout Agent ready | input_type=%s | skill=%s | llm=%s", input_type, skill_filename, llm.model if hasattr(llm, 'model') else 'unknown', ) return scout # ══════════════════════════════════════════════════════════════ # CrewAI Task 工廠函式(便利函式,供 main.py 使用) # ══════════════════════════════════════════════════════════════ def create_scout_task( agent, tech_stack: str, intel_fusion_result: dict | None = None, ): """ v3.4: Scout Task - package-aware mode. When tech_stack is a short comma-separated package list (from PackageExtractor), explicitly enumerate each package for the LLM to query via search_nvd. """ from crewai import Task intel_summary = _summarize_intel_fusion_for_task(intel_fusion_result) # Detect if input is a clean package list or raw code/long text is_package_list = ( len(tech_stack) < 300 and "\n" not in tech_stack and "def " not in tech_stack and "import " not in tech_stack ) if is_package_list: packages = [p.strip() for p in tech_stack.split(",") if p.strip()] packages_display = "\n".join(f" {i+1}. {pkg}" for i, pkg in enumerate(packages)) _osv_cmd_lines = "\n".join(f" - search_osv('{pkg}')" for pkg in packages[:8]) task_desc = ( f"You are analyzing security vulnerabilities for packages extracted from source code.\n\n" f"Package list to scan:\n{packages_display}\n\n" f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}" f"Steps to follow (MUST call tools in order):\n\n" f"Step 1: Call read_memory\n" f" Action: read_memory\n" f" Action Input: scout\n\n" f"Step 2: For EACH package, call search_osv first (more precise), search_nvd as fallback:\n" f"{_osv_cmd_lines}\n" f" If search_osv returns count=0, then try: search_nvd('')\n\n" f"Step 3: Reuse Intel Fusion evidence when available.\n" f" - Do NOT re-query EPSS or OTX from Scout.\n" f" - Prefer Intel Fusion values for composite_score, KEV, EPSS, GHSA, and OTX fields.\n" f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n" f"Step 4: Assemble JSON report from REAL tool results only\n" f" - CVE IDs MUST come from search_osv or search_nvd output\n" f" - Compare with read_memory history, mark is_new\n\n" f"Step 5: Call write_memory to save results\n" f" Action: write_memory\n" f" Action Input: scout|{{JSON report}}\n\n" f"Step 6: Output JSON report as Final Answer\n\n" f"FORBIDDEN:\n" f"- Do NOT skip tool calls\n" f"- Do NOT fabricate CVE IDs\n" f"- Do NOT use backstory examples (they are fake)\n" f"- write_memory MUST be called before Final Answer" ) else: task_desc = ( f"You are analyzing security vulnerabilities in: {tech_stack[:800]}\n\n" f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}" f"Steps to follow (MUST call tools in order):\n\n" f"Step 1: Call read_memory\n" f" Action: read_memory\n" f" Action Input: scout\n\n" f"Step 2: Extract PACKAGE NAMES from the code, then call search_osv first:\n" f" RULE: Package names come from require() or import statements ONLY.\n" f" Example: require('express') -> search_osv('express')\n" f" Example: require('lodash') -> search_osv('lodash')\n" f" If search_osv returns count=0 for a package, fallback: search_nvd('')\n" f" FORBIDDEN search terms (these are syntax, NOT packages):\n" f" - eval, exec, Function, innerHTML, script, html, document\n" f" - const, let, var, function, class, async, await\n" f" - req, res, app, user, input (these are variable names)\n" f" If no require()/import found, output empty vulnerabilities list.\n\n" f"Step 3: Reuse Intel Fusion evidence when available.\n" f" - Do NOT re-query EPSS or OTX from Scout.\n" f" - Keep Scout focused on package extraction, OSV/NVD discovery, and schema output.\n" f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n" f"Step 4: Assemble JSON report from REAL tool results only\n\n" f"Step 5: Call write_memory\n" f" Action: write_memory\n" f" Action Input: scout|{{JSON report}}\n\n" f"Step 6: Output JSON report as Final Answer\n\n" f"FORBIDDEN:\n" f"- Do NOT search NVD with: eval, html, innerHTML, script, const, function\n" f"- Do NOT skip tool calls\n" f"- Do NOT fabricate CVE IDs\n" f"- write_memory MUST be called before Final Answer" ) return Task( description=task_desc, expected_output="Structured JSON threat intel report with CVEs from search_osv (primary) or search_nvd (fallback), ready for deterministic Intel Fusion evidence merge.", agent=agent, ) def run_scout_pipeline( tech_stack: str, input_type: str = "pkg", intel_fusion_result: dict | None = None, ) -> dict: """ Execute full Scout Pipeline with Harness code-level guarantees. v5.0 (Phase 7.5) 新增: - OSV Batch 預熱:LLM 啟動前批量查詢所有套件,結果預存快取 → LLM 呼叫 search_osv() 時直接命中快取,無需等待 API - Harness 2.5:改用 OSV 資料做 LLM 遺漏補充(取代 NVD cache inject) - GHSA severity 維度:從 OSV database_specific.severity 直接解析 v3.7: input_type selects the correct Skill SOP for path-aware analysis. Args: tech_stack: User input (e.g. "Django 4.2, Redis 7.0" or source code) input_type: Path type (pkg/code/injection/config) intel_fusion_result: Optional Layer 1 enrichment to merge into Scout output Returns: dict: Parsed Scout JSON report """ import json from crewai import Crew, Process from config import mark_model_failed, get_current_model_name, rate_limiter from tools.memory_tool import write_memory # 新版 memory_tool 無 _write_memory_impl,使用公開 Tool 介面 # ── Harness 0:OSV Batch 預熱快取(在 LLM 之前執行)─────────── # 理由:LLM 呼叫 search_osv() 是一個一個套件查,無法自行批量。 # 在 Code-level Harness 層先做 Batch 查詢,結果存入快取, # Agent 呼叫 search_osv() 時直接命中快取,延遲從 N×API_RTT → 1×API_RTT。 _osv_batch_cache: dict[str, list] = {} # pkg → [vuln_dict, ...] if input_type == "pkg": # 從 tech_stack 提取套件名稱(去版本號) _pkg_list = [item.strip().split()[0] for item in tech_stack.split(",") if item.strip()] if _pkg_list: try: from tools.osv_tool import search_osv_batch logger.info("[HARNESS 0] OSV Batch warmup: %s", _pkg_list) _osv_batch_cache = search_osv_batch(_pkg_list) logger.info("[HARNESS 0] OSV Batch warmup done: %d packages cached", len(_osv_batch_cache)) except Exception as _e: logger.warning("[HARNESS 0] OSV Batch warmup failed (non-fatal): %s", _e) # ──────────────────────────────────────────────────────────── # 429 自動輪替:最多重試 MAX_LLM_RETRIES 次(每次切換模型) MAX_LLM_RETRIES = 2 excluded_models: list[str] = [] for attempt in range(MAX_LLM_RETRIES + 1): # v3.7: pass input_type so agent loads the correct Skill SOP agent = create_scout_agent(excluded_models, input_type=input_type) task = create_scout_task(agent, tech_stack, intel_fusion_result=intel_fusion_result) crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=True) # 執行 Agent logger.info("[START] Scout Pipeline: %s (attempt %d/%d)", tech_stack, attempt + 1, MAX_LLM_RETRIES + 1) try: from checkpoint import recorder as _cp _current_model = get_current_model_name(agent.llm) _cp.llm_call("scout", _current_model, "openrouter", f"attempt={attempt+1}") except Exception: _current_model = "unknown" _t_llm = time.time() try: result = crew.kickoff() try: _cp.llm_result("scout", _current_model, "SUCCESS", len(str(result)), int((time.time() - _t_llm) * 1000), thinking=str(result)[:1000]) except Exception: pass break # 成功則跳出重試迴圈 except Exception as e: error_str = str(e) if "429" in error_str and attempt < MAX_LLM_RETRIES: # 標記當前模型為冷卻中,下次迴圈會選擇其他模型 current_model = get_current_model_name(agent.llm) mark_model_failed(current_model) excluded_models.append(current_model) # 解析 API 回傳的 retry_after 秒數 import re as _re _m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE) retry_after = float(_m.group(1)) if _m else 0.0 logger.warning("[RETRY] Scout 429 on %s (attempt %d/%d), api_retry_after=%.0fs", current_model, attempt + 1, MAX_LLM_RETRIES, retry_after) try: _cp.llm_retry("scout", current_model, error_str[:200], attempt + 1, "next_in_waterfall") except Exception: pass rate_limiter.on_429(retry_after=retry_after, caller="scout") # 最少 30s continue try: _cp.llm_error("scout", _current_model, error_str[:300]) except Exception: pass raise # 非 429 或已超過重試次數,直接拋出 result_str = str(result).strip() # 解析 JSON(處理可能的 markdown 包裝) json_str = result_str if "```json" in json_str: json_str = json_str.split("```json")[1].split("```")[0].strip() elif "```" in json_str: parts = json_str.split("```") if len(parts) >= 3: json_str = parts[1].strip() try: output = json.loads(json_str) except json.JSONDecodeError: logger.error("[FAIL] Agent output is not valid JSON: %s", result_str[:200]) raise ValueError(f"Scout Agent output is not valid JSON: {result_str[:200]}") # ── Harness 保障 0.5:先補齊基礎 Schema,再寫入記憶 ─────────────── from datetime import datetime, timezone if "scan_id" not in output: logger.warning("[WARN] Output missing required field: scan_id") output["scan_id"] = f"scan_{int(time.time())}" if "timestamp" not in output: logger.warning("[WARN] Output missing required field: timestamp") output["timestamp"] = datetime.now(timezone.utc).isoformat() if "tech_stack" not in output: logger.warning("[WARN] Output missing required field: tech_stack") output["tech_stack"] = [ item.strip().lower() for item in str(tech_stack).split(",") if item.strip() ] if "vulnerabilities" not in output: logger.warning("[WARN] Output missing required field: vulnerabilities") output["vulnerabilities"] = [] if "summary" not in output: logger.warning("[WARN] Output missing required field: summary") output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in output.get("vulnerabilities", []): severity = str(vuln.get("severity", "MEDIUM")).upper() if severity == "MODERATE": severity = "MEDIUM" elif severity not in {"CRITICAL", "HIGH", "MEDIUM", "LOW"}: severity = _severity_from_cvss(float(vuln.get("cvss_score") or 0.0)) vuln["severity"] = severity # ── Harness 保障 1:強制 write_memory ────────────────────── memory_path = os.path.join(PROJECT_ROOT, "memory", "scout_memory.json") need_write = False if not os.path.exists(memory_path): need_write = True else: try: with open(memory_path, "r", encoding="utf-8") as f: content = f.read().strip() if not content or content == "{}": need_write = True except (IOError, json.JSONDecodeError): need_write = True if need_write: logger.warning("[WARN] Agent did not call write_memory -- code forcing write (Harness)") write_result = write_memory.run(agent_name="scout", data=json.dumps(output, ensure_ascii=False)) logger.info("[OK] Forced memory write: %s", write_result) # ── Harness 保障 2:基礎 Schema 驗證 ────────────────────── required = ["scan_id", "timestamp", "tech_stack", "vulnerabilities", "summary"] for field in required: if field not in output: logger.warning("[WARN] Output missing required field: %s", field) if field == "vulnerabilities": output["vulnerabilities"] = [] elif field == "summary": output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0} # ── Harness 保障 2.5:Cache 注入(Anti-LLM-Omission)────── # v5.0:改用 OSV Batch 快取資料(更精確,不會混入 1999 年 CVE) # 當 LLM 輸出 0 vulnerabilities,從 Batch 預熱快取直接注入 if not output.get("vulnerabilities"): injected = [] # 優先用 OSV Batch 預熱快取(最精確) if _osv_batch_cache: for pkg_name, vuln_list in _osv_batch_cache.items(): for v in vuln_list: cve_id = v.get("cve_id", "") if not cve_id.startswith(("CVE-", "GHSA-")): continue ghsa_sev = _extract_ghsa_severity_from_osv(v) injected.append({ "cve_id": cve_id, "package": v.get("package", pkg_name), "cvss_score": v.get("cvss_score", 0.0), "severity": v.get("severity", "MEDIUM"), "description": v.get("description", "")[:300], "published": v.get("published", ""), "is_new": True, "in_cisa_kev": False, "has_public_exploit": False, "source": "OSV", "osv_id": v.get("osv_id", ""), # GHSA 維度:從 OSV database_specific.severity 解析 "ghsa_severity": ghsa_sev, }) else: # Fallback:NVD cache(舊路徑,OSV Batch 失敗時使用) from tools.nvd_tool import _search_nvd_impl for item in (tech_stack or "").split(","): pkg = item.strip().split()[0].lower() if not pkg: continue try: cached_result = json.loads(_search_nvd_impl(pkg)) for v in cached_result.get("vulnerabilities", []): cve_id = v.get("cve_id") or v.get("id", "") if not cve_id.startswith("CVE-"): continue injected.append({ "cve_id": cve_id, "package": v.get("package", pkg), "cvss_score": v.get("cvss_score", 0.0), "severity": v.get("severity", "MEDIUM"), "description": v.get("description", "")[:300], "published": v.get("published_date", ""), "is_new": True, "in_cisa_kev": v.get("in_cisa_kev", False), "has_public_exploit": v.get("has_public_exploit", False), }) except Exception as e: logger.warning("[WARN] NVD cache inject failed for %s: %s", pkg, e) if injected: output["vulnerabilities"] = injected logger.warning( "[HARNESS 2.5] LLM output 0 CVEs, injected %d CVEs from %s for tech_stack=%s", len(injected), "OSV batch cache" if _osv_batch_cache else "NVD cache", tech_stack[:60] ) # 重新查 NVD 建立真實 CVE 清單 + CVE→package 對應表 # v5.0:優先用 OSV Batch Cache(已在 Harness 0 預熱),只有 OSV 無資料才查 NVD # NVD 查到的結果也加年份過濾(< 2005 → 不列入 real_cves) from tools.nvd_tool import _search_nvd_impl real_cves = set() cve_to_package = {} # CVE-XXXX-YYYY → package name # 收集所有需要查的 package:Agent 輸出的 + tech_stack 裡的 packages_to_check = set() for vuln in output.get("vulnerabilities", []): pkg = vuln.get("package", "").lower().strip() if pkg: packages_to_check.add(pkg) # 從 tech_stack 提取(去版本號) for item in tech_stack.split(","): pkg_name = item.strip().split()[0].lower() if pkg_name: packages_to_check.add(pkg_name) # 優先從 OSV Batch Cache 建 real_cves(最精確,不含 CVE-1999) for pkg_name, vuln_list in _osv_batch_cache.items(): for v in vuln_list: cve_id = v.get("cve_id", "") if cve_id.startswith(("CVE-", "GHSA-")): real_cves.add(cve_id) cve_to_package[cve_id] = pkg_name # OSV 無資料的套件,嘗試 NVD(但過濾 < 2005 年份) for pkg in packages_to_check: if any(cve_to_package.get(c) == pkg for c in real_cves): continue # OSV 已有該套件資料 try: import tools.nvd_tool as nvd_mod original_page_size = nvd_mod.RESULTS_PER_PAGE nvd_mod.RESULTS_PER_PAGE = 100 try: nvd_result = json.loads(_search_nvd_impl(pkg)) finally: nvd_mod.RESULTS_PER_PAGE = original_page_size for v in nvd_result.get("vulnerabilities", []): cve_id = v["cve_id"] # 年份過濾:CVE-1999/2000... 遠古 CVE 不計入 real_cves try: cve_year = int(cve_id.split("-")[1]) if cve_year < 2005: logger.debug("[FILTER] NVD verification skip ancient CVE: %s", cve_id) continue except (IndexError, ValueError): pass real_cves.add(cve_id) cve_to_package[cve_id] = pkg except Exception as e: logger.warning("[WARN] CVE verification NVD query failed (%s): %s", pkg, e) if real_cves: original_count = len(output.get("vulnerabilities", [])) verified_vulns = [] suspect_vulns = [] # 可能是真的但 keywordSearch 沒找到 for vuln in output.get("vulnerabilities", []): if vuln.get("cve_id") in real_cves: verified_vulns.append(vuln) else: suspect_vulns.append(vuln) # 對可疑的 CVE 做精確查詢(cveId lookup) hallucinated = [] if suspect_vulns: import re for vuln in suspect_vulns: cve_id = vuln.get("cve_id", "") if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id): hallucinated.append(cve_id) continue try: resp = requests.get( "https://services.nvd.nist.gov/rest/json/cves/2.0", params={"cveId": cve_id}, headers={"apiKey": os.getenv("NVD_API_KEY", "")}, timeout=10, ) if resp.status_code == 200: data = resp.json() if data.get("totalResults", 0) > 0: logger.info("[OK] CVE exact verification passed: %s", cve_id) verified_vulns.append(vuln) # 補 package:從 description 推斷 if not vuln.get("package"): desc = data["vulnerabilities"][0]["cve"]["descriptions"][0]["value"].lower() for pkg in packages_to_check: if pkg in desc: vuln["package"] = pkg cve_to_package[cve_id] = pkg break continue # NVD 明確回應但找不到 → 才算幻覺 hallucinated.append(cve_id) except Exception: # NVD API 不可達(timeout/connection)→ 保守保留,不當幻覺處理 logger.warning("[WARN] NVD verify unreachable for %s, keeping conservatively", cve_id) verified_vulns.append(vuln) if hallucinated: logger.warning( "[ALERT] Detected %d hallucinated CVEs, removed: %s", len(hallucinated), hallucinated ) output["vulnerabilities"] = verified_vulns # 重新計算 summary output["summary"] = { "total": len(verified_vulns), "new_since_last_scan": sum(1 for v in verified_vulns if v.get("is_new")), "critical": sum(1 for v in verified_vulns if v.get("severity") == "CRITICAL"), "high": sum(1 for v in verified_vulns if v.get("severity") == "HIGH"), "medium": sum(1 for v in verified_vulns if v.get("severity") == "MEDIUM"), "low": sum(1 for v in verified_vulns if v.get("severity") == "LOW"), } logger.info( "[OK] CVE verification result: %d -> %d (removed %d hallucinated)", original_count, len(verified_vulns), len(hallucinated) ) else: logger.info("[OK] All %d CVEs passed verification", original_count) else: logger.warning("[WARN] Cannot build real CVE list, skipping verification") # ── Harness 保障 4:補全 package 欄位 ────────────────────── # Agent 常忘記加 package,用 Layer 3 建好的 cve_to_package 補 patched_count = 0 for vuln in output.get("vulnerabilities", []): if not vuln.get("package"): cve_id = vuln.get("cve_id", "") if cve_id in cve_to_package: vuln["package"] = cve_to_package[cve_id] patched_count += 1 else: # 最後手段:從 description 猜 package desc = vuln.get("description", "").lower() for pkg in packages_to_check: if pkg in desc: vuln["package"] = pkg patched_count += 1 break else: vuln["package"] = "unknown" patched_count += 1 if patched_count: logger.info("[OK] Patched %d CVE package fields", patched_count) # 先合併 Intel Fusion 結果,再做歷史記憶校正,避免後補漏洞被誤標成新發現。 output = _merge_intel_fusion_evidence(output, intel_fusion_result) # ── Harness 保障 5:校正 is_new 標記 ────────────────────── # Agent 常常不正確比對歷史,程式碼代為校正 try: mem_data = {} if os.path.exists(memory_path): with open(memory_path, "r", encoding="utf-8") as f: mem_data = json.load(f) # 從 memory 的所有歷史掃描中提取已知 CVE 集合 historical_cves = set() # 新版 memory_tool 直接存 scan 結構 if "vulnerabilities" in mem_data: for v in mem_data.get("vulnerabilities", []): historical_cves.add(v.get("cve_id", "")) # 舊版 memory 有 latest/history 結構 elif "latest" in mem_data: for v in mem_data.get("latest", {}).get("vulnerabilities", []): historical_cves.add(v.get("cve_id", "")) output = _reconcile_is_new_flags(output, historical_cves) corrected = output.get("_is_new_corrected", 0) if corrected: logger.info("[OK] Corrected %d CVE is_new flags", corrected) except Exception as e: logger.warning("[WARN] is_new correction failed: %s", e) # ── Harness 保障 3.5:CVE 年份過濾器(最後防線)────────────── # 不管哪個 Agent 或 NVD 返回了遠古 CVE,在 Scout 輸出前一律攔截 # 根據:CISA KEV 最早 2002 年,現代套件漏洞幾乎都 >= 2005 # 例外:GHSA- 前綴(OSV/GitHub Advisory,不用年份過濾) CVE_YEAR_MIN = 2005 ancient_removed = [] fresh_vulns = [] for vuln in output.get("vulnerabilities", []): cve_id = vuln.get("cve_id", "") if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"): fresh_vulns.append(vuln) # GHSA/非標準 ID 保留 continue try: cve_year = int(cve_id.split("-")[1]) if cve_year < CVE_YEAR_MIN: ancient_removed.append(cve_id) logger.warning( "[HARNESS 3.5] Ancient CVE removed (year=%d < %d): %s", cve_year, CVE_YEAR_MIN, cve_id ) else: fresh_vulns.append(vuln) except (IndexError, ValueError): fresh_vulns.append(vuln) # 解析失敗保留(保守) if ancient_removed: output["vulnerabilities"] = fresh_vulns output["ancient_cves_removed"] = ancient_removed # 審計用 logger.warning( "[HARNESS 3.5] Removed %d ancient CVEs (< %d): %s", len(ancient_removed), CVE_YEAR_MIN, ancient_removed ) # ──────────────────────────────────────────────────────────── # ── 最終 Summary 校正(確保一致性)────────────────────────── vulns = output.get("vulnerabilities", []) output["summary"] = { "total": len(vulns), "new_since_last_scan": sum(1 for v in vulns if v.get("is_new")), "critical": sum(1 for v in vulns if v.get("severity") == "CRITICAL"), "high": sum(1 for v in vulns if v.get("severity") == "HIGH"), "medium": sum(1 for v in vulns if v.get("severity") == "MEDIUM"), "low": sum(1 for v in vulns if v.get("severity") == "LOW"), } vuln_count = output["summary"]["total"] new_count = output["summary"]["new_since_last_scan"] logger.info( "[OK] Scout Pipeline complete: %d CVEs, %d new", vuln_count, new_count ) return output