| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import logging |
| import time |
| from typing import Any, TYPE_CHECKING |
|
|
| import requests |
|
|
| from config import get_llm |
|
|
| |
|
|
| logger = logging.getLogger("ThreatHunter") |
|
|
| if TYPE_CHECKING: |
| from crewai import Agent |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| SKILL_MAP: dict[str, str] = { |
| "pkg": "threat_intel.md", |
| "code": "source_code_audit.md", |
| "injection": "ai_security_audit.md", |
| "config": "config_audit.md", |
| } |
|
|
| |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| SKILL_PATH = os.path.join(PROJECT_ROOT, "skills", "threat_intel.md") |
|
|
| |
| try: |
| from skills.skill_loader import skill_loader as _skill_loader |
| _SKILL_LOADER_AVAILABLE = True |
| logger.info("[Scout] Phase 4D: SkillLoader 啟用 ✓") |
| except ImportError: |
| _skill_loader = None |
| _SKILL_LOADER_AVAILABLE = False |
| logger.warning("[Scout] Phase 4D: SkillLoader 不可用,使用內建 _load_skill") |
|
|
|
|
| |
| |
| |
| |
|
|
| def _extract_ghsa_severity_from_osv(vuln_dict: dict) -> str: |
| """ |
| 從 OSV vuln_dict 解析 GHSA severity。 |
| |
| OSV 的 database_specific 欄位: |
| { |
| "severity": "HIGH", ← GitHub Advisory severity |
| "cvss": {...} |
| } |
| また、vuln["osv_id"].startswith("GHSA-") 也是 GHSA 直接來源。 |
| |
| Returns: |
| "CRITICAL" | "HIGH" | "MODERATE" | "MEDIUM" | "LOW" | "UNKNOWN" |
| """ |
| |
| if vuln_dict.get("ghsa_severity"): |
| return vuln_dict["ghsa_severity"] |
| |
| sev = vuln_dict.get("severity", "") |
| if sev in ("CRITICAL", "HIGH", "MODERATE", "MEDIUM", "LOW"): |
| return sev |
| return "UNKNOWN" |
| |
|
|
|
|
|
|
| def _severity_from_cvss(cvss_score: float) -> str: |
| """將 CVSS 分數轉成標準 severity 字串。""" |
| if cvss_score >= 9.0: |
| return "CRITICAL" |
| if cvss_score >= 7.0: |
| return "HIGH" |
| if cvss_score >= 4.0: |
| return "MEDIUM" |
| return "LOW" |
|
|
|
|
| def _summarize_intel_fusion_for_task(intel_fusion_result: dict | None) -> str: |
| """將 Intel Fusion 結果壓縮成給 Scout 任務描述使用的摘要。""" |
| if not intel_fusion_result: |
| return "" |
|
|
| fusion_results = intel_fusion_result.get("fusion_results", []) |
| if not fusion_results: |
| return "" |
|
|
| lines: list[str] = [ |
| "Layer 1 Intel Fusion evidence is available.", |
| "Reuse this enrichment instead of re-querying EPSS or OTX.", |
| "Use OSV/NVD only for CVE discovery, verification, or missing-package fallback.", |
| "Intel Fusion evidence:", |
| ] |
|
|
| for fusion in fusion_results[:8]: |
| dims = fusion.get("dimension_scores", {}) |
| cve_id = fusion.get("cve_id", "UNKNOWN") |
| score = fusion.get("composite_score", "n/a") |
| kev = bool(dims.get("kev", False)) |
| epss = dims.get("epss", "n/a") |
| ghsa = dims.get("ghsa_severity", "UNKNOWN") |
| otx = dims.get("otx_threat", "unknown") |
| lines.append( |
| f"- {cve_id}: composite={score}, kev={kev}, epss={epss}, ghsa={ghsa}, otx={otx}" |
| ) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]: |
| """把 Intel Fusion 的富化證據併入 Scout 最終漏洞清單。""" |
| if not intel_fusion_result: |
| return output |
|
|
| fusion_results = intel_fusion_result.get("fusion_results", []) |
| if not fusion_results: |
| return output |
|
|
| representative_fusions: list[dict[str, Any]] = [] |
| fusion_by_cve: dict[str, dict[str, Any]] = {} |
| for fusion in fusion_results: |
| cve_id = str(fusion.get("cve_id", "")).strip() |
| is_representative = ( |
| fusion.get("evidence_type") == "representative_cve" |
| or bool(fusion.get("must_not_enter_package_actions")) |
| or bool(fusion.get("not_directly_observed")) |
| ) |
| if is_representative: |
| representative_fusions.append(fusion) |
| continue |
| if cve_id: |
| fusion_by_cve[cve_id] = fusion |
|
|
| if not fusion_by_cve: |
| if representative_fusions: |
| output["_intel_fusion_applied"] = { |
| "merged_existing": 0, |
| "injected_missing": 0, |
| "fusion_count": 0, |
| "representative_cves_skipped": len(representative_fusions), |
| } |
| output["representative_cve_evidence"] = representative_fusions |
| return output |
|
|
| vulnerabilities = output.setdefault("vulnerabilities", []) |
| seen_cves = {str(v.get("cve_id", "")).strip() for v in vulnerabilities} |
| merged_count = 0 |
| injected_count = 0 |
|
|
| for vuln in vulnerabilities: |
| cve_id = str(vuln.get("cve_id", "")).strip() |
| fusion = fusion_by_cve.get(cve_id) |
| if not fusion: |
| continue |
|
|
| dims = fusion.get("dimension_scores", {}) |
| vuln["composite_score"] = fusion.get("composite_score", vuln.get("composite_score")) |
| vuln["intel_confidence"] = fusion.get("confidence", vuln.get("intel_confidence", "")) |
| vuln["dimensions_used"] = fusion.get("dimensions_used", vuln.get("dimensions_used", [])) |
| vuln["weights_used"] = fusion.get("weights_used", vuln.get("weights_used", {})) |
| vuln["evidence_type"] = fusion.get("evidence_type", vuln.get("evidence_type", "direct_cve")) |
| vuln["not_directly_observed"] = bool(fusion.get("not_directly_observed", False)) |
| vuln["must_not_enter_package_actions"] = bool(fusion.get("must_not_enter_package_actions", False)) |
|
|
| if dims.get("epss") is not None: |
| vuln["epss_score"] = dims.get("epss") |
| if "kev" in dims: |
| vuln["in_cisa_kev"] = bool(dims.get("kev")) |
| if dims.get("ghsa_severity"): |
| vuln["ghsa_severity"] = dims.get("ghsa_severity") |
| if dims.get("otx_threat"): |
| vuln["otx_threat"] = dims.get("otx_threat") |
|
|
| merged_count += 1 |
|
|
| for cve_id, fusion in fusion_by_cve.items(): |
| if cve_id in seen_cves: |
| continue |
|
|
| dims = fusion.get("dimension_scores", {}) |
| cvss_score = float(dims.get("cvss", 0.0) or 0.0) |
| vulnerabilities.append({ |
| "cve_id": cve_id, |
| "package": fusion.get("package", "unknown"), |
| "cvss_score": cvss_score, |
| "severity": _severity_from_cvss(cvss_score), |
| "description": fusion.get("description", ""), |
| "is_new": True, |
| "source": "INTEL_FUSION", |
| "evidence_type": fusion.get("evidence_type", "package_cve"), |
| "not_directly_observed": bool(fusion.get("not_directly_observed", False)), |
| "must_not_enter_package_actions": bool(fusion.get("must_not_enter_package_actions", False)), |
| "composite_score": fusion.get("composite_score", 0.0), |
| "intel_confidence": fusion.get("confidence", ""), |
| "dimensions_used": fusion.get("dimensions_used", []), |
| "weights_used": fusion.get("weights_used", {}), |
| "epss_score": dims.get("epss"), |
| "in_cisa_kev": bool(dims.get("kev", False)), |
| "ghsa_severity": dims.get("ghsa_severity", "UNKNOWN"), |
| "otx_threat": dims.get("otx_threat", "unknown"), |
| }) |
| injected_count += 1 |
|
|
| output["_intel_fusion_applied"] = { |
| "merged_existing": merged_count, |
| "injected_missing": injected_count, |
| "fusion_count": len(fusion_by_cve), |
| "representative_cves_skipped": len(representative_fusions), |
| } |
| if representative_fusions: |
| output["representative_cve_evidence"] = representative_fusions |
| return output |
|
|
|
|
| def merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]: |
| """Public wrapper for post-discovery Intel Fusion enrichment.""" |
| return _merge_intel_fusion_evidence(output, intel_fusion_result) |
|
|
|
|
| def _reconcile_is_new_flags(output: dict[str, Any], historical_cves: set[str]) -> dict[str, Any]: |
| """依照歷史記憶重新校正所有漏洞的 is_new 旗標。""" |
| corrected = 0 |
| for vuln in output.get("vulnerabilities", []): |
| cve_id = vuln.get("cve_id", "") |
| expected_is_new = cve_id not in historical_cves |
| if vuln.get("is_new") != expected_is_new: |
| vuln["is_new"] = expected_is_new |
| corrected += 1 |
|
|
| summary = output.setdefault("summary", {}) |
| summary["new_since_last_scan"] = sum( |
| 1 for vuln in output.get("vulnerabilities", []) if vuln.get("is_new") |
| ) |
| output["_is_new_corrected"] = corrected |
| return output |
|
|
|
|
| def _load_skill(skill_filename: str = "threat_intel.md") -> str: |
| """ |
| Load Skill SOP file by filename (v3.7 path-aware + Phase 4D 熱載入). |
| |
| Phase 4D: 優先使用 SkillLoader 單例(支援熱載入、mtime 驗證)。 |
| Fallback: 直接從磁碟讀取(原有實作,確保向後相容)。 |
| """ |
| |
| if _SKILL_LOADER_AVAILABLE and _skill_loader is not None: |
| try: |
| return _skill_loader.load_skill(skill_filename) |
| except Exception as e: |
| logger.warning("[Scout] SkillLoader 失敗,回退直接讀取: %s", e) |
|
|
| |
| skill_path = os.path.join(PROJECT_ROOT, "skills", skill_filename) |
| for encoding in ("utf-8", "utf-8-sig", "latin-1"): |
| try: |
| if os.path.exists(skill_path): |
| with open(skill_path, "r", encoding=encoding) as f: |
| content = f.read().strip() |
| if content: |
| logger.info("[OK] Skill loaded: %s (%d chars)", skill_path, len(content)) |
| return content |
| except (IOError, UnicodeDecodeError): |
| continue |
|
|
| logger.warning("[WARN] Skill file not found, using fallback: %s", skill_path) |
| return _FALLBACK_SKILL |
|
|
|
|
|
|
| |
| _FALLBACK_SKILL = """ |
| # Skill: Threat Intelligence Collection (minimal fallback) |
| |
| ## SOP |
| 1. First call read_memory(agent_name="scout") to read history. |
| 2. Query vulnerabilities for each technology package with search_nvd. |
| 3. For CVEs with CVSS >= 7.0, call search_otx for threat intelligence. |
| 4. Compare with history and mark is_new. |
| 5. Write the result with write_memory. |
| 6. Output pure JSON with no extra text. |
| |
| ## Quality Red Lines |
| - CVE IDs must come from search_nvd. Do not fabricate them. |
| - CVSS scores must come from the NVD API. |
| - Output pure JSON only. |
| """.strip() |
|
|
|
|
| |
| |
| |
|
|
| CONSTITUTION = """ |
| ## System Constitution - Non-Negotiable Rules |
| |
| 1. **CVE source constraint**: every CVE ID must come from search_nvd tool results. |
| Never fabricate, infer, or recall CVE IDs from memory. |
| Violating this rule creates hallucinated output and causes the Sentinel fact-check to fail the pipeline. |
| |
| 2. **CVSS source constraint**: every CVSS score must come from the NVD API result. |
| Do not estimate, adjust, or round CVSS values yourself. |
| |
| 3. **Output format constraint**: your Final Answer must be JSON and only JSON. |
| Do not add explanations, headings, markdown, or natural-language text before or after the JSON. |
| |
| 4. **Tool-use constraint**: you must query vulnerabilities through the search_nvd tool. |
| Do not skip tool calls and answer from training data; training data may be outdated. |
| |
| 5. **Honesty constraint**: if a package has no results, report count: 0 honestly. |
| Do not invent vulnerabilities to make the report look useful. |
| |
| 6. **Memory-read constraint**: the first step after startup must be read_memory. |
| Sentinel Behavior Monitor checks this behavior. |
| |
| 7. **Loop constraint**: run at most 15 ReAct iterations. |
| If all packages are queried before 15 iterations, output the result immediately and do no extra work. |
| |
| 8. **Memory-write constraint (most important)**: before giving the Final Answer, |
| you must call write_memory to store the complete report. |
| Order: query all packages -> assemble JSON -> call write_memory -> confirm success -> then provide Final Answer. |
| If you are about to answer before calling write_memory, stop and call write_memory first. |
| """.strip() |
|
|
|
|
| |
| |
| |
|
|
| def create_scout_agent( |
| excluded_models: list[str] | None = None, |
| input_type: str = "pkg", |
| ) -> "Agent": |
| """ |
| Build Scout Agent with Path-Aware Skill SOP (v3.7). |
| |
| input_type selects which Skill file to embed in backstory: |
| pkg -> threat_intel.md (NVD CVE scan for packages) |
| code -> source_code_audit.md (OWASP Top10 + CWE for source code) |
| injection -> ai_security_audit.md (OWASP LLM Top10 + MITRE ATLAS) |
| config -> config_audit.md (CIS Benchmark for config files) |
| |
| Args: |
| excluded_models: Models to skip (429-rate-limited) |
| input_type: Path type from frontend detector |
| |
| Returns: |
| CrewAI Agent instance ready for Task and Crew |
| """ |
| skill_filename = SKILL_MAP.get(input_type, "threat_intel.md") |
| skill_content = _load_skill(skill_filename) |
| logger.info("[Scout] Path=%s -> Skill=%s", input_type, skill_filename) |
|
|
| |
| _GOAL_MAP = { |
| "pkg": "Collect known CVEs for the given package list from OSV/NVD, merge Intel Fusion evidence when available, compare with history, and output structured JSON.", |
| "code": "Audit source code for OWASP Top10 / CWE vulnerabilities; extract package imports and scan NVD; output structured JSON.", |
| "injection": "Classify and assess AI security threats (OWASP LLM Top10 / MITRE ATLAS) in the given input; output structured JSON with no CVE hallucination.", |
| "config": "Audit the given configuration file against CIS Benchmarks for misconfigurations and hardcoded secrets; output structured JSON.", |
| } |
| agent_goal = _GOAL_MAP.get(input_type, _GOAL_MAP["pkg"]) |
|
|
| backstory = f"""You are an expert security analyst specialized in identifying software and AI system vulnerabilities. |
| You are rigorous, precise, and never fabricate data. |
| |
| {CONSTITUTION} |
| |
| --- |
| |
| ## Analysis Methodology (Skill SOP) |
| |
| You MUST follow this Standard Operating Procedure for the current scan path ({input_type}): |
| |
| {skill_content} |
| """ |
|
|
| |
| from crewai import Agent |
| from tools.nvd_tool import search_nvd |
| from tools.osv_tool import search_osv |
| from tools.memory_tool import read_memory, write_memory, history_search |
|
|
| llm = get_llm(exclude_models=excluded_models) |
| scout = Agent( |
| role="Threat Intelligence Scout", |
| goal=agent_goal, |
| backstory=backstory, |
| tools=[ |
| search_osv, |
| search_nvd, |
| read_memory, write_memory, history_search, |
| ], |
| llm=llm, |
| verbose=True, |
| max_iter=15, |
| allow_delegation=False, |
| ) |
|
|
| logger.info( |
| "[OK] Scout Agent ready | input_type=%s | skill=%s | llm=%s", |
| input_type, |
| skill_filename, |
| llm.model if hasattr(llm, 'model') else 'unknown', |
| ) |
| return scout |
|
|
|
|
| |
| |
| |
|
|
| def create_scout_task( |
| agent, |
| tech_stack: str, |
| intel_fusion_result: dict | None = None, |
| ): |
| """ |
| v3.4: Scout Task - package-aware mode. |
| When tech_stack is a short comma-separated package list (from PackageExtractor), |
| explicitly enumerate each package for the LLM to query via search_nvd. |
| """ |
| from crewai import Task |
|
|
| intel_summary = _summarize_intel_fusion_for_task(intel_fusion_result) |
|
|
| |
| is_package_list = ( |
| len(tech_stack) < 300 |
| and "\n" not in tech_stack |
| and "def " not in tech_stack |
| and "import " not in tech_stack |
| ) |
|
|
| if is_package_list: |
| packages = [p.strip() for p in tech_stack.split(",") if p.strip()] |
| packages_display = "\n".join(f" {i+1}. {pkg}" for i, pkg in enumerate(packages)) |
| _osv_cmd_lines = "\n".join(f" - search_osv('{pkg}')" for pkg in packages[:8]) |
| task_desc = ( |
| f"You are analyzing security vulnerabilities for packages extracted from source code.\n\n" |
| f"Package list to scan:\n{packages_display}\n\n" |
| f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}" |
| f"Steps to follow (MUST call tools in order):\n\n" |
| f"Step 1: Call read_memory\n" |
| f" Action: read_memory\n" |
| f" Action Input: scout\n\n" |
| f"Step 2: For EACH package, call search_osv first (more precise), search_nvd as fallback:\n" |
| f"{_osv_cmd_lines}\n" |
| f" If search_osv returns count=0, then try: search_nvd('<package>')\n\n" |
| f"Step 3: Reuse Intel Fusion evidence when available.\n" |
| f" - Do NOT re-query EPSS or OTX from Scout.\n" |
| f" - Prefer Intel Fusion values for composite_score, KEV, EPSS, GHSA, and OTX fields.\n" |
| f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n" |
| f"Step 4: Assemble JSON report from REAL tool results only\n" |
| f" - CVE IDs MUST come from search_osv or search_nvd output\n" |
| f" - Compare with read_memory history, mark is_new\n\n" |
| f"Step 5: Call write_memory to save results\n" |
| f" Action: write_memory\n" |
| f" Action Input: scout|{{JSON report}}\n\n" |
| f"Step 6: Output JSON report as Final Answer\n\n" |
| f"FORBIDDEN:\n" |
| f"- Do NOT skip tool calls\n" |
| f"- Do NOT fabricate CVE IDs\n" |
| f"- Do NOT use backstory examples (they are fake)\n" |
| f"- write_memory MUST be called before Final Answer" |
| ) |
| else: |
| task_desc = ( |
| f"You are analyzing security vulnerabilities in: {tech_stack[:800]}\n\n" |
| f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}" |
| f"Steps to follow (MUST call tools in order):\n\n" |
| f"Step 1: Call read_memory\n" |
| f" Action: read_memory\n" |
| f" Action Input: scout\n\n" |
| f"Step 2: Extract PACKAGE NAMES from the code, then call search_osv first:\n" |
| f" RULE: Package names come from require() or import statements ONLY.\n" |
| f" Example: require('express') -> search_osv('express')\n" |
| f" Example: require('lodash') -> search_osv('lodash')\n" |
| f" If search_osv returns count=0 for a package, fallback: search_nvd('<package>')\n" |
| f" FORBIDDEN search terms (these are syntax, NOT packages):\n" |
| f" - eval, exec, Function, innerHTML, script, html, document\n" |
| f" - const, let, var, function, class, async, await\n" |
| f" - req, res, app, user, input (these are variable names)\n" |
| f" If no require()/import found, output empty vulnerabilities list.\n\n" |
| f"Step 3: Reuse Intel Fusion evidence when available.\n" |
| f" - Do NOT re-query EPSS or OTX from Scout.\n" |
| f" - Keep Scout focused on package extraction, OSV/NVD discovery, and schema output.\n" |
| f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n" |
| f"Step 4: Assemble JSON report from REAL tool results only\n\n" |
| f"Step 5: Call write_memory\n" |
| f" Action: write_memory\n" |
| f" Action Input: scout|{{JSON report}}\n\n" |
| f"Step 6: Output JSON report as Final Answer\n\n" |
| f"FORBIDDEN:\n" |
| f"- Do NOT search NVD with: eval, html, innerHTML, script, const, function\n" |
| f"- Do NOT skip tool calls\n" |
| f"- Do NOT fabricate CVE IDs\n" |
| f"- write_memory MUST be called before Final Answer" |
| ) |
|
|
| return Task( |
| description=task_desc, |
| expected_output="Structured JSON threat intel report with CVEs from search_osv (primary) or search_nvd (fallback), ready for deterministic Intel Fusion evidence merge.", |
| agent=agent, |
| ) |
|
|
|
|
|
|
| def run_scout_pipeline( |
| tech_stack: str, |
| input_type: str = "pkg", |
| intel_fusion_result: dict | None = None, |
| ) -> dict: |
| """ |
| Execute full Scout Pipeline with Harness code-level guarantees. |
| |
| v5.0 (Phase 7.5) 新增: |
| - OSV Batch 預熱:LLM 啟動前批量查詢所有套件,結果預存快取 |
| → LLM 呼叫 search_osv() 時直接命中快取,無需等待 API |
| - Harness 2.5:改用 OSV 資料做 LLM 遺漏補充(取代 NVD cache inject) |
| - GHSA severity 維度:從 OSV database_specific.severity 直接解析 |
| |
| v3.7: input_type selects the correct Skill SOP for path-aware analysis. |
| |
| Args: |
| tech_stack: User input (e.g. "Django 4.2, Redis 7.0" or source code) |
| input_type: Path type (pkg/code/injection/config) |
| intel_fusion_result: Optional Layer 1 enrichment to merge into Scout output |
| |
| Returns: |
| dict: Parsed Scout JSON report |
| """ |
| import json |
| from crewai import Crew, Process |
| from config import mark_model_failed, get_current_model_name, rate_limiter |
| from tools.memory_tool import write_memory |
| |
|
|
| |
| |
| |
| |
| _osv_batch_cache: dict[str, list] = {} |
| if input_type == "pkg": |
| |
| _pkg_list = [item.strip().split()[0] for item in tech_stack.split(",") if item.strip()] |
| if _pkg_list: |
| try: |
| from tools.osv_tool import search_osv_batch |
| logger.info("[HARNESS 0] OSV Batch warmup: %s", _pkg_list) |
| _osv_batch_cache = search_osv_batch(_pkg_list) |
| logger.info("[HARNESS 0] OSV Batch warmup done: %d packages cached", |
| len(_osv_batch_cache)) |
| except Exception as _e: |
| logger.warning("[HARNESS 0] OSV Batch warmup failed (non-fatal): %s", _e) |
| |
|
|
| |
| MAX_LLM_RETRIES = 2 |
| excluded_models: list[str] = [] |
|
|
| for attempt in range(MAX_LLM_RETRIES + 1): |
| |
| agent = create_scout_agent(excluded_models, input_type=input_type) |
| task = create_scout_task(agent, tech_stack, intel_fusion_result=intel_fusion_result) |
| crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=True) |
|
|
| |
| logger.info("[START] Scout Pipeline: %s (attempt %d/%d)", tech_stack, attempt + 1, MAX_LLM_RETRIES + 1) |
| try: |
| from checkpoint import recorder as _cp |
| _current_model = get_current_model_name(agent.llm) |
| _cp.llm_call("scout", _current_model, "openrouter", f"attempt={attempt+1}") |
| except Exception: |
| _current_model = "unknown" |
| _t_llm = time.time() |
| try: |
| result = crew.kickoff() |
| try: |
| _cp.llm_result("scout", _current_model, "SUCCESS", |
| len(str(result)), int((time.time() - _t_llm) * 1000), |
| thinking=str(result)[:1000]) |
| except Exception: |
| pass |
| break |
| except Exception as e: |
| error_str = str(e) |
| if "429" in error_str and attempt < MAX_LLM_RETRIES: |
| |
| current_model = get_current_model_name(agent.llm) |
| mark_model_failed(current_model) |
| excluded_models.append(current_model) |
| |
| import re as _re |
| _m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE) |
| retry_after = float(_m.group(1)) if _m else 0.0 |
| logger.warning("[RETRY] Scout 429 on %s (attempt %d/%d), api_retry_after=%.0fs", |
| current_model, attempt + 1, MAX_LLM_RETRIES, retry_after) |
| try: |
| _cp.llm_retry("scout", current_model, error_str[:200], |
| attempt + 1, "next_in_waterfall") |
| except Exception: |
| pass |
| rate_limiter.on_429(retry_after=retry_after, caller="scout") |
| continue |
|
|
| try: |
| _cp.llm_error("scout", _current_model, error_str[:300]) |
| except Exception: |
| pass |
| raise |
|
|
| result_str = str(result).strip() |
|
|
| |
| json_str = result_str |
| if "```json" in json_str: |
| json_str = json_str.split("```json")[1].split("```")[0].strip() |
| elif "```" in json_str: |
| parts = json_str.split("```") |
| if len(parts) >= 3: |
| json_str = parts[1].strip() |
|
|
| try: |
| output = json.loads(json_str) |
| except json.JSONDecodeError: |
| logger.error("[FAIL] Agent output is not valid JSON: %s", result_str[:200]) |
| raise ValueError(f"Scout Agent output is not valid JSON: {result_str[:200]}") |
|
|
| |
| from datetime import datetime, timezone |
|
|
| if "scan_id" not in output: |
| logger.warning("[WARN] Output missing required field: scan_id") |
| output["scan_id"] = f"scan_{int(time.time())}" |
| if "timestamp" not in output: |
| logger.warning("[WARN] Output missing required field: timestamp") |
| output["timestamp"] = datetime.now(timezone.utc).isoformat() |
| if "tech_stack" not in output: |
| logger.warning("[WARN] Output missing required field: tech_stack") |
| output["tech_stack"] = [ |
| item.strip().lower() |
| for item in str(tech_stack).split(",") |
| if item.strip() |
| ] |
| if "vulnerabilities" not in output: |
| logger.warning("[WARN] Output missing required field: vulnerabilities") |
| output["vulnerabilities"] = [] |
| if "summary" not in output: |
| logger.warning("[WARN] Output missing required field: summary") |
| output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0} |
|
|
| for vuln in output.get("vulnerabilities", []): |
| severity = str(vuln.get("severity", "MEDIUM")).upper() |
| if severity == "MODERATE": |
| severity = "MEDIUM" |
| elif severity not in {"CRITICAL", "HIGH", "MEDIUM", "LOW"}: |
| severity = _severity_from_cvss(float(vuln.get("cvss_score") or 0.0)) |
| vuln["severity"] = severity |
|
|
| |
| memory_path = os.path.join(PROJECT_ROOT, "memory", "scout_memory.json") |
| need_write = False |
| if not os.path.exists(memory_path): |
| need_write = True |
| else: |
| try: |
| with open(memory_path, "r", encoding="utf-8") as f: |
| content = f.read().strip() |
| if not content or content == "{}": |
| need_write = True |
| except (IOError, json.JSONDecodeError): |
| need_write = True |
|
|
| if need_write: |
| logger.warning("[WARN] Agent did not call write_memory -- code forcing write (Harness)") |
| write_result = write_memory.run(agent_name="scout", data=json.dumps(output, ensure_ascii=False)) |
| logger.info("[OK] Forced memory write: %s", write_result) |
|
|
| |
| required = ["scan_id", "timestamp", "tech_stack", "vulnerabilities", "summary"] |
| for field in required: |
| if field not in output: |
| logger.warning("[WARN] Output missing required field: %s", field) |
| if field == "vulnerabilities": |
| output["vulnerabilities"] = [] |
| elif field == "summary": |
| output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0} |
|
|
| |
| |
| |
| if not output.get("vulnerabilities"): |
| injected = [] |
| |
| if _osv_batch_cache: |
| for pkg_name, vuln_list in _osv_batch_cache.items(): |
| for v in vuln_list: |
| cve_id = v.get("cve_id", "") |
| if not cve_id.startswith(("CVE-", "GHSA-")): |
| continue |
| ghsa_sev = _extract_ghsa_severity_from_osv(v) |
| injected.append({ |
| "cve_id": cve_id, |
| "package": v.get("package", pkg_name), |
| "cvss_score": v.get("cvss_score", 0.0), |
| "severity": v.get("severity", "MEDIUM"), |
| "description": v.get("description", "")[:300], |
| "published": v.get("published", ""), |
| "is_new": True, |
| "in_cisa_kev": False, |
| "has_public_exploit": False, |
| "source": "OSV", |
| "osv_id": v.get("osv_id", ""), |
| |
| "ghsa_severity": ghsa_sev, |
| }) |
| else: |
| |
| from tools.nvd_tool import _search_nvd_impl |
| for item in (tech_stack or "").split(","): |
| pkg = item.strip().split()[0].lower() |
| if not pkg: |
| continue |
| try: |
| cached_result = json.loads(_search_nvd_impl(pkg)) |
| for v in cached_result.get("vulnerabilities", []): |
| cve_id = v.get("cve_id") or v.get("id", "") |
| if not cve_id.startswith("CVE-"): |
| continue |
| injected.append({ |
| "cve_id": cve_id, |
| "package": v.get("package", pkg), |
| "cvss_score": v.get("cvss_score", 0.0), |
| "severity": v.get("severity", "MEDIUM"), |
| "description": v.get("description", "")[:300], |
| "published": v.get("published_date", ""), |
| "is_new": True, |
| "in_cisa_kev": v.get("in_cisa_kev", False), |
| "has_public_exploit": v.get("has_public_exploit", False), |
| }) |
| except Exception as e: |
| logger.warning("[WARN] NVD cache inject failed for %s: %s", pkg, e) |
|
|
| if injected: |
| output["vulnerabilities"] = injected |
| logger.warning( |
| "[HARNESS 2.5] LLM output 0 CVEs, injected %d CVEs from %s for tech_stack=%s", |
| len(injected), |
| "OSV batch cache" if _osv_batch_cache else "NVD cache", |
| tech_stack[:60] |
| ) |
|
|
| |
| |
| |
| from tools.nvd_tool import _search_nvd_impl |
| real_cves = set() |
| cve_to_package = {} |
|
|
| |
| packages_to_check = set() |
| for vuln in output.get("vulnerabilities", []): |
| pkg = vuln.get("package", "").lower().strip() |
| if pkg: |
| packages_to_check.add(pkg) |
| |
| for item in tech_stack.split(","): |
| pkg_name = item.strip().split()[0].lower() |
| if pkg_name: |
| packages_to_check.add(pkg_name) |
|
|
| |
| for pkg_name, vuln_list in _osv_batch_cache.items(): |
| for v in vuln_list: |
| cve_id = v.get("cve_id", "") |
| if cve_id.startswith(("CVE-", "GHSA-")): |
| real_cves.add(cve_id) |
| cve_to_package[cve_id] = pkg_name |
|
|
| |
| for pkg in packages_to_check: |
| if any(cve_to_package.get(c) == pkg for c in real_cves): |
| continue |
| try: |
| import tools.nvd_tool as nvd_mod |
| original_page_size = nvd_mod.RESULTS_PER_PAGE |
| nvd_mod.RESULTS_PER_PAGE = 100 |
| try: |
| nvd_result = json.loads(_search_nvd_impl(pkg)) |
| finally: |
| nvd_mod.RESULTS_PER_PAGE = original_page_size |
| for v in nvd_result.get("vulnerabilities", []): |
| cve_id = v["cve_id"] |
| |
| try: |
| cve_year = int(cve_id.split("-")[1]) |
| if cve_year < 2005: |
| logger.debug("[FILTER] NVD verification skip ancient CVE: %s", cve_id) |
| continue |
| except (IndexError, ValueError): |
| pass |
| real_cves.add(cve_id) |
| cve_to_package[cve_id] = pkg |
| except Exception as e: |
| logger.warning("[WARN] CVE verification NVD query failed (%s): %s", pkg, e) |
|
|
| if real_cves: |
| original_count = len(output.get("vulnerabilities", [])) |
| verified_vulns = [] |
| suspect_vulns = [] |
| for vuln in output.get("vulnerabilities", []): |
| if vuln.get("cve_id") in real_cves: |
| verified_vulns.append(vuln) |
| else: |
| suspect_vulns.append(vuln) |
|
|
| |
| hallucinated = [] |
| if suspect_vulns: |
| import re |
| for vuln in suspect_vulns: |
| cve_id = vuln.get("cve_id", "") |
| if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id): |
| hallucinated.append(cve_id) |
| continue |
| try: |
| resp = requests.get( |
| "https://services.nvd.nist.gov/rest/json/cves/2.0", |
| params={"cveId": cve_id}, |
| headers={"apiKey": os.getenv("NVD_API_KEY", "")}, |
| timeout=10, |
| ) |
| if resp.status_code == 200: |
| data = resp.json() |
| if data.get("totalResults", 0) > 0: |
| logger.info("[OK] CVE exact verification passed: %s", cve_id) |
| verified_vulns.append(vuln) |
| |
| if not vuln.get("package"): |
| desc = data["vulnerabilities"][0]["cve"]["descriptions"][0]["value"].lower() |
| for pkg in packages_to_check: |
| if pkg in desc: |
| vuln["package"] = pkg |
| cve_to_package[cve_id] = pkg |
| break |
| continue |
| |
| hallucinated.append(cve_id) |
| except Exception: |
| |
| logger.warning("[WARN] NVD verify unreachable for %s, keeping conservatively", cve_id) |
| verified_vulns.append(vuln) |
|
|
|
|
| if hallucinated: |
| logger.warning( |
| "[ALERT] Detected %d hallucinated CVEs, removed: %s", |
| len(hallucinated), hallucinated |
| ) |
| output["vulnerabilities"] = verified_vulns |
| |
| output["summary"] = { |
| "total": len(verified_vulns), |
| "new_since_last_scan": sum(1 for v in verified_vulns if v.get("is_new")), |
| "critical": sum(1 for v in verified_vulns if v.get("severity") == "CRITICAL"), |
| "high": sum(1 for v in verified_vulns if v.get("severity") == "HIGH"), |
| "medium": sum(1 for v in verified_vulns if v.get("severity") == "MEDIUM"), |
| "low": sum(1 for v in verified_vulns if v.get("severity") == "LOW"), |
| } |
| logger.info( |
| "[OK] CVE verification result: %d -> %d (removed %d hallucinated)", |
| original_count, len(verified_vulns), len(hallucinated) |
| ) |
| else: |
| logger.info("[OK] All %d CVEs passed verification", original_count) |
| else: |
| logger.warning("[WARN] Cannot build real CVE list, skipping verification") |
|
|
| |
| |
| patched_count = 0 |
| for vuln in output.get("vulnerabilities", []): |
| if not vuln.get("package"): |
| cve_id = vuln.get("cve_id", "") |
| if cve_id in cve_to_package: |
| vuln["package"] = cve_to_package[cve_id] |
| patched_count += 1 |
| else: |
| |
| desc = vuln.get("description", "").lower() |
| for pkg in packages_to_check: |
| if pkg in desc: |
| vuln["package"] = pkg |
| patched_count += 1 |
| break |
| else: |
| vuln["package"] = "unknown" |
| patched_count += 1 |
| if patched_count: |
| logger.info("[OK] Patched %d CVE package fields", patched_count) |
|
|
| |
| output = _merge_intel_fusion_evidence(output, intel_fusion_result) |
|
|
| |
| |
| try: |
| mem_data = {} |
| if os.path.exists(memory_path): |
| with open(memory_path, "r", encoding="utf-8") as f: |
| mem_data = json.load(f) |
|
|
| |
| historical_cves = set() |
| |
| if "vulnerabilities" in mem_data: |
| for v in mem_data.get("vulnerabilities", []): |
| historical_cves.add(v.get("cve_id", "")) |
| |
| elif "latest" in mem_data: |
| for v in mem_data.get("latest", {}).get("vulnerabilities", []): |
| historical_cves.add(v.get("cve_id", "")) |
|
|
| output = _reconcile_is_new_flags(output, historical_cves) |
| corrected = output.get("_is_new_corrected", 0) |
| if corrected: |
| logger.info("[OK] Corrected %d CVE is_new flags", corrected) |
| except Exception as e: |
| logger.warning("[WARN] is_new correction failed: %s", e) |
|
|
| |
| |
| |
| |
| CVE_YEAR_MIN = 2005 |
| ancient_removed = [] |
| fresh_vulns = [] |
| for vuln in output.get("vulnerabilities", []): |
| cve_id = vuln.get("cve_id", "") |
| if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"): |
| fresh_vulns.append(vuln) |
| continue |
| try: |
| cve_year = int(cve_id.split("-")[1]) |
| if cve_year < CVE_YEAR_MIN: |
| ancient_removed.append(cve_id) |
| logger.warning( |
| "[HARNESS 3.5] Ancient CVE removed (year=%d < %d): %s", |
| cve_year, CVE_YEAR_MIN, cve_id |
| ) |
| else: |
| fresh_vulns.append(vuln) |
| except (IndexError, ValueError): |
| fresh_vulns.append(vuln) |
|
|
| if ancient_removed: |
| output["vulnerabilities"] = fresh_vulns |
| output["ancient_cves_removed"] = ancient_removed |
| logger.warning( |
| "[HARNESS 3.5] Removed %d ancient CVEs (< %d): %s", |
| len(ancient_removed), CVE_YEAR_MIN, ancient_removed |
| ) |
| |
|
|
| |
| vulns = output.get("vulnerabilities", []) |
| output["summary"] = { |
| "total": len(vulns), |
| "new_since_last_scan": sum(1 for v in vulns if v.get("is_new")), |
| "critical": sum(1 for v in vulns if v.get("severity") == "CRITICAL"), |
| "high": sum(1 for v in vulns if v.get("severity") == "HIGH"), |
| "medium": sum(1 for v in vulns if v.get("severity") == "MEDIUM"), |
| "low": sum(1 for v in vulns if v.get("severity") == "LOW"), |
| } |
|
|
| vuln_count = output["summary"]["total"] |
| new_count = output["summary"]["new_since_last_scan"] |
| logger.info( |
| "[OK] Scout Pipeline complete: %d CVEs, %d new", vuln_count, new_count |
| ) |
|
|
| return output |
|
|