File size: 46,501 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 | # agents/scout.py
# 功能:Scout Agent 定義 — 威脅情報偵察員
# Harness 支柱:Constraints(系統憲法 + Skill SOP)+ Observability(verbose=True)
# 擁有者:成員 B(Scout Agent Pipeline)
#
# 使用方式:
# from agents.scout import create_scout_agent
#
# 架構定位:
# Pipeline 的第一環 — 收集情報 → 輸出 JSON → Analyst 接收
# Agent = Tool(手)+ Skill(腦)+ Constitution(法)
import os
import logging
import time
from typing import Any, TYPE_CHECKING
import requests
from config import get_llm
# LLM 與 Tool 皆延遲初始化,避免純 helper / 測試 import 路徑觸發 CrewAI 副作用。
logger = logging.getLogger("ThreatHunter")
if TYPE_CHECKING:
from crewai import Agent
# ══════════════════════════════════════════════════════════════
# Skill 載入(Phase 4D:使用 SkillLoader 熱載入系統)
# ══════════════════════════════════════════════════════════════
# ======================================================================
# v3.7: Path-Aware Skill Map
# 每種 input_type 對應一份 Skill SOP
# ======================================================================
SKILL_MAP: dict[str, str] = {
"pkg": "threat_intel.md", # Path A: package CVE scan
"code": "source_code_audit.md", # Path B-code: source code review
"injection": "ai_security_audit.md", # Path B-inject: AI security
"config": "config_audit.md", # Path C: config file audit
}
# 專案根目錄(agents/ 的上一層)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SKILL_PATH = os.path.join(PROJECT_ROOT, "skills", "threat_intel.md") # default fallback
# Phase 4D: 使用 SkillLoader 熱載入系統
try:
from skills.skill_loader import skill_loader as _skill_loader
_SKILL_LOADER_AVAILABLE = True
logger.info("[Scout] Phase 4D: SkillLoader 啟用 ✓")
except ImportError:
_skill_loader = None
_SKILL_LOADER_AVAILABLE = False
logger.warning("[Scout] Phase 4D: SkillLoader 不可用,使用內建 _load_skill")
# ── GHSA Severity 解析輔助(Phase 7.5)──────────────────────
# OSV vuln_dict 中的 database_specific.severity 存有 GitHub Advisory 嚴重度
# 直接解析此欄位填補 Intel Fusion 的 GHSA 維度(10% 權重)
# 來源:https://docs.github.com/en/graphql/reference/enums#securityadvisoryidentifiertype
def _extract_ghsa_severity_from_osv(vuln_dict: dict) -> str:
"""
從 OSV vuln_dict 解析 GHSA severity。
OSV 的 database_specific 欄位:
{
"severity": "HIGH", ← GitHub Advisory severity
"cvss": {...}
}
また、vuln["osv_id"].startswith("GHSA-") 也是 GHSA 直接來源。
Returns:
"CRITICAL" | "HIGH" | "MODERATE" | "MEDIUM" | "LOW" | "UNKNOWN"
"""
# 優先從已解析的欄位取(如果 _parse_osv_vuln 已填入)
if vuln_dict.get("ghsa_severity"):
return vuln_dict["ghsa_severity"]
# 從原始 severity 欄位取
sev = vuln_dict.get("severity", "")
if sev in ("CRITICAL", "HIGH", "MODERATE", "MEDIUM", "LOW"):
return sev
return "UNKNOWN"
# ─────────────────────────────────────────────────────────────
def _severity_from_cvss(cvss_score: float) -> str:
"""將 CVSS 分數轉成標準 severity 字串。"""
if cvss_score >= 9.0:
return "CRITICAL"
if cvss_score >= 7.0:
return "HIGH"
if cvss_score >= 4.0:
return "MEDIUM"
return "LOW"
def _summarize_intel_fusion_for_task(intel_fusion_result: dict | None) -> str:
"""將 Intel Fusion 結果壓縮成給 Scout 任務描述使用的摘要。"""
if not intel_fusion_result:
return ""
fusion_results = intel_fusion_result.get("fusion_results", [])
if not fusion_results:
return ""
lines: list[str] = [
"Layer 1 Intel Fusion evidence is available.",
"Reuse this enrichment instead of re-querying EPSS or OTX.",
"Use OSV/NVD only for CVE discovery, verification, or missing-package fallback.",
"Intel Fusion evidence:",
]
for fusion in fusion_results[:8]:
dims = fusion.get("dimension_scores", {})
cve_id = fusion.get("cve_id", "UNKNOWN")
score = fusion.get("composite_score", "n/a")
kev = bool(dims.get("kev", False))
epss = dims.get("epss", "n/a")
ghsa = dims.get("ghsa_severity", "UNKNOWN")
otx = dims.get("otx_threat", "unknown")
lines.append(
f"- {cve_id}: composite={score}, kev={kev}, epss={epss}, ghsa={ghsa}, otx={otx}"
)
return "\n".join(lines)
def _merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]:
"""把 Intel Fusion 的富化證據併入 Scout 最終漏洞清單。"""
if not intel_fusion_result:
return output
fusion_results = intel_fusion_result.get("fusion_results", [])
if not fusion_results:
return output
representative_fusions: list[dict[str, Any]] = []
fusion_by_cve: dict[str, dict[str, Any]] = {}
for fusion in fusion_results:
cve_id = str(fusion.get("cve_id", "")).strip()
is_representative = (
fusion.get("evidence_type") == "representative_cve"
or bool(fusion.get("must_not_enter_package_actions"))
or bool(fusion.get("not_directly_observed"))
)
if is_representative:
representative_fusions.append(fusion)
continue
if cve_id:
fusion_by_cve[cve_id] = fusion
if not fusion_by_cve:
if representative_fusions:
output["_intel_fusion_applied"] = {
"merged_existing": 0,
"injected_missing": 0,
"fusion_count": 0,
"representative_cves_skipped": len(representative_fusions),
}
output["representative_cve_evidence"] = representative_fusions
return output
vulnerabilities = output.setdefault("vulnerabilities", [])
seen_cves = {str(v.get("cve_id", "")).strip() for v in vulnerabilities}
merged_count = 0
injected_count = 0
for vuln in vulnerabilities:
cve_id = str(vuln.get("cve_id", "")).strip()
fusion = fusion_by_cve.get(cve_id)
if not fusion:
continue
dims = fusion.get("dimension_scores", {})
vuln["composite_score"] = fusion.get("composite_score", vuln.get("composite_score"))
vuln["intel_confidence"] = fusion.get("confidence", vuln.get("intel_confidence", ""))
vuln["dimensions_used"] = fusion.get("dimensions_used", vuln.get("dimensions_used", []))
vuln["weights_used"] = fusion.get("weights_used", vuln.get("weights_used", {}))
vuln["evidence_type"] = fusion.get("evidence_type", vuln.get("evidence_type", "direct_cve"))
vuln["not_directly_observed"] = bool(fusion.get("not_directly_observed", False))
vuln["must_not_enter_package_actions"] = bool(fusion.get("must_not_enter_package_actions", False))
if dims.get("epss") is not None:
vuln["epss_score"] = dims.get("epss")
if "kev" in dims:
vuln["in_cisa_kev"] = bool(dims.get("kev"))
if dims.get("ghsa_severity"):
vuln["ghsa_severity"] = dims.get("ghsa_severity")
if dims.get("otx_threat"):
vuln["otx_threat"] = dims.get("otx_threat")
merged_count += 1
for cve_id, fusion in fusion_by_cve.items():
if cve_id in seen_cves:
continue
dims = fusion.get("dimension_scores", {})
cvss_score = float(dims.get("cvss", 0.0) or 0.0)
vulnerabilities.append({
"cve_id": cve_id,
"package": fusion.get("package", "unknown"),
"cvss_score": cvss_score,
"severity": _severity_from_cvss(cvss_score),
"description": fusion.get("description", ""),
"is_new": True,
"source": "INTEL_FUSION",
"evidence_type": fusion.get("evidence_type", "package_cve"),
"not_directly_observed": bool(fusion.get("not_directly_observed", False)),
"must_not_enter_package_actions": bool(fusion.get("must_not_enter_package_actions", False)),
"composite_score": fusion.get("composite_score", 0.0),
"intel_confidence": fusion.get("confidence", ""),
"dimensions_used": fusion.get("dimensions_used", []),
"weights_used": fusion.get("weights_used", {}),
"epss_score": dims.get("epss"),
"in_cisa_kev": bool(dims.get("kev", False)),
"ghsa_severity": dims.get("ghsa_severity", "UNKNOWN"),
"otx_threat": dims.get("otx_threat", "unknown"),
})
injected_count += 1
output["_intel_fusion_applied"] = {
"merged_existing": merged_count,
"injected_missing": injected_count,
"fusion_count": len(fusion_by_cve),
"representative_cves_skipped": len(representative_fusions),
}
if representative_fusions:
output["representative_cve_evidence"] = representative_fusions
return output
def merge_intel_fusion_evidence(output: dict[str, Any], intel_fusion_result: dict | None) -> dict[str, Any]:
"""Public wrapper for post-discovery Intel Fusion enrichment."""
return _merge_intel_fusion_evidence(output, intel_fusion_result)
def _reconcile_is_new_flags(output: dict[str, Any], historical_cves: set[str]) -> dict[str, Any]:
"""依照歷史記憶重新校正所有漏洞的 is_new 旗標。"""
corrected = 0
for vuln in output.get("vulnerabilities", []):
cve_id = vuln.get("cve_id", "")
expected_is_new = cve_id not in historical_cves
if vuln.get("is_new") != expected_is_new:
vuln["is_new"] = expected_is_new
corrected += 1
summary = output.setdefault("summary", {})
summary["new_since_last_scan"] = sum(
1 for vuln in output.get("vulnerabilities", []) if vuln.get("is_new")
)
output["_is_new_corrected"] = corrected
return output
def _load_skill(skill_filename: str = "threat_intel.md") -> str:
"""
Load Skill SOP file by filename (v3.7 path-aware + Phase 4D 熱載入).
Phase 4D: 優先使用 SkillLoader 單例(支援熱載入、mtime 驗證)。
Fallback: 直接從磁碟讀取(原有實作,確保向後相容)。
"""
# Phase 4D: SkillLoader 熱載入路徑
if _SKILL_LOADER_AVAILABLE and _skill_loader is not None:
try:
return _skill_loader.load_skill(skill_filename)
except Exception as e:
logger.warning("[Scout] SkillLoader 失敗,回退直接讀取: %s", e)
# Fallback: 直接從磁碟讀取(原有實作)
skill_path = os.path.join(PROJECT_ROOT, "skills", skill_filename)
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
if os.path.exists(skill_path):
with open(skill_path, "r", encoding=encoding) as f:
content = f.read().strip()
if content:
logger.info("[OK] Skill loaded: %s (%d chars)", skill_path, len(content))
return content
except (IOError, UnicodeDecodeError):
continue
logger.warning("[WARN] Skill file not found, using fallback: %s", skill_path)
return _FALLBACK_SKILL
# 內嵌精簡版 Skill(Graceful Degradation — Skill 檔案遺失時的保底)
_FALLBACK_SKILL = """
# Skill: Threat Intelligence Collection (minimal fallback)
## SOP
1. First call read_memory(agent_name="scout") to read history.
2. Query vulnerabilities for each technology package with search_nvd.
3. For CVEs with CVSS >= 7.0, call search_otx for threat intelligence.
4. Compare with history and mark is_new.
5. Write the result with write_memory.
6. Output pure JSON with no extra text.
## Quality Red Lines
- CVE IDs must come from search_nvd. Do not fabricate them.
- CVSS scores must come from the NVD API.
- Output pure JSON only.
""".strip()
# ══════════════════════════════════════════════════════════════
# 系統憲法(Constraints 支柱 — 層級 A)
# ══════════════════════════════════════════════════════════════
CONSTITUTION = """
## System Constitution - Non-Negotiable Rules
1. **CVE source constraint**: every CVE ID must come from search_nvd tool results.
Never fabricate, infer, or recall CVE IDs from memory.
Violating this rule creates hallucinated output and causes the Sentinel fact-check to fail the pipeline.
2. **CVSS source constraint**: every CVSS score must come from the NVD API result.
Do not estimate, adjust, or round CVSS values yourself.
3. **Output format constraint**: your Final Answer must be JSON and only JSON.
Do not add explanations, headings, markdown, or natural-language text before or after the JSON.
4. **Tool-use constraint**: you must query vulnerabilities through the search_nvd tool.
Do not skip tool calls and answer from training data; training data may be outdated.
5. **Honesty constraint**: if a package has no results, report count: 0 honestly.
Do not invent vulnerabilities to make the report look useful.
6. **Memory-read constraint**: the first step after startup must be read_memory.
Sentinel Behavior Monitor checks this behavior.
7. **Loop constraint**: run at most 15 ReAct iterations.
If all packages are queried before 15 iterations, output the result immediately and do no extra work.
8. **Memory-write constraint (most important)**: before giving the Final Answer,
you must call write_memory to store the complete report.
Order: query all packages -> assemble JSON -> call write_memory -> confirm success -> then provide Final Answer.
If you are about to answer before calling write_memory, stop and call write_memory first.
""".strip()
# ══════════════════════════════════════════════════════════════
# Agent 工廠函式
# ══════════════════════════════════════════════════════════════
def create_scout_agent(
excluded_models: list[str] | None = None,
input_type: str = "pkg",
) -> "Agent":
"""
Build Scout Agent with Path-Aware Skill SOP (v3.7).
input_type selects which Skill file to embed in backstory:
pkg -> threat_intel.md (NVD CVE scan for packages)
code -> source_code_audit.md (OWASP Top10 + CWE for source code)
injection -> ai_security_audit.md (OWASP LLM Top10 + MITRE ATLAS)
config -> config_audit.md (CIS Benchmark for config files)
Args:
excluded_models: Models to skip (429-rate-limited)
input_type: Path type from frontend detector
Returns:
CrewAI Agent instance ready for Task and Crew
"""
skill_filename = SKILL_MAP.get(input_type, "threat_intel.md")
skill_content = _load_skill(skill_filename)
logger.info("[Scout] Path=%s -> Skill=%s", input_type, skill_filename)
# Goal adapts to the input path
_GOAL_MAP = {
"pkg": "Collect known CVEs for the given package list from OSV/NVD, merge Intel Fusion evidence when available, compare with history, and output structured JSON.",
"code": "Audit source code for OWASP Top10 / CWE vulnerabilities; extract package imports and scan NVD; output structured JSON.",
"injection": "Classify and assess AI security threats (OWASP LLM Top10 / MITRE ATLAS) in the given input; output structured JSON with no CVE hallucination.",
"config": "Audit the given configuration file against CIS Benchmarks for misconfigurations and hardcoded secrets; output structured JSON.",
}
agent_goal = _GOAL_MAP.get(input_type, _GOAL_MAP["pkg"])
backstory = f"""You are an expert security analyst specialized in identifying software and AI system vulnerabilities.
You are rigorous, precise, and never fabricate data.
{CONSTITUTION}
---
## Analysis Methodology (Skill SOP)
You MUST follow this Standard Operating Procedure for the current scan path ({input_type}):
{skill_content}
"""
# 延遲匯入 CrewAI,避免純 helper / 測試路徑在 import 階段觸發本機儲存副作用。
from crewai import Agent
from tools.nvd_tool import search_nvd
from tools.osv_tool import search_osv
from tools.memory_tool import read_memory, write_memory, history_search
llm = get_llm(exclude_models=excluded_models)
scout = Agent(
role="Threat Intelligence Scout",
goal=agent_goal,
backstory=backstory,
tools=[
search_osv, # 主力:OSV.dev ecosystem-aware 精確查詢(不會返回無關 1999 CVE)
search_nvd, # 備用:NVD CPE 查詢(OSV 無結果時使用)
read_memory, write_memory, history_search,
],
llm=llm,
verbose=True,
max_iter=15, # SOP: 最多 15 輪 ReAct(包含 6 步驟程:read_memory+search_nvd+OTX+write_memory)
allow_delegation=False,
)
logger.info(
"[OK] Scout Agent ready | input_type=%s | skill=%s | llm=%s",
input_type,
skill_filename,
llm.model if hasattr(llm, 'model') else 'unknown',
)
return scout
# ══════════════════════════════════════════════════════════════
# CrewAI Task 工廠函式(便利函式,供 main.py 使用)
# ══════════════════════════════════════════════════════════════
def create_scout_task(
agent,
tech_stack: str,
intel_fusion_result: dict | None = None,
):
"""
v3.4: Scout Task - package-aware mode.
When tech_stack is a short comma-separated package list (from PackageExtractor),
explicitly enumerate each package for the LLM to query via search_nvd.
"""
from crewai import Task
intel_summary = _summarize_intel_fusion_for_task(intel_fusion_result)
# Detect if input is a clean package list or raw code/long text
is_package_list = (
len(tech_stack) < 300
and "\n" not in tech_stack
and "def " not in tech_stack
and "import " not in tech_stack
)
if is_package_list:
packages = [p.strip() for p in tech_stack.split(",") if p.strip()]
packages_display = "\n".join(f" {i+1}. {pkg}" for i, pkg in enumerate(packages))
_osv_cmd_lines = "\n".join(f" - search_osv('{pkg}')" for pkg in packages[:8])
task_desc = (
f"You are analyzing security vulnerabilities for packages extracted from source code.\n\n"
f"Package list to scan:\n{packages_display}\n\n"
f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}"
f"Steps to follow (MUST call tools in order):\n\n"
f"Step 1: Call read_memory\n"
f" Action: read_memory\n"
f" Action Input: scout\n\n"
f"Step 2: For EACH package, call search_osv first (more precise), search_nvd as fallback:\n"
f"{_osv_cmd_lines}\n"
f" If search_osv returns count=0, then try: search_nvd('<package>')\n\n"
f"Step 3: Reuse Intel Fusion evidence when available.\n"
f" - Do NOT re-query EPSS or OTX from Scout.\n"
f" - Prefer Intel Fusion values for composite_score, KEV, EPSS, GHSA, and OTX fields.\n"
f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n"
f"Step 4: Assemble JSON report from REAL tool results only\n"
f" - CVE IDs MUST come from search_osv or search_nvd output\n"
f" - Compare with read_memory history, mark is_new\n\n"
f"Step 5: Call write_memory to save results\n"
f" Action: write_memory\n"
f" Action Input: scout|{{JSON report}}\n\n"
f"Step 6: Output JSON report as Final Answer\n\n"
f"FORBIDDEN:\n"
f"- Do NOT skip tool calls\n"
f"- Do NOT fabricate CVE IDs\n"
f"- Do NOT use backstory examples (they are fake)\n"
f"- write_memory MUST be called before Final Answer"
)
else:
task_desc = (
f"You are analyzing security vulnerabilities in: {tech_stack[:800]}\n\n"
f"{intel_summary + chr(10) + chr(10) if intel_summary else ''}"
f"Steps to follow (MUST call tools in order):\n\n"
f"Step 1: Call read_memory\n"
f" Action: read_memory\n"
f" Action Input: scout\n\n"
f"Step 2: Extract PACKAGE NAMES from the code, then call search_osv first:\n"
f" RULE: Package names come from require() or import statements ONLY.\n"
f" Example: require('express') -> search_osv('express')\n"
f" Example: require('lodash') -> search_osv('lodash')\n"
f" If search_osv returns count=0 for a package, fallback: search_nvd('<package>')\n"
f" FORBIDDEN search terms (these are syntax, NOT packages):\n"
f" - eval, exec, Function, innerHTML, script, html, document\n"
f" - const, let, var, function, class, async, await\n"
f" - req, res, app, user, input (these are variable names)\n"
f" If no require()/import found, output empty vulnerabilities list.\n\n"
f"Step 3: Reuse Intel Fusion evidence when available.\n"
f" - Do NOT re-query EPSS or OTX from Scout.\n"
f" - Keep Scout focused on package extraction, OSV/NVD discovery, and schema output.\n"
f" - If Intel Fusion has no matching CVE, continue with OSV/NVD-only evidence.\n\n"
f"Step 4: Assemble JSON report from REAL tool results only\n\n"
f"Step 5: Call write_memory\n"
f" Action: write_memory\n"
f" Action Input: scout|{{JSON report}}\n\n"
f"Step 6: Output JSON report as Final Answer\n\n"
f"FORBIDDEN:\n"
f"- Do NOT search NVD with: eval, html, innerHTML, script, const, function\n"
f"- Do NOT skip tool calls\n"
f"- Do NOT fabricate CVE IDs\n"
f"- write_memory MUST be called before Final Answer"
)
return Task(
description=task_desc,
expected_output="Structured JSON threat intel report with CVEs from search_osv (primary) or search_nvd (fallback), ready for deterministic Intel Fusion evidence merge.",
agent=agent,
)
def run_scout_pipeline(
tech_stack: str,
input_type: str = "pkg",
intel_fusion_result: dict | None = None,
) -> dict:
"""
Execute full Scout Pipeline with Harness code-level guarantees.
v5.0 (Phase 7.5) 新增:
- OSV Batch 預熱:LLM 啟動前批量查詢所有套件,結果預存快取
→ LLM 呼叫 search_osv() 時直接命中快取,無需等待 API
- Harness 2.5:改用 OSV 資料做 LLM 遺漏補充(取代 NVD cache inject)
- GHSA severity 維度:從 OSV database_specific.severity 直接解析
v3.7: input_type selects the correct Skill SOP for path-aware analysis.
Args:
tech_stack: User input (e.g. "Django 4.2, Redis 7.0" or source code)
input_type: Path type (pkg/code/injection/config)
intel_fusion_result: Optional Layer 1 enrichment to merge into Scout output
Returns:
dict: Parsed Scout JSON report
"""
import json
from crewai import Crew, Process
from config import mark_model_failed, get_current_model_name, rate_limiter
from tools.memory_tool import write_memory
# 新版 memory_tool 無 _write_memory_impl,使用公開 Tool 介面
# ── Harness 0:OSV Batch 預熱快取(在 LLM 之前執行)───────────
# 理由:LLM 呼叫 search_osv() 是一個一個套件查,無法自行批量。
# 在 Code-level Harness 層先做 Batch 查詢,結果存入快取,
# Agent 呼叫 search_osv() 時直接命中快取,延遲從 N×API_RTT → 1×API_RTT。
_osv_batch_cache: dict[str, list] = {} # pkg → [vuln_dict, ...]
if input_type == "pkg":
# 從 tech_stack 提取套件名稱(去版本號)
_pkg_list = [item.strip().split()[0] for item in tech_stack.split(",") if item.strip()]
if _pkg_list:
try:
from tools.osv_tool import search_osv_batch
logger.info("[HARNESS 0] OSV Batch warmup: %s", _pkg_list)
_osv_batch_cache = search_osv_batch(_pkg_list)
logger.info("[HARNESS 0] OSV Batch warmup done: %d packages cached",
len(_osv_batch_cache))
except Exception as _e:
logger.warning("[HARNESS 0] OSV Batch warmup failed (non-fatal): %s", _e)
# ────────────────────────────────────────────────────────────
# 429 自動輪替:最多重試 MAX_LLM_RETRIES 次(每次切換模型)
MAX_LLM_RETRIES = 2
excluded_models: list[str] = []
for attempt in range(MAX_LLM_RETRIES + 1):
# v3.7: pass input_type so agent loads the correct Skill SOP
agent = create_scout_agent(excluded_models, input_type=input_type)
task = create_scout_task(agent, tech_stack, intel_fusion_result=intel_fusion_result)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=True)
# 執行 Agent
logger.info("[START] Scout Pipeline: %s (attempt %d/%d)", tech_stack, attempt + 1, MAX_LLM_RETRIES + 1)
try:
from checkpoint import recorder as _cp
_current_model = get_current_model_name(agent.llm)
_cp.llm_call("scout", _current_model, "openrouter", f"attempt={attempt+1}")
except Exception:
_current_model = "unknown"
_t_llm = time.time()
try:
result = crew.kickoff()
try:
_cp.llm_result("scout", _current_model, "SUCCESS",
len(str(result)), int((time.time() - _t_llm) * 1000),
thinking=str(result)[:1000])
except Exception:
pass
break # 成功則跳出重試迴圈
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_LLM_RETRIES:
# 標記當前模型為冷卻中,下次迴圈會選擇其他模型
current_model = get_current_model_name(agent.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
# 解析 API 回傳的 retry_after 秒數
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[RETRY] Scout 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_LLM_RETRIES, retry_after)
try:
_cp.llm_retry("scout", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
rate_limiter.on_429(retry_after=retry_after, caller="scout") # 最少 30s
continue
try:
_cp.llm_error("scout", _current_model, error_str[:300])
except Exception:
pass
raise # 非 429 或已超過重試次數,直接拋出
result_str = str(result).strip()
# 解析 JSON(處理可能的 markdown 包裝)
json_str = result_str
if "```json" in json_str:
json_str = json_str.split("```json")[1].split("```")[0].strip()
elif "```" in json_str:
parts = json_str.split("```")
if len(parts) >= 3:
json_str = parts[1].strip()
try:
output = json.loads(json_str)
except json.JSONDecodeError:
logger.error("[FAIL] Agent output is not valid JSON: %s", result_str[:200])
raise ValueError(f"Scout Agent output is not valid JSON: {result_str[:200]}")
# ── Harness 保障 0.5:先補齊基礎 Schema,再寫入記憶 ───────────────
from datetime import datetime, timezone
if "scan_id" not in output:
logger.warning("[WARN] Output missing required field: scan_id")
output["scan_id"] = f"scan_{int(time.time())}"
if "timestamp" not in output:
logger.warning("[WARN] Output missing required field: timestamp")
output["timestamp"] = datetime.now(timezone.utc).isoformat()
if "tech_stack" not in output:
logger.warning("[WARN] Output missing required field: tech_stack")
output["tech_stack"] = [
item.strip().lower()
for item in str(tech_stack).split(",")
if item.strip()
]
if "vulnerabilities" not in output:
logger.warning("[WARN] Output missing required field: vulnerabilities")
output["vulnerabilities"] = []
if "summary" not in output:
logger.warning("[WARN] Output missing required field: summary")
output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}
for vuln in output.get("vulnerabilities", []):
severity = str(vuln.get("severity", "MEDIUM")).upper()
if severity == "MODERATE":
severity = "MEDIUM"
elif severity not in {"CRITICAL", "HIGH", "MEDIUM", "LOW"}:
severity = _severity_from_cvss(float(vuln.get("cvss_score") or 0.0))
vuln["severity"] = severity
# ── Harness 保障 1:強制 write_memory ──────────────────────
memory_path = os.path.join(PROJECT_ROOT, "memory", "scout_memory.json")
need_write = False
if not os.path.exists(memory_path):
need_write = True
else:
try:
with open(memory_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if not content or content == "{}":
need_write = True
except (IOError, json.JSONDecodeError):
need_write = True
if need_write:
logger.warning("[WARN] Agent did not call write_memory -- code forcing write (Harness)")
write_result = write_memory.run(agent_name="scout", data=json.dumps(output, ensure_ascii=False))
logger.info("[OK] Forced memory write: %s", write_result)
# ── Harness 保障 2:基礎 Schema 驗證 ──────────────────────
required = ["scan_id", "timestamp", "tech_stack", "vulnerabilities", "summary"]
for field in required:
if field not in output:
logger.warning("[WARN] Output missing required field: %s", field)
if field == "vulnerabilities":
output["vulnerabilities"] = []
elif field == "summary":
output["summary"] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}
# ── Harness 保障 2.5:Cache 注入(Anti-LLM-Omission)──────
# v5.0:改用 OSV Batch 快取資料(更精確,不會混入 1999 年 CVE)
# 當 LLM 輸出 0 vulnerabilities,從 Batch 預熱快取直接注入
if not output.get("vulnerabilities"):
injected = []
# 優先用 OSV Batch 預熱快取(最精確)
if _osv_batch_cache:
for pkg_name, vuln_list in _osv_batch_cache.items():
for v in vuln_list:
cve_id = v.get("cve_id", "")
if not cve_id.startswith(("CVE-", "GHSA-")):
continue
ghsa_sev = _extract_ghsa_severity_from_osv(v)
injected.append({
"cve_id": cve_id,
"package": v.get("package", pkg_name),
"cvss_score": v.get("cvss_score", 0.0),
"severity": v.get("severity", "MEDIUM"),
"description": v.get("description", "")[:300],
"published": v.get("published", ""),
"is_new": True,
"in_cisa_kev": False,
"has_public_exploit": False,
"source": "OSV",
"osv_id": v.get("osv_id", ""),
# GHSA 維度:從 OSV database_specific.severity 解析
"ghsa_severity": ghsa_sev,
})
else:
# Fallback:NVD cache(舊路徑,OSV Batch 失敗時使用)
from tools.nvd_tool import _search_nvd_impl
for item in (tech_stack or "").split(","):
pkg = item.strip().split()[0].lower()
if not pkg:
continue
try:
cached_result = json.loads(_search_nvd_impl(pkg))
for v in cached_result.get("vulnerabilities", []):
cve_id = v.get("cve_id") or v.get("id", "")
if not cve_id.startswith("CVE-"):
continue
injected.append({
"cve_id": cve_id,
"package": v.get("package", pkg),
"cvss_score": v.get("cvss_score", 0.0),
"severity": v.get("severity", "MEDIUM"),
"description": v.get("description", "")[:300],
"published": v.get("published_date", ""),
"is_new": True,
"in_cisa_kev": v.get("in_cisa_kev", False),
"has_public_exploit": v.get("has_public_exploit", False),
})
except Exception as e:
logger.warning("[WARN] NVD cache inject failed for %s: %s", pkg, e)
if injected:
output["vulnerabilities"] = injected
logger.warning(
"[HARNESS 2.5] LLM output 0 CVEs, injected %d CVEs from %s for tech_stack=%s",
len(injected),
"OSV batch cache" if _osv_batch_cache else "NVD cache",
tech_stack[:60]
)
# 重新查 NVD 建立真實 CVE 清單 + CVE→package 對應表
# v5.0:優先用 OSV Batch Cache(已在 Harness 0 預熱),只有 OSV 無資料才查 NVD
# NVD 查到的結果也加年份過濾(< 2005 → 不列入 real_cves)
from tools.nvd_tool import _search_nvd_impl
real_cves = set()
cve_to_package = {} # CVE-XXXX-YYYY → package name
# 收集所有需要查的 package:Agent 輸出的 + tech_stack 裡的
packages_to_check = set()
for vuln in output.get("vulnerabilities", []):
pkg = vuln.get("package", "").lower().strip()
if pkg:
packages_to_check.add(pkg)
# 從 tech_stack 提取(去版本號)
for item in tech_stack.split(","):
pkg_name = item.strip().split()[0].lower()
if pkg_name:
packages_to_check.add(pkg_name)
# 優先從 OSV Batch Cache 建 real_cves(最精確,不含 CVE-1999)
for pkg_name, vuln_list in _osv_batch_cache.items():
for v in vuln_list:
cve_id = v.get("cve_id", "")
if cve_id.startswith(("CVE-", "GHSA-")):
real_cves.add(cve_id)
cve_to_package[cve_id] = pkg_name
# OSV 無資料的套件,嘗試 NVD(但過濾 < 2005 年份)
for pkg in packages_to_check:
if any(cve_to_package.get(c) == pkg for c in real_cves):
continue # OSV 已有該套件資料
try:
import tools.nvd_tool as nvd_mod
original_page_size = nvd_mod.RESULTS_PER_PAGE
nvd_mod.RESULTS_PER_PAGE = 100
try:
nvd_result = json.loads(_search_nvd_impl(pkg))
finally:
nvd_mod.RESULTS_PER_PAGE = original_page_size
for v in nvd_result.get("vulnerabilities", []):
cve_id = v["cve_id"]
# 年份過濾:CVE-1999/2000... 遠古 CVE 不計入 real_cves
try:
cve_year = int(cve_id.split("-")[1])
if cve_year < 2005:
logger.debug("[FILTER] NVD verification skip ancient CVE: %s", cve_id)
continue
except (IndexError, ValueError):
pass
real_cves.add(cve_id)
cve_to_package[cve_id] = pkg
except Exception as e:
logger.warning("[WARN] CVE verification NVD query failed (%s): %s", pkg, e)
if real_cves:
original_count = len(output.get("vulnerabilities", []))
verified_vulns = []
suspect_vulns = [] # 可能是真的但 keywordSearch 沒找到
for vuln in output.get("vulnerabilities", []):
if vuln.get("cve_id") in real_cves:
verified_vulns.append(vuln)
else:
suspect_vulns.append(vuln)
# 對可疑的 CVE 做精確查詢(cveId lookup)
hallucinated = []
if suspect_vulns:
import re
for vuln in suspect_vulns:
cve_id = vuln.get("cve_id", "")
if not re.match(r"^CVE-\d{4}-\d{4,}$", cve_id):
hallucinated.append(cve_id)
continue
try:
resp = requests.get(
"https://services.nvd.nist.gov/rest/json/cves/2.0",
params={"cveId": cve_id},
headers={"apiKey": os.getenv("NVD_API_KEY", "")},
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data.get("totalResults", 0) > 0:
logger.info("[OK] CVE exact verification passed: %s", cve_id)
verified_vulns.append(vuln)
# 補 package:從 description 推斷
if not vuln.get("package"):
desc = data["vulnerabilities"][0]["cve"]["descriptions"][0]["value"].lower()
for pkg in packages_to_check:
if pkg in desc:
vuln["package"] = pkg
cve_to_package[cve_id] = pkg
break
continue
# NVD 明確回應但找不到 → 才算幻覺
hallucinated.append(cve_id)
except Exception:
# NVD API 不可達(timeout/connection)→ 保守保留,不當幻覺處理
logger.warning("[WARN] NVD verify unreachable for %s, keeping conservatively", cve_id)
verified_vulns.append(vuln)
if hallucinated:
logger.warning(
"[ALERT] Detected %d hallucinated CVEs, removed: %s",
len(hallucinated), hallucinated
)
output["vulnerabilities"] = verified_vulns
# 重新計算 summary
output["summary"] = {
"total": len(verified_vulns),
"new_since_last_scan": sum(1 for v in verified_vulns if v.get("is_new")),
"critical": sum(1 for v in verified_vulns if v.get("severity") == "CRITICAL"),
"high": sum(1 for v in verified_vulns if v.get("severity") == "HIGH"),
"medium": sum(1 for v in verified_vulns if v.get("severity") == "MEDIUM"),
"low": sum(1 for v in verified_vulns if v.get("severity") == "LOW"),
}
logger.info(
"[OK] CVE verification result: %d -> %d (removed %d hallucinated)",
original_count, len(verified_vulns), len(hallucinated)
)
else:
logger.info("[OK] All %d CVEs passed verification", original_count)
else:
logger.warning("[WARN] Cannot build real CVE list, skipping verification")
# ── Harness 保障 4:補全 package 欄位 ──────────────────────
# Agent 常忘記加 package,用 Layer 3 建好的 cve_to_package 補
patched_count = 0
for vuln in output.get("vulnerabilities", []):
if not vuln.get("package"):
cve_id = vuln.get("cve_id", "")
if cve_id in cve_to_package:
vuln["package"] = cve_to_package[cve_id]
patched_count += 1
else:
# 最後手段:從 description 猜 package
desc = vuln.get("description", "").lower()
for pkg in packages_to_check:
if pkg in desc:
vuln["package"] = pkg
patched_count += 1
break
else:
vuln["package"] = "unknown"
patched_count += 1
if patched_count:
logger.info("[OK] Patched %d CVE package fields", patched_count)
# 先合併 Intel Fusion 結果,再做歷史記憶校正,避免後補漏洞被誤標成新發現。
output = _merge_intel_fusion_evidence(output, intel_fusion_result)
# ── Harness 保障 5:校正 is_new 標記 ──────────────────────
# Agent 常常不正確比對歷史,程式碼代為校正
try:
mem_data = {}
if os.path.exists(memory_path):
with open(memory_path, "r", encoding="utf-8") as f:
mem_data = json.load(f)
# 從 memory 的所有歷史掃描中提取已知 CVE 集合
historical_cves = set()
# 新版 memory_tool 直接存 scan 結構
if "vulnerabilities" in mem_data:
for v in mem_data.get("vulnerabilities", []):
historical_cves.add(v.get("cve_id", ""))
# 舊版 memory 有 latest/history 結構
elif "latest" in mem_data:
for v in mem_data.get("latest", {}).get("vulnerabilities", []):
historical_cves.add(v.get("cve_id", ""))
output = _reconcile_is_new_flags(output, historical_cves)
corrected = output.get("_is_new_corrected", 0)
if corrected:
logger.info("[OK] Corrected %d CVE is_new flags", corrected)
except Exception as e:
logger.warning("[WARN] is_new correction failed: %s", e)
# ── Harness 保障 3.5:CVE 年份過濾器(最後防線)──────────────
# 不管哪個 Agent 或 NVD 返回了遠古 CVE,在 Scout 輸出前一律攔截
# 根據:CISA KEV 最早 2002 年,現代套件漏洞幾乎都 >= 2005
# 例外:GHSA- 前綴(OSV/GitHub Advisory,不用年份過濾)
CVE_YEAR_MIN = 2005
ancient_removed = []
fresh_vulns = []
for vuln in output.get("vulnerabilities", []):
cve_id = vuln.get("cve_id", "")
if cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
fresh_vulns.append(vuln) # GHSA/非標準 ID 保留
continue
try:
cve_year = int(cve_id.split("-")[1])
if cve_year < CVE_YEAR_MIN:
ancient_removed.append(cve_id)
logger.warning(
"[HARNESS 3.5] Ancient CVE removed (year=%d < %d): %s",
cve_year, CVE_YEAR_MIN, cve_id
)
else:
fresh_vulns.append(vuln)
except (IndexError, ValueError):
fresh_vulns.append(vuln) # 解析失敗保留(保守)
if ancient_removed:
output["vulnerabilities"] = fresh_vulns
output["ancient_cves_removed"] = ancient_removed # 審計用
logger.warning(
"[HARNESS 3.5] Removed %d ancient CVEs (< %d): %s",
len(ancient_removed), CVE_YEAR_MIN, ancient_removed
)
# ────────────────────────────────────────────────────────────
# ── 最終 Summary 校正(確保一致性)──────────────────────────
vulns = output.get("vulnerabilities", [])
output["summary"] = {
"total": len(vulns),
"new_since_last_scan": sum(1 for v in vulns if v.get("is_new")),
"critical": sum(1 for v in vulns if v.get("severity") == "CRITICAL"),
"high": sum(1 for v in vulns if v.get("severity") == "HIGH"),
"medium": sum(1 for v in vulns if v.get("severity") == "MEDIUM"),
"low": sum(1 for v in vulns if v.get("severity") == "LOW"),
}
vuln_count = output["summary"]["total"]
new_count = output["summary"]["new_since_last_scan"]
logger.info(
"[OK] Scout Pipeline complete: %d CVEs, %d new", vuln_count, new_count
)
return output
|