File size: 45,977 Bytes
c8d30bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 | # agents/analyst.py
# 功能:Analyst Agent 定義 — 漏洞連鎖分析師
# Harness 支柱:Constraints(系統憲法 + Skill SOP)+ Observability(verbose=True)
# 擁有者:成員 C(Analyst Agent Pipeline)
#
# 使用方式:
# from agents.analyst import create_analyst_agent, create_analyst_task, run_analyst_pipeline
#
# 架構定位:
# Pipeline 的第二環 — 接收 Scout 的情報清單 → 深度分析 → 輸出風險評估 JSON → Advisor 接收
# Agent = Tool(手)+ Skill(腦)+ Constitution(法)
import json
import os
import re
import time
import logging
from datetime import datetime, timezone
from typing import Any
from crewai import Agent, Task
from config import get_llm, LLM_RPM
from tools.kev_tool import check_cisa_kev
from tools.exploit_tool import search_exploits
from tools.memory_tool import read_memory, write_memory, history_search
# LLM 延遲初始化:在 create_*_agent() 中才呼叫 get_llm()
logger = logging.getLogger("ThreatHunter")
# 專案根目錄(agents/ 的上一層)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# ══════════════════════════════════════════════════════════════
# 第一部份:系統憲法 + Skill SOP 載入
# ══════════════════════════════════════════════════════════════
CONSTITUTION = """
=== ThreatHunter Constitution ===
1. All CVE IDs must come from Tool-returned data. Fabrication is prohibited.
2. You must use the provided Tools for queries. Skip is not allowed.
3. Output must conform to the specified JSON schema.
4. Uncertain reasoning must be tagged with confidence: HIGH / MEDIUM / NEEDS_VERIFICATION.
5. Each judgment must include a reasoning field.
6. Reports use English; technical terms are not translated.
7. Do not call the same Tool twice for the same data.
8. Risk adjustment can only ESCALATE, never DOWNGRADE.
9. Chain analysis must include chain_with, chain_description, and confidence.
""".strip()
SKILL_PATH = os.path.join(PROJECT_ROOT, "skills", "chain_analysis.md")
# v3.7: Path-Aware Skill Map(對應 main.py recorder.stage_enter 使用)
SKILL_MAP: dict[str, str] = {
"pkg": "chain_analysis.md", # Path A: package CVE chain
"code": "code_chain_analysis.md", # Path B-code: source code chain
"injection": "ai_chain_analysis.md", # Path B-inject: AI security chain
"config": "config_chain_analysis.md", # Path C: config chain
}
def _resolve_skill_path(skill_filename: str) -> str:
"""Return the absolute path for the requested Analyst skill file."""
return os.path.join(PROJECT_ROOT, "skills", skill_filename)
def _load_skill(skill_filename: str = "chain_analysis.md") -> str:
"""
載入 Skill SOP 文件內容。
安全閥:
- 檔案不存在 → 使用內嵌的精簡版 Skill(Graceful Degradation)
- 編碼錯誤 → 嘗試 utf-8-sig → 仍失敗 → 內嵌版
"""
skill_path = _resolve_skill_path(skill_filename)
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
if os.path.exists(skill_path):
with open(skill_path, "r", encoding=encoding) as f:
content = f.read().strip()
if content:
logger.info("[OK] Skill loaded: %s (%d chars)", skill_path, len(content))
return content
except (IOError, UnicodeDecodeError):
continue
logger.warning("[WARN] Skill file load failed, using fallback: %s", skill_path)
return _FALLBACK_SKILL
# 內嵌精簡版 Skill(Graceful Degradation — Skill 檔案遺失時的保底)
_FALLBACK_SKILL = """
# Skill: Vulnerability Chain Analysis (Fallback)
## SOP
1. read_memory(agent_name="analyst") — read historical data
2. Parse Scout's JSON: extract tech_stack + vulnerabilities
3. For each CVE with CVSS >= 7.0: call check_cisa_kev
4. For each CVE with in_kev=true OR CVSS >= 9.0: call search_exploits
5. Chain analysis: classify attack types, identify prerequisite→outcome chains
6. Risk scoring: weighted sum (CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5)
7. write_memory(agent_name="analyst", data=report) — save results
8. Output pure JSON (Analyst → Advisor contract)
## Quality Gates
- CVE must come from Scout's intelligence, never fabricate
- Chain analysis must include reasoning and confidence
- Risk can only escalate, never downgrade
- Output must be pure JSON
""".strip()
# ══════════════════════════════════════════════════════════════
# 第二部份:Agent 工廠函式
# ══════════════════════════════════════════════════════════════
def _build_analyst_backstory(input_type: str = "pkg") -> str:
"""建立共用的 Analyst backstory(系統憲法 + Skill SOP)"""
skill_filename = SKILL_MAP.get(input_type, "chain_analysis.md")
skill_content = _load_skill(skill_filename)
return f"""You are a senior vulnerability analyst specializing in attack chain analysis
and exploit intelligence. You are precise, methodical, and never fabricate data.
{CONSTITUTION}
---
## 📋 Analysis Methodology (Skill SOP)
The following is your standard operating procedure for vulnerability chain analysis:
{skill_content}
"""
def create_analyst_agent(
excluded_models: list[str] | None = None,
input_type: str = "pkg",
) -> Agent:
"""
建立 Analyst Agent 實例(完整工具版,供 main.py 使用)。
Args:
excluded_models: 需要跳過的模型名稱列表(429 被限速的模型)
Returns:
CrewAI Agent 實例,可直接用於 Task 和 Crew
"""
backstory = _build_analyst_backstory(input_type=input_type)
analyst = Agent(
role="Vulnerability Chain Analyst",
goal=(
"Receive Scout intelligence, validate KEV and exploit status, "
"analyze vulnerability chain attack paths, and assess risk levels."
),
backstory=backstory,
tools=[check_cisa_kev, search_exploits, read_memory, write_memory, history_search],
llm=get_llm(exclude_models=excluded_models),
verbose=True, # Harness: Observability — 完整 ReAct 推理可見
max_iter=5, # v3.5: Gemini-3-Flash ~4s/call, KEV+Exploit 各查一次就夠
max_rpm=LLM_RPM, # Harness: Graceful Degradation — 免費方案限速
allow_delegation=False, # Analyst 不委派,自己做完
)
logger.info(
"[OK] Analyst Agent created | input_type=%s | tools=%s | max_iter=%s | llm=%s",
input_type,
[t.name for t in analyst.tools],
analyst.max_iter,
analyst.llm.model if hasattr(analyst.llm, 'model') else 'unknown'
)
return analyst
# ── 子 Agent 工廠(工具限縮版,供 run_analyst_pipeline 使用)─────
# 每個子 Agent 有專屬 backstory,只包含其負責的 SOP 步驟,
# 避免弱模型看到完整 8 步 SOP 後混淆自己的職責。
def _create_collector_agent(excluded_models: list[str] | None = None) -> Agent:
"""
建立資料收集子 Agent(只有 read_memory 工具)。
認知負荷最低:讀記憶 + 解析 Scout JSON。
Args:
excluded_models: 需要跳過的模型名稱列表
"""
backstory = f"""You are a data collection specialist. You are precise and methodical.
{CONSTITUTION}
---
## Your Responsibility: Data Collection ONLY
You handle Step 1-2 of the analysis pipeline:
- Step 1: Call `read_memory` tool to retrieve historical analysis data
- Step 2: Parse the Scout Agent's JSON to extract all CVE entries
You do NOT perform KEV validation, exploit search, risk scoring, or write memory.
Those are handled by other agents in the pipeline.
"""
agent = Agent(
role="Vulnerability Analyst - Data Collector",
goal="Read historical memory and parse the Scout intelligence list.",
backstory=backstory,
tools=[read_memory],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=8,
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Collector Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
def _create_verifier_agent(excluded_models: list[str] | None = None) -> Agent:
"""
建立驗證分析子 Agent(只有 check_cisa_kev + search_exploits 工具)。
專注:KEV 驗證 + Exploit 搜尋 + Chain 分析。
Args:
excluded_models: 需要跳過的模型名稱列表
"""
backstory = f"""You are a vulnerability verification specialist. You validate KEV status and search for public exploits.
{CONSTITUTION}
---
## Your Responsibility: Verification & Analysis ONLY
You handle Step 3-5 of the analysis pipeline:
- Step 3: Call `check_cisa_kev` for all CVEs with cvss_score >= 7.0 (comma-separated)
- Step 4: Call `search_exploits` for each CVE where in_kev=true OR cvss_score >= 9.0
- Step 5: Perform chain analysis (classify attack types, identify prerequisite→outcome chains)
You do NOT read memory, write memory, or calculate risk scores.
Those are handled by other agents in the pipeline.
"""
agent = Agent(
role="Vulnerability Analyst - Verification Specialist",
goal="Validate CVE KEV status, search public exploits, and analyze vulnerability chain attack paths.",
backstory=backstory,
tools=[check_cisa_kev, search_exploits],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=5, # KEV+Exploit各一次
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Verifier Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
def _create_scorer_agent(excluded_models: list[str] | None = None) -> Agent:
"""
建立評分輸出子 Agent(只有 write_memory 工具)。
專注:風險計算 + 寫入記憶 + 輸出最終 JSON。
Args:
excluded_models: 需要跳過的模型名稱列表
"""
backstory = f"""You are a risk scoring specialist. You calculate risk scores and produce final JSON reports.
{CONSTITUTION}
---
## Your Responsibility: Risk Scoring & Output ONLY
You handle Step 6-8 of the analysis pipeline:
- Step 6: Calculate risk_score = min(100, sum of cvss_score * weight)
Weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
- Step 7: Call `write_memory` tool to save your report
- Step 8: Output the final JSON report
!! ABSOLUTE PROHIBITIONS:
- You do NOT have `read_memory` tool. Do NOT try to call it.
- You do NOT have `check_cisa_kev` tool. Do NOT try to call it.
- You do NOT have `search_exploits` tool. Do NOT try to call it.
- Steps 1-5 are ALREADY DONE by other agents. Their results are in your task context.
- Your ONLY tool is `write_memory`. Use it to save, then output Final Answer.
"""
agent = Agent(
role="Vulnerability Analyst - Risk Scorer",
goal="Calculate risk scores, write memory, and output the final JSON report.",
backstory=backstory,
tools=[write_memory],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=8,
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Scorer Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
# ══════════════════════════════════════════════════════════════
# 第三部份:Task 工廠函式
# ══════════════════════════════════════════════════════════════
# ── 原始單一 Task(向後相容,供 main.py 使用)──────────────────
def create_analyst_task(agent: Agent, context: list | None = None) -> Task:
"""
建立 Analyst Agent 的 Task(供 main.py 的單一 Crew 使用)。
重構為 CrewAI 標準架構:
- 不再把 scout_output 內容嵌入 description(舊方式)
- 改用 context=[scout_task] 讓 CrewAI 自動將前一個 Task 的輸出
傳遞給本 Task(CrewAI 原生樓制)
Args:
agent: create_analyst_agent() 回傳的 Agent 實例
context: 前一個 Task 的清單(如 [scout_task])
Returns:
CrewAI Task 實例
"""
return Task(
description="""You are the Analyst Agent. The Scout Agent's intelligence report
is available in your context (previous task output).
Execute the following steps in strict order, calling the specified tools:
Step 1: Read historical memory
Action: read_memory
Action Input: analyst
!! CRITICAL MEMORY RULES — DO NOT SKIP !!
Memory contains CVEs from PREVIOUS scans (different packages/code contexts).
STRICT RULES:
a) ONLY use memory to check if a CVE from THIS scan was seen before → mark is_repeated=true
b) NEVER add CVEs from memory to the current analysis if Scout did NOT find them in THIS scan
c) If Scout reports 0 CVEs (empty vulnerabilities[]), analysis[] has 0 CVE entries
d) Old scan data (e.g. Redis, Python2, Django from 6+ months ago) must be IGNORED entirely
e) A CVE is REPEATED only if: Scout found it NOW + it appears in memory history
Step 2: Parse the Scout intelligence from context
Extract all CVE entries from the vulnerabilities array.
Note each CVE's cve_id, cvss_score, severity, package, description, and is_new.
Step 3: KEV validation
Collect all CVE IDs with cvss_score >= 7.0 into a comma-separated string.
Action: check_cisa_kev
Action Input: CVE-XXXX-XXXX,CVE-YYYY-YYYY (all qualifying CVEs in one call)
Record in_kev status for each CVE.
Step 4: Exploit search
For each CVE where in_kev=true OR cvss_score >= 9.0:
Action: search_exploits
Action Input: CVE-XXXX-XXXX (one CVE per call)
Record exploit_available and exploit_count.
Step 5: Chain analysis
Classify each vulnerability's attack type.
Identify prerequisite-outcome chains between vulnerabilities.
Risk adjustment rules:
- in_kev + exploit + chain -> CRITICAL
- in_kev + exploit -> CRITICAL
- chain alone -> at least original severity
Risk can ONLY escalate, never downgrade.
!! CODE-LEVEL PATTERNS (v4.0) !!
If the Scout output contains a `code_patterns` field, you MUST also analyze it:
- Each entry has: finding_id (CODE-NNN), pattern_type, cwe_id, owasp_category,
severity, snippet (up to 200 chars), line_no
- Map each code_pattern to its OWASP attack chain (use the table in your Skill SOP)
- Include them in your analysis[] alongside CVE findings
- Use finding_id starting with CODE- (not CVE-) for these entries
- CRITICAL code patterns (SQL_INJECTION, CMD_INJECTION, EVAL_EXEC, PICKLE_UNSAFE,
PROTOTYPE_POLLUTION): always add to analysis with severity=CRITICAL
- HIGH code patterns (INNERHTML_XSS, SSRF_RISK, HARDCODED_SECRET, PATH_TRAVERSAL,
YAML_UNSAFE): add to analysis with severity=HIGH
Step 6: Risk scoring
risk_score = min(100, sum of (cvss x weight))
Weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
Step 7: Write memory (MANDATORY)
Action: write_memory
Action Input: analyst|{your complete JSON report}
Step 8: Output Final Answer as pure JSON.
Absolute prohibitions:
- Do NOT fabricate CVE IDs.
- Do NOT skip tool calls.
- Do NOT skip write_memory.
- Do NOT downgrade risk.
""",
expected_output=(
"Pure JSON following the Analyst -> Advisor contract: "
"scan_id, risk_score, risk_trend, analysis[] with "
"cve_id, original_cvss, adjusted_risk, in_cisa_kev, "
"exploit_available, chain_risk, reasoning for each CVE."
),
agent=agent,
context=context or [],
)
# ── 拆分版 Task 工廠(3 個子 Task,供 run_analyst_pipeline 使用)─
def _create_collection_task(agent: Agent, scout_output: str) -> Task:
"""
子 Task 1:資料收集。
讀取歷史記憶 + 解析 Scout 的情報清單。
"""
return Task(
description=f"""You are the Analyst Agent performing Step 1 of 3: Data Collection.
Below is the Scout Agent's intelligence report:
{scout_output}
=== YOUR GOAL ===
1. Read your historical memory using the `read_memory` tool.
2. Parse the Scout JSON above to extract:
a) ALL CVEs from the `vulnerabilities` array
b) ALL code-level findings from the `code_patterns` array (if present)
!! CRITICAL MEMORY RULES !!
- Memory is from PREVIOUS scans. Only use it to mark is_repeated=true for CVEs found in THIS scan
- NEVER add CVEs from memory that are NOT in the current vulnerabilities[] list
- If vulnerabilities[] is empty, parsed_cves must be empty too
3. Output your Final Answer in this JSON structure:
{{
"historical_risk_score": <number or null>,
"parsed_cves": [
{{ "cve_id": "...", "package": "...", "cvss_score": 0.0, "severity": "...", "description": "...", "is_new": true }}
],
"code_patterns": [
{{ "finding_id": "CODE-001", "pattern_type": "EVAL_EXEC", "cwe_id": "CWE-94", "owasp_category": "A03:2021-Injection", "severity": "CRITICAL", "snippet": "eval(data)", "line_no": 14, "language": "python" }}
],
"tech_stack": ["..."],
"total_cves": <number>
}}
If no code_patterns exist in Scout output, use an empty array [].
Copy ALL code_patterns entries from Scout output EXACTLY as-is into your output.
=== ⛔ CRITICAL RULE FOR FREE LLMS ⛔ ===
You MUST NOT generate the JSON response right now.
You MUST call the `read_memory` tool FIRST.
If you generate the Final Answer JSON without calling the tool, you will be penalized.
Stop thinking about the Final Answer and output your thought and action to call `read_memory` immediately!
""",
expected_output=(
"JSON with historical_risk_score (number or null), "
"parsed_cves array containing all CVEs from Scout intelligence, "
"AND code_patterns array (empty [] if none)."
),
agent=agent,
)
def _create_analysis_task(agent: Agent) -> Task:
"""
子 Task 2:驗證與分析。
KEV 驗證 + Exploit 搜尋 + Chain 分析。
上一個 Task 的輸出會作為 context 自動傳入。
"""
return Task(
description="""You are the Analyst Agent performing Step 2 of 3: Verification & Analysis.
The previous task gave you parsed CVE data AND code_patterns. Now you must verify and analyze both.
=== PART A: CVE Analysis (if parsed_cves is non-empty) ===
1. Use `check_cisa_kev` tool to check ALL CVE IDs with cvss_score >= 7.0 (comma-separated).
2. Use `search_exploits` tool for each CVE where in_kev=true OR cvss_score >= 9.0.
3. Perform chain analysis (risk can only escalate).
=== PART B: Code Pattern Analysis (if code_patterns is non-empty) ===
For each code_pattern entry in the previous task output:
- Map pattern_type to OWASP attack chain (e.g. EVAL_EXEC → A03:2021-Injection → Arbitrary Code Execution)
- Assign risk using CWE severity:
CRITICAL (cvss_equivalent=9.0): EVAL_EXEC, EVAL_USAGE, SQL_INJECTION, CMD_INJECTION, PICKLE_UNSAFE,
PROTOTYPE_POLLUTION, DESERIALIZE_UNSAFE
HIGH (cvss_equivalent=7.5): INNERHTML_XSS, SSRF_RISK, HARDCODED_SECRET, PATH_TRAVERSAL, YAML_UNSAFE
- Do NOT call check_cisa_kev for CODE- findings (they are code patterns, not CVEs)
- Include CODE- findings in analysis[] with these fields:
finding_id, pattern_type, cwe_id, owasp_category, severity, snippet, line_no,
original_cvss (use cvss_equivalent above), adjusted_risk, in_cisa_kev=false,
exploit_available=false (deterministic scan, no external lookup needed),
chain_risk, reasoning
=== YOUR OUTPUT ===
Once you have ALL tool results (or if no CVEs, directly from code_patterns), output:
{
"analysis": [
{
"cve_id": "CVE-2024-XXXX", <-- for CVE findings
"original_cvss": 9.8,
"adjusted_risk": "CRITICAL",
"in_cisa_kev": true,
"exploit_available": true,
"chain_risk": { "is_chain": true, "chain_with": ["..."], "chain_description": "...", "confidence": "HIGH" },
"reasoning": "..."
},
{
"finding_id": "CODE-001", <-- for code pattern findings
"cve_id": null,
"pattern_type": "EVAL_EXEC",
"cwe_id": "CWE-94",
"owasp_category": "A03:2021-Injection",
"severity": "CRITICAL",
"snippet": "eval(data)",
"line_no": 14,
"original_cvss": 9.0,
"adjusted_risk": "CRITICAL",
"in_cisa_kev": false,
"exploit_available": false,
"chain_risk": { "is_chain": true, "chain_with": [], "chain_description": "eval() with user-controlled input enables remote code execution", "confidence": "HIGH" },
"reasoning": "eval(data) executes arbitrary Python code. If data comes from user input (network, file, env), this is a direct RCE vector. CWE-94: Improper Control of Generation of Code."
}
]
}
=== ⛔ CRITICAL RULES ⛔ ===
- If parsed_cves is empty but code_patterns is non-empty: ONLY analyze code_patterns, no CVE tool calls
- If both are present: analyze both
- DO NOT fabricate CVE IDs
- DO NOT call check_cisa_kev for CODE- findings
- Stop thinking and call tools immediately!
""",
expected_output=(
"JSON with analysis array containing BOTH CVE findings (with KEV/exploit data) "
"AND code pattern findings (finding_id starting CODE-, with chain_risk and reasoning)."
),
agent=agent,
)
def _create_scoring_task(agent: Agent) -> Task:
"""
子 Task 3:評分與輸出。
計算風險分數 + 寫入記憶 + 輸出最終 JSON。
上一個 Task 的輸出會作為 context 自動傳入。
"""
now = datetime.now(timezone.utc)
scan_id = f"scan_{now.strftime('%Y%m%d')}_001"
return Task(
description=f"""You are performing the FINAL step: Scoring & Output.
⚠️ IMPORTANT CONTEXT:
- Step 1 (data collection) and Step 2 (KEV/exploit verification) are ALREADY COMPLETED by other agents.
- Their results are provided to you in the task context above.
- You do NOT need to call read_memory, check_cisa_kev, or search_exploits.
- You do NOT have those tools. Your ONLY tool is `write_memory`.
=== YOUR GOAL ===
1. Look at the analysis results from the previous task context.
2. Calculate risk_score: min(100, sum of (each finding's cvss_equivalent x weight))
Weight by adjusted_risk: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
- For CODE- findings: CRITICAL code pattern = cvss_equivalent 9.0, HIGH = 7.5
- For CVE findings: use original_cvss
3. Calculate risk_trend: compare with historical_risk_score from task 1 context.
If no history, use "+0". Format: "+7" or "-3" or "+0".
4. Call `write_memory` tool with these EXACT arguments:
- agent_name: analyst
- data: your complete JSON report as a string
5. After write_memory confirms success, output your Final Answer.
=== FINAL ANSWER FORMAT (pure JSON, no other text) ===
{{
"scan_id": "{scan_id}",
"risk_score": <calculated number 0-100>,
"risk_trend": "<+N or -N or +0>",
"analysis": <copy the COMPLETE analysis array from previous task context, including both CVE and CODE- entries>
}}
=== ⛔ RULES ⛔ ===
- Do NOT call read_memory (you don't have it).
- Do NOT call check_cisa_kev (you don't have it).
- Do NOT call search_exploits (you don't have it).
- DO call write_memory FIRST, then output Final Answer.
- INCLUDE all CODE- findings from analysis[] in your final output.
- Final Answer must be pure JSON only. No markdown, no explanation.
""",
expected_output=(
"Pure JSON: scan_id, risk_score (0-100), risk_trend, "
"and complete analysis array from previous task (including CODE- findings)."
),
agent=agent,
)
# ══════════════════════════════════════════════════════════════
# 第四部份:Harness 保障層(3 層)
# ══════════════════════════════════════════════════════════════
def _strip_react_residue(parsed: dict[str, Any]) -> dict[str, Any]:
"""
偵測並剝離 ReAct tool-call 殘留欄位。
弱模型常把 thought/action/action_input 混入最終 JSON,
這些不屬於 Analyst → Advisor 契約。
"""
react_keys = {"thought", "action", "action_input",
"Thought", "Action", "Action Input"}
found_react = react_keys & set(parsed.keys())
if not found_react:
return parsed # 沒有 ReAct 殘留,原樣返回
logger.warning("[WARN] Detected ReAct residual fields %s, stripped", found_react)
cleaned = {k: v for k, v in parsed.items() if k not in react_keys}
# 如果剝離後仍有 schema 必要欄位,則視為有效
schema_keys = {"scan_id", "risk_score", "risk_trend", "analysis"}
if schema_keys & set(cleaned.keys()):
return cleaned
# 剝離後空空如也 → 純 ReAct 格式,視為無效輸出
logger.warning("[WARN] After stripping ReAct, no valid schema fields remain")
return {}
def _extract_json_from_output(raw: str) -> dict[str, Any]:
"""從 LLM 輸出中提取 JSON(容忍 Markdown 包裝 + 剝離 ReAct 殘留)"""
parsed = None
# 嘗試 1:直接解析
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
pass
# 嘗試 2:提取 ```json ... ``` 區塊
if parsed is None:
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw)
if match:
try:
parsed = json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# 嘗試 3:提取 { ... } 區塊
if parsed is None:
match = re.search(r"\{[\s\S]+\}", raw)
if match:
try:
parsed = json.loads(match.group(0))
except json.JSONDecodeError:
pass
if parsed is None:
return {}
# 剝離 ReAct 殘留(防止 thought/action 被當成有效輸出)
return _strip_react_residue(parsed)
def _harness_validate_schema(output: dict[str, Any]) -> list[str]:
"""
Harness Layer 2:JSON Schema 驗證。
驗證 Analyst → Advisor 契約的必要欄位。
回傳錯誤清單,空清單表示通過。
"""
errors = []
required_keys = ["scan_id", "risk_score", "risk_trend", "analysis"]
for k in required_keys:
if k not in output:
errors.append(f"缺少必要欄位:{k}")
# 驗證 analysis 陣列中的每個項目
for i, item in enumerate(output.get("analysis", [])):
item_required = ["cve_id", "original_cvss", "adjusted_risk", "reasoning"]
for k in item_required:
if k not in item:
errors.append(f"analysis[{i}] 缺少欄位:{k}")
return errors
def _harness_validate_chain_risk(output: dict[str, Any]) -> None:
"""
Harness Layer 3:chain_risk 邏輯驗證。
is_chain=true 必須有 chain_with + chain_description。
"""
for i, item in enumerate(output.get("analysis", [])):
chain_risk = item.get("chain_risk", {})
if chain_risk.get("is_chain", False):
if not chain_risk.get("chain_with"):
logger.warning(
"[WARN] Harness Layer 3: analysis[%d] is_chain=true but missing chain_with, "
"auto-set to empty array", i
)
chain_risk["chain_with"] = []
if not chain_risk.get("chain_description"):
logger.warning(
"[WARN] Harness Layer 3: analysis[%d] is_chain=true but missing chain_description, "
"auto-patched", i
)
chain_risk["chain_description"] = "Chain detected but description not provided by Agent"
if not chain_risk.get("confidence"):
chain_risk["confidence"] = "NEEDS_VERIFICATION"
# CVE 年份切割點:2005 年前的漏洞目標軟體基本已退場
# 佐證:EPSS 研究(Jacobs et al. 2023)顯示 pre-2005 CVE 的 EPSS < 0.01
# 開源參考:Trivy --ignore-unfixed、Grype suppression 機制
_CVE_YEAR_CUTOFF = 2005
def _harness_filter_ancient_cves(output: dict[str, Any]) -> None:
"""
Harness Layer 3.5:CVE 年份過濾。
對 year < _CVE_YEAR_CUTOFF 的 CVE 標記 NEEDS_VERIFICATION,
不刪除(保留審計軌跡),但讓 Advisor 知道這些 CVE 可疑。
設計依據:
1. EPSS (Jacobs et al. 2023):pre-2005 CVE 的 EPSS 平均 < 0.01
2. NIST CVSS v3.1 User Guide §7.3:Temporal Metrics 應納入評估
3. Trivy/Grype 都有類似的年份過濾/suppress 機制
"""
for item in output.get("analysis", []):
cve_id = item.get("cve_id") or ""
year_m = re.match(r"CVE-(\d{4})-", cve_id)
if not year_m:
continue
year = int(year_m.group(1))
if year < _CVE_YEAR_CUTOFF:
# 設為 NEEDS_VERIFICATION,不強制刪除
if item.get("chain_risk", {}).get("confidence") not in ("NEEDS_VERIFICATION",):
item.setdefault("chain_risk", {})["confidence"] = "NEEDS_VERIFICATION"
item["_ancient_cve_warning"] = (
f"CVE year {year} < {_CVE_YEAR_CUTOFF}: "
f"target software likely retired. "
f"Verify relevance to current tech stack before acting."
)
logger.warning(
"[ANALYST] Ancient CVE flagged: %s (year=%d) → confidence=NEEDS_VERIFICATION",
cve_id, year,
)
def _build_fallback_output(scout_data: dict[str, Any]) -> dict[str, Any]:
"""
Harness 保障:當 LLM 輸出無法解析時,
根據 Scout 輸出建立最小可行的 Analyst 報告。
"""
vulns = scout_data.get("vulnerabilities", [])
code_patterns = scout_data.get("code_patterns", [])
analysis = []
for v in vulns:
cve_id = v.get("cve_id", "UNKNOWN")
cvss = float(v.get("cvss_score", 0))
severity = v.get("severity", "LOW")
analysis.append({
"cve_id": cve_id,
"package": v.get("package", "unknown"),
"severity": severity,
"original_cvss": cvss,
"adjusted_risk": severity,
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "",
"confidence": "NEEDS_VERIFICATION",
},
"reasoning": f"Fallback analysis: CVSS {cvss:.1f} ({severity}), KEV/Exploit status unknown (Harness fallback)",
})
for pattern in code_patterns:
severity = pattern.get("severity", "MEDIUM")
if severity == "CRITICAL":
cvss_equivalent = 9.0
elif severity == "HIGH":
cvss_equivalent = 7.5
elif severity == "MEDIUM":
cvss_equivalent = 5.0
else:
cvss_equivalent = 2.5
analysis.append({
"finding_id": pattern.get("finding_id", "CODE-000"),
"cve_id": None,
"pattern_type": pattern.get("pattern_type", "UNKNOWN"),
"cwe_id": pattern.get("cwe_id", "CWE-unknown"),
"owasp_category": pattern.get("owasp_category", ""),
"severity": severity,
"snippet": pattern.get("snippet", ""),
"line_no": pattern.get("line_no", 0),
"original_cvss": cvss_equivalent,
"adjusted_risk": severity,
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "Deterministic code pattern preserved in Analyst fallback.",
"confidence": "HIGH",
},
"reasoning": (
f"Fallback analysis: deterministic {pattern.get('pattern_type', 'UNKNOWN')} "
f"pattern confirmed by Security Guard ({pattern.get('cwe_id', 'CWE-unknown')})."
),
})
# 計算風險分數
weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1, "LOW": 0.5}
risk_score = min(100, int(sum(
float(item.get("original_cvss", 0))
* weight_map.get(item.get("adjusted_risk", item.get("severity", "LOW")), 1)
for item in analysis
)))
now = datetime.now(timezone.utc)
scan_id = f"scan_{now.strftime('%Y%m%d')}_001"
return {
"scan_id": scan_id,
"risk_score": risk_score,
"risk_trend": "+0",
"analysis": analysis,
"_harness_fallback": True,
}
# ══════════════════════════════════════════════════════════════
# 第五部份:Pipeline 執行函式(含 Harness 保障層)
# ══════════════════════════════════════════════════════════════
def run_analyst_pipeline(scout_output: str | dict, input_type: str = "pkg") -> dict:
"""
執行完整的 Analyst Pipeline,包含 Agent 執行 + 程式碼層保障。
Harness Engineering 核心理念:
不要 100% 依賴 LLM 遵守指令。
Agent 負責「盡力做」,程式碼負責「確保做到」。
架構:3-Task Sequential Pipeline(降低弱模型認知負荷)
Task 1: 資料收集(Collector)— read_memory + 解析 Scout JSON
Task 2: 驗證分析(Verifier)— KEV + Exploit + Chain 分析
Task 3: 評分輸出(Scorer)— 風險計算 + write_memory + 輸出 JSON
程式碼層保障:
Layer 1:強制 write_memory(Agent 若未呼叫,程式碼代為執行)
Layer 2:JSON Schema 驗證(必要欄位檢查)
Layer 3:chain_risk 邏輯驗證(is_chain=true 必須有 chain_with + chain_description)
Args:
scout_output: Scout Agent 的 JSON 輸出(字串或 dict)
input_type: Path-Aware Skill 路由(pkg/code/injection/config)
Returns:
dict: 解析後的 Analyst 報告 JSON(符合 Analyst → Advisor 契約)
"""
from crewai import Crew, Process
# 統一轉成 dict 和 str 兩種形式
if isinstance(scout_output, dict):
scout_dict = scout_output
scout_str = json.dumps(scout_output, ensure_ascii=False, indent=2)
else:
scout_str = scout_output
try:
scout_dict = json.loads(scout_output)
except json.JSONDecodeError:
scout_dict = {}
logger.info("[START] Analyst Pipeline (3-Task split architecture)")
# 記錄 pipeline 啟動前的記憶檔 mtime(用於判斷 Agent 是否呼叫了 write_memory)
memory_path_check = os.path.join(PROJECT_ROOT, "memory", "analyst_memory.json")
pre_mtime = os.path.getmtime(memory_path_check) if os.path.exists(memory_path_check) else 0
# 429 自動輪替:最多重試 MAX_LLM_RETRIES 次(每次切換模型)
from config import mark_model_failed, get_current_model_name
MAX_LLM_RETRIES = 2
excluded_models: list[str] = []
raw_output = ""
output: dict[str, Any] = {}
crew_success = False
for attempt in range(MAX_LLM_RETRIES + 1):
# ── 建立 3 個專責子 Agent(每次重試都用新模型)───────
collector = _create_collector_agent(excluded_models)
verifier = _create_verifier_agent(excluded_models)
scorer = _create_scorer_agent(excluded_models)
# ── 建立 3 個子 Task ────────────────────────────────────────
task_1 = _create_collection_task(collector, scout_str)
task_2 = _create_analysis_task(verifier)
task_3 = _create_scoring_task(scorer)
# ── 執行 CrewAI Sequential Pipeline ─────────────────────────
try:
crew = Crew(
agents=[collector, verifier, scorer],
tasks=[task_1, task_2, task_3],
process=Process.sequential,
verbose=True,
)
logger.info("[START] Analyst Crew kickoff (attempt %d/%d)", attempt + 1, MAX_LLM_RETRIES + 1)
try:
from checkpoint import recorder as _cp
_a_model = get_current_model_name(collector.llm)
_cp.llm_call("analyst", _a_model, "openrouter", f"3-task-split attempt={attempt+1}")
except Exception:
_a_model = "unknown"
_t_a = time.time()
result = crew.kickoff()
raw_output = str(result.raw) if hasattr(result, "raw") else str(result)
try:
_cp.llm_result("analyst", _a_model, "SUCCESS",
len(raw_output), int((time.time() - _t_a) * 1000),
thinking=raw_output[:1000])
except Exception:
pass
output = _extract_json_from_output(raw_output)
crew_success = bool(output)
logger.info("[OK] CrewAI 3-Task Pipeline done | crew_success=%s", crew_success)
break # 成功則跳出重試迴圈
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_LLM_RETRIES:
# 標記當前模型為冷卻中,從任一 sub-agent 取得模型名
current_model = get_current_model_name(collector.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[RETRY] Analyst 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_LLM_RETRIES, retry_after)
try:
_cp.llm_retry("analyst", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
from config import rate_limiter as _rl
_rl.on_429(retry_after=retry_after, caller="analyst") # 最少 30s
continue
logger.error("[FAIL] CrewAI execution failed: %s", e)
try:
_cp.llm_error("analyst", _a_model, error_str[:300])
except Exception:
pass
# ── Harness Layer 1:強制建立輸出 + 強制 write_memory ──────
need_fallback = not output or not crew_success
if need_fallback:
logger.warning("[WARN] Harness Layer 1: LLM output unparseable, using fallback")
output = _build_fallback_output(scout_dict)
# 強制 write_memory(使用 mtime 比較判斷 Agent 是否已寫入)
memory_path = os.path.join(PROJECT_ROOT, "memory", "analyst_memory.json")
post_mtime = os.path.getmtime(memory_path) if os.path.exists(memory_path) else 0
need_write = (post_mtime <= pre_mtime)
if need_write:
logger.warning("[WARN] Agent did not call write_memory -- code forcing write (Harness)")
try:
write_result = write_memory.run(
agent_name="analyst",
data=json.dumps(output, ensure_ascii=False),
)
logger.info("[OK] Forced memory write: %s", write_result)
except Exception as e:
logger.error("[FAIL] Forced write_memory failed: %s", e)
else:
logger.info("[OK] Agent already called write_memory (mtime updated)")
# ── Harness Layer 2:JSON Schema 驗證 ──────────────────────
schema_errors = _harness_validate_schema(output)
if schema_errors:
logger.warning("[WARN] Harness Layer 2: Schema errors %s, merging fallback", schema_errors)
fallback = _build_fallback_output(scout_dict)
for k, v in fallback.items():
if k not in output:
output[k] = v
# ── Harness Layer 3:chain_risk 邏輯驗證 ───────────────────
_harness_validate_chain_risk(output)
# ── Harness Layer 3.5:CVE 年份過濾 ─────────────────────────
# 佐證:EPSS (Jacobs et al. 2023)、NIST CVSS §7.3、Trivy/Grype suppress
_harness_filter_ancient_cves(output)
# ── 確保 risk_score 在合理範圍 ─────────────────────────────
risk_score = output.get("risk_score", 0)
if not (0 <= risk_score <= 100):
logger.warning("[WARN] risk_score=%s out of range, forcing correction", risk_score)
output["risk_score"] = max(0, min(100, risk_score))
# ── Harness Layer 4:Risk Escalation Rule (風險禁止降級) ────
scout_vulns = {v.get("cve_id"): v.get("severity", "LOW") for v in scout_dict.get("vulnerabilities", [])}
severity_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
# ── 確保 analysis 中每個項目都有 chain_risk ────────────────
for item in output.get("analysis", []):
cve_id = item.get("cve_id") or ""
orig_severity = scout_vulns.get(cve_id, "LOW")
adj_risk = item.get("adjusted_risk", orig_severity)
# 檢查是否降級
if severity_rank.get(adj_risk, 0) < severity_rank.get(orig_severity, 0):
logger.warning(
"[WARN] Harness Layer 4: %s tried to downgrade from %s to %s, "
"violates SOP, forcing back to %s",
cve_id, orig_severity, adj_risk, orig_severity
)
item["adjusted_risk"] = orig_severity
if "chain_risk" not in item:
item["chain_risk"] = {
"is_chain": False,
"chain_with": [],
"chain_description": "",
"confidence": "NEEDS_VERIFICATION",
}
if "in_cisa_kev" not in item:
item["in_cisa_kev"] = False
if "exploit_available" not in item:
item["exploit_available"] = False
analysis_count = len(output.get("analysis", []))
logger.info(
"[OK] Analyst Pipeline complete | risk_score=%s | risk_trend=%s | analysis_count=%d",
output.get('risk_score', 0),
output.get('risk_trend', '+0'),
analysis_count
)
return output
# ══════════════════════════════════════════════════════════════
# 第六部份:本地測試入口(直接執行此檔案時)
# ══════════════════════════════════════════════════════════════
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
)
# 使用 Scout 記憶作為測試輸入
_scout_output_path = os.path.join(PROJECT_ROOT, "memory", "scout_memory.json")
if os.path.exists(_scout_output_path):
with open(_scout_output_path, encoding="utf-8") as _f:
_test_input = _f.read()
print(f"[TEST] 使用 Scout 記憶作為輸入:{_scout_output_path}")
else:
_test_input = json.dumps({
"scan_id": "scan_test_001",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tech_stack": ["Django 4.2", "Redis 7.0"],
"vulnerabilities": [
{
"cve_id": "CVE-2024-42005",
"package": "django",
"cvss_score": 9.8,
"severity": "CRITICAL",
"description": "Django SQL injection vulnerability in QuerySet.values() and values_list()",
"is_new": True,
},
{
"cve_id": "CVE-2015-4335",
"package": "redis",
"cvss_score": 10.0,
"severity": "CRITICAL",
"description": "Redis Lua Sandbox Escape and Remote Code Execution",
"is_new": True,
},
],
"summary": {"total": 2, "critical": 2, "high": 0, "medium": 0, "low": 0},
})
print("[TEST] 使用預設測試輸入")
result = run_analyst_pipeline(_test_input)
print("\n=== Analyst 輸出 ===")
print(json.dumps(result, ensure_ascii=False, indent=2))
print(f"\nrisk_score: {result.get('risk_score', 0)}")
print(f"risk_trend: {result.get('risk_trend', '+0')}")
print(f"analysis count: {len(result.get('analysis', []))}")
|