Threat_Hunter / agents /advisor.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
agents/advisor.py
================
Advisor Agent โ€” ThreatHunter ๆœ€็ต‚่ฃๆฑบ่€…๏ผˆJudge๏ผ‰
่ท่ฒฌ๏ผš
ๆŽฅๆ”ถ Analyst Agent๏ผˆๆˆ–้™็ดšๆƒ…ๆณไธ‹ Scout Agent๏ผ‰็š„ๅˆ†ๆž็ตๆžœ๏ผŒ
็”ขๅ‡บๅฏๅŸท่กŒ็š„่ณ‡ๅฎ‰่กŒๅ‹•ๅ ฑๅ‘Šใ€‚ไธ้œ€่ฆ้กๅค–ๆŸฅ่ฉข NVD/OTX/KEV๏ผŒ
ๆ‰€ๆœ‰่ณ‡ๆ–™็”ฑๅ‰ๅบ Agent ๆไพ›ใ€‚
Harness ไฟ่ญทๅฑค๏ผˆ้ตๅพช HARNESS_ENGINEERING.md ไธ‰ๆŸฑๆžถๆง‹๏ผ‰๏ผš
Layer 1 โ€” ๅผทๅˆถ write_memory๏ผˆAgent ่‹ฅๆœชๅ‘ผๅซ๏ผŒ็จ‹ๅผ็ขผไปฃๅŸท่กŒ๏ผ‰
Layer 2 โ€” ่ผธๅ‡บๆ ผๅผ้ฉ—่ญ‰๏ผˆ็ฌฆๅˆ docs/data_contracts.md Advisorโ†’UI ๅฅ‘็ด„๏ผ‰
Layer 3 โ€” ้ขจ้šชๅˆ†ๆ•ธ็ฏ„ๅœ้ฉ—่ญ‰๏ผˆ0-100๏ผ‰
Layer 4 โ€” URGENT ้ …็›ฎๅฟ…้ ˆ้™„ๅธถ command๏ผˆไฟฎ่ฃœๆŒ‡ไปค๏ผ‰
Layer 5 โ€” ๆญทๅฒๆฏ”ๅฐ๏ผš้‡่ค‡ๆœชไฟฎ่ฃœ้ …็›ฎ่ชžๆฐฃ้žๅ‡
ไฝœ่€…๏ผšThreatHunter ็ต„้•ท
้ตๅฎˆ๏ผšproject_CONSTITUTION.md + docs/system_constitution.md
"""
import json
import logging
import os
import re
import time
from datetime import datetime, timezone
from typing import Any
from crewai import Agent, Task
from config import get_llm
from tools.memory_tool import history_search, read_memory, write_memory
# LLM ๅปถ้ฒๅˆๅง‹ๅŒ–๏ผšๅœจ create_advisor_agent() ไธญๆ‰ๅ‘ผๅซ get_llm()
logger = logging.getLogger("ThreatHunter")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไธ€้ƒจไปฝ๏ผš็ณป็ตฑๆ†ฒๆณ• + Skill SOP
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ๅตŒๅ…ฅ docs/system_constitution.md ่‹ฑๆ–‡็‰ˆ
CONSTITUTION = """
=== ThreatHunter Constitution ===
1. All CVE IDs must come from Tool-returned data. Fabrication is prohibited.
2. You must use the provided Tools for queries. Skip is not allowed.
3. Output must conform to the specified JSON schema.
4. Uncertain reasoning must be tagged with confidence: HIGH / MEDIUM / NEEDS_VERIFICATION.
5. Each judgment must include a reasoning field.
6. Reports use English; technical terms are not translated.
7. Do not call the same Tool twice for the same data.
"""
# ๅตŒๅ…ฅ skills/action_report.md SOP
_SKILL_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"skills", "action_report.md"
)
try:
with open(_SKILL_PATH, "r", encoding="utf-8") as _f:
ADVISOR_SKILL = _f.read()
except FileNotFoundError:
ADVISOR_SKILL = "## Skill: Action Report\nPrioritize URGENT โ†’ IMPORTANT โ†’ RESOLVED."
# v3.7: Path-Aware Skill Map๏ผˆๅฐๆ‡‰ main.py recorder.stage_enter ไฝฟ็”จ๏ผ‰
def _load_skill(skill_filename: str = "action_report.md") -> str:
skill_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"skills",
skill_filename,
)
try:
with open(skill_path, "r", encoding="utf-8") as skill_file:
return skill_file.read()
except FileNotFoundError:
return ADVISOR_SKILL
SKILL_MAP: dict[str, str] = {
"pkg": "action_report.md", # Path A: package scan report
"code": "code_action_report.md", # Path B-code: source code report
"injection": "ai_action_report.md", # Path B-inject: AI security report
"config": "config_action_report.md", # Path C: config report
}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไบŒ้ƒจไปฝ๏ผšAgent ๅปบ็ซ‹ๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def create_advisor_agent(
excluded_models: list[str] | None = None,
input_type: str = "pkg",
) -> Agent:
"""
ๅปบ็ซ‹ Advisor Agentใ€‚
Args:
excluded_models: ้œ€่ฆ่ทณ้Ž็š„ๆจกๅž‹ๅ็จฑๅˆ—่กจ๏ผˆ429 ่ขซ้™้€Ÿ็š„ๆจกๅž‹๏ผ‰
Returns:
CrewAI Agent ๅฏฆไพ‹๏ผŒๅ…ทๅ‚™่จ˜ๆ†ถ่ฎ€ๅฏซ่ƒฝๅŠ›ใ€‚
"""
skill_filename = SKILL_MAP.get(input_type, "action_report.md")
skill_content = _load_skill(skill_filename)
return Agent(
role="Security Advisor and Final Judge",
goal=(
"Review upstream agent vulnerability analysis, combine it with historical advisory memory, "
"and produce a clear, executable security action report for non-technical stakeholders. "
"Classify actions as URGENT, IMPORTANT, or RESOLVED, and include concrete remediation commands "
"for every action item."
),
backstory=f"""You are a senior security advisor at CISO level, with deep experience in attack analysis and risk management.
{CONSTITUTION}
## Action Report SOP from skills/{skill_filename}
{skill_content}
## Output Specification: Advisor to UI Data Contract
You must output exactly the following JSON shape. Do not include any text outside JSON.
```json
{{
"executive_summary": "One English sentence summarizing the overall risk posture.",
"actions": {{
"urgent": [
{{
"cve_id": "CVE-XXXX-XXXX",
"package": "package name",
"severity": "CRITICAL or HIGH",
"action": "specific remediation guidance",
"command": "pip install package==version or another concrete command",
"reason": "why this item is marked URGENT",
"is_repeated": false
}}
],
"important": [
{{
"cve_id": "CVE-XXXX-XXXX",
"package": "package name",
"severity": "HIGH or MEDIUM",
"action": "specific remediation guidance",
"reason": "why this should be prioritized"
}}
],
"resolved": []
}},
"risk_score": 0,
"risk_trend": "+0",
"scan_count": 1,
"generated_at": "ISO 8601 timestamp"
}}
```
## Triage Rules
- URGENT: CVSS >= 9.0 (CRITICAL), or known exploitation in the wild (in_cisa_kev=true), or public PoC.
- IMPORTANT: CVSS >= 7.0 (HIGH), or credible attack-chain risk.
- Other items (MEDIUM/LOW with no exploitation signal): do not include them in the action list for now.
## Risk Score Calculation
risk_score = min(100, sum of (cvss_score * weight for each vuln))
weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
""",
tools=[read_memory, write_memory, history_search],
llm=get_llm(exclude_models=excluded_models),
verbose=True,
max_iter=4, # v3.5: Advisor ๅช่ฎ€/ๅฏซ่จ˜ๆ†ถ๏ผŒไธ้œ€ๅคšๆฌก่ฟญไปฃ
allow_delegation=False,
)
def create_advisor_task(agent: Agent, analyst_output: str) -> Task:
"""
ๅปบ็ซ‹ Advisor Taskใ€‚
Args:
agent: create_advisor_agent() ๅ›žๅ‚ณ็š„ Agent
analyst_output: Analyst Agent ็š„ JSON ่ผธๅ‡บๅญ—ไธฒ๏ผˆๆˆ–้™็ดšๆ™‚ Scout ็š„่ผธๅ‡บ๏ผ‰
Returns:
CrewAI Task ๅฏฆไพ‹
"""
return Task(
description=f"""
You are the final judge. The following is the Analyst Agent result:
{analyst_output}
Follow these steps:
1. First read Advisor memory with read_memory agent_name="advisor".
Use it only to check whether real CVE IDs (CVE-XXXX-XXXX) appeared before and remain unresolved.
!! CRITICAL: is_repeated rules !!
- CVE findings with a real CVE-XXXX-XXXX ID: if seen in history, set is_repeated=true.
- CODE findings where finding_id starts with CODE- and cve_id=null: is_repeated must always be false.
Reason: each scan analyzes different code, so CODE-001 has no stable meaning across scans.
- Do not mark a new XSS or SQLi report as is_repeated=true merely because history contains eval().
2. Classify findings as URGENT, IMPORTANT, or RESOLVED.
3. Every URGENT CVE finding must include a concrete remediation command such as pip install or apt upgrade.
Every URGENT CODE finding must include vulnerable_snippet, fixed_snippet, and why_this_works.
4. Calculate the overall risk_score (0-100) and risk_trend compared with the previous report.
5. Produce a complete JSON action report with no text outside JSON.
6. Finally call write_memory agent_name="advisor" to save this report.
!! ANTI-FABRICATION RULES (v5.1): strict, violations invalidate the output !!
- executive_summary must describe only the vulnerability types actually found in this scan input.
Example: if the input contains XSS, say XSS; do not claim eval/RCE.
Example: if the input contains SQL Injection, say SQL Injection; do not claim XSS.
- Do not include any finding_id or CVE ID that is absent from the Analyst result.
- Do not fabricate vulnerable_snippet or fixed_snippet from SOP sample code such as eval() examples.
- vulnerable_snippet must come from the Analyst-provided snippet field; use an empty string if absent.
- Do not copy "Standard Code Fixes" examples from code_action_report.md or action_report.md as if they
were findings from this scan. They are format examples only.
!! CODE-LEVEL FINDING RULES (v4.0) !!
If Analyst analysis[] contains items whose finding_id starts with CODE-:
- These are code-level vulnerabilities from Security Guard static analysis. They require code-fix advice,
not package-upgrade advice.
- Triage:
URGENT = CODE patterns with severity=CRITICAL (SQL_INJECTION, CMD_INJECTION,
EVAL_EXEC, PICKLE_UNSAFE, PROTOTYPE_POLLUTION)
IMPORTANT = CODE patterns with severity=HIGH (INNERHTML_XSS, SSRF_RISK,
HARDCODED_SECRET, PATH_TRAVERSAL, YAML_UNSAFE)
- Every CODE action item must include:
"action": specific remediation guidance, for example "use parameterized queries" rather than "sanitize inputs"
"vulnerable_snippet": copied from the Analyst snippet field
"fixed_snippet": syntactically correct fixed code in the detected language
"why_this_works": an explanation of why the fix is effective
- Do not use "pip install" or "apt upgrade" as the command for CODE findings.
- Do not use vague advice such as "sanitize your inputs"; name the concrete API or coding pattern.
""",
expected_output=(
"Complete JSON action report that matches the Advisor to UI data contract, "
"including executive_summary, actions (urgent/important/resolved), "
"risk_score, risk_trend, scan_count, and generated_at."
),
agent=agent,
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไธ‰้ƒจไปฝ๏ผšHarness ไฟ้šœๅฑค๏ผˆ5 ๅฑค๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _extract_json_from_output(raw: str) -> dict[str, Any]:
"""ๅพž LLM ่ผธๅ‡บไธญๆๅ– JSON๏ผˆๅฎนๅฟ Markdown ๅŒ…่ฃ๏ผ‰ใ€‚"""
# ๅ˜—่ฉฆ 1๏ผš็›ดๆŽฅ่งฃๆž
try:
return json.loads(raw)
except json.JSONDecodeError:
pass
# ๅ˜—่ฉฆ 2๏ผšๆๅ– ```json ... ``` ๅ€ๅกŠ
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# ๅ˜—่ฉฆ 3๏ผšๆๅ– { ... } ๅ€ๅกŠ
match = re.search(r"\{[\s\S]+\}", raw)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return {}
def _build_fallback_output(analyst_data: dict[str, Any]) -> dict[str, Any]:
"""
Harness ไฟ้šœ๏ผš็•ถ LLM ่ผธๅ‡บ็„กๆณ•่งฃๆžๆ™‚๏ผŒ
ๆ นๆ“š Analyst/Scout ่ผธๅ‡บๅปบ็ซ‹ๆœ€ๅฐๅฏ่กŒๅ ฑๅ‘Šใ€‚
"""
vulns = analyst_data.get("vulnerabilities", analyst_data.get("analysis", []))
urgent, important = [], []
for v in vulns:
cve_id = v.get("cve_id", "UNKNOWN")
is_representative = (
v.get("evidence_type") == "representative_cve"
or bool(v.get("must_not_enter_package_actions"))
or bool(v.get("not_directly_observed"))
)
if is_representative:
continue
package = v.get("package", "unknown")
severity = v.get("severity", "MEDIUM")
cvss = float(v.get("cvss_score", v.get("original_cvss", 0)))
item = {
"cve_id": cve_id,
"package": package,
"severity": severity,
"action": f"Update {package} to the latest stable version.",
"reason": f"CVSS {cvss:.1f} ({severity})",
"is_repeated": False,
}
if cvss >= 9.0 or severity == "CRITICAL":
item["command"] = f"pip install --upgrade {package}"
urgent.append(item)
elif cvss >= 7.0 or severity == "HIGH":
important.append(item)
# ่จˆ็ฎ—้ขจ้šชๅˆ†ๆ•ธ
weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1, "LOW": 0.5}
risk_score = min(100, int(sum(
float(v.get("cvss_score", v.get("original_cvss", 0))) *
weight_map.get(v.get("severity", "LOW"), 1)
for v in vulns
)))
total = len(vulns)
critical_count = sum(1 for v in vulns if v.get("severity") == "CRITICAL")
summary = (
f"{total} vulnerabilities found. "
f"{critical_count} CRITICAL. "
f"Immediate action required for {len(urgent)} item(s)."
)
return {
"executive_summary": summary,
"actions": {
"urgent": urgent,
"important": important,
"resolved": [],
},
"risk_score": risk_score,
"risk_trend": "+0",
"scan_count": 1,
"generated_at": datetime.now(timezone.utc).isoformat(),
"_harness_fallback": True,
}
def _harness_validate_schema(output: dict[str, Any]) -> list[str]:
"""
Harness Layer 2๏ผš้ฉ—่ญ‰่ผธๅ‡บๆ ผๅผ็ฌฆๅˆ data_contracts.mdใ€‚
ๅ›žๅ‚ณ้Œฏ่ชคๆธ…ๅ–ฎ๏ผŒ็ฉบๆธ…ๅ–ฎ่กจ็คบ้€š้Žใ€‚
"""
errors = []
required_keys = ["executive_summary", "actions", "risk_score", "risk_trend"]
for k in required_keys:
if k not in output:
errors.append(f"็ผบๅฐ‘ๅฟ…่ฆๆฌ„ไฝ๏ผš{k}")
actions = output.get("actions", {})
for section in ["urgent", "important", "resolved"]:
if section not in actions:
errors.append(f"actions ็ผบๅฐ‘ {section} ๆฌ„ไฝ")
return errors
def _harness_validate_risk_score(output: dict[str, Any]) -> None:
"""Harness Layer 3๏ผš้ขจ้šชๅˆ†ๆ•ธๅฟ…้ ˆๅœจ 0-100 ็ฏ„ๅœใ€‚"""
score = output.get("risk_score", 0)
if not (0 <= score <= 100):
logger.warning("โš ๏ธ Harness Layer 3๏ผšrisk_score=%s ่ถ…ๅ‡บ็ฏ„ๅœ๏ผŒๅผทๅˆถไฟฎๆญฃ็‚บ clamp(0,100)", score)
output["risk_score"] = max(0, min(100, score))
def _harness_ensure_commands(output: dict[str, Any]) -> None:
"""
Harness Layer 4๏ผšURGENT ้ …็›ฎๅฟ…้ ˆ้™„ๅธถ command ๆฌ„ไฝใ€‚
v5.1๏ผšCODE-pattern ไธไฝฟ็”จ pip install๏ผŒๆ”น็”จ Manual code fix requiredใ€‚
"""
urgent = output.get("actions", {}).get("urgent", [])
for item in urgent:
if "command" not in item or not item["command"]:
cve_id = item.get("cve_id") or ""
is_code = not (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-"))
if is_code:
# CODE-pattern ไธ่ˆ‡ไปปไฝ•ๅฅ—ไปถ็ฎก็†ๅทฅๅ…ท้€ฃ็ต
item["command"] = "Manual code fix required (see fixed_snippet)"
else:
pkg = item.get("package", "package")
item["command"] = f"pip install --upgrade {pkg}"
logger.warning("โš ๏ธ Harness Layer 4๏ผš%s ็ผบๅฐ‘ command๏ผŒ่‡ชๅ‹•่ฃœๅ…จ", item.get("cve_id", "?"))
_CONSTITUTION_VIOLATION_WARNED = False
def _harness_enrich_cwe_evidence(output: dict) -> None:
"""
Harness Layer 6.5: ็‚บ code_patterns_summary ๆณจๅ…ฅ MITRE CWE ๅฎ˜ๆ–นไฝ่ญ‰ใ€‚
็‚บๆฏๅ€‹ code pattern ๅŠ ๅ…ฅ๏ผš
- CWE ๅฎ˜ๆ–นๅ็จฑ๏ผˆMITRE CWE v4.14๏ผ‰
- NIST ๅšด้‡ๆ€ง็ญ‰็ดš
- CVSS Base ๅˆ†ๆ•ธ๏ผˆๅ…ธๅž‹ๅ€ผ๏ผ‰
- OWASP 2021 ๅฐๆ‡‰
- ๅฎ˜ๆ–น URL
- ไฟฎๅพฉๅปบ่ญฐ๏ผˆไธญๆ–‡๏ผ‰
- ไปฃ่กจๆ€ง CVE๏ผˆๅŒ้กžๅผฑ้ปž็œŸๅฏฆๆกˆไพ‹๏ผ‰
ๆ•ˆๆžœ๏ผš่ฎ“ code_pattern ไธๅ†ๅชๆ˜ฏ LLM ็š„่ชชๆณ•๏ผŒ่€Œๆ˜ฏๆœ‰ MITRE ๅฎ˜ๆ–นๅฎš็พฉๆ”ฏๆ’ใ€‚
ๅ…่ฒฌ่ฒๆ˜Ž๏ผšไปฃ่กจๆ€ง CVE ๆ˜ฏใ€ŒๅŒ้กžๅผฑ้ปž็š„็œŸๅฏฆๆกˆไพ‹ใ€๏ผŒ
ไธไปฃ่กจ็”จๆˆถ็จ‹ๅผ็ขผใ€Œๅฐฑๆ˜ฏใ€้‚ฃๅ€‹ CVEใ€‚
"""
try:
from tools.cwe_registry import build_cwe_reference, pattern_type_to_cwe
except ImportError:
logger.warning("[ADVISOR] tools/cwe_registry not found, skipping CWE enrichment")
return
patterns = output.get("code_patterns_summary", [])
if not patterns:
return
enriched_count = 0
for item in patterns:
cwe_id = item.get("cwe_id") or item.get("cve_id", "")
if not cwe_id or not cwe_id.startswith("CWE-"):
# ๅฆ‚ๆžœๆฒ’ๆœ‰ cwe_id๏ผŒๅพž pattern_type ๆŽจๆ–ท
pt = item.get("pattern_type", "")
cwe_id_guess = pattern_type_to_cwe(pt)
if cwe_id_guess:
cwe_id = cwe_id_guess
if cwe_id and cwe_id.startswith("CWE-"):
cwe_reference = build_cwe_reference(cwe_id)
if cwe_reference:
item["cwe_reference"] = cwe_reference
item.setdefault("canonical_cwe_id", cwe_reference.get("id", cwe_id))
enriched_count += 1
if enriched_count:
logger.info(
"[ADVISOR] CWE enrichment: %d/%d code_patterns enriched with MITRE data",
enriched_count, len(patterns),
)
def _pattern_type_to_cwe(pattern_type: str) -> str | None:
"""ๅพž pattern_type ๅ็จฑๆŽจๆ–ท CWE ID๏ผˆfallback ็”จ๏ผ‰"""
try:
from tools.cwe_registry import pattern_type_to_cwe
except ImportError:
return None
return pattern_type_to_cwe(pattern_type)
def _harness_constitution_guard(output: dict[str, Any]) -> None:
"""
Harness Layer 6๏ผšๆ†‘ๆณ• CI-1/CI-2 ๅฎˆ่ก›ใ€‚
ๆ†‘ๆณ•่ฆๅ‰‡๏ผš
่ฆๅ‰‡ CI-1๏ผšๆ‰€ๆœ‰ CVE ็ทจ่™Ÿๅฟ…้ ˆไพ†่‡ช Tool ๅ›žๅ‚ณ็š„็œŸๅฏฆ API ่ณ‡ๆ–™
่ฆๅ‰‡ CI-2๏ผš็ฆๆญข LLM ่‡ช่กŒ็ทจ้€ ไปปไฝ• CVE ็ทจ่™Ÿๆˆ–ๆผๆดž็ดฐ็ฏ€
URGENT / IMPORTANT ๅ€ๅกŠๅชๅ…่จฑๆœ‰็œŸๅฏฆ CVE ID๏ผˆCVE-XXXX-XXXX ๆˆ– GHSA-XXXX๏ผ‰็š„้ …็›ฎใ€‚
CODE-pattern๏ผˆfinding_id = CODE-001 ็ญ‰๏ผŒcve_id = null๏ผ‰็‚บ LLM ่‡ช่กŒ็”Ÿๆˆ็š„้›œ่จŠ๏ผŒ
ไธๆ˜ฏๅฏ้ฉ—่ญ‰็š„ๅค–้ƒจไพ†ๆบ๏ผŒไธๅพ—ๅ‘ˆ็พๅœจ URGENT/IMPORTANT ๅ€ๅกŠใ€‚
็งป้™ค็š„ CODE-pattern ๆœƒ่ขซๆ”พๅ…ฅ code_patterns_summary ๆฌ„ไฝ๏ผŒไพ› UI ๅƒ่€ƒ้กฏ็คบใ€‚
"""
global _CONSTITUTION_VIOLATION_WARNED
actions = output.get("actions", {})
code_patterns_removed = []
representative_cves_removed = []
for section in ["urgent", "important"]:
original = actions.get(section, [])
clean = []
for item in original:
cve_id = item.get("cve_id") or ""
finding_id = item.get("finding_id") or ""
# ๅˆคๆ–ทๆ˜ฏๅฆ็‚บ CODE-pattern๏ผš
# 1) cve_id ็‚บ null/็ฉบ
# 2) ๆˆ– cve_id ไปฅ CWE- ้–‹้ ญ๏ผˆ้€™ๆ˜ฏ Harness ่จŠๆฏ้กžๅž‹๏ผŒไธๆ˜ฏ็œŸๅฏฆ CVE๏ผ‰
# 3) ๆˆ– finding_id ไปฅ CODE- ้–‹้ ญ
is_code_pattern = (
finding_id.startswith("CODE-")
or cve_id.startswith("CWE-")
or (
not cve_id
and not (cve_id.startswith("CVE-") if cve_id else False)
and not (cve_id.startswith("GHSA-") if cve_id else False)
)
)
has_real_cve = bool(
cve_id
and (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-"))
)
is_representative_cve = (
item.get("evidence_type") == "representative_cve"
or bool(item.get("must_not_enter_package_actions"))
or bool(item.get("not_directly_observed"))
)
if is_representative_cve:
representative_cves_removed.append(item)
logger.warning(
"[ADVISOR EVIDENCE] Representative CVE removed from %s: %s",
section, cve_id or finding_id or "unknown"
)
elif is_code_pattern and not has_real_cve:
code_patterns_removed.append(item)
if not _CONSTITUTION_VIOLATION_WARNED:
logger.warning(
"๐Ÿ›ก๏ธ Harness Layer 6 [CONSTITUTION CI-1/CI-2]๏ผš"
"CODE-pattern %r ๅพž %s ็งป้™ค(้žๅฏ้ฉ—่ญ‰ไพ†ๆบ)",
finding_id or cve_id, section
)
_CONSTITUTION_VIOLATION_WARNED = True
else:
clean.append(item)
actions[section] = clean
# ๅฆ‚ๆžœๆœ‰่ขซ็งป้™ค็š„ CODE-pattern๏ผŒ่จ˜้Œ„ๅˆฐ็จ็ซ‹ๆฌ„ไฝ
if code_patterns_removed:
existing = output.get("code_patterns_summary", [])
output["code_patterns_summary"] = existing + code_patterns_removed
logger.info(
"๐Ÿ›ก๏ธ Harness Layer 6๏ผšๅฐ‡ %d ๅ€‹ CODE-pattern ็งป้™คๅ‡บ URGENT/IMPORTANT๏ผŒ"
"็งปๅ…ฅ code_patterns_summary",
len(code_patterns_removed)
)
if representative_cves_removed:
existing = output.get("representative_cve_evidence", [])
output["representative_cve_evidence"] = existing + representative_cves_removed
output.setdefault("evidence_gate", {})["representative_cves_removed_from_actions"] = len(
representative_cves_removed
)
def _harness_check_repeated(output: dict[str, Any]) -> None:
"""
Harness Layer 5๏ผšๆฏ”ๅฐๆญทๅฒ่จ˜ๆ†ถ๏ผŒๆจ™่จ˜้‡่ค‡ๆœชไฟฎ่ฃœ้ …็›ฎใ€‚
่ฎ€ๅ– advisor_memory.json๏ผŒ่‹ฅ CVE ๅทฒๅœจๆญทๅฒไธญๅ‡บ็พไธ”ๆœช resolved๏ผŒ
ๅ‰‡ is_repeated=True ไธฆๅผทๅŒ–่ชžๆฐฃใ€‚
CRITICAL RULE๏ผˆv5.1๏ผ‰๏ผš
- CODE-level patterns (cve_id is null/empty) ็ต•ๅฐไธ่ƒฝๆจ™่จ˜ REPEATEDใ€‚
ๅŽŸๅ› ๏ผšๆฏๆฌกๆŽƒๆ็š„็จ‹ๅผ็ขผ้ƒฝไธๅŒ๏ผŒCODE-001 ๅœจไธๅŒๆŽƒๆไธญไปฃ่กจไธๅŒๆผๆดž๏ผŒ
่ทจๆŽƒๆๆฏ”ๅฐๆฒ’ๆœ‰ไปปไฝ•ๆ„็พฉ๏ผŒไธ”ๆœƒ็”ข็”Ÿๅšด้‡่ชคๅ ฑใ€‚
- REPEATED ๆฉŸๅˆถๅƒ…้ฉ็”จๆ–ผๆœ‰็œŸๅฏฆ CVE ID ็š„ๅฅ—ไปถๆผๆดžใ€‚
"""
try:
history_str = read_memory.run(agent_name="advisor")
history_data = json.loads(history_str) if history_str else {}
prev_vulns = set()
# ๆ”ถ้›†ๆญทๅฒไธญๆ‰€ๆœ‰ๆ›พๅปบ่ญฐ็š„็œŸๅฏฆ CVE ID๏ผˆไธๅซ null ๅ’Œ็ฉบๅญ—ไธฒ๏ผ‰
for scan in history_data.get("history", []):
for section in ["urgent", "important"]:
for item in scan.get("actions", {}).get(section, []):
cve_id = item.get("cve_id") or ""
# ๅชๆ”ถ้›†็œŸๅฏฆ CVE ID๏ผˆๅฟ…้ ˆไปฅ CVE- ๆˆ– GHSA- ้–‹้ ญ๏ผ‰
if cve_id and (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")):
prev_vulns.add(cve_id)
if not prev_vulns:
return
for section in ["urgent", "important"]:
for item in output.get("actions", {}).get(section, []):
cve_id = item.get("cve_id") or ""
# CODE-pattern๏ผˆcve_id ็‚บ็ฉบ๏ผ‰ๆฐธ้ ไธๆจ™ REPEATED
if not cve_id or not (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")):
item["is_repeated"] = False # ๅผทๅˆถๆธ…้™ค LLM ๅฏ่ƒฝ่จญ็š„ True
continue
if cve_id in prev_vulns:
item["is_repeated"] = True
# ๅผทๅŒ–่ชžๆฐฃ๏ผˆSkill SOP ่ฆๅฎš๏ผ‰
existing = item.get("action", "")
if not existing.startswith("[REPEATED"):
item["action"] = "[REPEATED โ€” STILL NOT PATCHED] " + existing
logger.info("๐Ÿ“‹ Harness Layer 5๏ผš%s ๆจ™่จ˜็‚บ้‡่ค‡ๆœชไฟฎ่ฃœ", cve_id)
except Exception as e:
logger.debug("Harness Layer 5 ่ทณ้Ž๏ผˆๆญทๅฒ่จ˜ๆ†ถๅฐš็„ก่จ˜้Œ„๏ผ‰๏ผš%s", e)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌๅ››้ƒจไปฝ๏ผšๅฎŒๆ•ด Pipeline ๅŸท่กŒๅ‡ฝๅผ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def run_advisor_pipeline(
analyst_output: str | dict[str, Any],
input_type: str = "pkg",
) -> dict[str, Any]:
"""
ๅŸท่กŒ Advisor Agent Pipeline๏ผˆๅซ 5 ๅฑค Harness ไฟ้šœ๏ผ‰ใ€‚
Args:
analyst_output: Analyst Agent ็š„ JSON ่ผธๅ‡บ๏ผˆๅญ—ไธฒๆˆ– dict๏ผ‰ใ€‚
็•ถๆˆๅ“ก C ๆœชๅฐฑ็ท’ๆ™‚๏ผŒๅฏๅ‚ณๅ…ฅ Scout ็š„่ผธๅ‡บไฝœ็‚บ้™็ดš่ทฏๅพ‘ใ€‚
Returns:
dict๏ผš็ฌฆๅˆ Advisorโ†’UI ่ณ‡ๆ–™ๅฅ‘็ด„็š„่กŒๅ‹•ๅ ฑๅ‘Š JSONใ€‚
"""
from crewai import Crew, Process
# ็ตฑไธ€่ฝ‰ๆˆ dict ๅ’Œ str ๅ…ฉ็จฎๅฝขๅผ
if isinstance(analyst_output, dict):
analyst_dict = analyst_output
analyst_str = json.dumps(analyst_output, ensure_ascii=False, indent=2)
else:
analyst_str = analyst_output
try:
analyst_dict = json.loads(analyst_output)
except json.JSONDecodeError:
analyst_dict = {}
logger.info("[START] Advisor Pipeline")
# 429 ่‡ชๅ‹•่ผชๆ›ฟ๏ผšๆœ€ๅคš้‡่ฉฆ MAX_LLM_RETRIES ๆฌก๏ผˆๆฏๆฌกๅˆ‡ๆ›ๆจกๅž‹๏ผ‰
from config import mark_model_failed, get_current_model_name
MAX_LLM_RETRIES = 2
excluded_models: list[str] = []
# โ”€โ”€ ๅปบ็ซ‹ Agent + Task โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
raw_output = ""
output: dict[str, Any] = {}
crew_success = False
for attempt in range(MAX_LLM_RETRIES + 1):
agent = create_advisor_agent(excluded_models, input_type=input_type)
task = create_advisor_task(agent, analyst_str)
# โ”€โ”€ ๅŸท่กŒ CrewAI โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
try:
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True,
)
logger.info("[START] Advisor Crew kickoff (attempt %d/%d)", attempt + 1, MAX_LLM_RETRIES + 1)
try:
from checkpoint import recorder as _cp
_adv_model = get_current_model_name(agent.llm)
_cp.llm_call("advisor", _adv_model, "openrouter", f"attempt={attempt+1}")
except Exception:
_adv_model = "unknown"
_t_adv = time.time()
result = crew.kickoff()
raw_output = str(result.raw) if hasattr(result, "raw") else str(result)
try:
_cp.llm_result("advisor", _adv_model, "SUCCESS",
len(raw_output), int((time.time() - _t_adv) * 1000),
thinking=raw_output[:1000])
except Exception:
pass
output = _extract_json_from_output(raw_output)
crew_success = bool(output)
break # ๆˆๅŠŸๅ‰‡่ทณๅ‡บ้‡่ฉฆ่ฟดๅœˆ
except Exception as e:
error_str = str(e)
if "429" in error_str and attempt < MAX_LLM_RETRIES:
current_model = get_current_model_name(agent.llm)
mark_model_failed(current_model)
excluded_models.append(current_model)
import re as _re
_m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
retry_after = float(_m.group(1)) if _m else 0.0
logger.warning("[RETRY] Advisor 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
current_model, attempt + 1, MAX_LLM_RETRIES, retry_after)
try:
_cp.llm_retry("advisor", current_model, error_str[:200],
attempt + 1, "next_in_waterfall")
except Exception:
pass
from config import rate_limiter as _rl
_rl.on_429(retry_after=retry_after, caller="advisor") # ๆœ€ๅฐ‘ 30s
continue
logger.error("[FAIL] CrewAI execution failed: %s", e)
try:
_cp.llm_error("advisor", _adv_model, error_str[:300])
except Exception:
pass
# โ”€โ”€ Harness Layer 1๏ผšๅผทๅˆถๅปบ็ซ‹่ผธๅ‡บ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
need_fallback = not output or not crew_success
if need_fallback:
logger.warning("[WARN] Harness Layer 1: LLM output unparseable, using fallback")
output = _build_fallback_output(analyst_dict)
# โ”€โ”€ Harness Layer 2๏ผšSchema ้ฉ—่ญ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
schema_errors = _harness_validate_schema(output)
if schema_errors:
logger.warning("[WARN] Harness Layer 2: Schema errors %s, merging fallback", schema_errors)
fallback = _build_fallback_output(analyst_dict)
for k, v in fallback.items():
if k not in output:
output[k] = v
# โ”€โ”€ Harness Layer 3๏ผš้ขจ้šชๅˆ†ๆ•ธ็ฏ„ๅœ้ฉ—่ญ‰ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_harness_validate_risk_score(output)
# โ”€โ”€ Harness Layer 4๏ผšURGENT ๅฟ…้ ˆๆœ‰ command โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_harness_ensure_commands(output)
# โ”€โ”€ Harness Layer 4.5๏ผšๆ†ฒๆณ• CI-1/CI-2 ๅฎˆ่ก› โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# CODE-pattern๏ผˆfinding_id = CODE-xxx๏ผŒcve_id = null๏ผ‰ไธๅพ—ๅ‡บ็พๅœจ URGENT/IMPORTANT
# ้€™ๆ˜ฏๅฐ project_CONSTITUTION.md ็ฌฌไธ‰ๆข 3.2 ็š„็กฌๆ€งๅŸท่กŒ
_harness_constitution_guard(output)
_harness_enrich_cwe_evidence(output)
# โ”€โ”€ Harness Layer 5๏ผšๆญทๅฒๆฏ”ๅฐ๏ผŒ้‡่ค‡ๆœชไฟฎ่ฃœ่ชžๆฐฃ้žๅ‡ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_harness_check_repeated(output)
# โ”€โ”€ Harness Layer 6๏ผšCVE ๅนดไปฝ้Žๆฟพ๏ผˆๆœ€็ต‚้˜ฒ็ทš๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ็„ก่ซ–ๅ“ชๅ€‹ Agent/Tool ๅธถๅ…ฅไบ†่ˆŠ CVE๏ผŒๅœจ Advisor ่ผธๅ‡บๅ‰ไธ€ๅพ‹็งป้™ค
CVE_YEAR_MIN = 2005
ancient_cves_removed = []
for section in ["urgent", "important"]:
items = output.get("actions", {}).get(section, [])
clean_items = []
for item in items:
cve_id = item.get("cve_id") or ""
if not cve_id or cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
clean_items.append(item)
continue
try:
yr = int(cve_id.split("-")[1])
if yr < CVE_YEAR_MIN:
ancient_cves_removed.append(cve_id)
logger.warning(
"[ADVISOR HARNESS 6] Ancient CVE removed from %s (year=%d < %d): %s",
section, yr, CVE_YEAR_MIN, cve_id
)
else:
clean_items.append(item)
except (IndexError, ValueError):
clean_items.append(item)
output["actions"][section] = clean_items
if ancient_cves_removed:
logger.warning(
"[ADVISOR HARNESS 6] Total ancient CVEs removed: %d โ€” %s",
len(ancient_cves_removed), ancient_cves_removed
)
output["ancient_cves_removed"] = ancient_cves_removed
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# โ”€โ”€ ่ฃœๅ…… generated_at โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if "generated_at" not in output:
output["generated_at"] = datetime.now(timezone.utc).isoformat()
# โ”€โ”€ ๅผทๅˆถๅฏซๅ…ฅ่จ˜ๆ†ถ๏ผˆ่‹ฅ LLM ๅทฒๅ‘ผๅซ๏ผŒๆญค่™•็‚บๅ†ช็ญ‰ๆ“ไฝœ๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
try:
write_result = write_memory.run(
agent_name="advisor",
data=json.dumps(output, ensure_ascii=False),
)
logger.info("[OK] Advisor memory saved: %s", write_result)
except Exception as e:
logger.error("[FAIL] write_memory failed: %s", e)
logger.info(
"[OK] Advisor Pipeline complete | risk_score=%s | urgent=%s | important=%s",
output.get("risk_score", 0),
len(output.get("actions", {}).get("urgent", [])),
len(output.get("actions", {}).get("important", [])),
)
return output
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ็ฌฌไบ”้ƒจไปฝ๏ผšๆœฌๅœฐๆธฌ่ฉฆๅ…ฅๅฃ๏ผˆ็›ดๆŽฅๅŸท่กŒๆญคๆช”ๆกˆๆ™‚๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
)
# ไฝฟ็”จ Scout ่ผธๅ‡บไฝœ็‚บ้™็ดšๆธฌ่ฉฆ่ผธๅ…ฅ๏ผˆๆˆๅ“ก C ๅฐšๆœชๅฐฑ็ท’๏ผ‰
_scout_output_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"memory", "scout_memory.json"
)
if os.path.exists(_scout_output_path):
with open(_scout_output_path, encoding="utf-8") as _f:
_test_input = _f.read()
print(f"[TEST] ไฝฟ็”จ Scout ่จ˜ๆ†ถไฝœ็‚บ่ผธๅ…ฅ๏ผš{_scout_output_path}")
else:
_test_input = json.dumps({
"scan_id": "scan_test_001",
"vulnerabilities": [
{
"cve_id": "CVE-2024-42005",
"package": "django",
"cvss_score": 9.8,
"severity": "CRITICAL",
"description": "Django SQL injection vulnerability",
"is_new": True,
},
{
"cve_id": "CVE-2015-4335",
"package": "redis",
"cvss_score": 10.0,
"severity": "CRITICAL",
"description": "Redis RCE via Lua bytecode",
"is_new": True,
},
],
"summary": {"total": 2, "critical": 2, "high": 0, "medium": 0, "low": 0},
})
print("[TEST] ไฝฟ็”จ้ ่จญๆธฌ่ฉฆ่ผธๅ…ฅ")
result = run_advisor_pipeline(_test_input)
print("\n=== Advisor ่ผธๅ‡บ ===")
print(json.dumps(result, ensure_ascii=False, indent=2))
print(f"\nrisk_score: {result.get('risk_score', 0)}")
print(f"urgent: {len(result.get('actions', {}).get('urgent', []))}")
print(f"important: {len(result.get('actions', {}).get('important', []))}")