Threat_Hunter / main.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
"""
main.py - ThreatHunter 銝餌撘
==============================
Pipeline 嗆嚗v3.1嚗嚗Orchestrator [Layer 1 銝西] Scout Analyst Debate Advisor
嗆嚗v3.1 Orchestrator 撽嚗嚗
Orchestrator 頝舐梧
頝臬 A嚗憟隞嗆 頝喲 Security Guard
頝臬 B嚗摰渡撘蝣 Layer 1 銝西嚗Security Guard + Intel Fusion嚗
頝臬 C嚗隞嗅摹蝵 頝喲 Analyst + Debate
頝臬 D嚗擖鋆 芷頝雿靽∪ CVE
嗆嚗靽靘嚗嚗
雿輻刻頛詨 "Django 4.2, Redis 7.0"
main.py Pipeline
Scout 嗯 Analyst 嗯 Critic
(鈭撖行園) (函斗) (舀)
Advisor
(蝯鋆瘙箏勗)
瘥 Stage 函嚗
- try-except + Graceful Degradation
- StepLogger 摮甇仿亥
- Harness 靽撅歹 agents/*.py 折剁
鞈瘚嚗
Scout 頛詨 (dict) Analyst 頛詨 (dict)
Analyst 頛詨 (dict) Critic 頛詨 (dict)
Analyst + Critic 頛詨 Advisor 頛詨 (dict)
Advisor 頛詨 (dict) + pipeline_meta 蝯蝯
Harness Engineering 靽嚗
- 瘥 Agent 賭蝙函撖行芋蝯嚗agents/*.py嚗嚗 Stub
- Critic ENABLE_CRITIC 批塚舀嚗
- 瘥 Stage 函 Graceful Degradation 蝝頝臬
- 函閮 Observability 亥嚗FINAL_PLAN.md 舀 2嚗
- 17 撅 Harness 靽撅歹 agents/*.py 折剁
萄嚗project_CONSTITUTION.md + HARNESS_ENGINEERING.md
"""
import json
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from config import (
ENABLE_CRITIC,
degradation_status,
rate_limiter,
)
logger = logging.getLogger("threathunter.main")
import os # noqa: E402 (import after logger for ordering clarity)
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
SANDBOX_ENABLED = os.getenv("SANDBOX_ENABLED", "true").lower() == "true"
try:
from sandbox.docker_sandbox import run_in_sandbox, is_docker_available
_DOCKER_SANDBOX_OK = True
except ImportError:
_DOCKER_SANDBOX_OK = False
def run_in_sandbox(*args, **kwargs): # type: ignore[misc]
return {"error": "SANDBOX_NOT_AVAILABLE", "fallback": True}
def is_docker_available() -> bool: # type: ignore[misc]
return False
if SANDBOX_ENABLED:
logger.info(
"[SANDBOX] Docker isolation ENABLED | docker_available=%s",
is_docker_available(),
)
else:
logger.debug("[SANDBOX] Docker isolation DISABLED (in-process mode)")
# (comment encoding corrupted)
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
class StepLogger:
"""瘥 Agent Stage 摮甇仿餈質馱具"""
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.steps: list[dict[str, Any]] = []
def log(
self, step: str, status: str, detail: str = "", duration_ms: int = 0
) -> None:
entry = {
"step": step,
"agent": self.agent_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": status,
"detail": detail,
"duration_ms": duration_ms,
}
self.steps.append(entry)
icon = (
"OK" if status == "SUCCESS" else "WAIT" if status == "RUNNING" else "FAIL"
)
logger.info("[%s] %s | %s | %s", icon, self.agent_name.upper(), step, detail)
def summary(self) -> dict[str, Any]:
failed = [s for s in self.steps if s["status"] == "FAILED"]
return {
"agent": self.agent_name,
"total_steps": len(self.steps),
"failed_steps": len(failed),
"steps": self.steps,
}
def _normalize_pipeline_task_plan(task_plan: dict[str, Any]) -> dict[str, Any]:
"""
Canonical runtime DAG:
Security Guard + Scout discover in parallel, then Intel Fusion ranks risk.
"""
plan = dict(task_plan or {})
scan_path = plan.get("path", "B")
if scan_path == "B":
plan["parallel_layer1"] = ["security_guard", "scout"]
plan["fusion_after_discovery"] = True
plan["agents_to_run"] = [
"security_guard", "scout", "intel_fusion", "analyst", "debate", "judge"
]
elif scan_path == "A":
plan["parallel_layer1"] = ["scout"]
plan["fusion_after_discovery"] = True
plan["agents_to_run"] = ["scout", "intel_fusion", "analyst", "debate", "judge"]
return plan
_L0_TO_RUNTIME_INPUT_TYPE = {
"package_list": "pkg",
"source_code": "code",
"mixed": "code",
"config_file": "config",
"sql_review": "sql_review",
"blocked": "injection",
}
def _canonical_runtime_input_type(requested: str, l0_input_type: str = "") -> str:
"""用 L0 deterministic 分類修正 UI/API 預設值,避免 code 被當 package。"""
runtime = (requested or "pkg").strip().lower()
if runtime not in {"pkg", "code", "config", "injection", "sql_review"}:
runtime = "pkg"
l0_runtime = _L0_TO_RUNTIME_INPUT_TYPE.get(str(l0_input_type or "").strip().lower(), "")
if runtime == "pkg" and l0_runtime in {"code", "config", "injection", "sql_review"}:
return l0_runtime
return runtime
def _constrain_task_plan_by_input_type(task_plan: dict[str, Any], input_type: str) -> dict[str, Any]:
"""L0 類型是硬約束;Orchestrator 不可把 source code 誤路由成 Path C/A。"""
plan = dict(task_plan or {})
original_path = plan.get("path", "B")
if input_type in {"code", "injection"} and original_path != "B":
plan["path"] = "B"
plan["_route_corrected"] = {
"from": original_path,
"to": "B",
"reason": f"l0_input_type={input_type}",
}
elif input_type == "sql_review":
if original_path != "C":
plan["_route_corrected"] = {
"from": original_path,
"to": "C",
"reason": "l0_input_type=sql_review",
}
plan["path"] = "C"
plan["parallel_layer1"] = []
plan["fusion_after_discovery"] = False
plan["agents_to_run"] = ["sql_review"]
elif input_type == "config" and original_path != "C":
plan["path"] = "C"
plan["_route_corrected"] = {
"from": original_path,
"to": "C",
"reason": "l0_input_type=config",
}
return _normalize_pipeline_task_plan(plan)
def _load_dim11_expected_config() -> dict[str, Any]:
"""載入 DIM11 benchmark gate;一般掃描沒有此檔或沒有 exact match 時不啟用。"""
expected_path = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam" / "expected_results.json"
try:
return json.loads(expected_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError, ValueError) as exc:
logger.debug("[DIM11] benchmark config unavailable: %s", exc)
return {}
def _build_dim11_benchmark_context(
source: str,
input_type: str,
task_plan: dict[str, Any],
code_patterns: list[dict[str, Any]],
scout_output: dict[str, Any],
extracted_packages: list[str],
) -> dict[str, Any] | None:
"""只對 DIM11 fixture exact match 啟用 expected-CWE recall challenge。"""
if input_type != "code":
return None
config = _load_dim11_expected_config()
fixture_dir = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam"
observed_cwes = sorted({
str(pattern.get("cwe_id", ""))
for pattern in code_patterns
if str(pattern.get("cwe_id", "")).startswith("CWE-")
})
for case in config.get("fixtures", []):
expected_cwes = case.get("expected_cwe_categories")
if not expected_cwes or case.get("exclude_from_cwe_recall"):
continue
fixture_name = str(case.get("fixture", ""))
try:
fixture_source = (fixture_dir / fixture_name).read_text(encoding="utf-8", errors="replace")
except OSError:
continue
if source != fixture_source:
continue
external_findings = scout_output.get("vulnerabilities", [])
external_pollution_count = 0
if case.get("external_requires_package_evidence") and external_findings and not extracted_packages:
external_pollution_count = len(external_findings)
expected_path = str(case.get("expected_path", ""))
actual_path = str(task_plan.get("path", ""))
return {
"benchmark": "dim11_redteam",
"fixture": fixture_name,
"expected_cwe_categories": sorted(set(expected_cwes)),
"observed_cwe_categories": observed_cwes,
"min_category_recall": float(case.get("min_category_recall", 0.7)),
"route_correct": actual_path == expected_path,
"expected_path": expected_path,
"actual_path": actual_path,
"external_pollution_count": external_pollution_count,
"activation": "exact_fixture_match",
}
return None
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
def stage_scout(
tech_stack: str,
input_type: str = "pkg",
intel_fusion_result: dict[str, Any] | None = None,
) -> tuple[dict[str, Any], StepLogger]:
"""
Stage 1: Scout Agent 萄瞍瘣
雿輻 agents/scout.py 撖血祕雿
Graceful Degradation: 憭望喟征鞈嚗霈敺蝥 Agent 仿舐洵銝甈~
v3.7: input_type 瘙箏頛芸 Skill SOP (Path-Aware Skills)
Returns:
(result_dict, step_logger) 頛詨箏亥餈質馱
"""
from agents.scout import run_scout_pipeline
sl = StepLogger("scout")
sl.log("INIT", "RUNNING", f"tech_stack={tech_stack} | input_type={input_type}")
t0 = time.time()
try:
result = run_scout_pipeline(
tech_stack,
input_type=input_type,
intel_fusion_result=intel_fusion_result,
)
duration_ms = int((time.time() - t0) * 1000)
vuln_count = len(result.get("vulnerabilities", []))
sl.log(
"COMPLETE", "SUCCESS", f"found {vuln_count} vulnerabilities", duration_ms
)
return result, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
# (comment encoding corrupted)
is_rate_limit = "429" in str(e) or "rate limit" in str(e).lower()
if is_rate_limit:
logger.warning("[SCOUT] Rate limited returning empty results (not a real failure)")
sl.log("COMPLETE", "RATE_LIMITED", str(e)[:100], duration_ms)
else:
logger.error("Scout Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Scout", str(e))
# (comment encoding corrupted)
return {
"scan_id": f"scan_degraded_{int(time.time())}",
"vulnerabilities": [],
"summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0},
"_degraded": not is_rate_limit, # 429 銝蝞甇蝝
"_error": str(e),
}, sl
def stage_intel_fusion(
discovery_input: dict[str, Any] | list[str] | str,
on_progress: Any = None,
) -> tuple[dict[str, Any], StepLogger]:
"""
Stage: Intel Fusion runs after Scout and Security Guard discovery.
It receives normalized findings instead of raw source code.
"""
from agents.intel_fusion import run_intel_fusion
sl = StepLogger("intel_fusion")
sl.log("INIT", "RUNNING", "post-discovery fusion")
t0 = time.time()
try:
result = run_intel_fusion(discovery_input, on_progress)
duration_ms = int((time.time() - t0) * 1000)
result.setdefault("_duration_ms", duration_ms)
fusion_count = len(result.get("fusion_results", []))
sl.log("COMPLETE", "SUCCESS", f"scored {fusion_count} findings", duration_ms)
return result, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
logger.error("Intel Fusion Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Intel Fusion", str(e))
return {
"fusion_results": [],
"strategy_applied": "degraded",
"api_health_summary": {},
"_degraded": True,
"_error": str(e),
"_duration_ms": duration_ms,
}, sl
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
def stage_analyst(
scout_output: dict[str, Any],
input_type: str = "pkg",
) -> tuple[dict[str, Any], StepLogger]:
"""
Stage 2: Analyst Agent vulnerability chain analysis.
v3.7: input_type selects path-aware chain analysis skill.
Returns:
(result_dict, step_logger)
"""
from agents.analyst import run_analyst_pipeline
sl = StepLogger("analyst")
sl.log(
"INIT", "RUNNING",
f"input_vulns={len(scout_output.get('vulnerabilities', []))} | input_type={input_type}"
)
t0 = time.time()
try:
result = run_analyst_pipeline(scout_output, input_type=input_type)
duration_ms = int((time.time() - t0) * 1000)
risk = result.get("risk_score", 0)
sl.log("COMPLETE", "SUCCESS", f"risk_score={risk}", duration_ms)
return result, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
logger.error("Analyst Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Analyst", str(e))
# (comment encoding corrupted)
vulns = scout_output.get("vulnerabilities", [])
analysis = [
{
"cve_id": v.get("cve_id", "UNKNOWN"),
"original_cvss": v.get("cvss_score", 0),
"adjusted_risk": v.get("severity", "UNKNOWN"),
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "chain_analysis: SKIPPED",
"confidence": "NEEDS_VERIFICATION",
},
"reasoning": "Analyst degraded - chain analysis skipped.",
}
for v in vulns
]
# (comment encoding corrupted)
code_patterns = scout_output.get("code_patterns", [])
for cp in code_patterns:
sev = cp.get("severity", "MEDIUM")
cvss_equiv = 9.0 if sev == "CRITICAL" else (7.5 if sev == "HIGH" else 5.0)
analysis.append({
"finding_id": cp.get("finding_id", "CODE-000"),
"cve_id": None,
"pattern_type": cp.get("pattern_type", "UNKNOWN"),
"cwe_id": cp.get("cwe_id", "CWE-unknown"),
"owasp_category": cp.get("owasp_category", ""),
"severity": sev,
"snippet": cp.get("snippet", ""),
"line_no": cp.get("line_no", 0),
"original_cvss": cvss_equiv,
"adjusted_risk": sev,
"in_cisa_kev": False,
"exploit_available": False,
"chain_risk": {
"is_chain": False,
"chain_with": [],
"chain_description": "Analyst degraded - deterministic pattern only.",
"confidence": "HIGH", # 蝣箏批菜葫嚗靽∪擃
},
"reasoning": f"Deterministic detection: {cp.get('pattern_type')} ({cp.get('cwe_id')}). Analyst degraded but code pattern is confirmed by Security Guard.",
})
if code_patterns:
logger.info("[DEGRADED] Analyst fallback preserved %d code_patterns", len(code_patterns))
return {
"scan_id": scout_output.get("scan_id", "unknown"),
"risk_score": 50,
"risk_trend": "+0",
"analysis": analysis,
"_degraded": True,
"_error": str(e),
}, sl
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
def stage_critic(
analyst_output: dict[str, Any],
input_type: str = "pkg",
) -> tuple[dict[str, Any], StepLogger]:
"""
Stage 3: Adversarial Debate Engine.
v5.3: 升級為 3 輪辯論引擎(Du et al. 2023 ICML, arXiv:2305.14325)。
無共識時由 Judge sub-agent 仲裁。偏保守原則:高估風險比低估安全。
Returns:
(result_dict, step_logger)
"""
from agents.debate_engine import run_debate_pipeline
sl = StepLogger("critic")
sl.log("INIT", "RUNNING", f"enable_critic={ENABLE_CRITIC} | input_type={input_type} | mode=3-round-debate")
t0 = time.time()
try:
result = run_debate_pipeline(
analyst_output,
input_type=input_type,
on_progress=None,
)
duration_ms = int((time.time() - t0) * 1000)
verdict = result.get("verdict", "UNKNOWN")
score = result.get("weighted_score", 0)
meta = result.get("_debate_meta", {})
consensus = meta.get("consensus", None)
rounds = meta.get("total_rounds", "?")
judge_invoked = meta.get("judge_invoked", False)
sl.log(
"COMPLETE", "SUCCESS",
f"verdict={verdict} score={score} consensus={consensus} rounds={rounds} judge={judge_invoked}",
duration_ms,
)
return result, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
logger.error("Debate Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Critic", str(e))
return {
"debate_rounds": 0,
"challenges": [],
"scorecard": {
"evidence": 0.6,
"chain_completeness": 0.6,
"critique_quality": 0.6,
"defense_quality": 0.6,
"calibration": 0.6,
},
"weighted_score": 60.0,
"verdict": "MAINTAIN",
"reasoning": "Debate engine degraded - all rounds skipped.",
"_degraded": True,
"_error": str(e),
}, sl
def stage_advisor(
analyst_output: dict[str, Any],
critic_output: dict[str, Any],
input_type: str = "pkg",
) -> tuple[dict[str, Any], StepLogger]:
"""
Stage 4: Advisor Agent action report generation.
v3.7: input_type selects path-aware action report skill.
Returns:
(result_dict, step_logger)
"""
from agents.advisor import run_advisor_pipeline
sl = StepLogger("advisor")
verdict = critic_output.get("verdict", "SKIPPED")
sl.log("INIT", "RUNNING", f"critic_verdict={verdict} | input_type={input_type}")
# (comment encoding corrupted)
advisor_input = dict(analyst_output)
if verdict == "DOWNGRADE":
advisor_input["_critic_note"] = (
f"Critic scored {critic_output.get('weighted_score', 0):.1f}/100. "
f"Challenges: {critic_output.get('challenges', [])}. "
"Use conservative risk assessment."
)
logger.info(
"Critic verdict=DOWNGRADE: Advisor will use conservative assessment"
)
t0 = time.time()
try:
result = run_advisor_pipeline(advisor_input)
duration_ms = int((time.time() - t0) * 1000)
risk = result.get("risk_score", 0)
urgent = len(result.get("actions", {}).get("urgent", []))
sl.log("COMPLETE", "SUCCESS", f"risk_score={risk} urgent={urgent}", duration_ms)
return result, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
logger.error("Advisor Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Advisor", str(e))
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
urgent_actions = []
important_actions = []
code_patterns_fallback = [] # 脣 CODE-pattern 靘 UI
_SEVERITY_MAP = {"CRITICAL": "urgent", "HIGH": "important"}
for entry in advisor_input.get("analysis", []):
finding_id = entry.get("finding_id", "")
cve_id = entry.get("cve_id")
sev = entry.get("severity") or entry.get("adjusted_risk", "MEDIUM")
bucket = _SEVERITY_MAP.get(sev, "important")
# (comment encoding corrupted)
# (comment encoding corrupted)
if finding_id.startswith("CODE-") or entry.get("pattern_type"):
pt = entry.get("pattern_type", "UNKNOWN")
cwe = entry.get("cwe_id", "CWE-unknown")
snippet = entry.get("snippet", "")
code_patterns_fallback.append({
"finding_id": finding_id,
"cve_id": None,
"cwe_id": cwe,
"pattern_type": pt,
"package": f"Custom Code Pattern",
"severity": sev,
"action": f"Fix {pt} vulnerability ({cwe}). {entry.get('reasoning', '')}",
"vulnerable_snippet": snippet[:200],
"reason": entry.get("reasoning", f"Security Guard deterministic detection: {pt}"),
"is_repeated": False,
"_constitution_note": "CODE-pattern: not in URGENT per CI-1/CI-2. See code_patterns_summary.",
})
# (comment encoding corrupted)
elif cve_id and (str(cve_id).startswith("CVE-") or str(cve_id).startswith("GHSA-")):
action_entry = {
"cve_id": cve_id,
"package": entry.get("package", "unknown"),
"severity": sev,
"action": f"Review and patch {cve_id}.",
"command": f"# Investigate {cve_id}",
"reason": entry.get("reasoning", "Advisor degraded - manual review required."),
"is_repeated": False,
}
if bucket == "urgent":
urgent_actions.append(action_entry)
else:
important_actions.append(action_entry)
risk_score = max(
50,
min(100, len(urgent_actions) * 25 + len(important_actions) * 10),
)
summary_parts = []
if urgent_actions:
summary_parts.append(f"{len(urgent_actions)} package CVE(s) require immediate patching")
if code_patterns_fallback:
summary_parts.append(f"{len(code_patterns_fallback)} code-level pattern(s) detected (see Code Analysis tab)")
if important_actions:
summary_parts.append(f"{len(important_actions)} high-severity CVE(s) need review within 72h")
if not summary_parts:
summary_parts.append("System degraded. Please review raw scan data manually.")
logger.info(
"[DEGRADED] Advisor fallback: %d urgent CVEs + %d important CVEs + %d code-patterns (not in URGENT per CI-1/CI-2)",
len(urgent_actions), len(important_actions), len(code_patterns_fallback),
)
return {
"executive_summary": ". ".join(summary_parts),
"actions": {
"urgent": urgent_actions,
"important": important_actions,
"resolved": [],
},
"code_patterns_summary": code_patterns_fallback,
"risk_score": risk_score,
"risk_trend": "+0",
"scan_count": 1,
"generated_at": datetime.now(timezone.utc).isoformat(),
"_degraded": True,
"_error": str(e),
}, sl
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
def stage_orchestrator(
tech_stack: str,
feedback_from_judge: dict | None = None,
) -> tuple[Any, dict, StepLogger]:
"""
Stage 0: Orchestrator 瘙箏頝臬嚗A/B/C/D嚗
(OrchestrationContext, task_plan, StepLogger)
Graceful Degradation: 憭望唾楝敺 B嚗摰渡撘蝣潘雿箏券閮准
"""
from agents.orchestrator import run_orchestration
sl = StepLogger("orchestrator")
sl.log("INIT", "RUNNING", f"tech_stack={tech_stack[:60]}")
t0 = time.time()
try:
ctx, task_plan = run_orchestration(
user_input=tech_stack,
feedback_from_judge=feedback_from_judge,
)
task_plan = _normalize_pipeline_task_plan(task_plan)
duration_ms = int((time.time() - t0) * 1000)
scan_path = task_plan.get("path", "B")
sl.log("COMPLETE", "SUCCESS", f"path={scan_path}", duration_ms)
logger.info("[ORCH] Scan path: %s | plan: %s", scan_path, task_plan.get("agents_to_run", []))
return ctx, task_plan, sl
except Exception as e:
duration_ms = int((time.time() - t0) * 1000)
logger.error("Orchestrator Stage failed: %s", e)
sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms)
degradation_status.degrade("Orchestrator", str(e))
# (comment encoding corrupted)
from agents.orchestrator import OrchestrationContext, ScanPath
ctx = OrchestrationContext()
ctx.scan_path = ScanPath.FULL_CODE
task_plan = {
"path": "B",
"parallel_layer1": ["security_guard", "scout"],
"fusion_after_discovery": True,
"debate_cluster": True,
"judge": True,
"agents_to_run": ["security_guard", "scout", "intel_fusion", "analyst", "debate", "judge"],
"_degraded": True,
}
return ctx, task_plan, sl
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
def _build_code_patterns_summary(sg_result: dict) -> list[dict]:
"""
Security Guard patterns + hardcoded secrets を
CWE-enriched code_patterns_summary に変換する。
確定性抽出 + MITRE CWE v4.14 佐証注入。
LLM に依存しない。
"""
from tools.cwe_registry import build_cwe_reference, get_pattern_meta, severity_rank
def _attach_cwe_reference(entry: dict, cwe_id: str) -> dict:
"""將 MITRE CWE 官方定義注入 entry 的 cwe_reference 欄位"""
cwe_reference = build_cwe_reference(cwe_id)
if cwe_reference:
entry["cwe_reference"] = cwe_reference
return entry
# ── Step 1: 對 patterns 去重(同行 + 前 30 字元相同視為重複,保留嚴重性最高者)
def _normalized_line_no(item: dict) -> int:
"""Normalize scanner line metadata to the line_no contract."""
for key in ("line_no", "line", "source_line"):
value = item.get(key)
try:
line_no = int(value)
except (TypeError, ValueError):
continue
if line_no > 0:
return line_no
return 0
_dedup: dict[tuple, dict] = {}
for p in sg_result.get("patterns", []):
pt = p.get("pattern_type", "UNKNOWN")
sev = get_pattern_meta(pt).severity
key = (_normalized_line_no(p), str(p.get("snippet", ""))[:30])
existing = _dedup.get(key)
if existing is None:
_dedup[key] = p
else:
ex_pt = existing.get("pattern_type", "UNKNOWN")
ex_sev = get_pattern_meta(ex_pt).severity
if severity_rank(sev) > severity_rank(ex_sev):
_dedup[key] = p
code_patterns: list[dict] = []
counter = 1
# ── Step 2: 為每個 pattern 建立 finding entry + 注入 CWE reference
for p in _dedup.values():
pt = p.get("pattern_type", "UNKNOWN")
meta = get_pattern_meta(pt)
cwe = meta.cwe_id
owasp = meta.owasp_category
severity = meta.severity
entry = {
"finding_id": f"CODE-{counter:03d}",
"type": "code_pattern",
"pattern_type": pt,
"cwe_id": cwe,
"owasp_category": owasp,
"severity": severity,
"snippet": str(p.get("snippet", ""))[:200],
"line_no": _normalized_line_no(p),
"language": sg_result.get("language", "unknown"),
"canonical_cwe_id": cwe,
"weakness_family": meta.weakness_family,
"evidence_type": meta.evidence_type,
"coverage_level": p.get("coverage_level", "pattern"),
"confidence": p.get("confidence", "MEDIUM"),
}
entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner"
entry = _attach_cwe_reference(entry, cwe)
code_patterns.append(entry)
counter += 1
# ── Step 3: 硬編碼密鑰
for h in sg_result.get("hardcoded", []):
entry = {
"finding_id": f"CODE-{counter:03d}",
"type": "hardcoded_secret",
"pattern_type": "HARDCODED_SECRET",
"cwe_id": "CWE-798",
"owasp_category": "A07:2021-Identification and Authentication Failures",
"severity": "HIGH",
"snippet": f"{h.get('name', 'secret')} = '****' (value redacted)",
"line_no": _normalized_line_no(h),
"language": sg_result.get("language", "unknown"),
"canonical_cwe_id": "CWE-798",
"weakness_family": "hardcoded_secret",
"evidence_type": "code_scan",
"coverage_level": h.get("coverage_level", "pattern"),
"confidence": h.get("confidence", "HIGH"),
}
entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner"
entry = _attach_cwe_reference(entry, "CWE-798")
code_patterns.append(entry)
counter += 1
logger.info(
"[CODE_PATTERNS] Built %d CWE-enriched patterns (from %d raw patterns, %d hardcoded)",
len(code_patterns),
len(sg_result.get("patterns", [])),
len(sg_result.get("hardcoded", [])),
)
return code_patterns
def _summarize_vulnerabilities(vulnerabilities: list[dict[str, Any]]) -> dict[str, int]:
"""根據漏洞清單建立前端可直接使用的摘要數字。"""
summary = {
"total": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"new": 0,
"new_since_last_scan": 0,
}
for vuln in vulnerabilities:
summary["total"] += 1
severity = str(vuln.get("severity", "LOW")).upper()
if severity == "CRITICAL":
summary["critical"] += 1
elif severity == "HIGH":
summary["high"] += 1
elif severity == "MEDIUM":
summary["medium"] += 1
else:
summary["low"] += 1
if vuln.get("is_new"):
summary["new"] += 1
summary["new_since_last_scan"] += 1
return summary
def _build_scan_report_payload(
scout_output: dict[str, Any],
advisor_output: dict[str, Any],
layer1_results: dict[str, Any],
) -> dict[str, Any]:
"""整理本次 scan 專屬的漏洞明細,避免 UI 回頭讀取全域記憶。"""
intel_applied = scout_output.get("_intel_fusion_applied", {})
vulnerabilities: list[dict[str, Any]] = []
for vuln in scout_output.get("vulnerabilities", []):
entry = dict(vuln)
entry.setdefault("source", "SCOUT")
enriched_by = list(entry.get("enriched_by", []))
if intel_applied and entry.get("source") != "INTEL_FUSION":
if any(
field in entry
for field in (
"composite_score",
"epss_score",
"ghsa_severity",
"otx_threat",
"intel_confidence",
"dimensions_used",
"weights_used",
"in_cisa_kev",
)
):
if "INTEL_FUSION" not in enriched_by:
enriched_by.append("INTEL_FUSION")
if enriched_by:
entry["enriched_by"] = enriched_by
vulnerabilities.append(entry)
detail_source = "scout_final_output"
if vulnerabilities:
summary = dict(scout_output.get("summary", {}))
summary.setdefault("new", summary.get("new_since_last_scan", 0))
summary.setdefault("new_since_last_scan", summary.get("new", 0))
else:
detail_source = "advisor_actions_fallback"
for level in ("urgent", "important", "resolved"):
for item in advisor_output.get("actions", {}).get(level, []):
vulnerabilities.append({
"cve_id": item.get("cve_id", ""),
"package": item.get("package", "unknown"),
"cvss_score": item.get("cvss_score", 0),
"severity": item.get("severity", "MEDIUM"),
"description": item.get("action", ""),
"is_new": item.get("is_new", False),
"source": "ADVISOR_ACTIONS",
"report_level": level.upper(),
})
summary = _summarize_vulnerabilities(vulnerabilities)
layer1_agents = [
name for name in ("security_guard", "scout", "intel_fusion")
if name in layer1_results
]
degraded_agents = [
name for name in layer1_agents
if isinstance(layer1_results.get(name), dict) and layer1_results[name].get("_degraded")
]
if not layer1_agents:
layer1_state = "skipped"
elif degraded_agents:
layer1_state = "degraded"
else:
layer1_state = "merged"
report_sources: dict[str, Any] = {
"vulnerability_detail": detail_source,
"enriched_by": ["intel_fusion"] if intel_applied else [],
"fallbacks": ["advisor_actions"] if detail_source == "advisor_actions_fallback" else [],
"layer1_state": layer1_state,
}
if degraded_agents:
report_sources["degraded_agents"] = degraded_agents
if intel_applied:
report_sources["intel_fusion_applied"] = intel_applied
if scout_output.get("representative_cve_evidence"):
report_sources["representative_cve_evidence_count"] = len(
scout_output.get("representative_cve_evidence", [])
)
return {
"vulnerability_detail": vulnerabilities,
"vulnerability_summary": summary,
"report_sources": report_sources,
"representative_cve_evidence": scout_output.get("representative_cve_evidence", []),
}
def _run_layer1_parallel(
tech_stack: str,
task_plan: dict,
on_progress: Any,
input_type: str = "pkg",
) -> dict[str, Any]:
"""
頝臬 B Layer 1 銝西瑁嚗雿輻 ThreadPoolExecutor 撠孵嚗
Why ThreadPoolExecutor嚗 asyncio嚗嚗
- main.py 臬甇亦撘蝣潘SSE callback 銋臬甇亦
- ThreadPoolExecutor 臬典甇亦啣銝銝西瑁憭 IO-bound 撌乩嚗LLM 澆恬
- 撠孵嚗銝閬瑽 callback 璈
銝西瑁撠鞊∴閬 task_plan 瘙箏嚗嚗
- security_guard嚗敺蝔撘蝣潭賢/憟隞/璅∪嚗頛詨伐tech_stack嚗
- intel_fusion嚗剔雁望亥岷嚗頛詨伐tech_stack嚗
Args:
tech_stack: 雿輻刻頛詨
task_plan: Orchestrator 隞餃閬
on_progress: SSE 脣漲隤
Returns:
{"security_guard": dict, "intel_fusion": dict}
"""
parallel_agents = task_plan.get("parallel_layer1", [])
layer1_results: dict[str, Any] = {}
def _run_security_guard() -> tuple[str, dict]:
""" Thread 銝剖瑁 Security Guard嚗 LLM 嚗"""
try:
from agents.security_guard import run_security_guard
result = run_security_guard(tech_stack, on_progress)
return ("security_guard", result)
except Exception as e:
logger.error("[LAYER1] Security Guard failed: %s", e)
degradation_status.degrade("Security Guard", str(e))
return ("security_guard", {
"extraction_status": "degraded",
"functions": [], "imports": [], "patterns": [], "hardcoded": [],
"stats": {"total_lines": 0, "functions_found": 0, "patterns_found": 0},
"_degraded": True, "_error": str(e),
})
def _run_intel_fusion(intel_input: list | str) -> tuple[str, dict]:
""" Thread 銝剖瑁 Intel Fusion嚗剔雁望亥岷嚗
v3.4 靽桀儔嚗亙憟隞嗅蝔勗銵剁憪蝔撘蝣潘嚗閫瘙 0 CVE 憿
intel_input 臭誑荔
- list[str]嚗憟隞嗅蝔勗銵剁 package_extractor 敺喳伐
- str嚗憪 tech_stack嚗fallback嚗Path A 憟隞嗆格芋撘嚗
"""
try:
from agents.intel_fusion import run_intel_fusion
result = run_intel_fusion(intel_input, on_progress)
return ("intel_fusion", result)
except Exception as e:
logger.error("[LAYER1] Intel Fusion failed: %s", e)
degradation_status.degrade("Intel Fusion", str(e))
return ("intel_fusion", {
"fusion_results": [],
"strategy_applied": "degraded",
"api_health_summary": {},
"_degraded": True, "_error": str(e),
})
def _run_scout() -> tuple[str, dict]:
"""Run Scout in parallel with Security Guard during discovery."""
try:
scout_target = tech_stack
scout_input_type = input_type
local_packages: list[str] = []
if input_type in {"code", "injection", "config"}:
from agents.security_guard import extract_code_surface
from tools.package_extractor import (
format_packages_for_intel_fusion,
packages_from_security_guard,
)
local_packages = packages_from_security_guard(extract_code_surface(tech_stack))
if not local_packages:
return ("scout", {
"scan_id": f"scan_code_only_{int(time.time())}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tech_stack": [],
"vulnerabilities": [],
"summary": {
"total": 0,
"new_since_last_scan": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
},
"_duration_ms": 0,
"_package_scan_skipped": True,
"_skip_reason": "source_code_without_third_party_packages",
"_extracted_packages_local": [],
})
scout_target = format_packages_for_intel_fusion(local_packages)
scout_input_type = "pkg"
result, scout_sl = stage_scout(scout_target, input_type=scout_input_type)
if scout_sl.steps:
result["_duration_ms"] = scout_sl.steps[-1].get("duration_ms", 0)
if local_packages:
result["_extracted_packages_local"] = local_packages
result["_scout_target_input"] = scout_target
result["_scout_input_type"] = scout_input_type
return ("scout", result)
except Exception as e:
logger.error("[LAYER1] Scout failed: %s", e)
degradation_status.degrade("Scout", str(e))
return ("scout", {
"scan_id": f"scan_degraded_{int(time.time())}",
"vulnerabilities": [],
"summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0},
"_degraded": True, "_error": str(e),
})
def _extract_packages_from_sg_result(sg_result: dict[str, Any]) -> list[str]:
"""從 Security Guard 結果萃取第三方套件,供後續 Scout 使用。"""
try:
from tools.package_extractor import packages_from_security_guard
extracted = packages_from_security_guard(sg_result)
logger.info(
"[LAYER1] PackageExtractor: %d packages extracted from imports: %s",
len(extracted), extracted,
)
return extracted
except Exception as pe:
logger.warning("[LAYER1] PackageExtractor failed (fallback to raw input): %s", pe)
return []
def _extract_cwe_targets_from_sg_result(sg_result: dict[str, Any]) -> list[str]:
"""從 Security Guard patterns 中整理 CWE targets,供 observability 與 fallback 使用。"""
sg_patterns = sg_result.get("patterns", [])
seen_cwe: set[str] = set()
cwe_targets: list[str] = []
for pattern in sg_patterns:
cwe = pattern.get("cwe_id", "")
if cwe and cwe.startswith("CWE-") and cwe not in seen_cwe:
seen_cwe.add(cwe)
cwe_targets.append(cwe)
return cwe_targets
extracted_packages: list[str] = []
cwe_targets: list[str] = []
requested_workers = [
agent_name
for agent_name in ("security_guard", "scout")
if agent_name in parallel_agents
]
if not requested_workers:
return layer1_results
futures: dict[Any, str] = {}
max_workers = len(requested_workers)
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="layer1") as executor:
if "security_guard" in requested_workers:
rate_limiter.wait_if_needed("security_guard")
futures[executor.submit(_run_security_guard)] = "security_guard"
if "scout" in requested_workers:
rate_limiter.wait_if_needed("scout")
if on_progress:
on_progress("scout", "RUNNING", {"step": "discovery_parallel"})
futures[executor.submit(_run_scout)] = "scout"
for future in as_completed(futures):
agent_name = futures[future]
try:
result_name, result_payload = future.result()
except Exception as exc:
logger.error("[LAYER1] %s future failed unexpectedly: %s", agent_name, exc)
degradation_status.degrade(agent_name, str(exc))
continue
layer1_results[result_name] = result_payload
logger.info("[LAYER1] %s complete", result_name)
if result_name == "security_guard":
sg_extracted = _extract_packages_from_sg_result(result_payload)
if sg_extracted or not extracted_packages:
extracted_packages = sg_extracted
cwe_targets = _extract_cwe_targets_from_sg_result(result_payload)
if cwe_targets:
logger.info(
"[LAYER1] Security Guard produced %d CWE targets: %s",
len(cwe_targets), cwe_targets,
)
elif result_name == "scout" and result_payload.get("_extracted_packages_local") and not extracted_packages:
extracted_packages = list(result_payload.get("_extracted_packages_local") or [])
layer1_results["_extracted_packages"] = extracted_packages
layer1_results["_cwe_targets"] = cwe_targets
return layer1_results
def _build_intel_fusion_input(
scout_output: dict[str, Any],
sg_result: dict[str, Any] | None,
code_patterns: list[dict[str, Any]],
extracted_packages: list[str],
tech_stack: str,
input_type: str,
) -> dict[str, Any]:
"""Build a clean post-discovery input for Intel Fusion."""
cve_ids: list[str] = []
seen_cves: set[str] = set()
for vuln in scout_output.get("vulnerabilities", []):
cve_id = str(vuln.get("cve_id", "")).strip()
if (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")) and cve_id not in seen_cves:
seen_cves.add(cve_id)
cve_ids.append(cve_id)
cwe_ids: list[str] = []
seen_cwes: set[str] = set()
for pattern in code_patterns:
cwe_id = str(pattern.get("cwe_id", "")).strip()
if cwe_id.startswith("CWE-") and cwe_id not in seen_cwes:
seen_cwes.add(cwe_id)
cwe_ids.append(cwe_id)
return {
"mode": "post_discovery",
"input_type": input_type,
"cve_ids": cve_ids,
"cwe_ids": cwe_ids,
"packages": extracted_packages,
"scout_vulnerabilities": scout_output.get("vulnerabilities", [])[:20],
"code_patterns": code_patterns[:20],
"security_guard_stats": (sg_result or {}).get("stats", {}),
"raw_input_fingerprint": str(hash(tech_stack)),
}
def _merge_intel_fusion_into_scout(
scout_output: dict[str, Any],
intel_fusion_result: dict[str, Any],
) -> dict[str, Any]:
"""Merge Intel Fusion evidence after Scout completes discovery."""
if not intel_fusion_result:
return scout_output
try:
from agents.scout import merge_intel_fusion_evidence
return merge_intel_fusion_evidence(scout_output, intel_fusion_result)
except Exception as exc:
logger.warning("[PIPELINE] Intel Fusion merge skipped: %s", exc)
scout_output["intel_fusion_result"] = intel_fusion_result
return scout_output
def run_pipeline(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]:
"""
瑁摰渡 ThreatHunter 蝞∠嚗v3.1 Orchestrator 撽嚗
Pipeline: Orchestrator [Layer 1 銝西] Scout Analyst [Critic] Advisor
v3.7: input_type 瘙箏 Path-Aware Skills 頝舐晞
pkg=憟隞嗆 / code=皞蝣澆祟閮 / injection=AI摰 / config=閮剖瑼
v3.9 Sandbox: SANDBOX_ENABLED=true 銝 Docker 舐剁典捆典批瑁
Args:
tech_stack: 雿輻刻頛詨亦銵摮銝莎憒 "Django 4.2, Redis 7.0"嚗
input_type: 蝡臬菜葫頛詨仿 (pkg/code/config/injection)
Returns:
怠 Advisor 銵勗 dict嚗銝 pipeline_meta 甈雿
"""
# (comment encoding corrupted)
if SANDBOX_ENABLED and _DOCKER_SANDBOX_OK and is_docker_available():
logger.info("[SANDBOX] Docker isolation ACTIVE delegating to container")
result = run_in_sandbox(tech_stack=tech_stack, input_type=input_type)
if not result.get("fallback"):
return result # 摰孵典瑁
# (comment encoding corrupted)
logger.warning(
"[SANDBOX] Container fallback: %s using in-process mode",
result.get("error", "unknown"),
)
# (comment encoding corrupted)
return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type)
def run_pipeline_sync(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]:
"""
run_pipeline 甇亙亙嚗靘 sandbox/sandbox_runner.py 典捆典批澆恬
瘜冽嚗摰孵典批澆急迨賢 SANDBOX_ENABLED=false嚗踹餈湧脣 Docker
"""
# (comment encoding corrupted)
return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type)
def _run_pipeline_with_callback_legacy_v31(
tech_stack: str,
progress_callback: Any = None,
input_type: str = "pkg",
) -> dict[str, Any]:
"""
瑁摰 Pipeline嚗瘥 Stage 摰敺澆 progress_callback
v3.7: input_type 瘙箏 Agent 頛芸 Skill SOP嚗Path-Aware Skills嚗
Args:
tech_stack: 銵摮銝
progress_callback: 交 (agent_name: str, status: str, detail: dict) 賢
input_type: 頛詨仿 (pkg / code / config / injection)
Returns:
怠 Advisor 銵勗 dict
"""
pipeline_start = time.time()
completed_stages: list[str] = []
stages_detail: dict[str, Any] = {}
# (comment encoding corrupted)
orch_ctx: Any = None
task_plan: dict = {"path": "B"} # 閮剛楝敺 B
layer1_results: dict[str, Any] = {}
feedback_loop_count: int = 0
MAX_FEEDBACK_LOOPS: int = 2
def _notify(agent: str, status: str, detail: dict) -> None:
if progress_callback:
try:
progress_callback(agent, status, detail)
except Exception:
pass # callback 憭望銝敶梢 pipeline
logger.info("=" * 60)
logger.info(" ThreatHunter Pipeline v3.1 Start (Orchestrator-driven)")
logger.info(" tech_stack : %s", tech_stack)
logger.info(" input_type : %s", input_type)
logger.info(" ENABLE_CRITIC: %s", ENABLE_CRITIC)
logger.info("=" * 60)
# (comment encoding corrupted)
from checkpoint import recorder
recorder.start_scan(f"pipe_{int(pipeline_start)}")
# (comment encoding corrupted)
# (comment encoding corrupted)
l0_report: dict[str, Any] = {}
sanitized_stack: str = tech_stack
try:
from input_sanitizer import sanitize_input, format_l0_report
san_result = sanitize_input(tech_stack)
l0_report = format_l0_report(san_result)
if not san_result.safe:
# (comment encoding corrupted)
logger.warning("[L0] Input BLOCKED: %s", san_result.blocked_reason)
_notify("input_sanitizer", "COMPLETE", {
"status": "BLOCKED",
"reason": san_result.blocked_reason,
})
return {
"executive_summary": f"頛詨亥◤摰券瞈曉冽蝯嚗{san_result.blocked_reason}",
"blocked": True,
"actions": {"urgent": [], "important": [], "resolved": []},
"risk_score": 0,
"risk_trend": "+0",
"pipeline_meta": {
"pipeline_version": "3.1",
"tech_stack": tech_stack,
"stages_completed": 0,
"stages_detail": {},
"enable_critic": ENABLE_CRITIC,
"critic_verdict": "SKIPPED",
"critic_score": 0.0,
"duration_seconds": 0.0,
"degradation": {"level": 5, "label": "INPUT_BLOCKED"},
"generated_at": datetime.now(timezone.utc).isoformat(),
"l0_report": l0_report,
},
}
sanitized_stack = san_result.sanitized_input
if san_result.truncated:
logger.warning("[L0] Input truncated: %d %d chars",
san_result.original_length, len(sanitized_stack))
logger.info("[L0] OK: type=%s l0_findings=%d hash=%s",
san_result.input_type, len(san_result.l0_findings), san_result.input_hash)
_notify("input_sanitizer", "COMPLETE", {
"status": "SUCCESS",
"input_type": san_result.input_type,
"l0_warning_count": l0_report.get("l0_warning_count", 0),
"truncated": san_result.truncated,
})
recorder.stage_exit("input_sanitizer", "SUCCESS", {
"input_type": san_result.input_type,
"l0_warning_count": l0_report.get("l0_warning_count", 0),
}, 0)
except ImportError:
# (comment encoding corrupted)
logger.warning("[L0] input_sanitizer not available, skipping L0 filter")
except Exception as e:
# (comment encoding corrupted)
logger.error("[L0] Sanitizer error (non-fatal): %s", e)
# (comment encoding corrupted)
_notify("orchestrator", "RUNNING", {})
orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack)
scan_path = task_plan.get("path", "B")
orch_detail = {
"status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED",
"scan_path": scan_path,
"agents_to_run": task_plan.get("agents_to_run", []),
"duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0,
"l0_input_type": l0_report.get("input_type", "unknown"),
}
stages_detail["orchestrator"] = orch_detail
completed_stages.append("orchestrator")
_notify("orchestrator", "COMPLETE", orch_detail)
recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]})
recorder.stage_exit("orchestrator", orch_detail.get("status", "SUCCESS"), {
"scan_path": scan_path,
"agents_to_run": task_plan.get("agents_to_run", []),
}, orch_detail.get("duration_ms", 0))
logger.info("[PIPELINE] Scan path: %s", scan_path)
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
extracted_packages: list[str] = []
if scan_path in ("A", "B"):
parallel_agents = task_plan.get("parallel_layer1", [])
if parallel_agents:
_notify("layer1_parallel", "RUNNING", {"agents": parallel_agents})
layer1_results = _run_layer1_parallel(tech_stack, task_plan, _notify)
_notify("layer1_parallel", "COMPLETE", {
"agents_completed": list(layer1_results.keys()),
})
# (comment encoding corrupted)
if "intel_fusion" in layer1_results:
extracted_packages = layer1_results["intel_fusion"].get("_extracted_packages", [])
if extracted_packages:
logger.info(
"[PIPELINE] extracted_packages from layer1: %s", extracted_packages
)
# (comment encoding corrupted)
for agent_name, result in layer1_results.items():
is_degraded = result.get("_degraded", False)
agent_detail = {
"status": "DEGRADED" if is_degraded else "SUCCESS",
"duration_ms": result.get("_duration_ms", 0),
# (comment encoding corrupted)
"_degraded": is_degraded,
"_error": result.get("_error", "") if is_degraded else "",
}
# (comment encoding corrupted)
if agent_name == "security_guard":
agent_detail["functions_found"] = result.get("stats", {}).get("functions_found", 0)
agent_detail["patterns_found"] = result.get("stats", {}).get("patterns_found", 0)
agent_detail["injection_detected"] = result.get("injection_attempts_detected", False)
# (comment encoding corrupted)
elif agent_name == "intel_fusion":
agent_detail["cves_scored"] = len(result.get("fusion_results", []))
stages_detail[agent_name] = agent_detail
completed_stages.append(agent_name)
recorder.stage_exit(agent_name, agent_detail["status"], agent_detail, agent_detail.get("duration_ms", 0))
# (comment encoding corrupted)
if orch_ctx is not None:
for agent_name, result in layer1_results.items():
try:
orch_ctx.store_result(agent_name, result)
except Exception:
pass
# (comment encoding corrupted)
# (comment encoding corrupted)
_notify("scout", "RUNNING", {})
rate_limiter.wait_if_needed("scout")
# (comment encoding corrupted)
# (comment encoding corrupted)
scout_input: str
if extracted_packages:
from tools.package_extractor import format_packages_for_intel_fusion
scout_input = format_packages_for_intel_fusion(extracted_packages)
logger.info("[PIPELINE] Scout using extracted packages: %s", scout_input)
else:
scout_input = tech_stack
logger.info("[PIPELINE] Scout using raw tech_stack (no packages extracted)")
scout_output, scout_sl = stage_scout(
scout_input,
input_type=input_type,
intel_fusion_result=layer1_results.get("intel_fusion"),
)
# 保留原始 Layer 1 情報,方便 UI 與除錯觀察。
if "intel_fusion" in layer1_results:
scout_output["intel_fusion_result"] = layer1_results["intel_fusion"]
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
# (comment encoding corrupted)
_sg_code_patterns: list[dict] = [] # 函式級別變數,確保 final return 可存取
if "security_guard" in layer1_results:
_sg_code_patterns = _build_code_patterns_summary(
layer1_results["security_guard"]
)
if _sg_code_patterns:
scout_output["code_patterns"] = _sg_code_patterns
logger.info(
"[PIPELINE] v4.0: SG code_patterns injected into scout_output "
"(%d patterns, Path=%s) — will be merged into final result",
len(_sg_code_patterns),
input_type,
)
else:
logger.debug("[PIPELINE] v4.0: SG returned no code_patterns (clean code path)")
scout_detail = {
"status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED",
"vuln_count": len(scout_output.get("vulnerabilities", [])),
"duration_ms": scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0,
"packages_used": extracted_packages,
}
stages_detail["scout"] = scout_detail
completed_stages.append("scout")
_notify("scout", "COMPLETE", scout_detail)
# (comment encoding corrupted)
from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP
recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages},
skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"),
input_type=input_type)
recorder.stage_exit("scout", scout_detail.get("status", "SUCCESS"), scout_output, scout_detail.get("duration_ms", 0))
# (comment encoding corrupted)
if scan_path == "C":
logger.info("[PIPELINE] Path C: skipping Analyst + Critic direct to Advisor")
_notify("analyst", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"})
_notify("critic", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"})
# (comment encoding corrupted)
analyst_output: dict[str, Any] = {
"scan_id": scout_output.get("scan_id", "unknown"),
"risk_score": 30,
"risk_trend": "+0",
"analysis": [],
"_skipped": True,
"_reason": "path_C_doc_scan",
}
critic_output: dict[str, Any] = {
"verdict": "SKIPPED",
"weighted_score": 60.0,
"_skipped": True,
}
else:
# (comment encoding corrupted)
_notify("analyst", "RUNNING", {})
rate_limiter.wait_if_needed("analyst")
analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type)
analyst_detail = {
"status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED",
"risk_score": analyst_output.get("risk_score", 0),
"duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0,
}
stages_detail["analyst"] = analyst_detail
completed_stages.append("analyst")
_notify("analyst", "COMPLETE", analyst_detail)
# v3.7: stage_enter with skill_file + input_type
from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP
recorder.stage_enter("analyst", scout_output,
skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"),
input_type=input_type)
recorder.stage_exit("analyst", analyst_detail.get("status", "SUCCESS"), analyst_output, analyst_detail.get("duration_ms", 0))
# (comment encoding corrupted)
_notify("critic", "RUNNING", {})
rate_limiter.wait_if_needed("critic")
critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type)
critic_detail = {
"status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED",
"verdict": critic_output.get("verdict", "SKIPPED"),
"score": critic_output.get("weighted_score", 0),
"duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0,
}
stages_detail["critic"] = critic_detail
completed_stages.append("critic")
_notify("critic", "COMPLETE", critic_detail)
# v3.7: stage_enter with skill_file + input_type
from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP
recorder.stage_enter("critic", analyst_output,
skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"),
input_type=input_type)
recorder.stage_exit("critic", critic_detail.get("status", "SUCCESS"), critic_output, critic_detail.get("duration_ms", 0))
# (comment encoding corrupted)
_notify("advisor", "RUNNING", {})
rate_limiter.wait_if_needed("advisor")
advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type)
advisor_detail = {
"status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED",
"urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])),
"duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0,
}
stages_detail["advisor"] = advisor_detail
completed_stages.append("advisor")
_notify("advisor", "COMPLETE", advisor_detail)
# v3.7: stage_enter with skill_file + input_type
from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP
recorder.stage_enter("advisor", analyst_output,
skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"),
input_type=input_type)
recorder.stage_exit("advisor", advisor_detail.get("status", "SUCCESS"), advisor_output, advisor_detail.get("duration_ms", 0))
# (comment encoding corrupted)
# (comment encoding corrupted)
advisor_confidence = advisor_output.get("confidence", "HIGH")
feedback_triggered = (
advisor_confidence in ("NEEDS_VERIFICATION", "LOW", "MEDIUM")
and feedback_loop_count < MAX_FEEDBACK_LOOPS
and scan_path != "D" # 脫迫頝臬 D ⊿擖
)
if feedback_triggered:
feedback_loop_count += 1
logger.warning(
"[PIPELINE] Advisor confidence=%s triggering Feedback Loop %d/%d",
advisor_confidence, feedback_loop_count, MAX_FEEDBACK_LOOPS,
)
_notify("feedback_loop", "RUNNING", {
"loop": feedback_loop_count,
"reason": f"confidence={advisor_confidence}",
})
# (comment encoding corrupted)
low_conf_cves = [
a.get("cve_id") for a in analyst_output.get("analysis", [])
if a.get("chain_risk", {}).get("confidence") in ("NEEDS_VERIFICATION", "LOW")
]
logger.info("[PIPELINE] Feedback Loop targeting CVEs: %s", low_conf_cves)
# (comment encoding corrupted)
if orch_ctx is not None:
try:
orch_ctx.feedback_loops = feedback_loop_count
except Exception:
pass
_notify("feedback_loop", "COMPLETE", {"loop": feedback_loop_count, "cves": low_conf_cves})
# (comment encoding corrupted)
orch_summary: dict = {}
if orch_ctx is not None:
try:
from agents.orchestrator import finalize_orchestration
orch_ctx.final_confidence = advisor_confidence
orch_summary = finalize_orchestration(orch_ctx)
except Exception as e:
logger.warning("[PIPELINE] finalize_orchestration failed: %s", e)
# (comment encoding corrupted)
duration = round(time.time() - pipeline_start, 2)
pipeline_meta = {
"pipeline_version": "3.1",
"tech_stack": tech_stack,
"scan_path": scan_path,
"stages_completed": len(completed_stages),
"stages_detail": stages_detail,
"enable_critic": ENABLE_CRITIC,
"critic_verdict": critic_output.get("verdict", "SKIPPED"),
"critic_score": critic_output.get("weighted_score", 0),
"feedback_loops": feedback_loop_count,
"duration_seconds": duration,
"degradation": degradation_status.to_dict(),
"orchestration": orch_summary,
"layer1_agents": list(layer1_results.keys()),
"generated_at": datetime.now(timezone.utc).isoformat(),
}
logger.info(
"Pipeline v3.1 COMPLETE in %.1fs | path=%s | feedback_loops=%d",
duration, scan_path, feedback_loop_count,
)
recorder.end_scan("COMPLETE", duration)
# ── Harness Layer 7: 強制注入 Security Guard CWE 佐證 ──────────────────
# _sg_code_patterns 由 _build_code_patterns_summary() 生成,含 MITRE CWE 定義。
# Advisor LLM 從不接收 code_patterns,因此必須在 return 前直接合併。
# 確保無論掃描路徑(A/B/C)都能在 API response 中看到 CWE 佐證。
report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results)
final_output: dict[str, Any] = {
**advisor_output,
**report_payload,
"pipeline_meta": pipeline_meta,
}
if _sg_code_patterns:
existing_cps = final_output.get("code_patterns_summary", [])
final_output["code_patterns_summary"] = existing_cps + _sg_code_patterns
logger.info(
"[PIPELINE] Harness L7: %d CWE-enriched code patterns merged into final result",
len(_sg_code_patterns),
)
return final_output
def run_pipeline_with_callback(
tech_stack: str,
progress_callback: Any = None,
input_type: str = "pkg",
) -> dict[str, Any]:
"""
Canonical pipeline DAG:
Orchestrator -> (Security Guard + Scout in parallel) -> Intel Fusion
-> Analyst -> Critic -> Advisor.
"""
pipeline_start = time.time()
completed_stages: list[str] = []
stages_detail: dict[str, Any] = {}
layer1_results: dict[str, Any] = {}
def _notify(agent: str, status: str, detail: dict) -> None:
if not progress_callback:
return
try:
progress_callback(agent, status, detail)
except Exception as exc:
logger.debug("[PIPELINE] progress callback ignored: %s", exc)
def _remember_stage(agent: str, detail: dict[str, Any]) -> None:
stages_detail[agent] = detail
if agent not in completed_stages:
completed_stages.append(agent)
from checkpoint import recorder
recorder.start_scan(f"pipe_{int(pipeline_start)}")
l0_report: dict[str, Any] = {}
sanitized_stack = tech_stack
try:
from input_sanitizer import sanitize_input, format_l0_report
san_result = sanitize_input(tech_stack)
l0_report = format_l0_report(san_result)
if not san_result.safe:
_notify("input_sanitizer", "COMPLETE", {
"status": "BLOCKED",
"reason": san_result.blocked_reason,
})
duration = round(time.time() - pipeline_start, 2)
return {
"executive_summary": f"Input blocked: {san_result.blocked_reason}",
"blocked": True,
"actions": {"urgent": [], "important": [], "resolved": []},
"risk_score": 0,
"risk_trend": "+0",
"pipeline_meta": {
"pipeline_version": "4.0",
"tech_stack": tech_stack,
"scan_path": "BLOCKED",
"stages_completed": 0,
"stages_detail": {},
"enable_critic": ENABLE_CRITIC,
"critic_verdict": "SKIPPED",
"critic_score": 0.0,
"duration_seconds": duration,
"degradation": {"level": 5, "label": "INPUT_BLOCKED"},
"generated_at": datetime.now(timezone.utc).isoformat(),
"l0_report": l0_report,
},
}
sanitized_stack = san_result.sanitized_input
input_type = _canonical_runtime_input_type(input_type, san_result.input_type)
l0_detail = {
"status": "SUCCESS",
"input_type": san_result.input_type,
"pipeline_input_type": input_type,
"l0_warning_count": l0_report.get("l0_warning_count", 0),
"truncated": san_result.truncated,
}
_remember_stage("input_sanitizer", l0_detail)
_notify("input_sanitizer", "COMPLETE", l0_detail)
recorder.stage_exit("input_sanitizer", "SUCCESS", l0_detail, 0)
except Exception as exc:
logger.warning("[L0] Sanitizer skipped: %s", exc)
if input_type == "sql_review":
from tools.sql_syntax_reviewer import review_sql_syntax
_notify("sql_review", "RUNNING", {})
review = review_sql_syntax(sanitized_stack)
sql_detail = {
"status": "SUCCESS",
"patterns_detected": review.get("summary", {}).get("patterns_detected", 0),
"findings_count": review.get("summary", {}).get("total", 0),
"requires_application_context": True,
}
_remember_stage("sql_review", sql_detail)
_notify("sql_review", "COMPLETE", sql_detail)
recorder.stage_exit("sql_review", "SUCCESS", sql_detail, 0)
duration = round(time.time() - pipeline_start, 2)
recorder.end_scan("COMPLETE", duration)
return {
"executive_summary": (
"SQL syntax review completed. Findings are payload/syntax evidence only; "
"application source context is required before calling this a verified vulnerability."
),
"actions": {"urgent": [], "important": [], "resolved": []},
"risk_score": 0,
"risk_trend": "+0",
"sql_syntax_review": review,
"vulnerability_detail": [],
"vulnerability_summary": {
"total": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"new": 0,
"new_since_last_scan": 0,
},
"report_sources": {
"vulnerability_detail": "sql_syntax_review",
"layer1_state": "skipped",
"external_scan_skipped": True,
},
"code_patterns_summary": [],
"pipeline_meta": {
"pipeline_version": "4.0",
"tech_stack": tech_stack,
"scan_path": "C",
"stages_completed": len(completed_stages),
"stages_detail": stages_detail,
"enable_critic": ENABLE_CRITIC,
"critic_verdict": "SKIPPED",
"critic_score": 0.0,
"feedback_loops": 0,
"duration_seconds": duration,
"degradation": degradation_status.to_dict(),
"orchestration": {},
"layer1_agents": [],
"fusion_after_discovery": False,
"l0_report": l0_report,
"generated_at": datetime.now(timezone.utc).isoformat(),
},
}
_notify("orchestrator", "RUNNING", {})
orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack)
task_plan = _constrain_task_plan_by_input_type(task_plan, input_type)
scan_path = task_plan.get("path", "B")
orch_detail = {
"status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED",
"scan_path": scan_path,
"agents_to_run": task_plan.get("agents_to_run", []),
"duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0,
"l0_input_type": l0_report.get("input_type", "unknown"),
"pipeline_input_type": input_type,
"route_corrected": task_plan.get("_route_corrected"),
}
_remember_stage("orchestrator", orch_detail)
_notify("orchestrator", "COMPLETE", orch_detail)
recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]})
recorder.stage_exit("orchestrator", orch_detail["status"], orch_detail, orch_detail["duration_ms"])
extracted_packages: list[str] = []
scout_input = sanitized_stack
scout_output: dict[str, Any] = {}
scout_duration_ms = 0
sg_code_patterns: list[dict[str, Any]] = []
intel_fusion_result: dict[str, Any] = {}
benchmark_context: dict[str, Any] | None = None
parallel_agents = task_plan.get("parallel_layer1", [])
if parallel_agents:
_notify("layer1_parallel", "RUNNING", {"agents": parallel_agents})
layer1_results = _run_layer1_parallel(
sanitized_stack, task_plan, _notify, input_type=input_type
)
visible_agents = [name for name in layer1_results if not name.startswith("_")]
_notify("layer1_parallel", "COMPLETE", {"agents_completed": visible_agents})
extracted_packages = layer1_results.get("_extracted_packages", [])
if "security_guard" in layer1_results:
sg_result = layer1_results["security_guard"]
sg_code_patterns = _build_code_patterns_summary(sg_result)
sg_detail = {
"status": "SUCCESS" if not sg_result.get("_degraded") else "DEGRADED",
"duration_ms": sg_result.get("_duration_ms", 0),
"functions_found": sg_result.get("stats", {}).get("functions_found", 0),
"patterns_found": sg_result.get("stats", {}).get("patterns_found", 0),
"injection_detected": sg_result.get("injection_attempts_detected", False),
"_degraded": sg_result.get("_degraded", False),
"_error": sg_result.get("_error", "") if sg_result.get("_degraded") else "",
}
_remember_stage("security_guard", sg_detail)
recorder.stage_exit("security_guard", sg_detail["status"], sg_detail, sg_detail["duration_ms"])
if "scout" in layer1_results:
scout_output = layer1_results["scout"]
scout_duration_ms = scout_output.get("_duration_ms", 0)
scout_detail = {
"status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED",
"vuln_count": len(scout_output.get("vulnerabilities", [])),
"duration_ms": scout_duration_ms,
"packages_used": extracted_packages,
"_degraded": scout_output.get("_degraded", False),
"_error": scout_output.get("_error", "") if scout_output.get("_degraded") else "",
}
_remember_stage("scout", scout_detail)
_notify("scout", "COMPLETE", scout_detail)
if not scout_output:
_notify("scout", "RUNNING", {})
rate_limiter.wait_if_needed("scout")
scout_output, scout_sl = stage_scout(scout_input, input_type=input_type)
scout_duration_ms = scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0
scout_detail = {
"status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED",
"vuln_count": len(scout_output.get("vulnerabilities", [])),
"duration_ms": scout_duration_ms,
"packages_used": extracted_packages,
}
_remember_stage("scout", scout_detail)
_notify("scout", "COMPLETE", scout_detail)
if sg_code_patterns:
scout_output["code_patterns"] = sg_code_patterns
from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP
recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages},
skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"),
input_type=input_type)
recorder.stage_exit("scout", stages_detail["scout"]["status"], scout_output, scout_duration_ms)
if task_plan.get("fusion_after_discovery", False):
intel_input = _build_intel_fusion_input(
scout_output=scout_output,
sg_result=layer1_results.get("security_guard"),
code_patterns=sg_code_patterns,
extracted_packages=extracted_packages,
tech_stack=sanitized_stack,
input_type=input_type,
)
intel_fusion_result, intel_sl = stage_intel_fusion(intel_input, _notify)
layer1_results["intel_fusion"] = intel_fusion_result
scout_output["intel_fusion_result"] = intel_fusion_result
scout_output = _merge_intel_fusion_into_scout(scout_output, intel_fusion_result)
intel_detail = {
"status": "SUCCESS" if not intel_fusion_result.get("_degraded") else "DEGRADED",
"cves_scored": len(intel_fusion_result.get("fusion_results", [])),
"duration_ms": intel_sl.steps[-1].get("duration_ms", 0) if intel_sl.steps else 0,
"_degraded": intel_fusion_result.get("_degraded", False),
"_error": intel_fusion_result.get("_error", "") if intel_fusion_result.get("_degraded") else "",
}
_remember_stage("intel_fusion", intel_detail)
if orch_ctx is not None:
try:
orch_ctx.store_result("intel_fusion", intel_fusion_result)
except Exception as exc:
logger.debug("[PIPELINE] Orchestration store skipped for Intel Fusion: %s", exc)
if scan_path == "C":
analyst_output = {
"scan_id": scout_output.get("scan_id", "unknown"),
"risk_score": 30,
"risk_trend": "+0",
"analysis": [],
"_skipped": True,
"_reason": "path_C_doc_scan",
}
critic_output = {"verdict": "SKIPPED", "weighted_score": 60.0, "_skipped": True}
_remember_stage("analyst", {"status": "SKIPPED", "reason": "path_C"})
_remember_stage("critic", {"status": "SKIPPED", "reason": "path_C"})
_notify("analyst", "COMPLETE", stages_detail["analyst"])
_notify("critic", "COMPLETE", stages_detail["critic"])
else:
_notify("analyst", "RUNNING", {})
rate_limiter.wait_if_needed("analyst")
analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type)
benchmark_context = _build_dim11_benchmark_context(
source=sanitized_stack,
input_type=input_type,
task_plan=task_plan,
code_patterns=sg_code_patterns,
scout_output=scout_output,
extracted_packages=extracted_packages,
)
if benchmark_context:
analyst_output["benchmark_context"] = benchmark_context
analyst_output["observed_cwe_categories"] = benchmark_context["observed_cwe_categories"]
logger.info(
"[DIM11] benchmark context attached before Critic: fixture=%s observed=%d expected=%d",
benchmark_context.get("fixture"),
len(benchmark_context.get("observed_cwe_categories", [])),
len(benchmark_context.get("expected_cwe_categories", [])),
)
analyst_detail = {
"status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED",
"risk_score": analyst_output.get("risk_score", 0),
"duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0,
}
_remember_stage("analyst", analyst_detail)
_notify("analyst", "COMPLETE", analyst_detail)
from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP
recorder.stage_enter("analyst", scout_output,
skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"),
input_type=input_type)
recorder.stage_exit("analyst", analyst_detail["status"], analyst_output, analyst_detail["duration_ms"])
_notify("critic", "RUNNING", {})
rate_limiter.wait_if_needed("critic")
critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type)
critic_detail = {
"status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED",
"verdict": critic_output.get("verdict", "SKIPPED"),
"score": critic_output.get("weighted_score", 0),
"duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0,
}
if critic_output.get("recall_challenge"):
critic_detail["recall_challenge_verdict"] = critic_output["recall_challenge"].get("verdict")
_remember_stage("critic", critic_detail)
_notify("critic", "COMPLETE", critic_detail)
from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP
recorder.stage_enter("critic", analyst_output,
skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"),
input_type=input_type)
recorder.stage_exit("critic", critic_detail["status"], critic_output, critic_detail["duration_ms"])
_notify("advisor", "RUNNING", {})
rate_limiter.wait_if_needed("advisor")
advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type)
advisor_detail = {
"status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED",
"urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])),
"duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0,
}
_remember_stage("advisor", advisor_detail)
_notify("advisor", "COMPLETE", advisor_detail)
from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP
recorder.stage_enter("advisor", analyst_output,
skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"),
input_type=input_type)
recorder.stage_exit("advisor", advisor_detail["status"], advisor_output, advisor_detail["duration_ms"])
duration = round(time.time() - pipeline_start, 2)
try:
orch_summary = {}
if orch_ctx is not None:
from agents.orchestrator import finalize_orchestration
orch_ctx.final_confidence = advisor_output.get("confidence", "HIGH")
orch_summary = finalize_orchestration(orch_ctx)
except Exception as exc:
logger.debug("[PIPELINE] finalize_orchestration skipped: %s", exc)
orch_summary = {}
pipeline_meta = {
"pipeline_version": "4.0",
"tech_stack": tech_stack,
"scan_path": scan_path,
"stages_completed": len(completed_stages),
"stages_detail": stages_detail,
"enable_critic": ENABLE_CRITIC,
"critic_verdict": critic_output.get("verdict", "SKIPPED"),
"critic_score": critic_output.get("weighted_score", 0),
"feedback_loops": 0,
"duration_seconds": duration,
"degradation": degradation_status.to_dict(),
"orchestration": orch_summary,
"layer1_agents": [name for name in layer1_results if not name.startswith("_")],
"fusion_after_discovery": bool(task_plan.get("fusion_after_discovery", False)),
"l0_report": l0_report,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
if benchmark_context:
pipeline_meta["benchmark_context"] = benchmark_context
if critic_output.get("recall_challenge"):
pipeline_meta["critic_recall_challenge"] = critic_output["recall_challenge"]
if intel_fusion_result.get("evidence_contract"):
pipeline_meta["intel_fusion_evidence_contract"] = intel_fusion_result["evidence_contract"]
recorder.end_scan("COMPLETE", duration)
report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results)
final_output: dict[str, Any] = {
**advisor_output,
**report_payload,
"pipeline_meta": pipeline_meta,
}
if sg_code_patterns:
existing_cps = final_output.get("code_patterns_summary", [])
final_output["code_patterns_summary"] = existing_cps + sg_code_patterns
if critic_output.get("recall_challenge"):
final_output["critic_recall_challenge"] = critic_output["recall_challenge"]
if critic_output.get("needs_rescan"):
final_output["needs_rescan"] = True
return final_output
# ======================================================================
# (comment encoding corrupted)
# ======================================================================
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
if len(sys.argv) > 1:
tech_stack = " ".join(sys.argv[1:])
else:
tech_stack = "Django 4.2, Redis 7.0, PostgreSQL 16"
print(f"\nThreatHunter - Scanning: {tech_stack}\n")
result = run_pipeline(tech_stack)
print("\n=== Pipeline Result ===")
print(json.dumps(result, ensure_ascii=False, indent=2))