Threat_Hunter / agents /analyst.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# agents/analyst.py
# ๅŠŸ่ƒฝ๏ผšAnalyst Agent ๅฎš็พฉ โ€” ๆผๆดž้€ฃ้Ž–ๅˆ†ๆžๅธซ
# Harness ๆ”ฏๆŸฑ๏ผšConstraints๏ผˆ็ณป็ตฑๆ†ฒๆณ• + Skill SOP๏ผ‰+ Observability๏ผˆverbose=True๏ผ‰
# ๆ“ๆœ‰่€…๏ผšๆˆๅ“ก C๏ผˆAnalyst Agent Pipeline๏ผ‰
#
# ไฝฟ็”จๆ–นๅผ๏ผš
# from agents.analyst import create_analyst_agent, create_analyst_task, run_analyst_pipeline
#
# ๆžถๆง‹ๅฎšไฝ๏ผš
# Pipeline ็š„็ฌฌไบŒ็’ฐ โ€” ๆŽฅๆ”ถ Scout ็š„ๆƒ…ๅ ฑๆธ…ๅ–ฎ โ†’ ๆทฑๅบฆๅˆ†ๆž โ†’ ่ผธๅ‡บ้ขจ้šช่ฉ•ไผฐ JSON โ†’ Advisor ๆŽฅๆ”ถ
# Agent = Tool๏ผˆๆ‰‹๏ผ‰+ Skill๏ผˆ่…ฆ๏ผ‰+ Constitution๏ผˆๆณ•๏ผ‰
import json
import os
import re
import time
import logging
from datetime import datetime, timezone
from typing import Any
from crewai import Agent, Task
from config import get_llm, LLM_RPM
from tools.kev_tool import check_cisa_kev
from tools.exploit_tool import search_exploits
from tools.memory_tool import read_memory, write_memory, history_search
# LLM ๅปถ้ฒๅˆๅง‹ๅŒ–๏ผšๅœจ create_*_agent() ไธญๆ‰ๅ‘ผๅซ get_llm()
logger = logging.getLogger("ThreatHunter")
# ๅฐˆๆกˆๆ น็›ฎ้Œ„๏ผˆagents/ ็š„ไธŠไธ€ๅฑค๏ผ‰
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไธ€้ƒจไปฝ๏ผš็ณป็ตฑๆ†ฒๆณ• + Skill SOP ่ผ‰ๅ…ฅ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
CONSTITUTION = """
=== ThreatHunter Constitution ===
1. All CVE IDs must come from Tool-returned data. Fabrication is prohibited.
2. You must use the provided Tools for queries. Skip is not allowed.
3. Output must conform to the specified JSON schema.
4. Uncertain reasoning must be tagged with confidence: HIGH / MEDIUM / NEEDS_VERIFICATION.
5. Each judgment must include a reasoning field.
6. Reports use English; technical terms are not translated.
7. Do not call the same Tool twice for the same data.
8. Risk adjustment can only ESCALATE, never DOWNGRADE.
9. Chain analysis must include chain_with, chain_description, and confidence.
""".strip()
SKILL_PATH = os.path.join(PROJECT_ROOT, "skills", "chain_analysis.md")
# v3.7: Path-Aware Skill Map๏ผˆๅฐๆ‡‰ main.py recorder.stage_enter ไฝฟ็”จ๏ผ‰
SKILL_MAP: dict[str, str] = {
"pkg": "chain_analysis.md", # Path A: package CVE chain
"code": "code_chain_analysis.md", # Path B-code: source code chain
"injection": "ai_chain_analysis.md", # Path B-inject: AI security chain
"config": "config_chain_analysis.md", # Path C: config chain
}
def _resolve_skill_path(skill_filename: str) -> str:
"""Return the absolute path for the requested Analyst skill file."""
return os.path.join(PROJECT_ROOT, "skills", skill_filename)
def _load_skill(skill_filename: str = "chain_analysis.md") -> str:
"""
่ผ‰ๅ…ฅ Skill SOP ๆ–‡ไปถๅ…งๅฎนใ€‚
ๅฎ‰ๅ…จ้–ฅ๏ผš
- ๆช”ๆกˆไธๅญ˜ๅœจ โ†’ ไฝฟ็”จๅ…งๅตŒ็š„็ฒพ็ฐก็‰ˆ Skill๏ผˆGraceful Degradation๏ผ‰
- ็ทจ็ขผ้Œฏ่ชค โ†’ ๅ˜—่ฉฆ utf-8-sig โ†’ ไปๅคฑๆ•— โ†’ ๅ…งๅตŒ็‰ˆ
"""
skill_path = _resolve_skill_path(skill_filename)
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
if os.path.exists(skill_path):
with open(skill_path, "r", encoding=encoding) as f:
content = f.read().strip()
if content:
logger.info("[OK] Skill loaded: %s (%d chars)", skill_path, len(content))
return content
except (IOError, UnicodeDecodeError):
continue
logger.warning("[WARN] Skill file load failed, using fallback: %s", skill_path)
return _FALLBACK_SKILL
# ๅ…งๅตŒ็ฒพ็ฐก็‰ˆ Skill๏ผˆGraceful Degradation โ€” Skill ๆช”ๆกˆ้บๅคฑๆ™‚็š„ไฟๅบ•๏ผ‰
_FALLBACK_SKILL = """
# Skill: Vulnerability Chain Analysis (Fallback)
## SOP
1. read_memory(agent_name="analyst") โ€” read historical data
2. Parse Scout's JSON: extract tech_stack + vulnerabilities
3. For each CVE with CVSS >= 7.0: call check_cisa_kev
4. For each CVE with in_kev=true OR CVSS >= 9.0: call search_exploits
5. Chain analysis: classify attack types, identify prerequisiteโ†’outcome chains
6. Risk scoring: weighted sum (CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5)
7. write_memory(agent_name="analyst", data=report) โ€” save results
8. Output pure JSON (Analyst โ†’ Advisor contract)
## Quality Gates
- CVE must come from Scout's intelligence, never fabricate
- Chain analysis must include reasoning and confidence
- Risk can only escalate, never downgrade
- Output must be pure JSON
""".strip()
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไบŒ้ƒจไปฝ๏ผšAgent ๅทฅๅป ๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _build_analyst_backstory(input_type: str = "pkg") -> str:
"""ๅปบ็ซ‹ๅ…ฑ็”จ็š„ Analyst backstory๏ผˆ็ณป็ตฑๆ†ฒๆณ• + Skill SOP๏ผ‰"""
skill_filename = SKILL_MAP.get(input_type, "chain_analysis.md")
skill_content = _load_skill(skill_filename)
return f"""You are a senior vulnerability analyst specializing in attack chain analysis
and exploit intelligence. You are precise, methodical, and never fabricate data.
{CONSTITUTION}
---
## ๐Ÿ“‹ Analysis Methodology (Skill SOP)
The following is your standard operating procedure for vulnerability chain analysis:
{skill_content}
"""
def create_analyst_agent(
excluded_models: list[str] | None = None,
input_type: str = "pkg",
) -> Agent:
"""
ๅปบ็ซ‹ Analyst Agent ๅฏฆไพ‹๏ผˆๅฎŒๆ•ดๅทฅๅ…ท็‰ˆ๏ผŒไพ› main.py ไฝฟ็”จ๏ผ‰ใ€‚
Args:
excluded_models: ้œ€่ฆ่ทณ้Ž็š„ๆจกๅž‹ๅ็จฑๅˆ—่กจ๏ผˆ429 ่ขซ้™้€Ÿ็š„ๆจกๅž‹๏ผ‰
Returns:
CrewAI Agent ๅฏฆไพ‹๏ผŒๅฏ็›ดๆŽฅ็”จๆ–ผ Task ๅ’Œ Crew
"""
backstory = _build_analyst_backstory(input_type=input_type)
analyst = Agent(
role="Vulnerability Chain Analyst",
goal=(
"Receive Scout intelligence, validate KEV and exploit status, "
"analyze vulnerability chain attack paths, and assess risk levels."
),
backstory=backstory,
tools=[check_cisa_kev, search_exploits, read_memory, write_memory, history_search],
llm=get_llm(exclude_models=excluded_models),
verbose=True, # Harness: Observability โ€” ๅฎŒๆ•ด ReAct ๆŽจ็†ๅฏ่ฆ‹
max_iter=5, # v3.5: Gemini-3-Flash ~4s/call, KEV+Exploit ๅ„ๆŸฅไธ€ๆฌกๅฐฑๅค 
max_rpm=LLM_RPM, # Harness: Graceful Degradation โ€” ๅ…่ฒปๆ–นๆกˆ้™้€Ÿ
allow_delegation=False, # Analyst ไธๅง”ๆดพ๏ผŒ่‡ชๅทฑๅšๅฎŒ
)
logger.info(
"[OK] Analyst Agent created | input_type=%s | tools=%s | max_iter=%s | llm=%s",
input_type,
[t.name for t in analyst.tools],
analyst.max_iter,
analyst.llm.model if hasattr(analyst.llm, 'model') else 'unknown'
)
return analyst
# โ”€โ”€ ๅญ Agent ๅทฅๅป ๏ผˆๅทฅๅ…ท้™็ธฎ็‰ˆ๏ผŒไพ› run_analyst_pipeline ไฝฟ็”จ๏ผ‰โ”€โ”€โ”€โ”€โ”€
# ๆฏๅ€‹ๅญ Agent ๆœ‰ๅฐˆๅฑฌ backstory๏ผŒๅชๅŒ…ๅซๅ…ถ่ฒ ่ฒฌ็š„ SOP ๆญฅ้ฉŸ๏ผŒ
# ้ฟๅ…ๅผฑๆจกๅž‹็œ‹ๅˆฐๅฎŒๆ•ด 8 ๆญฅ SOP ๅพŒๆททๆท†่‡ชๅทฑ็š„่ท่ฒฌใ€‚
def _create_collector_agent(excluded_models: list[str] | None = None) -> Agent:
"""
ๅปบ็ซ‹่ณ‡ๆ–™ๆ”ถ้›†ๅญ Agent๏ผˆๅชๆœ‰ read_memory ๅทฅๅ…ท๏ผ‰ใ€‚
่ช็Ÿฅ่ฒ ่ทๆœ€ไฝŽ๏ผš่ฎ€่จ˜ๆ†ถ + ่งฃๆž Scout JSONใ€‚
Args:
excluded_models: ้œ€่ฆ่ทณ้Ž็š„ๆจกๅž‹ๅ็จฑๅˆ—่กจ
"""
backstory = f"""You are a data collection specialist. You are precise and methodical.
{CONSTITUTION}
---
## Your Responsibility: Data Collection ONLY
You handle Step 1-2 of the analysis pipeline:
- Step 1: Call `read_memory` tool to retrieve historical analysis data
- Step 2: Parse the Scout Agent's JSON to extract all CVE entries
You do NOT perform KEV validation, exploit search, risk scoring, or write memory.
Those are handled by other agents in the pipeline.
"""
agent = Agent(
role="Vulnerability Analyst - Data Collector",
goal="Read historical memory and parse the Scout intelligence list.",
backstory=backstory,
tools=[read_memory],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=8,
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Collector Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
def _create_verifier_agent(excluded_models: list[str] | None = None) -> Agent:
"""
ๅปบ็ซ‹้ฉ—่ญ‰ๅˆ†ๆžๅญ Agent๏ผˆๅชๆœ‰ check_cisa_kev + search_exploits ๅทฅๅ…ท๏ผ‰ใ€‚
ๅฐˆๆณจ๏ผšKEV ้ฉ—่ญ‰ + Exploit ๆœๅฐ‹ + Chain ๅˆ†ๆžใ€‚
Args:
excluded_models: ้œ€่ฆ่ทณ้Ž็š„ๆจกๅž‹ๅ็จฑๅˆ—่กจ
"""
backstory = f"""You are a vulnerability verification specialist. You validate KEV status and search for public exploits.
{CONSTITUTION}
---
## Your Responsibility: Verification & Analysis ONLY
You handle Step 3-5 of the analysis pipeline:
- Step 3: Call `check_cisa_kev` for all CVEs with cvss_score >= 7.0 (comma-separated)
- Step 4: Call `search_exploits` for each CVE where in_kev=true OR cvss_score >= 9.0
- Step 5: Perform chain analysis (classify attack types, identify prerequisiteโ†’outcome chains)
You do NOT read memory, write memory, or calculate risk scores.
Those are handled by other agents in the pipeline.
"""
agent = Agent(
role="Vulnerability Analyst - Verification Specialist",
goal="Validate CVE KEV status, search public exploits, and analyze vulnerability chain attack paths.",
backstory=backstory,
tools=[check_cisa_kev, search_exploits],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=5, # KEV+Exploitๅ„ไธ€ๆฌก
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Verifier Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
def _create_scorer_agent(excluded_models: list[str] | None = None) -> Agent:
"""
ๅปบ็ซ‹่ฉ•ๅˆ†่ผธๅ‡บๅญ Agent๏ผˆๅชๆœ‰ write_memory ๅทฅๅ…ท๏ผ‰ใ€‚
ๅฐˆๆณจ๏ผš้ขจ้šช่จˆ็ฎ— + ๅฏซๅ…ฅ่จ˜ๆ†ถ + ่ผธๅ‡บๆœ€็ต‚ JSONใ€‚
Args:
excluded_models: ้œ€่ฆ่ทณ้Ž็š„ๆจกๅž‹ๅ็จฑๅˆ—่กจ
"""
backstory = f"""You are a risk scoring specialist. You calculate risk scores and produce final JSON reports.
{CONSTITUTION}
---
## Your Responsibility: Risk Scoring & Output ONLY
You handle Step 6-8 of the analysis pipeline:
- Step 6: Calculate risk_score = min(100, sum of cvss_score * weight)
Weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
- Step 7: Call `write_memory` tool to save your report
- Step 8: Output the final JSON report
!! ABSOLUTE PROHIBITIONS:
- You do NOT have `read_memory` tool. Do NOT try to call it.
- You do NOT have `check_cisa_kev` tool. Do NOT try to call it.
- You do NOT have `search_exploits` tool. Do NOT try to call it.
- Steps 1-5 are ALREADY DONE by other agents. Their results are in your task context.
- Your ONLY tool is `write_memory`. Use it to save, then output Final Answer.
"""
agent = Agent(
role="Vulnerability Analyst - Risk Scorer",
goal="Calculate risk scores, write memory, and output the final JSON report.",
backstory=backstory,
tools=[write_memory],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=8,
max_rpm=LLM_RPM,
allow_delegation=False,
)
logger.info("[OK] Scorer Sub-Agent created | tools=%s", [t.name for t in agent.tools])
return agent
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไธ‰้ƒจไปฝ๏ผšTask ๅทฅๅป ๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# โ”€โ”€ ๅŽŸๅง‹ๅ–ฎไธ€ Task๏ผˆๅ‘ๅพŒ็›ธๅฎน๏ผŒไพ› main.py ไฝฟ็”จ๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def create_analyst_task(agent: Agent, context: list | None = None) -> Task:
"""
ๅปบ็ซ‹ Analyst Agent ็š„ Task๏ผˆไพ› main.py ็š„ๅ–ฎไธ€ Crew ไฝฟ็”จ๏ผ‰ใ€‚
้‡ๆง‹็‚บ CrewAI ๆจ™ๆบ–ๆžถๆง‹๏ผš
- ไธๅ†ๆŠŠ scout_output ๅ…งๅฎนๅตŒๅ…ฅ description๏ผˆ่ˆŠๆ–นๅผ๏ผ‰
- ๆ”น็”จ context=[scout_task] ่ฎ“ CrewAI ่‡ชๅ‹•ๅฐ‡ๅ‰ไธ€ๅ€‹ Task ็š„่ผธๅ‡บ
ๅ‚ณ้ž็ตฆๆœฌ Task๏ผˆCrewAI ๅŽŸ็”Ÿๆจ“ๅˆถ๏ผ‰
Args:
agent: create_analyst_agent() ๅ›žๅ‚ณ็š„ Agent ๅฏฆไพ‹
context: ๅ‰ไธ€ๅ€‹ Task ็š„ๆธ…ๅ–ฎ๏ผˆๅฆ‚ [scout_task])
Returns:
CrewAI Task ๅฏฆไพ‹
"""
return Task(
description="""You are the Analyst Agent. The Scout Agent's intelligence report
is available in your context (previous task output).
Execute the following steps in strict order, calling the specified tools:
Step 1: Read historical memory
Action: read_memory
Action Input: analyst
!! CRITICAL MEMORY RULES โ€” DO NOT SKIP !!
Memory contains CVEs from PREVIOUS scans (different packages/code contexts).
STRICT RULES:
a) ONLY use memory to check if a CVE from THIS scan was seen before โ†’ mark is_repeated=true
b) NEVER add CVEs from memory to the current analysis if Scout did NOT find them in THIS scan
c) If Scout reports 0 CVEs (empty vulnerabilities[]), analysis[] has 0 CVE entries
d) Old scan data (e.g. Redis, Python2, Django from 6+ months ago) must be IGNORED entirely
e) A CVE is REPEATED only if: Scout found it NOW + it appears in memory history
Step 2: Parse the Scout intelligence from context
Extract all CVE entries from the vulnerabilities array.
Note each CVE's cve_id, cvss_score, severity, package, description, and is_new.
Step 3: KEV validation
Collect all CVE IDs with cvss_score >= 7.0 into a comma-separated string.
Action: check_cisa_kev
Action Input: CVE-XXXX-XXXX,CVE-YYYY-YYYY (all qualifying CVEs in one call)
Record in_kev status for each CVE.
Step 4: Exploit search
For each CVE where in_kev=true OR cvss_score >= 9.0:
Action: search_exploits
Action Input: CVE-XXXX-XXXX (one CVE per call)
Record exploit_available and exploit_count.
Step 5: Chain analysis
Classify each vulnerability's attack type.
Identify prerequisite-outcome chains between vulnerabilities.
Risk adjustment rules:
- in_kev + exploit + chain -> CRITICAL
- in_kev + exploit -> CRITICAL
- chain alone -> at least original severity
Risk can ONLY escalate, never downgrade.
!! CODE-LEVEL PATTERNS (v4.0) !!
If the Scout output contains a `code_patterns` field, you MUST also analyze it:
- Each entry has: finding_id (CODE-NNN), pattern_type, cwe_id, owasp_category,
severity, snippet (up to 200 chars), line_no
- Map each code_pattern to its OWASP attack chain (use the table in your Skill SOP)
- Include them in your analysis[] alongside CVE findings
- Use finding_id starting with CODE- (not CVE-) for these entries
- CRITICAL code patterns (SQL_INJECTION, CMD_INJECTION, EVAL_EXEC, PICKLE_UNSAFE,
PROTOTYPE_POLLUTION): always add to analysis with severity=CRITICAL
- HIGH code patterns (INNERHTML_XSS, SSRF_RISK, HARDCODED_SECRET, PATH_TRAVERSAL,
YAML_UNSAFE): add to analysis with severity=HIGH
Step 6: Risk scoring
risk_score = min(100, sum of (cvss x weight))
Weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
Step 7: Write memory (MANDATORY)
Action: write_memory
Action Input: analyst|{your complete JSON report}
Step 8: Output Final Answer as pure JSON.
Absolute prohibitions:
- Do NOT fabricate CVE IDs.
- Do NOT skip tool calls.
- Do NOT skip write_memory.
- Do NOT downgrade risk.
""",
expected_output=(
"Pure JSON following the Analyst -> Advisor contract: "
"scan_id, risk_score, risk_trend, analysis[] with "
"cve_id, original_cvss, adjusted_risk, in_cisa_kev, "
"exploit_available, chain_risk, reasoning for each CVE."
),
agent=agent,
context=context or [],
)
# โ”€โ”€ ๆ‹†ๅˆ†็‰ˆ Task ๅทฅๅป ๏ผˆ3 ๅ€‹ๅญ Task๏ผŒไพ› run_analyst_pipeline ไฝฟ็”จ๏ผ‰โ”€
def _create_collection_task(agent: Agent, scout_output: str) -> Task:
"""
ๅญ Task 1๏ผš่ณ‡ๆ–™ๆ”ถ้›†ใ€‚
่ฎ€ๅ–ๆญทๅฒ่จ˜ๆ†ถ + ่งฃๆž Scout ็š„ๆƒ…ๅ ฑๆธ…ๅ–ฎใ€‚
"""
return Task(
description=f"""You are the Analyst Agent performing Step 1 of 3: Data Collection.
Below is the Scout Agent's intelligence report:
{scout_output}
=== YOUR GOAL ===
1. Read your historical memory using the `read_memory` tool.
2. Parse the Scout JSON above to extract:
a) ALL CVEs from the `vulnerabilities` array
b) ALL code-level findings from the `code_patterns` array (if present)
!! CRITICAL MEMORY RULES !!
- Memory is from PREVIOUS scans. Only use it to mark is_repeated=true for CVEs found in THIS scan
- NEVER add CVEs from memory that are NOT in the current vulnerabilities[] list
- If vulnerabilities[] is empty, parsed_cves must be empty too
3. Output your Final Answer in this JSON structure:
{{
"historical_risk_score": <number or null>,
"parsed_cves": [
{{ "cve_id": "...", "package": "...", "cvss_score": 0.0, "severity": "...", "description": "...", "is_new": true }}
],
"code_patterns": [
{{ "finding_id": "CODE-001", "pattern_type": "EVAL_EXEC", "cwe_id": "CWE-94", "owasp_category": "A03:2021-Injection", "severity": "CRITICAL", "snippet": "eval(data)", "line_no": 14, "language": "python" }}
],
"tech_stack": ["..."],
"total_cves": <number>
}}
If no code_patterns exist in Scout output, use an empty array [].
Copy ALL code_patterns entries from Scout output EXACTLY as-is into your output.
=== โ›” CRITICAL RULE FOR FREE LLMS โ›” ===
You MUST NOT generate the JSON response right now.
You MUST call the `read_memory` tool FIRST.
If you generate the Final Answer JSON without calling the tool, you will be penalized.
Stop thinking about the Final Answer and output your thought and action to call `read_memory` immediately!
""",
expected_output=(
"JSON with historical_risk_score (number or null), "
"parsed_cves array containing all CVEs from Scout intelligence, "
"AND code_patterns array (empty [] if none)."
),
agent=agent,
)
def _create_analysis_task(agent: Agent) -> Task:
"""
ๅญ Task 2๏ผš้ฉ—่ญ‰่ˆ‡ๅˆ†ๆžใ€‚
KEV ้ฉ—่ญ‰ + Exploit ๆœๅฐ‹ + Chain ๅˆ†ๆžใ€‚
ไธŠไธ€ๅ€‹ Task ็š„่ผธๅ‡บๆœƒไฝœ็‚บ context ่‡ชๅ‹•ๅ‚ณๅ…ฅใ€‚
"""
return Task(
description="""You are the Analyst Agent performing Step 2 of 3: Verification & Analysis.
The previous task gave you parsed CVE data AND code_patterns. Now you must verify and analyze both.
=== PART A: CVE Analysis (if parsed_cves is non-empty) ===
1. Use `check_cisa_kev` tool to check ALL CVE IDs with cvss_score >= 7.0 (comma-separated).
2. Use `search_exploits` tool for each CVE where in_kev=true OR cvss_score >= 9.0.
3. Perform chain analysis (risk can only escalate).
=== PART B: Code Pattern Analysis (if code_patterns is non-empty) ===
For each code_pattern entry in the previous task output:
- Map pattern_type to OWASP attack chain (e.g. EVAL_EXEC โ†’ A03:2021-Injection โ†’ Arbitrary Code Execution)
- Assign risk using CWE severity:
CRITICAL (cvss_equivalent=9.0): EVAL_EXEC, EVAL_USAGE, SQL_INJECTION, CMD_INJECTION, PICKLE_UNSAFE,
PROTOTYPE_POLLUTION, DESERIALIZE_UNSAFE
HIGH (cvss_equivalent=7.5): INNERHTML_XSS, SSRF_RISK, HARDCODED_SECRET, PATH_TRAVERSAL, YAML_UNSAFE
- Do NOT call check_cisa_kev for CODE- findings (they are code patterns, not CVEs)
- Include CODE- findings in analysis[] with these fields:
finding_id, pattern_type, cwe_id, owasp_category, severity, snippet, line_no,
original_cvss (use cvss_equivalent above), adjusted_risk, in_cisa_kev=false,
exploit_available=false (deterministic scan, no external lookup needed),
chain_risk, reasoning
=== YOUR OUTPUT ===
Once you have ALL tool results (or if no CVEs, directly from code_patterns), output:
{
"analysis": [
{
"cve_id": "CVE-2024-XXXX", <-- for CVE findings
"original_cvss": 9.8,
"adjusted_risk": "CRITICAL",
"in_cisa_kev": true,
"exploit_available": true,
"chain_risk": { "is_chain": true, "chain_with": ["..."], "chain_description": "...", "confidence": "HIGH" },
"reasoning": "..."
},
{
"finding_id": "CODE-001", <-- for code pattern findings
"cve_id": null,
"pattern_type": "EVAL_EXEC",
"cwe_id": "CWE-94",
"owasp_category": "A03:2021-Injection",
"severity": "CRITICAL",
"snippet": "eval(data)",
"line_no": 14,
"original_cvss": 9.0,
"adjusted_risk": "CRITICAL",
"in_cisa_kev": false,
"exploit_available": false,
"chain_risk": { "is_chain": true, "chain_with": [], "chain_description": "eval() with user-controlled input enables remote code execution", "confidence": "HIGH" },
"reasoning": "eval(data) executes arbitrary Python code. If data comes from user input (network, file, env), this is a direct RCE vector. CWE-94: Improper Control of Generation of Code."
}
]
}
=== โ›” CRITICAL RULES โ›” ===
- If parsed_cves is empty but code_patterns is non-empty: ONLY analyze code_patterns, no CVE tool calls
- If both are present: analyze both
- DO NOT fabricate CVE IDs
- DO NOT call check_cisa_kev for CODE- findings
- Stop thinking and call tools immediately!
""",
expected_output=(
"JSON with analysis array containing BOTH CVE findings (with KEV/exploit data) "
"AND code pattern findings (finding_id starting CODE-, with chain_risk and reasoning)."
),
agent=agent,
)
def _create_scoring_task(agent: Agent) -> Task:
"""
ๅญ Task 3๏ผš่ฉ•ๅˆ†่ˆ‡่ผธๅ‡บใ€‚
่จˆ็ฎ—้ขจ้šชๅˆ†ๆ•ธ + ๅฏซๅ…ฅ่จ˜ๆ†ถ + ่ผธๅ‡บๆœ€็ต‚ JSONใ€‚
ไธŠไธ€ๅ€‹ Task ็š„่ผธๅ‡บๆœƒไฝœ็‚บ context ่‡ชๅ‹•ๅ‚ณๅ…ฅใ€‚
"""
now = datetime.now(timezone.utc)
scan_id = f"scan_{now.strftime('%Y%m%d')}_001"
return Task(
description=f"""You are performing the FINAL step: Scoring & Output.
โš ๏ธ IMPORTANT CONTEXT:
- Step 1 (data collection) and Step 2 (KEV/exploit verification) are ALREADY COMPLETED by other agents.
- Their results are provided to you in the task context above.
- You do NOT need to call read_memory, check_cisa_kev, or search_exploits.
- You do NOT have those tools. Your ONLY tool is `write_memory`.
=== YOUR GOAL ===
1. Look at the analysis results from the previous task context.
2. Calculate risk_score: min(100, sum of (each finding's cvss_equivalent x weight))
Weight by adjusted_risk: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
- For CODE- findings: CRITICAL code pattern = cvss_equivalent 9.0, HIGH = 7.5
- For CVE findings: use original_cvss
3. Calculate risk_trend: compare with historical_risk_score from task 1 context.
If no history, use "+0". Format: "+7" or "-3" or "+0".
4. Call `write_memory` tool with these EXACT arguments:
- agent_name: analyst
- data: your complete JSON report as a string
5. After write_memory confirms success, output your Final Answer.
=== FINAL ANSWER FORMAT (pure JSON, no other text) ===
{{
"scan_id": "{scan_id}",
"risk_score": <calculated number 0-100>,
"risk_trend": "<+N or -N or +0>",
"analysis": <copy the COMPLETE analysis array from previous task context, including both CVE and CODE- entries>
}}
=== โ›” RULES โ›” ===
- Do NOT call read_memory (you don't have it).
- Do NOT call check_cisa_kev (you don't have it).
- Do NOT call search_exploits (you don't have it).
- DO call write_memory FIRST, then output Final Answer.
- INCLUDE all CODE- findings from analysis[] in your final output.
- Final Answer must be pure JSON only. No markdown, no explanation.
""",
expected_output=(
"Pure JSON: scan_id, risk_score (0-100), risk_trend, "
"and complete analysis array from previous task (including CODE- findings)."
),
agent=agent,
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌๅ››้ƒจไปฝ๏ผšHarness ไฟ้šœๅฑค๏ผˆ3 ๅฑค๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _strip_react_residue(parsed: dict[str, Any]) -> dict[str, Any]:
"""
ๅตๆธฌไธฆๅ‰้›ข ReAct tool-call ๆฎ˜็•™ๆฌ„ไฝใ€‚
ๅผฑๆจกๅž‹ๅธธๆŠŠ thought/action/action_input ๆททๅ…ฅๆœ€็ต‚ JSON๏ผŒ
้€™ไบ›ไธๅฑฌๆ–ผ Analyst โ†’ Advisor ๅฅ‘็ด„ใ€‚
"""
react_keys = {"thought", "action", "action_input",
"Thought", "Action", "Action Input"}
found_react = react_keys & set(parsed.keys())
if not found_react:
return parsed # ๆฒ’ๆœ‰ ReAct ๆฎ˜็•™๏ผŒๅŽŸๆจฃ่ฟ”ๅ›ž
logger.warning("[WARN] Detected ReAct residual fields %s, stripped", found_react)
cleaned = {k: v for k, v in parsed.items() if k not in react_keys}
# ๅฆ‚ๆžœๅ‰้›ขๅพŒไปๆœ‰ schema ๅฟ…่ฆๆฌ„ไฝ๏ผŒๅ‰‡่ฆ–็‚บๆœ‰ๆ•ˆ
schema_keys = {"scan_id", "risk_score", "risk_trend", "analysis"}
if schema_keys & set(cleaned.keys()):
return cleaned
# ๅ‰้›ขๅพŒ็ฉบ็ฉบๅฆ‚ไนŸ โ†’ ็ด” ReAct ๆ ผๅผ๏ผŒ่ฆ–็‚บ็„กๆ•ˆ่ผธๅ‡บ
logger.warning("[WARN] After stripping ReAct, no valid schema fields remain")
return {}
def _extract_json_from_output(raw: str) -> dict[str, Any]:
"""ๅพž LLM ่ผธๅ‡บไธญๆๅ– JSON๏ผˆๅฎนๅฟ Markdown ๅŒ…่ฃ + ๅ‰้›ข ReAct ๆฎ˜็•™๏ผ‰"""
parsed = None
# ๅ˜—่ฉฆ 1๏ผš็›ดๆŽฅ่งฃๆž
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
pass
# ๅ˜—่ฉฆ 2๏ผšๆๅ– ```json ... ``` ๅ€ๅกŠ
if parsed is None:
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw)
if match:
try:
parsed = json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# ๅ˜—่ฉฆ 3๏ผšๆๅ– { ... } ๅ€ๅกŠ
if parsed is None:
match = re.search(r"\{[\s\S]+\}", raw)
if match:
try:
parsed = json.loads(match.group(0))
except json.JSONDecodeError:
pass
if parsed is None:
return {}
# ๅ‰้›ข ReAct ๆฎ˜็•™๏ผˆ้˜ฒๆญข thought/action ่ขซ็•ถๆˆๆœ‰ๆ•ˆ่ผธๅ‡บ๏ผ‰
return _strip_react_residue(parsed)
def _harness_validate_schema(output: dict[str, Any]) -> list[str]:
"""
Harness Layer 2๏ผšJSON Schema ้ฉ—่ญ‰ใ€‚
้ฉ—่ญ‰ Analyst โ†’ Advisor ๅฅ‘็ด„็š„ๅฟ…่ฆๆฌ„ไฝใ€‚
ๅ›žๅ‚ณ้Œฏ่ชคๆธ…ๅ–ฎ๏ผŒ็ฉบๆธ…ๅ–ฎ่กจ็คบ้€š้Žใ€‚
"""
errors = []
required_keys = ["scan_id", "risk_score", "risk_trend", "analysis"]
for k in required_keys:
if k not in output:
errors.append(f"็ผบๅฐ‘ๅฟ…่ฆๆฌ„ไฝ๏ผš{k}")
# ้ฉ—่ญ‰ analysis ้™ฃๅˆ—ไธญ็š„ๆฏๅ€‹้ …็›ฎ
for i, item in enumerate(output.get("analysis", [])):
item_required = ["cve_id", "original_cvss", "adjusted_risk", "reasoning"]
for k in item_required:
if k not in item:
errors.append(f"analysis[{i}] ็ผบๅฐ‘ๆฌ„ไฝ๏ผš{k}")
return errors
def _harness_validate_chain_risk(output: dict[str, Any]) -> None:
"""
Harness Layer 3๏ผšchain_risk ้‚่ผฏ้ฉ—่ญ‰ใ€‚
is_chain=true ๅฟ…้ ˆๆœ‰ chain_with + chain_descriptionใ€‚
"""
for i, item in enumerate(output.get("analysis", [])):
chain_risk = item.get("chain_risk", {})
if chain_risk.get("is_chain", False):
if not chain_risk.get("chain_with"):
logger.warning(
"[WARN] Harness Layer 3: analysis[%d] is_chain=true but missing chain_with, "
"auto-set to empty array", i
)
chain_risk["chain_with"] = []
if not chain_risk.get("chain_description"):
logger.warning(
"[WARN] Harness Layer 3: analysis[%d] is_chain=true but missing chain_description, "
"auto-patched", i
)
chain_risk["chain_description"] = "Chain detected but description not provided by Agent"
if not chain_risk.get("confidence"):
chain_risk["confidence"] = "NEEDS_VERIFICATION"
# CVE ๅนดไปฝๅˆ‡ๅ‰ฒ้ปž๏ผš2005 ๅนดๅ‰็š„ๆผๆดž็›ฎๆจ™่ปŸ้ซ”ๅŸบๆœฌๅทฒ้€€ๅ ด
# ไฝ่ญ‰๏ผšEPSS ็ ”็ฉถ๏ผˆJacobs et al. 2023๏ผ‰้กฏ็คบ pre-2005 CVE ็š„ EPSS < 0.01
# ้–‹ๆบๅƒ่€ƒ๏ผšTrivy --ignore-unfixedใ€Grype suppression ๆฉŸๅˆถ
_CVE_YEAR_CUTOFF = 2005
def _harness_filter_ancient_cves(output: dict[str, Any]) -> None:
"""
Harness Layer 3.5๏ผšCVE ๅนดไปฝ้Žๆฟพใ€‚
ๅฐ year < _CVE_YEAR_CUTOFF ็š„ CVE ๆจ™่จ˜ NEEDS_VERIFICATION๏ผŒ
ไธๅˆช้™ค๏ผˆไฟ็•™ๅฏฉ่จˆ่ปŒ่ทก๏ผ‰๏ผŒไฝ†่ฎ“ Advisor ็Ÿฅ้“้€™ไบ› CVE ๅฏ็–‘ใ€‚
่จญ่จˆไพๆ“š๏ผš
1. EPSS (Jacobs et al. 2023)๏ผšpre-2005 CVE ็š„ EPSS ๅนณๅ‡ < 0.01
2. NIST CVSS v3.1 User Guide ยง7.3๏ผšTemporal Metrics ๆ‡‰็ดๅ…ฅ่ฉ•ไผฐ
3. Trivy/Grype ้ƒฝๆœ‰้กžไผผ็š„ๅนดไปฝ้Žๆฟพ/suppress ๆฉŸๅˆถ
"""
for item in output.get("analysis", []):
cve_id = item.get("cve_id") or ""
year_m = re.match(r"CVE-(\d{4})-", cve_id)
if not year_m:
continue
year = int(year_m.group(1))
if year < _CVE_YEAR_CUTOFF:
# ่จญ็‚บ NEEDS_VERIFICATION๏ผŒไธๅผทๅˆถๅˆช้™ค
if item.get("chain_risk", {}).get("confidence") not in ("NEEDS_VERIFICATION",):
item.setdefault("chain_risk", {})["confidence"] = "NEEDS_VERIFICATION"
item["_ancient_cve_warning"] = (
f"CVE year {year} < {_CVE_YEAR_CUTOFF}: "
f"target software likely retired. "
f"Verify relevance to current tech stack before acting."
)
logger.warning(
"[ANALYST] Ancient CVE flagged: %s (year=%d) โ†’ confidence=NEEDS_VERIFICATION",
cve_id, year,
)
def _build_fallback_output(scout_data: dict[str, Any]) -> dict[str, Any]:
"""
Harness ไฟ้šœ๏ผš็•ถ LLM ่ผธๅ‡บ็„กๆณ•่งฃๆžๆ™‚๏ผŒ
ๆ นๆ“š Scout ่ผธๅ‡บๅปบ็ซ‹ๆœ€ๅฐๅฏ่กŒ็š„ Analyst ๅ ฑๅ‘Šใ€‚
"""
vulns = scout_data.get("vulnerabilities", [])
code_patterns = scout_data.get("code_patterns", [])
analysis = []
for v in vulns:
cve_id = v.get("cve_id", "UNKNOWN")
cvss = float(v.get("cvss_score", 0))
severity = v.get("severity", "LOW")
analysis.append({
"cve_id": cve_id,
"package": v.get("package", "unknown"),
"severity": severity,
"original_cvss": cvss,
"adjusted_risk": severity,
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "",
"confidence": "NEEDS_VERIFICATION",
},
"reasoning": f"Fallback analysis: CVSS {cvss:.1f} ({severity}), KEV/Exploit status unknown (Harness fallback)",
})
for pattern in code_patterns:
severity = pattern.get("severity", "MEDIUM")
if severity == "CRITICAL":
cvss_equivalent = 9.0
elif severity == "HIGH":
cvss_equivalent = 7.5
elif severity == "MEDIUM":
cvss_equivalent = 5.0
else:
cvss_equivalent = 2.5
analysis.append({
"finding_id": pattern.get("finding_id", "CODE-000"),
"cve_id": None,
"pattern_type": pattern.get("pattern_type", "UNKNOWN"),
"cwe_id": pattern.get("cwe_id", "CWE-unknown"),
"owasp_category": pattern.get("owasp_category", ""),
"severity": severity,
"snippet": pattern.get("snippet", ""),
"line_no": pattern.get("line_no", 0),
"original_cvss": cvss_equivalent,
"adjusted_risk": severity,
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "Deterministic code pattern preserved in Analyst fallback.",
"confidence": "HIGH",
},
"reasoning": (
f"Fallback analysis: deterministic {pattern.get('pattern_type', 'UNKNOWN')} "
f"pattern confirmed by Security Guard ({pattern.get('cwe_id', 'CWE-unknown')})."
),
})
# ่จˆ็ฎ—้ขจ้šชๅˆ†ๆ•ธ
weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1, "LOW": 0.5}
risk_score = min(100, int(sum(
float(item.get("original_cvss", 0))
* weight_map.get(item.get("adjusted_risk", item.get("severity", "LOW")), 1)
for item in analysis
)))
now = datetime.now(timezone.utc)
scan_id = f"scan_{now.strftime('%Y%m%d')}_001"
return {
"scan_id": scan_id,
"risk_score": risk_score,
"risk_trend": "+0",
"analysis": analysis,
"_harness_fallback": True,
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไบ”้ƒจไปฝ๏ผšPipeline ๅŸท่กŒๅ‡ฝๅผ๏ผˆๅซ Harness ไฟ้šœๅฑค๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def run_analyst_pipeline(scout_output: str | dict, input_type: str = "pkg") -> dict:
"""
ๅŸท่กŒๅฎŒๆ•ด็š„ Analyst Pipeline๏ผŒๅŒ…ๅซ Agent ๅŸท่กŒ + ็จ‹ๅผ็ขผๅฑคไฟ้šœใ€‚
Harness Engineering ๆ ธๅฟƒ็†ๅฟต๏ผš
ไธ่ฆ 100% ไพ่ณด LLM ้ตๅฎˆๆŒ‡ไปคใ€‚
Agent ่ฒ ่ฒฌใ€Œ็›กๅŠ›ๅšใ€๏ผŒ็จ‹ๅผ็ขผ่ฒ ่ฒฌใ€Œ็ขบไฟๅšๅˆฐใ€ใ€‚
ๆžถๆง‹๏ผš3-Task Sequential Pipeline๏ผˆ้™ไฝŽๅผฑๆจกๅž‹่ช็Ÿฅ่ฒ ่ท๏ผ‰
Task 1: ่ณ‡ๆ–™ๆ”ถ้›†๏ผˆCollector๏ผ‰โ€” read_memory + ่งฃๆž Scout JSON
Task 2: ้ฉ—่ญ‰ๅˆ†ๆž๏ผˆVerifier๏ผ‰โ€” KEV + Exploit + Chain ๅˆ†ๆž
Task 3: ่ฉ•ๅˆ†่ผธๅ‡บ๏ผˆScorer๏ผ‰โ€” ้ขจ้šช่จˆ็ฎ— + write_memory + ่ผธๅ‡บ JSON
็จ‹ๅผ็ขผๅฑคไฟ้šœ๏ผš
Layer 1๏ผšๅผทๅˆถ write_memory๏ผˆAgent ่‹ฅๆœชๅ‘ผๅซ๏ผŒ็จ‹ๅผ็ขผไปฃ็‚บๅŸท่กŒ๏ผ‰
Layer 2๏ผšJSON Schema ้ฉ—่ญ‰๏ผˆๅฟ…่ฆๆฌ„ไฝๆชขๆŸฅ๏ผ‰
Layer 3๏ผšchain_risk ้‚่ผฏ้ฉ—่ญ‰๏ผˆis_chain=true ๅฟ…้ ˆๆœ‰ chain_with + chain_description๏ผ‰
Args:
scout_output: Scout Agent ็š„ JSON ่ผธๅ‡บ๏ผˆๅญ—ไธฒๆˆ– dict๏ผ‰
input_type: Path-Aware Skill ่ทฏ็”ฑ๏ผˆpkg/code/injection/config๏ผ‰
Returns:
dict: ่งฃๆžๅพŒ็š„ Analyst ๅ ฑๅ‘Š JSON๏ผˆ็ฌฆๅˆ Analyst โ†’ Advisor ๅฅ‘็ด„๏ผ‰
"""
from crewai import Crew, Process
# ็ตฑไธ€่ฝ‰ๆˆ dict ๅ’Œ str ๅ…ฉ็จฎๅฝขๅผ
if isinstance(scout_output, dict):
scout_dict = scout_output
scout_str = json.dumps(scout_output, ensure_ascii=False, indent=2)
else:
scout_str = scout_output
try:
scout_dict = json.loads(scout_output)
except json.JSONDecodeError:
scout_dict = {}
logger.info("[START] Analyst Pipeline (3-Task split architecture)")
# ่จ˜้Œ„ pipeline ๅ•Ÿๅ‹•ๅ‰็š„่จ˜ๆ†ถๆช” mtime๏ผˆ็”จๆ–ผๅˆคๆ–ท Agent ๆ˜ฏๅฆๅ‘ผๅซไบ† write_memory๏ผ‰
memory_path_check = os.path.join(PROJECT_ROOT, "memory", "analyst_memory.json")
pre_mtime = os.path.getmtime(memory_path_check) if os.path.exists(memory_path_check) else 0
# 429 ่‡ชๅ‹•่ผชๆ›ฟ๏ผšๆœ€ๅคš้‡่ฉฆ MAX_LLM_RETRIES ๆฌก๏ผˆๆฏๆฌกๅˆ‡ๆ›ๆจกๅž‹๏ผ‰
from config import mark_model_failed, get_current_model_name
MAX_LLM_RETRIES = 2
excluded_models: list[str] = []
raw_output = ""
output: dict[str, Any] = {}
crew_success = False
for attempt in range(MAX_LLM_RETRIES + 1):
# โ”€โ”€ ๅปบ็ซ‹ 3 ๅ€‹ๅฐˆ่ฒฌๅญ Agent๏ผˆๆฏๆฌก้‡่ฉฆ้ƒฝ็”จๆ–ฐๆจกๅž‹๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€
collector = _create_collector_agent(excluded_models)
verifier = _create_verifier_agent(excluded_models)
scorer = _create_scorer_agent(excluded_models)
# โ”€โ”€ ๅปบ็ซ‹ 3 ๅ€‹ๅญ Task โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
task_1 = _create_collection_task(collector, scout_str)
task_2 = _create_analysis_task(verifier)
task_3 = _create_scoring_task(scorer)
# โ”€โ”€ ๅŸท่กŒ CrewAI Sequential Pipeline โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
try:
crew = Crew(
agents=[collector, verifier, scorer],
tasks=[task_1, task_2, task_3],
process=Process.sequential,
verbose=True,
)
logger.info("[START] Analyst Crew kickoff (attempt %d/%d)", attempt + 1, MAX_LLM_RETRIES + 1)
try:
from checkpoint import recorder as _cp
_a_model = get_current_model_name(collector.llm)
_cp.llm_call("analyst", _a_model, "openrouter", f"3-task-split attempt={attempt+1}")
except Exception:
_a_model = "unknown"
_t_a = time.time()
result = crew.kickoff()
raw_output = str(result.raw) if hasattr(result, "raw") else str(result)
try:
_cp.llm_result("analyst", _a_model, "SUCCESS",
len(raw_output), int((time.time() - _t_a) * 1000),
thinking=raw_output[:1000])
except Exception:
pass
output = _extract_json_from_output(raw_output)
crew_success = bool(output)
logger.info("[OK] CrewAI 3-Task Pipeline done | crew_success=%s", crew_success)
break # ๆˆๅŠŸๅ‰‡่ทณๅ‡บ้‡่ฉฆ่ฟดๅœˆ
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_LLM_RETRIES:
# ๆจ™่จ˜็•ถๅ‰ๆจกๅž‹็‚บๅ†ทๅปไธญ๏ผŒๅพžไปปไธ€ sub-agent ๅ–ๅพ—ๆจกๅž‹ๅ
current_model = get_current_model_name(collector.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[RETRY] Analyst 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_LLM_RETRIES, retry_after)
try:
_cp.llm_retry("analyst", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
from config import rate_limiter as _rl
_rl.on_429(retry_after=retry_after, caller="analyst") # ๆœ€ๅฐ‘ 30s
continue
logger.error("[FAIL] CrewAI execution failed: %s", e)
try:
_cp.llm_error("analyst", _a_model, error_str[:300])
except Exception:
pass
# โ”€โ”€ Harness Layer 1๏ผšๅผทๅˆถๅปบ็ซ‹่ผธๅ‡บ + ๅผทๅˆถ write_memory โ”€โ”€โ”€โ”€โ”€โ”€
need_fallback = not output or not crew_success
if need_fallback:
logger.warning("[WARN] Harness Layer 1: LLM output unparseable, using fallback")
output = _build_fallback_output(scout_dict)
# ๅผทๅˆถ write_memory๏ผˆไฝฟ็”จ mtime ๆฏ”่ผƒๅˆคๆ–ท Agent ๆ˜ฏๅฆๅทฒๅฏซๅ…ฅ๏ผ‰
memory_path = os.path.join(PROJECT_ROOT, "memory", "analyst_memory.json")
post_mtime = os.path.getmtime(memory_path) if os.path.exists(memory_path) else 0
need_write = (post_mtime <= pre_mtime)
if need_write:
logger.warning("[WARN] Agent did not call write_memory -- code forcing write (Harness)")
try:
write_result = write_memory.run(
agent_name="analyst",
data=json.dumps(output, ensure_ascii=False),
)
logger.info("[OK] Forced memory write: %s", write_result)
except Exception as e:
logger.error("[FAIL] Forced write_memory failed: %s", e)
else:
logger.info("[OK] Agent already called write_memory (mtime updated)")
# โ”€โ”€ Harness Layer 2๏ผšJSON Schema ้ฉ—่ญ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
schema_errors = _harness_validate_schema(output)
if schema_errors:
logger.warning("[WARN] Harness Layer 2: Schema errors %s, merging fallback", schema_errors)
fallback = _build_fallback_output(scout_dict)
for k, v in fallback.items():
if k not in output:
output[k] = v
# โ”€โ”€ Harness Layer 3๏ผšchain_risk ้‚่ผฏ้ฉ—่ญ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_harness_validate_chain_risk(output)
# โ”€โ”€ Harness Layer 3.5๏ผšCVE ๅนดไปฝ้Žๆฟพ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ไฝ่ญ‰๏ผšEPSS (Jacobs et al. 2023)ใ€NIST CVSS ยง7.3ใ€Trivy/Grype suppress
_harness_filter_ancient_cves(output)
# โ”€โ”€ ็ขบไฟ risk_score ๅœจๅˆ็†็ฏ„ๅœ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
risk_score = output.get("risk_score", 0)
if not (0 <= risk_score <= 100):
logger.warning("[WARN] risk_score=%s out of range, forcing correction", risk_score)
output["risk_score"] = max(0, min(100, risk_score))
# โ”€โ”€ Harness Layer 4๏ผšRisk Escalation Rule (้ขจ้šช็ฆๆญข้™็ดš) โ”€โ”€โ”€โ”€
scout_vulns = {v.get("cve_id"): v.get("severity", "LOW") for v in scout_dict.get("vulnerabilities", [])}
severity_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
# โ”€โ”€ ็ขบไฟ analysis ไธญๆฏๅ€‹้ …็›ฎ้ƒฝๆœ‰ chain_risk โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
for item in output.get("analysis", []):
cve_id = item.get("cve_id") or ""
orig_severity = scout_vulns.get(cve_id, "LOW")
adj_risk = item.get("adjusted_risk", orig_severity)
# ๆชขๆŸฅๆ˜ฏๅฆ้™็ดš
if severity_rank.get(adj_risk, 0) < severity_rank.get(orig_severity, 0):
logger.warning(
"[WARN] Harness Layer 4: %s tried to downgrade from %s to %s, "
"violates SOP, forcing back to %s",
cve_id, orig_severity, adj_risk, orig_severity
)
item["adjusted_risk"] = orig_severity
if "chain_risk" not in item:
item["chain_risk"] = {
"is_chain": False,
"chain_with": [],
"chain_description": "",
"confidence": "NEEDS_VERIFICATION",
}
if "in_cisa_kev" not in item:
item["in_cisa_kev"] = False
if "exploit_available" not in item:
item["exploit_available"] = False
analysis_count = len(output.get("analysis", []))
logger.info(
"[OK] Analyst Pipeline complete | risk_score=%s | risk_trend=%s | analysis_count=%d",
output.get('risk_score', 0),
output.get('risk_trend', '+0'),
analysis_count
)
return output
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌๅ…ญ้ƒจไปฝ๏ผšๆœฌๅœฐๆธฌ่ฉฆๅ…ฅๅฃ๏ผˆ็›ดๆŽฅๅŸท่กŒๆญคๆช”ๆกˆๆ™‚๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
)
# ไฝฟ็”จ Scout ่จ˜ๆ†ถไฝœ็‚บๆธฌ่ฉฆ่ผธๅ…ฅ
_scout_output_path = os.path.join(PROJECT_ROOT, "memory", "scout_memory.json")
if os.path.exists(_scout_output_path):
with open(_scout_output_path, encoding="utf-8") as _f:
_test_input = _f.read()
print(f"[TEST] ไฝฟ็”จ Scout ่จ˜ๆ†ถไฝœ็‚บ่ผธๅ…ฅ๏ผš{_scout_output_path}")
else:
_test_input = json.dumps({
"scan_id": "scan_test_001",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tech_stack": ["Django 4.2", "Redis 7.0"],
"vulnerabilities": [
{
"cve_id": "CVE-2024-42005",
"package": "django",
"cvss_score": 9.8,
"severity": "CRITICAL",
"description": "Django SQL injection vulnerability in QuerySet.values() and values_list()",
"is_new": True,
},
{
"cve_id": "CVE-2015-4335",
"package": "redis",
"cvss_score": 10.0,
"severity": "CRITICAL",
"description": "Redis Lua Sandbox Escape and Remote Code Execution",
"is_new": True,
},
],
"summary": {"total": 2, "critical": 2, "high": 0, "medium": 0, "low": 0},
})
print("[TEST] ไฝฟ็”จ้ ่จญๆธฌ่ฉฆ่ผธๅ…ฅ")
result = run_analyst_pipeline(_test_input)
print("\n=== Analyst ่ผธๅ‡บ ===")
print(json.dumps(result, ensure_ascii=False, indent=2))
print(f"\nrisk_score: {result.get('risk_score', 0)}")
print(f"risk_trend: {result.get('risk_trend', '+0')}")
print(f"analysis count: {len(result.get('analysis', []))}")