| """ |
| main.py - ThreatHunter 銝餌撘 |
| ============================== |
| Pipeline 嗆嚗v3.1嚗嚗Orchestrator [Layer 1 銝西] Scout Analyst Debate Advisor |
| |
| 嗆嚗v3.1 Orchestrator 撽嚗嚗 |
| Orchestrator 頝舐梧 |
| 頝臬 A嚗憟隞嗆 頝喲 Security Guard |
| 頝臬 B嚗摰渡撘蝣 Layer 1 銝西嚗Security Guard + Intel Fusion嚗 |
| 頝臬 C嚗隞嗅摹蝵 頝喲 Analyst + Debate |
| 頝臬 D嚗擖鋆 芷頝雿靽∪ CVE |
| |
| 嗆嚗靽靘嚗嚗 |
| 雿輻刻頛詨 "Django 4.2, Redis 7.0" |
| main.py Pipeline |
| Scout 嗯 Analyst 嗯 Critic |
| (鈭撖行園) (函斗) (舀) |
| 潑 |
| Advisor |
| (蝯鋆瘙箏勗) |
| 瘥 Stage 函嚗 |
| - try-except + Graceful Degradation |
| - StepLogger 摮甇仿亥 |
| - Harness 靽撅歹 agents/*.py 折剁 |
| |
| 鞈瘚嚗 |
| Scout 頛詨 (dict) Analyst 頛詨 (dict) |
| Analyst 頛詨 (dict) Critic 頛詨 (dict) |
| Analyst + Critic 頛詨 Advisor 頛詨 (dict) |
| Advisor 頛詨 (dict) + pipeline_meta 蝯蝯 |
| |
| Harness Engineering 靽嚗 |
| - 瘥 Agent 賭蝙函撖行芋蝯嚗agents/*.py嚗嚗 Stub |
| - Critic ENABLE_CRITIC 批塚舀嚗 |
| - 瘥 Stage 函 Graceful Degradation 蝝頝臬 |
| - 函閮 Observability 亥嚗FINAL_PLAN.md 舀 2嚗 |
| - 17 撅 Harness 靽撅歹 agents/*.py 折剁 |
| |
| 萄嚗project_CONSTITUTION.md + HARNESS_ENGINEERING.md |
| """ |
|
|
| import json |
| import logging |
| import sys |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
| from config import ( |
| ENABLE_CRITIC, |
| degradation_status, |
| rate_limiter, |
| ) |
|
|
|
|
|
|
| logger = logging.getLogger("threathunter.main") |
|
|
| import os |
|
|
| |
| |
| |
| SANDBOX_ENABLED = os.getenv("SANDBOX_ENABLED", "true").lower() == "true" |
|
|
| try: |
| from sandbox.docker_sandbox import run_in_sandbox, is_docker_available |
| _DOCKER_SANDBOX_OK = True |
| except ImportError: |
| _DOCKER_SANDBOX_OK = False |
| def run_in_sandbox(*args, **kwargs): |
| return {"error": "SANDBOX_NOT_AVAILABLE", "fallback": True} |
| def is_docker_available() -> bool: |
| return False |
|
|
| if SANDBOX_ENABLED: |
| logger.info( |
| "[SANDBOX] Docker isolation ENABLED | docker_available=%s", |
| is_docker_available(), |
| ) |
| else: |
| logger.debug("[SANDBOX] Docker isolation DISABLED (in-process mode)") |
| |
|
|
|
|
|
|
| |
| |
| |
|
|
|
|
| class StepLogger: |
| """瘥 Agent Stage 摮甇仿餈質馱具""" |
|
|
| def __init__(self, agent_name: str): |
| self.agent_name = agent_name |
| self.steps: list[dict[str, Any]] = [] |
|
|
| def log( |
| self, step: str, status: str, detail: str = "", duration_ms: int = 0 |
| ) -> None: |
| entry = { |
| "step": step, |
| "agent": self.agent_name, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "status": status, |
| "detail": detail, |
| "duration_ms": duration_ms, |
| } |
| self.steps.append(entry) |
| icon = ( |
| "OK" if status == "SUCCESS" else "WAIT" if status == "RUNNING" else "FAIL" |
| ) |
| logger.info("[%s] %s | %s | %s", icon, self.agent_name.upper(), step, detail) |
|
|
| def summary(self) -> dict[str, Any]: |
| failed = [s for s in self.steps if s["status"] == "FAILED"] |
| return { |
| "agent": self.agent_name, |
| "total_steps": len(self.steps), |
| "failed_steps": len(failed), |
| "steps": self.steps, |
| } |
|
|
|
|
| def _normalize_pipeline_task_plan(task_plan: dict[str, Any]) -> dict[str, Any]: |
| """ |
| Canonical runtime DAG: |
| Security Guard + Scout discover in parallel, then Intel Fusion ranks risk. |
| """ |
| plan = dict(task_plan or {}) |
| scan_path = plan.get("path", "B") |
|
|
| if scan_path == "B": |
| plan["parallel_layer1"] = ["security_guard", "scout"] |
| plan["fusion_after_discovery"] = True |
| plan["agents_to_run"] = [ |
| "security_guard", "scout", "intel_fusion", "analyst", "debate", "judge" |
| ] |
| elif scan_path == "A": |
| plan["parallel_layer1"] = ["scout"] |
| plan["fusion_after_discovery"] = True |
| plan["agents_to_run"] = ["scout", "intel_fusion", "analyst", "debate", "judge"] |
|
|
| return plan |
|
|
|
|
| _L0_TO_RUNTIME_INPUT_TYPE = { |
| "package_list": "pkg", |
| "source_code": "code", |
| "mixed": "code", |
| "config_file": "config", |
| "sql_review": "sql_review", |
| "blocked": "injection", |
| } |
|
|
|
|
| def _canonical_runtime_input_type(requested: str, l0_input_type: str = "") -> str: |
| """用 L0 deterministic 分類修正 UI/API 預設值,避免 code 被當 package。""" |
| runtime = (requested or "pkg").strip().lower() |
| if runtime not in {"pkg", "code", "config", "injection", "sql_review"}: |
| runtime = "pkg" |
|
|
| l0_runtime = _L0_TO_RUNTIME_INPUT_TYPE.get(str(l0_input_type or "").strip().lower(), "") |
| if runtime == "pkg" and l0_runtime in {"code", "config", "injection", "sql_review"}: |
| return l0_runtime |
| return runtime |
|
|
|
|
| def _constrain_task_plan_by_input_type(task_plan: dict[str, Any], input_type: str) -> dict[str, Any]: |
| """L0 類型是硬約束;Orchestrator 不可把 source code 誤路由成 Path C/A。""" |
| plan = dict(task_plan or {}) |
| original_path = plan.get("path", "B") |
|
|
| if input_type in {"code", "injection"} and original_path != "B": |
| plan["path"] = "B" |
| plan["_route_corrected"] = { |
| "from": original_path, |
| "to": "B", |
| "reason": f"l0_input_type={input_type}", |
| } |
| elif input_type == "sql_review": |
| if original_path != "C": |
| plan["_route_corrected"] = { |
| "from": original_path, |
| "to": "C", |
| "reason": "l0_input_type=sql_review", |
| } |
| plan["path"] = "C" |
| plan["parallel_layer1"] = [] |
| plan["fusion_after_discovery"] = False |
| plan["agents_to_run"] = ["sql_review"] |
| elif input_type == "config" and original_path != "C": |
| plan["path"] = "C" |
| plan["_route_corrected"] = { |
| "from": original_path, |
| "to": "C", |
| "reason": "l0_input_type=config", |
| } |
|
|
| return _normalize_pipeline_task_plan(plan) |
|
|
|
|
| def _load_dim11_expected_config() -> dict[str, Any]: |
| """載入 DIM11 benchmark gate;一般掃描沒有此檔或沒有 exact match 時不啟用。""" |
| expected_path = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam" / "expected_results.json" |
| try: |
| return json.loads(expected_path.read_text(encoding="utf-8")) |
| except (OSError, json.JSONDecodeError, ValueError) as exc: |
| logger.debug("[DIM11] benchmark config unavailable: %s", exc) |
| return {} |
|
|
|
|
| def _build_dim11_benchmark_context( |
| source: str, |
| input_type: str, |
| task_plan: dict[str, Any], |
| code_patterns: list[dict[str, Any]], |
| scout_output: dict[str, Any], |
| extracted_packages: list[str], |
| ) -> dict[str, Any] | None: |
| """只對 DIM11 fixture exact match 啟用 expected-CWE recall challenge。""" |
| if input_type != "code": |
| return None |
|
|
| config = _load_dim11_expected_config() |
| fixture_dir = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam" |
| observed_cwes = sorted({ |
| str(pattern.get("cwe_id", "")) |
| for pattern in code_patterns |
| if str(pattern.get("cwe_id", "")).startswith("CWE-") |
| }) |
|
|
| for case in config.get("fixtures", []): |
| expected_cwes = case.get("expected_cwe_categories") |
| if not expected_cwes or case.get("exclude_from_cwe_recall"): |
| continue |
|
|
| fixture_name = str(case.get("fixture", "")) |
| try: |
| fixture_source = (fixture_dir / fixture_name).read_text(encoding="utf-8", errors="replace") |
| except OSError: |
| continue |
|
|
| if source != fixture_source: |
| continue |
|
|
| external_findings = scout_output.get("vulnerabilities", []) |
| external_pollution_count = 0 |
| if case.get("external_requires_package_evidence") and external_findings and not extracted_packages: |
| external_pollution_count = len(external_findings) |
|
|
| expected_path = str(case.get("expected_path", "")) |
| actual_path = str(task_plan.get("path", "")) |
| return { |
| "benchmark": "dim11_redteam", |
| "fixture": fixture_name, |
| "expected_cwe_categories": sorted(set(expected_cwes)), |
| "observed_cwe_categories": observed_cwes, |
| "min_category_recall": float(case.get("min_category_recall", 0.7)), |
| "route_correct": actual_path == expected_path, |
| "expected_path": expected_path, |
| "actual_path": actual_path, |
| "external_pollution_count": external_pollution_count, |
| "activation": "exact_fixture_match", |
| } |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
|
|
| def stage_scout( |
| tech_stack: str, |
| input_type: str = "pkg", |
| intel_fusion_result: dict[str, Any] | None = None, |
| ) -> tuple[dict[str, Any], StepLogger]: |
| """ |
| Stage 1: Scout Agent 萄瞍瘣 |
| 雿輻 agents/scout.py 撖血祕雿 |
| Graceful Degradation: 憭望喟征鞈嚗霈敺蝥 Agent 仿舐洵銝甈~ |
| |
| v3.7: input_type 瘙箏頛芸 Skill SOP (Path-Aware Skills) |
| |
| Returns: |
| (result_dict, step_logger) 頛詨箏亥餈質馱 |
| """ |
| from agents.scout import run_scout_pipeline |
|
|
| sl = StepLogger("scout") |
| sl.log("INIT", "RUNNING", f"tech_stack={tech_stack} | input_type={input_type}") |
|
|
| t0 = time.time() |
| try: |
| result = run_scout_pipeline( |
| tech_stack, |
| input_type=input_type, |
| intel_fusion_result=intel_fusion_result, |
| ) |
|
|
| duration_ms = int((time.time() - t0) * 1000) |
| vuln_count = len(result.get("vulnerabilities", [])) |
| sl.log( |
| "COMPLETE", "SUCCESS", f"found {vuln_count} vulnerabilities", duration_ms |
| ) |
| return result, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| |
| is_rate_limit = "429" in str(e) or "rate limit" in str(e).lower() |
| if is_rate_limit: |
| logger.warning("[SCOUT] Rate limited returning empty results (not a real failure)") |
| sl.log("COMPLETE", "RATE_LIMITED", str(e)[:100], duration_ms) |
| else: |
| logger.error("Scout Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Scout", str(e)) |
| |
| return { |
| "scan_id": f"scan_degraded_{int(time.time())}", |
| "vulnerabilities": [], |
| "summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}, |
| "_degraded": not is_rate_limit, |
| "_error": str(e), |
| }, sl |
|
|
|
|
| def stage_intel_fusion( |
| discovery_input: dict[str, Any] | list[str] | str, |
| on_progress: Any = None, |
| ) -> tuple[dict[str, Any], StepLogger]: |
| """ |
| Stage: Intel Fusion runs after Scout and Security Guard discovery. |
| It receives normalized findings instead of raw source code. |
| """ |
| from agents.intel_fusion import run_intel_fusion |
|
|
| sl = StepLogger("intel_fusion") |
| sl.log("INIT", "RUNNING", "post-discovery fusion") |
| t0 = time.time() |
|
|
| try: |
| result = run_intel_fusion(discovery_input, on_progress) |
| duration_ms = int((time.time() - t0) * 1000) |
| result.setdefault("_duration_ms", duration_ms) |
| fusion_count = len(result.get("fusion_results", [])) |
| sl.log("COMPLETE", "SUCCESS", f"scored {fusion_count} findings", duration_ms) |
| return result, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| logger.error("Intel Fusion Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Intel Fusion", str(e)) |
| return { |
| "fusion_results": [], |
| "strategy_applied": "degraded", |
| "api_health_summary": {}, |
| "_degraded": True, |
| "_error": str(e), |
| "_duration_ms": duration_ms, |
| }, sl |
|
|
|
|
| |
| |
| |
|
|
|
|
| def stage_analyst( |
| scout_output: dict[str, Any], |
| input_type: str = "pkg", |
| ) -> tuple[dict[str, Any], StepLogger]: |
| """ |
| Stage 2: Analyst Agent vulnerability chain analysis. |
| v3.7: input_type selects path-aware chain analysis skill. |
| |
| Returns: |
| (result_dict, step_logger) |
| """ |
| from agents.analyst import run_analyst_pipeline |
|
|
| sl = StepLogger("analyst") |
| sl.log( |
| "INIT", "RUNNING", |
| f"input_vulns={len(scout_output.get('vulnerabilities', []))} | input_type={input_type}" |
| ) |
|
|
| t0 = time.time() |
| try: |
| result = run_analyst_pipeline(scout_output, input_type=input_type) |
| duration_ms = int((time.time() - t0) * 1000) |
| risk = result.get("risk_score", 0) |
| sl.log("COMPLETE", "SUCCESS", f"risk_score={risk}", duration_ms) |
| return result, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| logger.error("Analyst Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Analyst", str(e)) |
| |
| vulns = scout_output.get("vulnerabilities", []) |
| analysis = [ |
| { |
| "cve_id": v.get("cve_id", "UNKNOWN"), |
| "original_cvss": v.get("cvss_score", 0), |
| "adjusted_risk": v.get("severity", "UNKNOWN"), |
| "in_cisa_kev": False, |
| "exploit_available": False, |
| "chain_risk": { |
| "is_chain": False, |
| "chain_with": [], |
| "chain_description": "chain_analysis: SKIPPED", |
| "confidence": "NEEDS_VERIFICATION", |
| }, |
| "reasoning": "Analyst degraded - chain analysis skipped.", |
| } |
| for v in vulns |
| ] |
|
|
| |
| code_patterns = scout_output.get("code_patterns", []) |
| for cp in code_patterns: |
| sev = cp.get("severity", "MEDIUM") |
| cvss_equiv = 9.0 if sev == "CRITICAL" else (7.5 if sev == "HIGH" else 5.0) |
| analysis.append({ |
| "finding_id": cp.get("finding_id", "CODE-000"), |
| "cve_id": None, |
| "pattern_type": cp.get("pattern_type", "UNKNOWN"), |
| "cwe_id": cp.get("cwe_id", "CWE-unknown"), |
| "owasp_category": cp.get("owasp_category", ""), |
| "severity": sev, |
| "snippet": cp.get("snippet", ""), |
| "line_no": cp.get("line_no", 0), |
| "original_cvss": cvss_equiv, |
| "adjusted_risk": sev, |
| "in_cisa_kev": False, |
| "exploit_available": False, |
| "chain_risk": { |
| "is_chain": False, |
| "chain_with": [], |
| "chain_description": "Analyst degraded - deterministic pattern only.", |
| "confidence": "HIGH", |
| }, |
| "reasoning": f"Deterministic detection: {cp.get('pattern_type')} ({cp.get('cwe_id')}). Analyst degraded but code pattern is confirmed by Security Guard.", |
| }) |
| if code_patterns: |
| logger.info("[DEGRADED] Analyst fallback preserved %d code_patterns", len(code_patterns)) |
|
|
| return { |
| "scan_id": scout_output.get("scan_id", "unknown"), |
| "risk_score": 50, |
| "risk_trend": "+0", |
| "analysis": analysis, |
| "_degraded": True, |
| "_error": str(e), |
| }, sl |
|
|
|
|
| |
| |
| |
|
|
|
|
| def stage_critic( |
| analyst_output: dict[str, Any], |
| input_type: str = "pkg", |
| ) -> tuple[dict[str, Any], StepLogger]: |
| """ |
| Stage 3: Adversarial Debate Engine. |
| v5.3: 升級為 3 輪辯論引擎(Du et al. 2023 ICML, arXiv:2305.14325)。 |
| 無共識時由 Judge sub-agent 仲裁。偏保守原則:高估風險比低估安全。 |
| |
| Returns: |
| (result_dict, step_logger) |
| """ |
| from agents.debate_engine import run_debate_pipeline |
|
|
| sl = StepLogger("critic") |
| sl.log("INIT", "RUNNING", f"enable_critic={ENABLE_CRITIC} | input_type={input_type} | mode=3-round-debate") |
|
|
| t0 = time.time() |
| try: |
| result = run_debate_pipeline( |
| analyst_output, |
| input_type=input_type, |
| on_progress=None, |
| ) |
| duration_ms = int((time.time() - t0) * 1000) |
| verdict = result.get("verdict", "UNKNOWN") |
| score = result.get("weighted_score", 0) |
| meta = result.get("_debate_meta", {}) |
| consensus = meta.get("consensus", None) |
| rounds = meta.get("total_rounds", "?") |
| judge_invoked = meta.get("judge_invoked", False) |
| sl.log( |
| "COMPLETE", "SUCCESS", |
| f"verdict={verdict} score={score} consensus={consensus} rounds={rounds} judge={judge_invoked}", |
| duration_ms, |
| ) |
| return result, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| logger.error("Debate Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Critic", str(e)) |
| return { |
| "debate_rounds": 0, |
| "challenges": [], |
| "scorecard": { |
| "evidence": 0.6, |
| "chain_completeness": 0.6, |
| "critique_quality": 0.6, |
| "defense_quality": 0.6, |
| "calibration": 0.6, |
| }, |
| "weighted_score": 60.0, |
| "verdict": "MAINTAIN", |
| "reasoning": "Debate engine degraded - all rounds skipped.", |
| "_degraded": True, |
| "_error": str(e), |
| }, sl |
|
|
|
|
| def stage_advisor( |
| analyst_output: dict[str, Any], |
| critic_output: dict[str, Any], |
| input_type: str = "pkg", |
| ) -> tuple[dict[str, Any], StepLogger]: |
| """ |
| Stage 4: Advisor Agent action report generation. |
| v3.7: input_type selects path-aware action report skill. |
| |
| Returns: |
| (result_dict, step_logger) |
| """ |
| from agents.advisor import run_advisor_pipeline |
|
|
| sl = StepLogger("advisor") |
| verdict = critic_output.get("verdict", "SKIPPED") |
| sl.log("INIT", "RUNNING", f"critic_verdict={verdict} | input_type={input_type}") |
|
|
| |
| advisor_input = dict(analyst_output) |
| if verdict == "DOWNGRADE": |
| advisor_input["_critic_note"] = ( |
| f"Critic scored {critic_output.get('weighted_score', 0):.1f}/100. " |
| f"Challenges: {critic_output.get('challenges', [])}. " |
| "Use conservative risk assessment." |
| ) |
| logger.info( |
| "Critic verdict=DOWNGRADE: Advisor will use conservative assessment" |
| ) |
|
|
| t0 = time.time() |
| try: |
| result = run_advisor_pipeline(advisor_input) |
| duration_ms = int((time.time() - t0) * 1000) |
| risk = result.get("risk_score", 0) |
| urgent = len(result.get("actions", {}).get("urgent", [])) |
| sl.log("COMPLETE", "SUCCESS", f"risk_score={risk} urgent={urgent}", duration_ms) |
| return result, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| logger.error("Advisor Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Advisor", str(e)) |
|
|
| |
| |
| |
| urgent_actions = [] |
| important_actions = [] |
| code_patterns_fallback = [] |
| _SEVERITY_MAP = {"CRITICAL": "urgent", "HIGH": "important"} |
|
|
| for entry in advisor_input.get("analysis", []): |
| finding_id = entry.get("finding_id", "") |
| cve_id = entry.get("cve_id") |
| sev = entry.get("severity") or entry.get("adjusted_risk", "MEDIUM") |
| bucket = _SEVERITY_MAP.get(sev, "important") |
|
|
| |
| |
| if finding_id.startswith("CODE-") or entry.get("pattern_type"): |
| pt = entry.get("pattern_type", "UNKNOWN") |
| cwe = entry.get("cwe_id", "CWE-unknown") |
| snippet = entry.get("snippet", "") |
| code_patterns_fallback.append({ |
| "finding_id": finding_id, |
| "cve_id": None, |
| "cwe_id": cwe, |
| "pattern_type": pt, |
| "package": f"Custom Code Pattern", |
| "severity": sev, |
| "action": f"Fix {pt} vulnerability ({cwe}). {entry.get('reasoning', '')}", |
| "vulnerable_snippet": snippet[:200], |
| "reason": entry.get("reasoning", f"Security Guard deterministic detection: {pt}"), |
| "is_repeated": False, |
| "_constitution_note": "CODE-pattern: not in URGENT per CI-1/CI-2. See code_patterns_summary.", |
| }) |
| |
| elif cve_id and (str(cve_id).startswith("CVE-") or str(cve_id).startswith("GHSA-")): |
| action_entry = { |
| "cve_id": cve_id, |
| "package": entry.get("package", "unknown"), |
| "severity": sev, |
| "action": f"Review and patch {cve_id}.", |
| "command": f"# Investigate {cve_id}", |
| "reason": entry.get("reasoning", "Advisor degraded - manual review required."), |
| "is_repeated": False, |
| } |
| if bucket == "urgent": |
| urgent_actions.append(action_entry) |
| else: |
| important_actions.append(action_entry) |
|
|
| risk_score = max( |
| 50, |
| min(100, len(urgent_actions) * 25 + len(important_actions) * 10), |
| ) |
| summary_parts = [] |
| if urgent_actions: |
| summary_parts.append(f"{len(urgent_actions)} package CVE(s) require immediate patching") |
| if code_patterns_fallback: |
| summary_parts.append(f"{len(code_patterns_fallback)} code-level pattern(s) detected (see Code Analysis tab)") |
| if important_actions: |
| summary_parts.append(f"{len(important_actions)} high-severity CVE(s) need review within 72h") |
| if not summary_parts: |
| summary_parts.append("System degraded. Please review raw scan data manually.") |
|
|
| logger.info( |
| "[DEGRADED] Advisor fallback: %d urgent CVEs + %d important CVEs + %d code-patterns (not in URGENT per CI-1/CI-2)", |
| len(urgent_actions), len(important_actions), len(code_patterns_fallback), |
| ) |
|
|
| return { |
| "executive_summary": ". ".join(summary_parts), |
| "actions": { |
| "urgent": urgent_actions, |
| "important": important_actions, |
| "resolved": [], |
| }, |
| "code_patterns_summary": code_patterns_fallback, |
| "risk_score": risk_score, |
| "risk_trend": "+0", |
| "scan_count": 1, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "_degraded": True, |
| "_error": str(e), |
| }, sl |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
| |
| |
|
|
|
|
| def stage_orchestrator( |
| tech_stack: str, |
| feedback_from_judge: dict | None = None, |
| ) -> tuple[Any, dict, StepLogger]: |
| """ |
| Stage 0: Orchestrator 瘙箏頝臬嚗A/B/C/D嚗 |
| (OrchestrationContext, task_plan, StepLogger) |
| Graceful Degradation: 憭望唾楝敺 B嚗摰渡撘蝣潘雿箏券閮准 |
| """ |
| from agents.orchestrator import run_orchestration |
|
|
| sl = StepLogger("orchestrator") |
| sl.log("INIT", "RUNNING", f"tech_stack={tech_stack[:60]}") |
| t0 = time.time() |
|
|
| try: |
| ctx, task_plan = run_orchestration( |
| user_input=tech_stack, |
| feedback_from_judge=feedback_from_judge, |
| ) |
| task_plan = _normalize_pipeline_task_plan(task_plan) |
| duration_ms = int((time.time() - t0) * 1000) |
| scan_path = task_plan.get("path", "B") |
| sl.log("COMPLETE", "SUCCESS", f"path={scan_path}", duration_ms) |
| logger.info("[ORCH] Scan path: %s | plan: %s", scan_path, task_plan.get("agents_to_run", [])) |
| return ctx, task_plan, sl |
| except Exception as e: |
| duration_ms = int((time.time() - t0) * 1000) |
| logger.error("Orchestrator Stage failed: %s", e) |
| sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) |
| degradation_status.degrade("Orchestrator", str(e)) |
| |
| from agents.orchestrator import OrchestrationContext, ScanPath |
| ctx = OrchestrationContext() |
| ctx.scan_path = ScanPath.FULL_CODE |
| task_plan = { |
| "path": "B", |
| "parallel_layer1": ["security_guard", "scout"], |
| "fusion_after_discovery": True, |
| "debate_cluster": True, |
| "judge": True, |
| "agents_to_run": ["security_guard", "scout", "intel_fusion", "analyst", "debate", "judge"], |
| "_degraded": True, |
| } |
| return ctx, task_plan, sl |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _build_code_patterns_summary(sg_result: dict) -> list[dict]: |
| """ |
| Security Guard patterns + hardcoded secrets を |
| CWE-enriched code_patterns_summary に変換する。 |
| |
| 確定性抽出 + MITRE CWE v4.14 佐証注入。 |
| LLM に依存しない。 |
| """ |
| from tools.cwe_registry import build_cwe_reference, get_pattern_meta, severity_rank |
|
|
| def _attach_cwe_reference(entry: dict, cwe_id: str) -> dict: |
| """將 MITRE CWE 官方定義注入 entry 的 cwe_reference 欄位""" |
| cwe_reference = build_cwe_reference(cwe_id) |
| if cwe_reference: |
| entry["cwe_reference"] = cwe_reference |
| return entry |
|
|
| |
| def _normalized_line_no(item: dict) -> int: |
| """Normalize scanner line metadata to the line_no contract.""" |
| for key in ("line_no", "line", "source_line"): |
| value = item.get(key) |
| try: |
| line_no = int(value) |
| except (TypeError, ValueError): |
| continue |
| if line_no > 0: |
| return line_no |
| return 0 |
|
|
| _dedup: dict[tuple, dict] = {} |
| for p in sg_result.get("patterns", []): |
| pt = p.get("pattern_type", "UNKNOWN") |
| sev = get_pattern_meta(pt).severity |
| key = (_normalized_line_no(p), str(p.get("snippet", ""))[:30]) |
| existing = _dedup.get(key) |
| if existing is None: |
| _dedup[key] = p |
| else: |
| ex_pt = existing.get("pattern_type", "UNKNOWN") |
| ex_sev = get_pattern_meta(ex_pt).severity |
| if severity_rank(sev) > severity_rank(ex_sev): |
| _dedup[key] = p |
|
|
| code_patterns: list[dict] = [] |
| counter = 1 |
|
|
| |
| for p in _dedup.values(): |
| pt = p.get("pattern_type", "UNKNOWN") |
| meta = get_pattern_meta(pt) |
| cwe = meta.cwe_id |
| owasp = meta.owasp_category |
| severity = meta.severity |
| entry = { |
| "finding_id": f"CODE-{counter:03d}", |
| "type": "code_pattern", |
| "pattern_type": pt, |
| "cwe_id": cwe, |
| "owasp_category": owasp, |
| "severity": severity, |
| "snippet": str(p.get("snippet", ""))[:200], |
| "line_no": _normalized_line_no(p), |
| "language": sg_result.get("language", "unknown"), |
| "canonical_cwe_id": cwe, |
| "weakness_family": meta.weakness_family, |
| "evidence_type": meta.evidence_type, |
| "coverage_level": p.get("coverage_level", "pattern"), |
| "confidence": p.get("confidence", "MEDIUM"), |
| } |
| entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner" |
| entry = _attach_cwe_reference(entry, cwe) |
| code_patterns.append(entry) |
| counter += 1 |
|
|
| |
| for h in sg_result.get("hardcoded", []): |
| entry = { |
| "finding_id": f"CODE-{counter:03d}", |
| "type": "hardcoded_secret", |
| "pattern_type": "HARDCODED_SECRET", |
| "cwe_id": "CWE-798", |
| "owasp_category": "A07:2021-Identification and Authentication Failures", |
| "severity": "HIGH", |
| "snippet": f"{h.get('name', 'secret')} = '****' (value redacted)", |
| "line_no": _normalized_line_no(h), |
| "language": sg_result.get("language", "unknown"), |
| "canonical_cwe_id": "CWE-798", |
| "weakness_family": "hardcoded_secret", |
| "evidence_type": "code_scan", |
| "coverage_level": h.get("coverage_level", "pattern"), |
| "confidence": h.get("confidence", "HIGH"), |
| } |
| entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner" |
| entry = _attach_cwe_reference(entry, "CWE-798") |
| code_patterns.append(entry) |
| counter += 1 |
|
|
| logger.info( |
| "[CODE_PATTERNS] Built %d CWE-enriched patterns (from %d raw patterns, %d hardcoded)", |
| len(code_patterns), |
| len(sg_result.get("patterns", [])), |
| len(sg_result.get("hardcoded", [])), |
| ) |
| return code_patterns |
|
|
|
|
| def _summarize_vulnerabilities(vulnerabilities: list[dict[str, Any]]) -> dict[str, int]: |
| """根據漏洞清單建立前端可直接使用的摘要數字。""" |
| summary = { |
| "total": 0, |
| "critical": 0, |
| "high": 0, |
| "medium": 0, |
| "low": 0, |
| "new": 0, |
| "new_since_last_scan": 0, |
| } |
|
|
| for vuln in vulnerabilities: |
| summary["total"] += 1 |
| severity = str(vuln.get("severity", "LOW")).upper() |
| if severity == "CRITICAL": |
| summary["critical"] += 1 |
| elif severity == "HIGH": |
| summary["high"] += 1 |
| elif severity == "MEDIUM": |
| summary["medium"] += 1 |
| else: |
| summary["low"] += 1 |
|
|
| if vuln.get("is_new"): |
| summary["new"] += 1 |
| summary["new_since_last_scan"] += 1 |
|
|
| return summary |
|
|
|
|
| def _build_scan_report_payload( |
| scout_output: dict[str, Any], |
| advisor_output: dict[str, Any], |
| layer1_results: dict[str, Any], |
| ) -> dict[str, Any]: |
| """整理本次 scan 專屬的漏洞明細,避免 UI 回頭讀取全域記憶。""" |
| intel_applied = scout_output.get("_intel_fusion_applied", {}) |
| vulnerabilities: list[dict[str, Any]] = [] |
|
|
| for vuln in scout_output.get("vulnerabilities", []): |
| entry = dict(vuln) |
| entry.setdefault("source", "SCOUT") |
|
|
| enriched_by = list(entry.get("enriched_by", [])) |
| if intel_applied and entry.get("source") != "INTEL_FUSION": |
| if any( |
| field in entry |
| for field in ( |
| "composite_score", |
| "epss_score", |
| "ghsa_severity", |
| "otx_threat", |
| "intel_confidence", |
| "dimensions_used", |
| "weights_used", |
| "in_cisa_kev", |
| ) |
| ): |
| if "INTEL_FUSION" not in enriched_by: |
| enriched_by.append("INTEL_FUSION") |
|
|
| if enriched_by: |
| entry["enriched_by"] = enriched_by |
| vulnerabilities.append(entry) |
|
|
| detail_source = "scout_final_output" |
| if vulnerabilities: |
| summary = dict(scout_output.get("summary", {})) |
| summary.setdefault("new", summary.get("new_since_last_scan", 0)) |
| summary.setdefault("new_since_last_scan", summary.get("new", 0)) |
| else: |
| detail_source = "advisor_actions_fallback" |
| for level in ("urgent", "important", "resolved"): |
| for item in advisor_output.get("actions", {}).get(level, []): |
| vulnerabilities.append({ |
| "cve_id": item.get("cve_id", ""), |
| "package": item.get("package", "unknown"), |
| "cvss_score": item.get("cvss_score", 0), |
| "severity": item.get("severity", "MEDIUM"), |
| "description": item.get("action", ""), |
| "is_new": item.get("is_new", False), |
| "source": "ADVISOR_ACTIONS", |
| "report_level": level.upper(), |
| }) |
| summary = _summarize_vulnerabilities(vulnerabilities) |
|
|
| layer1_agents = [ |
| name for name in ("security_guard", "scout", "intel_fusion") |
| if name in layer1_results |
| ] |
| degraded_agents = [ |
| name for name in layer1_agents |
| if isinstance(layer1_results.get(name), dict) and layer1_results[name].get("_degraded") |
| ] |
| if not layer1_agents: |
| layer1_state = "skipped" |
| elif degraded_agents: |
| layer1_state = "degraded" |
| else: |
| layer1_state = "merged" |
|
|
| report_sources: dict[str, Any] = { |
| "vulnerability_detail": detail_source, |
| "enriched_by": ["intel_fusion"] if intel_applied else [], |
| "fallbacks": ["advisor_actions"] if detail_source == "advisor_actions_fallback" else [], |
| "layer1_state": layer1_state, |
| } |
| if degraded_agents: |
| report_sources["degraded_agents"] = degraded_agents |
| if intel_applied: |
| report_sources["intel_fusion_applied"] = intel_applied |
| if scout_output.get("representative_cve_evidence"): |
| report_sources["representative_cve_evidence_count"] = len( |
| scout_output.get("representative_cve_evidence", []) |
| ) |
|
|
| return { |
| "vulnerability_detail": vulnerabilities, |
| "vulnerability_summary": summary, |
| "report_sources": report_sources, |
| "representative_cve_evidence": scout_output.get("representative_cve_evidence", []), |
| } |
|
|
| def _run_layer1_parallel( |
| tech_stack: str, |
| task_plan: dict, |
| on_progress: Any, |
| input_type: str = "pkg", |
| ) -> dict[str, Any]: |
| """ |
| 頝臬 B Layer 1 銝西瑁嚗雿輻 ThreadPoolExecutor 撠孵嚗 |
| |
| Why ThreadPoolExecutor嚗 asyncio嚗嚗 |
| - main.py 臬甇亦撘蝣潘SSE callback 銋臬甇亦 |
| - ThreadPoolExecutor 臬典甇亦啣銝銝西瑁憭 IO-bound 撌乩嚗LLM 澆恬 |
| - 撠孵嚗銝閬瑽 callback 璈 |
| |
| 銝西瑁撠鞊∴閬 task_plan 瘙箏嚗嚗 |
| - security_guard嚗敺蝔撘蝣潭賢/憟隞/璅∪嚗頛詨伐tech_stack嚗 |
| - intel_fusion嚗剔雁望亥岷嚗頛詨伐tech_stack嚗 |
| |
| Args: |
| tech_stack: 雿輻刻頛詨 |
| task_plan: Orchestrator 隞餃閬 |
| on_progress: SSE 脣漲隤 |
| |
| Returns: |
| {"security_guard": dict, "intel_fusion": dict} |
| """ |
| parallel_agents = task_plan.get("parallel_layer1", []) |
| layer1_results: dict[str, Any] = {} |
|
|
| def _run_security_guard() -> tuple[str, dict]: |
| """ Thread 銝剖瑁 Security Guard嚗 LLM 嚗""" |
| try: |
| from agents.security_guard import run_security_guard |
| result = run_security_guard(tech_stack, on_progress) |
| return ("security_guard", result) |
| except Exception as e: |
| logger.error("[LAYER1] Security Guard failed: %s", e) |
| degradation_status.degrade("Security Guard", str(e)) |
| return ("security_guard", { |
| "extraction_status": "degraded", |
| "functions": [], "imports": [], "patterns": [], "hardcoded": [], |
| "stats": {"total_lines": 0, "functions_found": 0, "patterns_found": 0}, |
| "_degraded": True, "_error": str(e), |
| }) |
|
|
| def _run_intel_fusion(intel_input: list | str) -> tuple[str, dict]: |
| """ Thread 銝剖瑁 Intel Fusion嚗剔雁望亥岷嚗 |
| |
| v3.4 靽桀儔嚗亙憟隞嗅蝔勗銵剁憪蝔撘蝣潘嚗閫瘙 0 CVE 憿 |
| intel_input 臭誑荔 |
| - list[str]嚗憟隞嗅蝔勗銵剁 package_extractor 敺喳伐 |
| - str嚗憪 tech_stack嚗fallback嚗Path A 憟隞嗆格芋撘嚗 |
| """ |
| try: |
| from agents.intel_fusion import run_intel_fusion |
| result = run_intel_fusion(intel_input, on_progress) |
| return ("intel_fusion", result) |
| except Exception as e: |
| logger.error("[LAYER1] Intel Fusion failed: %s", e) |
| degradation_status.degrade("Intel Fusion", str(e)) |
| return ("intel_fusion", { |
| "fusion_results": [], |
| "strategy_applied": "degraded", |
| "api_health_summary": {}, |
| "_degraded": True, "_error": str(e), |
| }) |
|
|
| def _run_scout() -> tuple[str, dict]: |
| """Run Scout in parallel with Security Guard during discovery.""" |
| try: |
| scout_target = tech_stack |
| scout_input_type = input_type |
| local_packages: list[str] = [] |
|
|
| if input_type in {"code", "injection", "config"}: |
| from agents.security_guard import extract_code_surface |
| from tools.package_extractor import ( |
| format_packages_for_intel_fusion, |
| packages_from_security_guard, |
| ) |
|
|
| local_packages = packages_from_security_guard(extract_code_surface(tech_stack)) |
| if not local_packages: |
| return ("scout", { |
| "scan_id": f"scan_code_only_{int(time.time())}", |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "tech_stack": [], |
| "vulnerabilities": [], |
| "summary": { |
| "total": 0, |
| "new_since_last_scan": 0, |
| "critical": 0, |
| "high": 0, |
| "medium": 0, |
| "low": 0, |
| }, |
| "_duration_ms": 0, |
| "_package_scan_skipped": True, |
| "_skip_reason": "source_code_without_third_party_packages", |
| "_extracted_packages_local": [], |
| }) |
|
|
| scout_target = format_packages_for_intel_fusion(local_packages) |
| scout_input_type = "pkg" |
|
|
| result, scout_sl = stage_scout(scout_target, input_type=scout_input_type) |
| if scout_sl.steps: |
| result["_duration_ms"] = scout_sl.steps[-1].get("duration_ms", 0) |
| if local_packages: |
| result["_extracted_packages_local"] = local_packages |
| result["_scout_target_input"] = scout_target |
| result["_scout_input_type"] = scout_input_type |
| return ("scout", result) |
| except Exception as e: |
| logger.error("[LAYER1] Scout failed: %s", e) |
| degradation_status.degrade("Scout", str(e)) |
| return ("scout", { |
| "scan_id": f"scan_degraded_{int(time.time())}", |
| "vulnerabilities": [], |
| "summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}, |
| "_degraded": True, "_error": str(e), |
| }) |
|
|
| def _extract_packages_from_sg_result(sg_result: dict[str, Any]) -> list[str]: |
| """從 Security Guard 結果萃取第三方套件,供後續 Scout 使用。""" |
| try: |
| from tools.package_extractor import packages_from_security_guard |
|
|
| extracted = packages_from_security_guard(sg_result) |
| logger.info( |
| "[LAYER1] PackageExtractor: %d packages extracted from imports: %s", |
| len(extracted), extracted, |
| ) |
| return extracted |
| except Exception as pe: |
| logger.warning("[LAYER1] PackageExtractor failed (fallback to raw input): %s", pe) |
| return [] |
|
|
| def _extract_cwe_targets_from_sg_result(sg_result: dict[str, Any]) -> list[str]: |
| """從 Security Guard patterns 中整理 CWE targets,供 observability 與 fallback 使用。""" |
| sg_patterns = sg_result.get("patterns", []) |
| seen_cwe: set[str] = set() |
| cwe_targets: list[str] = [] |
| for pattern in sg_patterns: |
| cwe = pattern.get("cwe_id", "") |
| if cwe and cwe.startswith("CWE-") and cwe not in seen_cwe: |
| seen_cwe.add(cwe) |
| cwe_targets.append(cwe) |
| return cwe_targets |
|
|
| extracted_packages: list[str] = [] |
| cwe_targets: list[str] = [] |
| requested_workers = [ |
| agent_name |
| for agent_name in ("security_guard", "scout") |
| if agent_name in parallel_agents |
| ] |
|
|
| if not requested_workers: |
| return layer1_results |
|
|
| futures: dict[Any, str] = {} |
| max_workers = len(requested_workers) |
|
|
| with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="layer1") as executor: |
| if "security_guard" in requested_workers: |
| rate_limiter.wait_if_needed("security_guard") |
| futures[executor.submit(_run_security_guard)] = "security_guard" |
|
|
| if "scout" in requested_workers: |
| rate_limiter.wait_if_needed("scout") |
| if on_progress: |
| on_progress("scout", "RUNNING", {"step": "discovery_parallel"}) |
| futures[executor.submit(_run_scout)] = "scout" |
|
|
| for future in as_completed(futures): |
| agent_name = futures[future] |
| try: |
| result_name, result_payload = future.result() |
| except Exception as exc: |
| logger.error("[LAYER1] %s future failed unexpectedly: %s", agent_name, exc) |
| degradation_status.degrade(agent_name, str(exc)) |
| continue |
|
|
| layer1_results[result_name] = result_payload |
| logger.info("[LAYER1] %s complete", result_name) |
|
|
| if result_name == "security_guard": |
| sg_extracted = _extract_packages_from_sg_result(result_payload) |
| if sg_extracted or not extracted_packages: |
| extracted_packages = sg_extracted |
| cwe_targets = _extract_cwe_targets_from_sg_result(result_payload) |
| if cwe_targets: |
| logger.info( |
| "[LAYER1] Security Guard produced %d CWE targets: %s", |
| len(cwe_targets), cwe_targets, |
| ) |
| elif result_name == "scout" and result_payload.get("_extracted_packages_local") and not extracted_packages: |
| extracted_packages = list(result_payload.get("_extracted_packages_local") or []) |
|
|
| layer1_results["_extracted_packages"] = extracted_packages |
| layer1_results["_cwe_targets"] = cwe_targets |
|
|
| return layer1_results |
|
|
|
|
| def _build_intel_fusion_input( |
| scout_output: dict[str, Any], |
| sg_result: dict[str, Any] | None, |
| code_patterns: list[dict[str, Any]], |
| extracted_packages: list[str], |
| tech_stack: str, |
| input_type: str, |
| ) -> dict[str, Any]: |
| """Build a clean post-discovery input for Intel Fusion.""" |
| cve_ids: list[str] = [] |
| seen_cves: set[str] = set() |
| for vuln in scout_output.get("vulnerabilities", []): |
| cve_id = str(vuln.get("cve_id", "")).strip() |
| if (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")) and cve_id not in seen_cves: |
| seen_cves.add(cve_id) |
| cve_ids.append(cve_id) |
|
|
| cwe_ids: list[str] = [] |
| seen_cwes: set[str] = set() |
| for pattern in code_patterns: |
| cwe_id = str(pattern.get("cwe_id", "")).strip() |
| if cwe_id.startswith("CWE-") and cwe_id not in seen_cwes: |
| seen_cwes.add(cwe_id) |
| cwe_ids.append(cwe_id) |
|
|
| return { |
| "mode": "post_discovery", |
| "input_type": input_type, |
| "cve_ids": cve_ids, |
| "cwe_ids": cwe_ids, |
| "packages": extracted_packages, |
| "scout_vulnerabilities": scout_output.get("vulnerabilities", [])[:20], |
| "code_patterns": code_patterns[:20], |
| "security_guard_stats": (sg_result or {}).get("stats", {}), |
| "raw_input_fingerprint": str(hash(tech_stack)), |
| } |
|
|
|
|
| def _merge_intel_fusion_into_scout( |
| scout_output: dict[str, Any], |
| intel_fusion_result: dict[str, Any], |
| ) -> dict[str, Any]: |
| """Merge Intel Fusion evidence after Scout completes discovery.""" |
| if not intel_fusion_result: |
| return scout_output |
| try: |
| from agents.scout import merge_intel_fusion_evidence |
|
|
| return merge_intel_fusion_evidence(scout_output, intel_fusion_result) |
| except Exception as exc: |
| logger.warning("[PIPELINE] Intel Fusion merge skipped: %s", exc) |
| scout_output["intel_fusion_result"] = intel_fusion_result |
| return scout_output |
|
|
|
|
|
|
| def run_pipeline(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]: |
| """ |
| 瑁摰渡 ThreatHunter 蝞∠嚗v3.1 Orchestrator 撽嚗 |
| |
| Pipeline: Orchestrator [Layer 1 銝西] Scout Analyst [Critic] Advisor |
| |
| v3.7: input_type 瘙箏 Path-Aware Skills 頝舐晞 |
| pkg=憟隞嗆 / code=皞蝣澆祟閮 / injection=AI摰 / config=閮剖瑼 |
| |
| v3.9 Sandbox: SANDBOX_ENABLED=true 銝 Docker 舐剁典捆典批瑁 |
| |
| Args: |
| tech_stack: 雿輻刻頛詨亦銵摮銝莎憒 "Django 4.2, Redis 7.0"嚗 |
| input_type: 蝡臬菜葫頛詨仿 (pkg/code/config/injection) |
| |
| Returns: |
| 怠 Advisor 銵勗 dict嚗銝 pipeline_meta 甈雿 |
| """ |
| |
| if SANDBOX_ENABLED and _DOCKER_SANDBOX_OK and is_docker_available(): |
| logger.info("[SANDBOX] Docker isolation ACTIVE delegating to container") |
| result = run_in_sandbox(tech_stack=tech_stack, input_type=input_type) |
| if not result.get("fallback"): |
| return result |
| |
| logger.warning( |
| "[SANDBOX] Container fallback: %s using in-process mode", |
| result.get("error", "unknown"), |
| ) |
| |
| return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type) |
|
|
|
|
| def run_pipeline_sync(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]: |
| """ |
| run_pipeline 甇亙亙嚗靘 sandbox/sandbox_runner.py 典捆典批澆恬 |
| 瘜冽嚗摰孵典批澆急迨賢 SANDBOX_ENABLED=false嚗踹餈湧脣 Docker |
| """ |
| |
| return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type) |
|
|
|
|
| def _run_pipeline_with_callback_legacy_v31( |
| tech_stack: str, |
| progress_callback: Any = None, |
| input_type: str = "pkg", |
| ) -> dict[str, Any]: |
| """ |
| 瑁摰 Pipeline嚗瘥 Stage 摰敺澆 progress_callback |
| |
| v3.7: input_type 瘙箏 Agent 頛芸 Skill SOP嚗Path-Aware Skills嚗 |
| |
| Args: |
| tech_stack: 銵摮銝 |
| progress_callback: 交 (agent_name: str, status: str, detail: dict) 賢 |
| input_type: 頛詨仿 (pkg / code / config / injection) |
| |
| Returns: |
| 怠 Advisor 銵勗 dict |
| """ |
| pipeline_start = time.time() |
| completed_stages: list[str] = [] |
| stages_detail: dict[str, Any] = {} |
| |
| orch_ctx: Any = None |
| task_plan: dict = {"path": "B"} |
| layer1_results: dict[str, Any] = {} |
| feedback_loop_count: int = 0 |
| MAX_FEEDBACK_LOOPS: int = 2 |
|
|
| def _notify(agent: str, status: str, detail: dict) -> None: |
| if progress_callback: |
| try: |
| progress_callback(agent, status, detail) |
| except Exception: |
| pass |
|
|
| logger.info("=" * 60) |
| logger.info(" ThreatHunter Pipeline v3.1 Start (Orchestrator-driven)") |
| logger.info(" tech_stack : %s", tech_stack) |
| logger.info(" input_type : %s", input_type) |
| logger.info(" ENABLE_CRITIC: %s", ENABLE_CRITIC) |
| logger.info("=" * 60) |
|
|
| |
| from checkpoint import recorder |
| recorder.start_scan(f"pipe_{int(pipeline_start)}") |
|
|
| |
| |
| l0_report: dict[str, Any] = {} |
| sanitized_stack: str = tech_stack |
| try: |
| from input_sanitizer import sanitize_input, format_l0_report |
| san_result = sanitize_input(tech_stack) |
| l0_report = format_l0_report(san_result) |
|
|
| if not san_result.safe: |
| |
| logger.warning("[L0] Input BLOCKED: %s", san_result.blocked_reason) |
| _notify("input_sanitizer", "COMPLETE", { |
| "status": "BLOCKED", |
| "reason": san_result.blocked_reason, |
| }) |
| return { |
| "executive_summary": f"頛詨亥◤摰券瞈曉冽蝯嚗{san_result.blocked_reason}", |
| "blocked": True, |
| "actions": {"urgent": [], "important": [], "resolved": []}, |
| "risk_score": 0, |
| "risk_trend": "+0", |
| "pipeline_meta": { |
| "pipeline_version": "3.1", |
| "tech_stack": tech_stack, |
| "stages_completed": 0, |
| "stages_detail": {}, |
| "enable_critic": ENABLE_CRITIC, |
| "critic_verdict": "SKIPPED", |
| "critic_score": 0.0, |
| "duration_seconds": 0.0, |
| "degradation": {"level": 5, "label": "INPUT_BLOCKED"}, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "l0_report": l0_report, |
| }, |
| } |
|
|
| sanitized_stack = san_result.sanitized_input |
| if san_result.truncated: |
| logger.warning("[L0] Input truncated: %d %d chars", |
| san_result.original_length, len(sanitized_stack)) |
|
|
| logger.info("[L0] OK: type=%s l0_findings=%d hash=%s", |
| san_result.input_type, len(san_result.l0_findings), san_result.input_hash) |
| _notify("input_sanitizer", "COMPLETE", { |
| "status": "SUCCESS", |
| "input_type": san_result.input_type, |
| "l0_warning_count": l0_report.get("l0_warning_count", 0), |
| "truncated": san_result.truncated, |
| }) |
| recorder.stage_exit("input_sanitizer", "SUCCESS", { |
| "input_type": san_result.input_type, |
| "l0_warning_count": l0_report.get("l0_warning_count", 0), |
| }, 0) |
|
|
| except ImportError: |
| |
| logger.warning("[L0] input_sanitizer not available, skipping L0 filter") |
| except Exception as e: |
| |
| logger.error("[L0] Sanitizer error (non-fatal): %s", e) |
|
|
| |
| _notify("orchestrator", "RUNNING", {}) |
| orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack) |
| scan_path = task_plan.get("path", "B") |
| orch_detail = { |
| "status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED", |
| "scan_path": scan_path, |
| "agents_to_run": task_plan.get("agents_to_run", []), |
| "duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0, |
| "l0_input_type": l0_report.get("input_type", "unknown"), |
| } |
| stages_detail["orchestrator"] = orch_detail |
| completed_stages.append("orchestrator") |
| _notify("orchestrator", "COMPLETE", orch_detail) |
| recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]}) |
| recorder.stage_exit("orchestrator", orch_detail.get("status", "SUCCESS"), { |
| "scan_path": scan_path, |
| "agents_to_run": task_plan.get("agents_to_run", []), |
| }, orch_detail.get("duration_ms", 0)) |
| logger.info("[PIPELINE] Scan path: %s", scan_path) |
|
|
|
|
| |
| |
| |
| |
| extracted_packages: list[str] = [] |
|
|
| if scan_path in ("A", "B"): |
| parallel_agents = task_plan.get("parallel_layer1", []) |
| if parallel_agents: |
| _notify("layer1_parallel", "RUNNING", {"agents": parallel_agents}) |
| layer1_results = _run_layer1_parallel(tech_stack, task_plan, _notify) |
| _notify("layer1_parallel", "COMPLETE", { |
| "agents_completed": list(layer1_results.keys()), |
| }) |
| |
| if "intel_fusion" in layer1_results: |
| extracted_packages = layer1_results["intel_fusion"].get("_extracted_packages", []) |
| if extracted_packages: |
| logger.info( |
| "[PIPELINE] extracted_packages from layer1: %s", extracted_packages |
| ) |
| |
| for agent_name, result in layer1_results.items(): |
| is_degraded = result.get("_degraded", False) |
| agent_detail = { |
| "status": "DEGRADED" if is_degraded else "SUCCESS", |
| "duration_ms": result.get("_duration_ms", 0), |
| |
| "_degraded": is_degraded, |
| "_error": result.get("_error", "") if is_degraded else "", |
| } |
| |
| if agent_name == "security_guard": |
| agent_detail["functions_found"] = result.get("stats", {}).get("functions_found", 0) |
| agent_detail["patterns_found"] = result.get("stats", {}).get("patterns_found", 0) |
| agent_detail["injection_detected"] = result.get("injection_attempts_detected", False) |
| |
| elif agent_name == "intel_fusion": |
| agent_detail["cves_scored"] = len(result.get("fusion_results", [])) |
| stages_detail[agent_name] = agent_detail |
| completed_stages.append(agent_name) |
| recorder.stage_exit(agent_name, agent_detail["status"], agent_detail, agent_detail.get("duration_ms", 0)) |
| |
| if orch_ctx is not None: |
| for agent_name, result in layer1_results.items(): |
| try: |
| orch_ctx.store_result(agent_name, result) |
| except Exception: |
| pass |
|
|
| |
| |
| _notify("scout", "RUNNING", {}) |
| rate_limiter.wait_if_needed("scout") |
| |
| |
| scout_input: str |
| if extracted_packages: |
| from tools.package_extractor import format_packages_for_intel_fusion |
| scout_input = format_packages_for_intel_fusion(extracted_packages) |
| logger.info("[PIPELINE] Scout using extracted packages: %s", scout_input) |
| else: |
| scout_input = tech_stack |
| logger.info("[PIPELINE] Scout using raw tech_stack (no packages extracted)") |
| scout_output, scout_sl = stage_scout( |
| scout_input, |
| input_type=input_type, |
| intel_fusion_result=layer1_results.get("intel_fusion"), |
| ) |
| |
| if "intel_fusion" in layer1_results: |
| scout_output["intel_fusion_result"] = layer1_results["intel_fusion"] |
|
|
| |
| |
| |
| |
| |
| _sg_code_patterns: list[dict] = [] |
| if "security_guard" in layer1_results: |
| _sg_code_patterns = _build_code_patterns_summary( |
| layer1_results["security_guard"] |
| ) |
| if _sg_code_patterns: |
| scout_output["code_patterns"] = _sg_code_patterns |
| logger.info( |
| "[PIPELINE] v4.0: SG code_patterns injected into scout_output " |
| "(%d patterns, Path=%s) — will be merged into final result", |
| len(_sg_code_patterns), |
| input_type, |
| ) |
| else: |
| logger.debug("[PIPELINE] v4.0: SG returned no code_patterns (clean code path)") |
|
|
| scout_detail = { |
| "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", |
| "vuln_count": len(scout_output.get("vulnerabilities", [])), |
| "duration_ms": scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0, |
| "packages_used": extracted_packages, |
| } |
| stages_detail["scout"] = scout_detail |
| completed_stages.append("scout") |
| _notify("scout", "COMPLETE", scout_detail) |
| |
| from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP |
| recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages}, |
| skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"), |
| input_type=input_type) |
| recorder.stage_exit("scout", scout_detail.get("status", "SUCCESS"), scout_output, scout_detail.get("duration_ms", 0)) |
|
|
|
|
| |
| if scan_path == "C": |
| logger.info("[PIPELINE] Path C: skipping Analyst + Critic direct to Advisor") |
| _notify("analyst", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"}) |
| _notify("critic", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"}) |
| |
| analyst_output: dict[str, Any] = { |
| "scan_id": scout_output.get("scan_id", "unknown"), |
| "risk_score": 30, |
| "risk_trend": "+0", |
| "analysis": [], |
| "_skipped": True, |
| "_reason": "path_C_doc_scan", |
| } |
| critic_output: dict[str, Any] = { |
| "verdict": "SKIPPED", |
| "weighted_score": 60.0, |
| "_skipped": True, |
| } |
| else: |
| |
| _notify("analyst", "RUNNING", {}) |
| rate_limiter.wait_if_needed("analyst") |
| analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type) |
| analyst_detail = { |
| "status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED", |
| "risk_score": analyst_output.get("risk_score", 0), |
| "duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0, |
| } |
| stages_detail["analyst"] = analyst_detail |
| completed_stages.append("analyst") |
| _notify("analyst", "COMPLETE", analyst_detail) |
| |
| from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP |
| recorder.stage_enter("analyst", scout_output, |
| skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"), |
| input_type=input_type) |
| recorder.stage_exit("analyst", analyst_detail.get("status", "SUCCESS"), analyst_output, analyst_detail.get("duration_ms", 0)) |
|
|
| |
| _notify("critic", "RUNNING", {}) |
| rate_limiter.wait_if_needed("critic") |
| critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type) |
| critic_detail = { |
| "status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED", |
| "verdict": critic_output.get("verdict", "SKIPPED"), |
| "score": critic_output.get("weighted_score", 0), |
| "duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0, |
| } |
| stages_detail["critic"] = critic_detail |
| completed_stages.append("critic") |
| _notify("critic", "COMPLETE", critic_detail) |
| |
| from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP |
| recorder.stage_enter("critic", analyst_output, |
| skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"), |
| input_type=input_type) |
| recorder.stage_exit("critic", critic_detail.get("status", "SUCCESS"), critic_output, critic_detail.get("duration_ms", 0)) |
|
|
| |
| _notify("advisor", "RUNNING", {}) |
| rate_limiter.wait_if_needed("advisor") |
| advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type) |
| advisor_detail = { |
| "status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED", |
| "urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])), |
| "duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0, |
| } |
| stages_detail["advisor"] = advisor_detail |
| completed_stages.append("advisor") |
| _notify("advisor", "COMPLETE", advisor_detail) |
| |
| from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP |
| recorder.stage_enter("advisor", analyst_output, |
| skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"), |
| input_type=input_type) |
| recorder.stage_exit("advisor", advisor_detail.get("status", "SUCCESS"), advisor_output, advisor_detail.get("duration_ms", 0)) |
|
|
| |
| |
| advisor_confidence = advisor_output.get("confidence", "HIGH") |
| feedback_triggered = ( |
| advisor_confidence in ("NEEDS_VERIFICATION", "LOW", "MEDIUM") |
| and feedback_loop_count < MAX_FEEDBACK_LOOPS |
| and scan_path != "D" |
| ) |
|
|
| if feedback_triggered: |
| feedback_loop_count += 1 |
| logger.warning( |
| "[PIPELINE] Advisor confidence=%s triggering Feedback Loop %d/%d", |
| advisor_confidence, feedback_loop_count, MAX_FEEDBACK_LOOPS, |
| ) |
| _notify("feedback_loop", "RUNNING", { |
| "loop": feedback_loop_count, |
| "reason": f"confidence={advisor_confidence}", |
| }) |
| |
| low_conf_cves = [ |
| a.get("cve_id") for a in analyst_output.get("analysis", []) |
| if a.get("chain_risk", {}).get("confidence") in ("NEEDS_VERIFICATION", "LOW") |
| ] |
| logger.info("[PIPELINE] Feedback Loop targeting CVEs: %s", low_conf_cves) |
| |
| if orch_ctx is not None: |
| try: |
| orch_ctx.feedback_loops = feedback_loop_count |
| except Exception: |
| pass |
| _notify("feedback_loop", "COMPLETE", {"loop": feedback_loop_count, "cves": low_conf_cves}) |
|
|
| |
| orch_summary: dict = {} |
| if orch_ctx is not None: |
| try: |
| from agents.orchestrator import finalize_orchestration |
| orch_ctx.final_confidence = advisor_confidence |
| orch_summary = finalize_orchestration(orch_ctx) |
| except Exception as e: |
| logger.warning("[PIPELINE] finalize_orchestration failed: %s", e) |
|
|
| |
| duration = round(time.time() - pipeline_start, 2) |
| pipeline_meta = { |
| "pipeline_version": "3.1", |
| "tech_stack": tech_stack, |
| "scan_path": scan_path, |
| "stages_completed": len(completed_stages), |
| "stages_detail": stages_detail, |
| "enable_critic": ENABLE_CRITIC, |
| "critic_verdict": critic_output.get("verdict", "SKIPPED"), |
| "critic_score": critic_output.get("weighted_score", 0), |
| "feedback_loops": feedback_loop_count, |
| "duration_seconds": duration, |
| "degradation": degradation_status.to_dict(), |
| "orchestration": orch_summary, |
| "layer1_agents": list(layer1_results.keys()), |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| } |
|
|
| logger.info( |
| "Pipeline v3.1 COMPLETE in %.1fs | path=%s | feedback_loops=%d", |
| duration, scan_path, feedback_loop_count, |
| ) |
| recorder.end_scan("COMPLETE", duration) |
|
|
| |
| |
| |
| |
| report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results) |
| final_output: dict[str, Any] = { |
| **advisor_output, |
| **report_payload, |
| "pipeline_meta": pipeline_meta, |
| } |
| if _sg_code_patterns: |
| existing_cps = final_output.get("code_patterns_summary", []) |
| final_output["code_patterns_summary"] = existing_cps + _sg_code_patterns |
| logger.info( |
| "[PIPELINE] Harness L7: %d CWE-enriched code patterns merged into final result", |
| len(_sg_code_patterns), |
| ) |
| return final_output |
|
|
|
|
| def run_pipeline_with_callback( |
| tech_stack: str, |
| progress_callback: Any = None, |
| input_type: str = "pkg", |
| ) -> dict[str, Any]: |
| """ |
| Canonical pipeline DAG: |
| Orchestrator -> (Security Guard + Scout in parallel) -> Intel Fusion |
| -> Analyst -> Critic -> Advisor. |
| """ |
| pipeline_start = time.time() |
| completed_stages: list[str] = [] |
| stages_detail: dict[str, Any] = {} |
| layer1_results: dict[str, Any] = {} |
|
|
| def _notify(agent: str, status: str, detail: dict) -> None: |
| if not progress_callback: |
| return |
| try: |
| progress_callback(agent, status, detail) |
| except Exception as exc: |
| logger.debug("[PIPELINE] progress callback ignored: %s", exc) |
|
|
| def _remember_stage(agent: str, detail: dict[str, Any]) -> None: |
| stages_detail[agent] = detail |
| if agent not in completed_stages: |
| completed_stages.append(agent) |
|
|
| from checkpoint import recorder |
| recorder.start_scan(f"pipe_{int(pipeline_start)}") |
|
|
| l0_report: dict[str, Any] = {} |
| sanitized_stack = tech_stack |
| try: |
| from input_sanitizer import sanitize_input, format_l0_report |
|
|
| san_result = sanitize_input(tech_stack) |
| l0_report = format_l0_report(san_result) |
| if not san_result.safe: |
| _notify("input_sanitizer", "COMPLETE", { |
| "status": "BLOCKED", |
| "reason": san_result.blocked_reason, |
| }) |
| duration = round(time.time() - pipeline_start, 2) |
| return { |
| "executive_summary": f"Input blocked: {san_result.blocked_reason}", |
| "blocked": True, |
| "actions": {"urgent": [], "important": [], "resolved": []}, |
| "risk_score": 0, |
| "risk_trend": "+0", |
| "pipeline_meta": { |
| "pipeline_version": "4.0", |
| "tech_stack": tech_stack, |
| "scan_path": "BLOCKED", |
| "stages_completed": 0, |
| "stages_detail": {}, |
| "enable_critic": ENABLE_CRITIC, |
| "critic_verdict": "SKIPPED", |
| "critic_score": 0.0, |
| "duration_seconds": duration, |
| "degradation": {"level": 5, "label": "INPUT_BLOCKED"}, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| "l0_report": l0_report, |
| }, |
| } |
|
|
| sanitized_stack = san_result.sanitized_input |
| input_type = _canonical_runtime_input_type(input_type, san_result.input_type) |
| l0_detail = { |
| "status": "SUCCESS", |
| "input_type": san_result.input_type, |
| "pipeline_input_type": input_type, |
| "l0_warning_count": l0_report.get("l0_warning_count", 0), |
| "truncated": san_result.truncated, |
| } |
| _remember_stage("input_sanitizer", l0_detail) |
| _notify("input_sanitizer", "COMPLETE", l0_detail) |
| recorder.stage_exit("input_sanitizer", "SUCCESS", l0_detail, 0) |
| except Exception as exc: |
| logger.warning("[L0] Sanitizer skipped: %s", exc) |
|
|
| if input_type == "sql_review": |
| from tools.sql_syntax_reviewer import review_sql_syntax |
|
|
| _notify("sql_review", "RUNNING", {}) |
| review = review_sql_syntax(sanitized_stack) |
| sql_detail = { |
| "status": "SUCCESS", |
| "patterns_detected": review.get("summary", {}).get("patterns_detected", 0), |
| "findings_count": review.get("summary", {}).get("total", 0), |
| "requires_application_context": True, |
| } |
| _remember_stage("sql_review", sql_detail) |
| _notify("sql_review", "COMPLETE", sql_detail) |
| recorder.stage_exit("sql_review", "SUCCESS", sql_detail, 0) |
| duration = round(time.time() - pipeline_start, 2) |
| recorder.end_scan("COMPLETE", duration) |
| return { |
| "executive_summary": ( |
| "SQL syntax review completed. Findings are payload/syntax evidence only; " |
| "application source context is required before calling this a verified vulnerability." |
| ), |
| "actions": {"urgent": [], "important": [], "resolved": []}, |
| "risk_score": 0, |
| "risk_trend": "+0", |
| "sql_syntax_review": review, |
| "vulnerability_detail": [], |
| "vulnerability_summary": { |
| "total": 0, |
| "critical": 0, |
| "high": 0, |
| "medium": 0, |
| "low": 0, |
| "new": 0, |
| "new_since_last_scan": 0, |
| }, |
| "report_sources": { |
| "vulnerability_detail": "sql_syntax_review", |
| "layer1_state": "skipped", |
| "external_scan_skipped": True, |
| }, |
| "code_patterns_summary": [], |
| "pipeline_meta": { |
| "pipeline_version": "4.0", |
| "tech_stack": tech_stack, |
| "scan_path": "C", |
| "stages_completed": len(completed_stages), |
| "stages_detail": stages_detail, |
| "enable_critic": ENABLE_CRITIC, |
| "critic_verdict": "SKIPPED", |
| "critic_score": 0.0, |
| "feedback_loops": 0, |
| "duration_seconds": duration, |
| "degradation": degradation_status.to_dict(), |
| "orchestration": {}, |
| "layer1_agents": [], |
| "fusion_after_discovery": False, |
| "l0_report": l0_report, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| }, |
| } |
|
|
| _notify("orchestrator", "RUNNING", {}) |
| orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack) |
| task_plan = _constrain_task_plan_by_input_type(task_plan, input_type) |
| scan_path = task_plan.get("path", "B") |
| orch_detail = { |
| "status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED", |
| "scan_path": scan_path, |
| "agents_to_run": task_plan.get("agents_to_run", []), |
| "duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0, |
| "l0_input_type": l0_report.get("input_type", "unknown"), |
| "pipeline_input_type": input_type, |
| "route_corrected": task_plan.get("_route_corrected"), |
| } |
| _remember_stage("orchestrator", orch_detail) |
| _notify("orchestrator", "COMPLETE", orch_detail) |
| recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]}) |
| recorder.stage_exit("orchestrator", orch_detail["status"], orch_detail, orch_detail["duration_ms"]) |
|
|
| extracted_packages: list[str] = [] |
| scout_input = sanitized_stack |
| scout_output: dict[str, Any] = {} |
| scout_duration_ms = 0 |
| sg_code_patterns: list[dict[str, Any]] = [] |
| intel_fusion_result: dict[str, Any] = {} |
| benchmark_context: dict[str, Any] | None = None |
|
|
| parallel_agents = task_plan.get("parallel_layer1", []) |
| if parallel_agents: |
| _notify("layer1_parallel", "RUNNING", {"agents": parallel_agents}) |
| layer1_results = _run_layer1_parallel( |
| sanitized_stack, task_plan, _notify, input_type=input_type |
| ) |
| visible_agents = [name for name in layer1_results if not name.startswith("_")] |
| _notify("layer1_parallel", "COMPLETE", {"agents_completed": visible_agents}) |
|
|
| extracted_packages = layer1_results.get("_extracted_packages", []) |
| if "security_guard" in layer1_results: |
| sg_result = layer1_results["security_guard"] |
| sg_code_patterns = _build_code_patterns_summary(sg_result) |
| sg_detail = { |
| "status": "SUCCESS" if not sg_result.get("_degraded") else "DEGRADED", |
| "duration_ms": sg_result.get("_duration_ms", 0), |
| "functions_found": sg_result.get("stats", {}).get("functions_found", 0), |
| "patterns_found": sg_result.get("stats", {}).get("patterns_found", 0), |
| "injection_detected": sg_result.get("injection_attempts_detected", False), |
| "_degraded": sg_result.get("_degraded", False), |
| "_error": sg_result.get("_error", "") if sg_result.get("_degraded") else "", |
| } |
| _remember_stage("security_guard", sg_detail) |
| recorder.stage_exit("security_guard", sg_detail["status"], sg_detail, sg_detail["duration_ms"]) |
|
|
| if "scout" in layer1_results: |
| scout_output = layer1_results["scout"] |
| scout_duration_ms = scout_output.get("_duration_ms", 0) |
| scout_detail = { |
| "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", |
| "vuln_count": len(scout_output.get("vulnerabilities", [])), |
| "duration_ms": scout_duration_ms, |
| "packages_used": extracted_packages, |
| "_degraded": scout_output.get("_degraded", False), |
| "_error": scout_output.get("_error", "") if scout_output.get("_degraded") else "", |
| } |
| _remember_stage("scout", scout_detail) |
| _notify("scout", "COMPLETE", scout_detail) |
|
|
| if not scout_output: |
| _notify("scout", "RUNNING", {}) |
| rate_limiter.wait_if_needed("scout") |
| scout_output, scout_sl = stage_scout(scout_input, input_type=input_type) |
| scout_duration_ms = scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0 |
| scout_detail = { |
| "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", |
| "vuln_count": len(scout_output.get("vulnerabilities", [])), |
| "duration_ms": scout_duration_ms, |
| "packages_used": extracted_packages, |
| } |
| _remember_stage("scout", scout_detail) |
| _notify("scout", "COMPLETE", scout_detail) |
|
|
| if sg_code_patterns: |
| scout_output["code_patterns"] = sg_code_patterns |
|
|
| from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP |
| recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages}, |
| skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"), |
| input_type=input_type) |
| recorder.stage_exit("scout", stages_detail["scout"]["status"], scout_output, scout_duration_ms) |
|
|
| if task_plan.get("fusion_after_discovery", False): |
| intel_input = _build_intel_fusion_input( |
| scout_output=scout_output, |
| sg_result=layer1_results.get("security_guard"), |
| code_patterns=sg_code_patterns, |
| extracted_packages=extracted_packages, |
| tech_stack=sanitized_stack, |
| input_type=input_type, |
| ) |
| intel_fusion_result, intel_sl = stage_intel_fusion(intel_input, _notify) |
| layer1_results["intel_fusion"] = intel_fusion_result |
| scout_output["intel_fusion_result"] = intel_fusion_result |
| scout_output = _merge_intel_fusion_into_scout(scout_output, intel_fusion_result) |
| intel_detail = { |
| "status": "SUCCESS" if not intel_fusion_result.get("_degraded") else "DEGRADED", |
| "cves_scored": len(intel_fusion_result.get("fusion_results", [])), |
| "duration_ms": intel_sl.steps[-1].get("duration_ms", 0) if intel_sl.steps else 0, |
| "_degraded": intel_fusion_result.get("_degraded", False), |
| "_error": intel_fusion_result.get("_error", "") if intel_fusion_result.get("_degraded") else "", |
| } |
| _remember_stage("intel_fusion", intel_detail) |
| if orch_ctx is not None: |
| try: |
| orch_ctx.store_result("intel_fusion", intel_fusion_result) |
| except Exception as exc: |
| logger.debug("[PIPELINE] Orchestration store skipped for Intel Fusion: %s", exc) |
|
|
| if scan_path == "C": |
| analyst_output = { |
| "scan_id": scout_output.get("scan_id", "unknown"), |
| "risk_score": 30, |
| "risk_trend": "+0", |
| "analysis": [], |
| "_skipped": True, |
| "_reason": "path_C_doc_scan", |
| } |
| critic_output = {"verdict": "SKIPPED", "weighted_score": 60.0, "_skipped": True} |
| _remember_stage("analyst", {"status": "SKIPPED", "reason": "path_C"}) |
| _remember_stage("critic", {"status": "SKIPPED", "reason": "path_C"}) |
| _notify("analyst", "COMPLETE", stages_detail["analyst"]) |
| _notify("critic", "COMPLETE", stages_detail["critic"]) |
| else: |
| _notify("analyst", "RUNNING", {}) |
| rate_limiter.wait_if_needed("analyst") |
| analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type) |
| benchmark_context = _build_dim11_benchmark_context( |
| source=sanitized_stack, |
| input_type=input_type, |
| task_plan=task_plan, |
| code_patterns=sg_code_patterns, |
| scout_output=scout_output, |
| extracted_packages=extracted_packages, |
| ) |
| if benchmark_context: |
| analyst_output["benchmark_context"] = benchmark_context |
| analyst_output["observed_cwe_categories"] = benchmark_context["observed_cwe_categories"] |
| logger.info( |
| "[DIM11] benchmark context attached before Critic: fixture=%s observed=%d expected=%d", |
| benchmark_context.get("fixture"), |
| len(benchmark_context.get("observed_cwe_categories", [])), |
| len(benchmark_context.get("expected_cwe_categories", [])), |
| ) |
| analyst_detail = { |
| "status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED", |
| "risk_score": analyst_output.get("risk_score", 0), |
| "duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0, |
| } |
| _remember_stage("analyst", analyst_detail) |
| _notify("analyst", "COMPLETE", analyst_detail) |
| from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP |
| recorder.stage_enter("analyst", scout_output, |
| skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"), |
| input_type=input_type) |
| recorder.stage_exit("analyst", analyst_detail["status"], analyst_output, analyst_detail["duration_ms"]) |
|
|
| _notify("critic", "RUNNING", {}) |
| rate_limiter.wait_if_needed("critic") |
| critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type) |
| critic_detail = { |
| "status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED", |
| "verdict": critic_output.get("verdict", "SKIPPED"), |
| "score": critic_output.get("weighted_score", 0), |
| "duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0, |
| } |
| if critic_output.get("recall_challenge"): |
| critic_detail["recall_challenge_verdict"] = critic_output["recall_challenge"].get("verdict") |
| _remember_stage("critic", critic_detail) |
| _notify("critic", "COMPLETE", critic_detail) |
| from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP |
| recorder.stage_enter("critic", analyst_output, |
| skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"), |
| input_type=input_type) |
| recorder.stage_exit("critic", critic_detail["status"], critic_output, critic_detail["duration_ms"]) |
|
|
| _notify("advisor", "RUNNING", {}) |
| rate_limiter.wait_if_needed("advisor") |
| advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type) |
| advisor_detail = { |
| "status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED", |
| "urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])), |
| "duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0, |
| } |
| _remember_stage("advisor", advisor_detail) |
| _notify("advisor", "COMPLETE", advisor_detail) |
| from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP |
| recorder.stage_enter("advisor", analyst_output, |
| skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"), |
| input_type=input_type) |
| recorder.stage_exit("advisor", advisor_detail["status"], advisor_output, advisor_detail["duration_ms"]) |
|
|
| duration = round(time.time() - pipeline_start, 2) |
| try: |
| orch_summary = {} |
| if orch_ctx is not None: |
| from agents.orchestrator import finalize_orchestration |
| orch_ctx.final_confidence = advisor_output.get("confidence", "HIGH") |
| orch_summary = finalize_orchestration(orch_ctx) |
| except Exception as exc: |
| logger.debug("[PIPELINE] finalize_orchestration skipped: %s", exc) |
| orch_summary = {} |
|
|
| pipeline_meta = { |
| "pipeline_version": "4.0", |
| "tech_stack": tech_stack, |
| "scan_path": scan_path, |
| "stages_completed": len(completed_stages), |
| "stages_detail": stages_detail, |
| "enable_critic": ENABLE_CRITIC, |
| "critic_verdict": critic_output.get("verdict", "SKIPPED"), |
| "critic_score": critic_output.get("weighted_score", 0), |
| "feedback_loops": 0, |
| "duration_seconds": duration, |
| "degradation": degradation_status.to_dict(), |
| "orchestration": orch_summary, |
| "layer1_agents": [name for name in layer1_results if not name.startswith("_")], |
| "fusion_after_discovery": bool(task_plan.get("fusion_after_discovery", False)), |
| "l0_report": l0_report, |
| "generated_at": datetime.now(timezone.utc).isoformat(), |
| } |
| if benchmark_context: |
| pipeline_meta["benchmark_context"] = benchmark_context |
| if critic_output.get("recall_challenge"): |
| pipeline_meta["critic_recall_challenge"] = critic_output["recall_challenge"] |
| if intel_fusion_result.get("evidence_contract"): |
| pipeline_meta["intel_fusion_evidence_contract"] = intel_fusion_result["evidence_contract"] |
|
|
| recorder.end_scan("COMPLETE", duration) |
| report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results) |
| final_output: dict[str, Any] = { |
| **advisor_output, |
| **report_payload, |
| "pipeline_meta": pipeline_meta, |
| } |
| if sg_code_patterns: |
| existing_cps = final_output.get("code_patterns_summary", []) |
| final_output["code_patterns_summary"] = existing_cps + sg_code_patterns |
| if critic_output.get("recall_challenge"): |
| final_output["critic_recall_challenge"] = critic_output["recall_challenge"] |
| if critic_output.get("needs_rescan"): |
| final_output["needs_rescan"] = True |
| return final_output |
|
|
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| handlers=[logging.StreamHandler(sys.stdout)], |
| ) |
|
|
| if len(sys.argv) > 1: |
| tech_stack = " ".join(sys.argv[1:]) |
| else: |
| tech_stack = "Django 4.2, Redis 7.0, PostgreSQL 16" |
|
|
| print(f"\nThreatHunter - Scanning: {tech_stack}\n") |
| result = run_pipeline(tech_stack) |
| print("\n=== Pipeline Result ===") |
| print(json.dumps(result, ensure_ascii=False, indent=2)) |
|
|