""" main.py - ThreatHunter 銝餌撘 ============================== Pipeline 嗆嚗v3.1嚗嚗Orchestrator [Layer 1 銝西] Scout Analyst Debate Advisor 嗆嚗v3.1 Orchestrator 撽嚗嚗 Orchestrator 頝舐梧 頝臬 A嚗憟隞嗆 頝喲 Security Guard 頝臬 B嚗摰渡撘蝣 Layer 1 銝西嚗Security Guard + Intel Fusion嚗 頝臬 C嚗隞嗅摹蝵 頝喲 Analyst + Debate 頝臬 D嚗擖鋆 芷頝雿靽∪ CVE 嗆嚗靽靘嚗嚗 雿輻刻頛詨 "Django 4.2, Redis 7.0" main.py Pipeline Scout 嗯 Analyst 嗯 Critic (鈭撖行園) (函斗) (舀) 潑 Advisor (蝯鋆瘙箏勗) 瘥 Stage 函嚗 - try-except + Graceful Degradation - StepLogger 摮甇仿亥 - Harness 靽撅歹 agents/*.py 折剁 鞈瘚嚗 Scout 頛詨 (dict) Analyst 頛詨 (dict) Analyst 頛詨 (dict) Critic 頛詨 (dict) Analyst + Critic 頛詨 Advisor 頛詨 (dict) Advisor 頛詨 (dict) + pipeline_meta 蝯蝯 Harness Engineering 靽嚗 - 瘥 Agent 賭蝙函撖行芋蝯嚗agents/*.py嚗嚗 Stub - Critic ENABLE_CRITIC 批塚舀嚗 - 瘥 Stage 函 Graceful Degradation 蝝頝臬 - 函閮 Observability 亥嚗FINAL_PLAN.md 舀 2嚗 - 17 撅 Harness 靽撅歹 agents/*.py 折剁 萄嚗project_CONSTITUTION.md + HARNESS_ENGINEERING.md """ import json import logging import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from typing import Any from config import ( ENABLE_CRITIC, degradation_status, rate_limiter, ) logger = logging.getLogger("threathunter.main") import os # noqa: E402 (import after logger for ordering clarity) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) SANDBOX_ENABLED = os.getenv("SANDBOX_ENABLED", "true").lower() == "true" try: from sandbox.docker_sandbox import run_in_sandbox, is_docker_available _DOCKER_SANDBOX_OK = True except ImportError: _DOCKER_SANDBOX_OK = False def run_in_sandbox(*args, **kwargs): # type: ignore[misc] return {"error": "SANDBOX_NOT_AVAILABLE", "fallback": True} def is_docker_available() -> bool: # type: ignore[misc] return False if SANDBOX_ENABLED: logger.info( "[SANDBOX] Docker isolation ENABLED | docker_available=%s", is_docker_available(), ) else: logger.debug("[SANDBOX] Docker isolation DISABLED (in-process mode)") # (comment encoding corrupted) # ====================================================================== # (comment encoding corrupted) # ====================================================================== class StepLogger: """瘥 Agent Stage 摮甇仿餈質馱具""" def __init__(self, agent_name: str): self.agent_name = agent_name self.steps: list[dict[str, Any]] = [] def log( self, step: str, status: str, detail: str = "", duration_ms: int = 0 ) -> None: entry = { "step": step, "agent": self.agent_name, "timestamp": datetime.now(timezone.utc).isoformat(), "status": status, "detail": detail, "duration_ms": duration_ms, } self.steps.append(entry) icon = ( "OK" if status == "SUCCESS" else "WAIT" if status == "RUNNING" else "FAIL" ) logger.info("[%s] %s | %s | %s", icon, self.agent_name.upper(), step, detail) def summary(self) -> dict[str, Any]: failed = [s for s in self.steps if s["status"] == "FAILED"] return { "agent": self.agent_name, "total_steps": len(self.steps), "failed_steps": len(failed), "steps": self.steps, } def _normalize_pipeline_task_plan(task_plan: dict[str, Any]) -> dict[str, Any]: """ Canonical runtime DAG: Security Guard + Scout discover in parallel, then Intel Fusion ranks risk. """ plan = dict(task_plan or {}) scan_path = plan.get("path", "B") if scan_path == "B": plan["parallel_layer1"] = ["security_guard", "scout"] plan["fusion_after_discovery"] = True plan["agents_to_run"] = [ "security_guard", "scout", "intel_fusion", "analyst", "debate", "judge" ] elif scan_path == "A": plan["parallel_layer1"] = ["scout"] plan["fusion_after_discovery"] = True plan["agents_to_run"] = ["scout", "intel_fusion", "analyst", "debate", "judge"] return plan _L0_TO_RUNTIME_INPUT_TYPE = { "package_list": "pkg", "source_code": "code", "mixed": "code", "config_file": "config", "sql_review": "sql_review", "blocked": "injection", } def _canonical_runtime_input_type(requested: str, l0_input_type: str = "") -> str: """用 L0 deterministic 分類修正 UI/API 預設值,避免 code 被當 package。""" runtime = (requested or "pkg").strip().lower() if runtime not in {"pkg", "code", "config", "injection", "sql_review"}: runtime = "pkg" l0_runtime = _L0_TO_RUNTIME_INPUT_TYPE.get(str(l0_input_type or "").strip().lower(), "") if runtime == "pkg" and l0_runtime in {"code", "config", "injection", "sql_review"}: return l0_runtime return runtime def _constrain_task_plan_by_input_type(task_plan: dict[str, Any], input_type: str) -> dict[str, Any]: """L0 類型是硬約束;Orchestrator 不可把 source code 誤路由成 Path C/A。""" plan = dict(task_plan or {}) original_path = plan.get("path", "B") if input_type in {"code", "injection"} and original_path != "B": plan["path"] = "B" plan["_route_corrected"] = { "from": original_path, "to": "B", "reason": f"l0_input_type={input_type}", } elif input_type == "sql_review": if original_path != "C": plan["_route_corrected"] = { "from": original_path, "to": "C", "reason": "l0_input_type=sql_review", } plan["path"] = "C" plan["parallel_layer1"] = [] plan["fusion_after_discovery"] = False plan["agents_to_run"] = ["sql_review"] elif input_type == "config" and original_path != "C": plan["path"] = "C" plan["_route_corrected"] = { "from": original_path, "to": "C", "reason": "l0_input_type=config", } return _normalize_pipeline_task_plan(plan) def _load_dim11_expected_config() -> dict[str, Any]: """載入 DIM11 benchmark gate;一般掃描沒有此檔或沒有 exact match 時不啟用。""" expected_path = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam" / "expected_results.json" try: return json.loads(expected_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError, ValueError) as exc: logger.debug("[DIM11] benchmark config unavailable: %s", exc) return {} def _build_dim11_benchmark_context( source: str, input_type: str, task_plan: dict[str, Any], code_patterns: list[dict[str, Any]], scout_output: dict[str, Any], extracted_packages: list[str], ) -> dict[str, Any] | None: """只對 DIM11 fixture exact match 啟用 expected-CWE recall challenge。""" if input_type != "code": return None config = _load_dim11_expected_config() fixture_dir = Path(__file__).resolve().parent / "tests" / "fixtures" / "dim11_redteam" observed_cwes = sorted({ str(pattern.get("cwe_id", "")) for pattern in code_patterns if str(pattern.get("cwe_id", "")).startswith("CWE-") }) for case in config.get("fixtures", []): expected_cwes = case.get("expected_cwe_categories") if not expected_cwes or case.get("exclude_from_cwe_recall"): continue fixture_name = str(case.get("fixture", "")) try: fixture_source = (fixture_dir / fixture_name).read_text(encoding="utf-8", errors="replace") except OSError: continue if source != fixture_source: continue external_findings = scout_output.get("vulnerabilities", []) external_pollution_count = 0 if case.get("external_requires_package_evidence") and external_findings and not extracted_packages: external_pollution_count = len(external_findings) expected_path = str(case.get("expected_path", "")) actual_path = str(task_plan.get("path", "")) return { "benchmark": "dim11_redteam", "fixture": fixture_name, "expected_cwe_categories": sorted(set(expected_cwes)), "observed_cwe_categories": observed_cwes, "min_category_recall": float(case.get("min_category_recall", 0.7)), "route_correct": actual_path == expected_path, "expected_path": expected_path, "actual_path": actual_path, "external_pollution_count": external_pollution_count, "activation": "exact_fixture_match", } return None # ====================================================================== # (comment encoding corrupted) # ====================================================================== def stage_scout( tech_stack: str, input_type: str = "pkg", intel_fusion_result: dict[str, Any] | None = None, ) -> tuple[dict[str, Any], StepLogger]: """ Stage 1: Scout Agent 萄瞍瘣 雿輻 agents/scout.py 撖血祕雿 Graceful Degradation: 憭望喟征鞈嚗霈敺蝥 Agent 仿舐洵銝甈~ v3.7: input_type 瘙箏頛芸 Skill SOP (Path-Aware Skills) Returns: (result_dict, step_logger) 頛詨箏亥餈質馱 """ from agents.scout import run_scout_pipeline sl = StepLogger("scout") sl.log("INIT", "RUNNING", f"tech_stack={tech_stack} | input_type={input_type}") t0 = time.time() try: result = run_scout_pipeline( tech_stack, input_type=input_type, intel_fusion_result=intel_fusion_result, ) duration_ms = int((time.time() - t0) * 1000) vuln_count = len(result.get("vulnerabilities", [])) sl.log( "COMPLETE", "SUCCESS", f"found {vuln_count} vulnerabilities", duration_ms ) return result, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) # (comment encoding corrupted) is_rate_limit = "429" in str(e) or "rate limit" in str(e).lower() if is_rate_limit: logger.warning("[SCOUT] Rate limited returning empty results (not a real failure)") sl.log("COMPLETE", "RATE_LIMITED", str(e)[:100], duration_ms) else: logger.error("Scout Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Scout", str(e)) # (comment encoding corrupted) return { "scan_id": f"scan_degraded_{int(time.time())}", "vulnerabilities": [], "summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}, "_degraded": not is_rate_limit, # 429 銝蝞甇蝝 "_error": str(e), }, sl def stage_intel_fusion( discovery_input: dict[str, Any] | list[str] | str, on_progress: Any = None, ) -> tuple[dict[str, Any], StepLogger]: """ Stage: Intel Fusion runs after Scout and Security Guard discovery. It receives normalized findings instead of raw source code. """ from agents.intel_fusion import run_intel_fusion sl = StepLogger("intel_fusion") sl.log("INIT", "RUNNING", "post-discovery fusion") t0 = time.time() try: result = run_intel_fusion(discovery_input, on_progress) duration_ms = int((time.time() - t0) * 1000) result.setdefault("_duration_ms", duration_ms) fusion_count = len(result.get("fusion_results", [])) sl.log("COMPLETE", "SUCCESS", f"scored {fusion_count} findings", duration_ms) return result, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) logger.error("Intel Fusion Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Intel Fusion", str(e)) return { "fusion_results": [], "strategy_applied": "degraded", "api_health_summary": {}, "_degraded": True, "_error": str(e), "_duration_ms": duration_ms, }, sl # ====================================================================== # (comment encoding corrupted) # ====================================================================== def stage_analyst( scout_output: dict[str, Any], input_type: str = "pkg", ) -> tuple[dict[str, Any], StepLogger]: """ Stage 2: Analyst Agent vulnerability chain analysis. v3.7: input_type selects path-aware chain analysis skill. Returns: (result_dict, step_logger) """ from agents.analyst import run_analyst_pipeline sl = StepLogger("analyst") sl.log( "INIT", "RUNNING", f"input_vulns={len(scout_output.get('vulnerabilities', []))} | input_type={input_type}" ) t0 = time.time() try: result = run_analyst_pipeline(scout_output, input_type=input_type) duration_ms = int((time.time() - t0) * 1000) risk = result.get("risk_score", 0) sl.log("COMPLETE", "SUCCESS", f"risk_score={risk}", duration_ms) return result, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) logger.error("Analyst Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Analyst", str(e)) # (comment encoding corrupted) vulns = scout_output.get("vulnerabilities", []) analysis = [ { "cve_id": v.get("cve_id", "UNKNOWN"), "original_cvss": v.get("cvss_score", 0), "adjusted_risk": v.get("severity", "UNKNOWN"), "in_cisa_kev": False, "exploit_available": False, "chain_risk": { "is_chain": False, "chain_with": [], "chain_description": "chain_analysis: SKIPPED", "confidence": "NEEDS_VERIFICATION", }, "reasoning": "Analyst degraded - chain analysis skipped.", } for v in vulns ] # (comment encoding corrupted) code_patterns = scout_output.get("code_patterns", []) for cp in code_patterns: sev = cp.get("severity", "MEDIUM") cvss_equiv = 9.0 if sev == "CRITICAL" else (7.5 if sev == "HIGH" else 5.0) analysis.append({ "finding_id": cp.get("finding_id", "CODE-000"), "cve_id": None, "pattern_type": cp.get("pattern_type", "UNKNOWN"), "cwe_id": cp.get("cwe_id", "CWE-unknown"), "owasp_category": cp.get("owasp_category", ""), "severity": sev, "snippet": cp.get("snippet", ""), "line_no": cp.get("line_no", 0), "original_cvss": cvss_equiv, "adjusted_risk": sev, "in_cisa_kev": False, "exploit_available": False, "chain_risk": { "is_chain": False, "chain_with": [], "chain_description": "Analyst degraded - deterministic pattern only.", "confidence": "HIGH", # 蝣箏批菜葫嚗靽∪擃 }, "reasoning": f"Deterministic detection: {cp.get('pattern_type')} ({cp.get('cwe_id')}). Analyst degraded but code pattern is confirmed by Security Guard.", }) if code_patterns: logger.info("[DEGRADED] Analyst fallback preserved %d code_patterns", len(code_patterns)) return { "scan_id": scout_output.get("scan_id", "unknown"), "risk_score": 50, "risk_trend": "+0", "analysis": analysis, "_degraded": True, "_error": str(e), }, sl # ====================================================================== # (comment encoding corrupted) # ====================================================================== def stage_critic( analyst_output: dict[str, Any], input_type: str = "pkg", ) -> tuple[dict[str, Any], StepLogger]: """ Stage 3: Adversarial Debate Engine. v5.3: 升級為 3 輪辯論引擎(Du et al. 2023 ICML, arXiv:2305.14325)。 無共識時由 Judge sub-agent 仲裁。偏保守原則:高估風險比低估安全。 Returns: (result_dict, step_logger) """ from agents.debate_engine import run_debate_pipeline sl = StepLogger("critic") sl.log("INIT", "RUNNING", f"enable_critic={ENABLE_CRITIC} | input_type={input_type} | mode=3-round-debate") t0 = time.time() try: result = run_debate_pipeline( analyst_output, input_type=input_type, on_progress=None, ) duration_ms = int((time.time() - t0) * 1000) verdict = result.get("verdict", "UNKNOWN") score = result.get("weighted_score", 0) meta = result.get("_debate_meta", {}) consensus = meta.get("consensus", None) rounds = meta.get("total_rounds", "?") judge_invoked = meta.get("judge_invoked", False) sl.log( "COMPLETE", "SUCCESS", f"verdict={verdict} score={score} consensus={consensus} rounds={rounds} judge={judge_invoked}", duration_ms, ) return result, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) logger.error("Debate Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Critic", str(e)) return { "debate_rounds": 0, "challenges": [], "scorecard": { "evidence": 0.6, "chain_completeness": 0.6, "critique_quality": 0.6, "defense_quality": 0.6, "calibration": 0.6, }, "weighted_score": 60.0, "verdict": "MAINTAIN", "reasoning": "Debate engine degraded - all rounds skipped.", "_degraded": True, "_error": str(e), }, sl def stage_advisor( analyst_output: dict[str, Any], critic_output: dict[str, Any], input_type: str = "pkg", ) -> tuple[dict[str, Any], StepLogger]: """ Stage 4: Advisor Agent action report generation. v3.7: input_type selects path-aware action report skill. Returns: (result_dict, step_logger) """ from agents.advisor import run_advisor_pipeline sl = StepLogger("advisor") verdict = critic_output.get("verdict", "SKIPPED") sl.log("INIT", "RUNNING", f"critic_verdict={verdict} | input_type={input_type}") # (comment encoding corrupted) advisor_input = dict(analyst_output) if verdict == "DOWNGRADE": advisor_input["_critic_note"] = ( f"Critic scored {critic_output.get('weighted_score', 0):.1f}/100. " f"Challenges: {critic_output.get('challenges', [])}. " "Use conservative risk assessment." ) logger.info( "Critic verdict=DOWNGRADE: Advisor will use conservative assessment" ) t0 = time.time() try: result = run_advisor_pipeline(advisor_input) duration_ms = int((time.time() - t0) * 1000) risk = result.get("risk_score", 0) urgent = len(result.get("actions", {}).get("urgent", [])) sl.log("COMPLETE", "SUCCESS", f"risk_score={risk} urgent={urgent}", duration_ms) return result, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) logger.error("Advisor Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Advisor", str(e)) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) urgent_actions = [] important_actions = [] code_patterns_fallback = [] # 脣 CODE-pattern 靘 UI _SEVERITY_MAP = {"CRITICAL": "urgent", "HIGH": "important"} for entry in advisor_input.get("analysis", []): finding_id = entry.get("finding_id", "") cve_id = entry.get("cve_id") sev = entry.get("severity") or entry.get("adjusted_risk", "MEDIUM") bucket = _SEVERITY_MAP.get(sev, "important") # (comment encoding corrupted) # (comment encoding corrupted) if finding_id.startswith("CODE-") or entry.get("pattern_type"): pt = entry.get("pattern_type", "UNKNOWN") cwe = entry.get("cwe_id", "CWE-unknown") snippet = entry.get("snippet", "") code_patterns_fallback.append({ "finding_id": finding_id, "cve_id": None, "cwe_id": cwe, "pattern_type": pt, "package": f"Custom Code Pattern", "severity": sev, "action": f"Fix {pt} vulnerability ({cwe}). {entry.get('reasoning', '')}", "vulnerable_snippet": snippet[:200], "reason": entry.get("reasoning", f"Security Guard deterministic detection: {pt}"), "is_repeated": False, "_constitution_note": "CODE-pattern: not in URGENT per CI-1/CI-2. See code_patterns_summary.", }) # (comment encoding corrupted) elif cve_id and (str(cve_id).startswith("CVE-") or str(cve_id).startswith("GHSA-")): action_entry = { "cve_id": cve_id, "package": entry.get("package", "unknown"), "severity": sev, "action": f"Review and patch {cve_id}.", "command": f"# Investigate {cve_id}", "reason": entry.get("reasoning", "Advisor degraded - manual review required."), "is_repeated": False, } if bucket == "urgent": urgent_actions.append(action_entry) else: important_actions.append(action_entry) risk_score = max( 50, min(100, len(urgent_actions) * 25 + len(important_actions) * 10), ) summary_parts = [] if urgent_actions: summary_parts.append(f"{len(urgent_actions)} package CVE(s) require immediate patching") if code_patterns_fallback: summary_parts.append(f"{len(code_patterns_fallback)} code-level pattern(s) detected (see Code Analysis tab)") if important_actions: summary_parts.append(f"{len(important_actions)} high-severity CVE(s) need review within 72h") if not summary_parts: summary_parts.append("System degraded. Please review raw scan data manually.") logger.info( "[DEGRADED] Advisor fallback: %d urgent CVEs + %d important CVEs + %d code-patterns (not in URGENT per CI-1/CI-2)", len(urgent_actions), len(important_actions), len(code_patterns_fallback), ) return { "executive_summary": ". ".join(summary_parts), "actions": { "urgent": urgent_actions, "important": important_actions, "resolved": [], }, "code_patterns_summary": code_patterns_fallback, "risk_score": risk_score, "risk_trend": "+0", "scan_count": 1, "generated_at": datetime.now(timezone.utc).isoformat(), "_degraded": True, "_error": str(e), }, sl # ====================================================================== # (comment encoding corrupted) # ====================================================================== # ====================================================================== # (comment encoding corrupted) # ====================================================================== def stage_orchestrator( tech_stack: str, feedback_from_judge: dict | None = None, ) -> tuple[Any, dict, StepLogger]: """ Stage 0: Orchestrator 瘙箏頝臬嚗A/B/C/D嚗 (OrchestrationContext, task_plan, StepLogger) Graceful Degradation: 憭望唾楝敺 B嚗摰渡撘蝣潘雿箏券閮准 """ from agents.orchestrator import run_orchestration sl = StepLogger("orchestrator") sl.log("INIT", "RUNNING", f"tech_stack={tech_stack[:60]}") t0 = time.time() try: ctx, task_plan = run_orchestration( user_input=tech_stack, feedback_from_judge=feedback_from_judge, ) task_plan = _normalize_pipeline_task_plan(task_plan) duration_ms = int((time.time() - t0) * 1000) scan_path = task_plan.get("path", "B") sl.log("COMPLETE", "SUCCESS", f"path={scan_path}", duration_ms) logger.info("[ORCH] Scan path: %s | plan: %s", scan_path, task_plan.get("agents_to_run", [])) return ctx, task_plan, sl except Exception as e: duration_ms = int((time.time() - t0) * 1000) logger.error("Orchestrator Stage failed: %s", e) sl.log("COMPLETE", "FAILED", str(e)[:100], duration_ms) degradation_status.degrade("Orchestrator", str(e)) # (comment encoding corrupted) from agents.orchestrator import OrchestrationContext, ScanPath ctx = OrchestrationContext() ctx.scan_path = ScanPath.FULL_CODE task_plan = { "path": "B", "parallel_layer1": ["security_guard", "scout"], "fusion_after_discovery": True, "debate_cluster": True, "judge": True, "agents_to_run": ["security_guard", "scout", "intel_fusion", "analyst", "debate", "judge"], "_degraded": True, } return ctx, task_plan, sl # ====================================================================== # (comment encoding corrupted) # ====================================================================== def _build_code_patterns_summary(sg_result: dict) -> list[dict]: """ Security Guard patterns + hardcoded secrets を CWE-enriched code_patterns_summary に変換する。 確定性抽出 + MITRE CWE v4.14 佐証注入。 LLM に依存しない。 """ from tools.cwe_registry import build_cwe_reference, get_pattern_meta, severity_rank def _attach_cwe_reference(entry: dict, cwe_id: str) -> dict: """將 MITRE CWE 官方定義注入 entry 的 cwe_reference 欄位""" cwe_reference = build_cwe_reference(cwe_id) if cwe_reference: entry["cwe_reference"] = cwe_reference return entry # ── Step 1: 對 patterns 去重(同行 + 前 30 字元相同視為重複,保留嚴重性最高者) def _normalized_line_no(item: dict) -> int: """Normalize scanner line metadata to the line_no contract.""" for key in ("line_no", "line", "source_line"): value = item.get(key) try: line_no = int(value) except (TypeError, ValueError): continue if line_no > 0: return line_no return 0 _dedup: dict[tuple, dict] = {} for p in sg_result.get("patterns", []): pt = p.get("pattern_type", "UNKNOWN") sev = get_pattern_meta(pt).severity key = (_normalized_line_no(p), str(p.get("snippet", ""))[:30]) existing = _dedup.get(key) if existing is None: _dedup[key] = p else: ex_pt = existing.get("pattern_type", "UNKNOWN") ex_sev = get_pattern_meta(ex_pt).severity if severity_rank(sev) > severity_rank(ex_sev): _dedup[key] = p code_patterns: list[dict] = [] counter = 1 # ── Step 2: 為每個 pattern 建立 finding entry + 注入 CWE reference for p in _dedup.values(): pt = p.get("pattern_type", "UNKNOWN") meta = get_pattern_meta(pt) cwe = meta.cwe_id owasp = meta.owasp_category severity = meta.severity entry = { "finding_id": f"CODE-{counter:03d}", "type": "code_pattern", "pattern_type": pt, "cwe_id": cwe, "owasp_category": owasp, "severity": severity, "snippet": str(p.get("snippet", ""))[:200], "line_no": _normalized_line_no(p), "language": sg_result.get("language", "unknown"), "canonical_cwe_id": cwe, "weakness_family": meta.weakness_family, "evidence_type": meta.evidence_type, "coverage_level": p.get("coverage_level", "pattern"), "confidence": p.get("confidence", "MEDIUM"), } entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner" entry = _attach_cwe_reference(entry, cwe) code_patterns.append(entry) counter += 1 # ── Step 3: 硬編碼密鑰 for h in sg_result.get("hardcoded", []): entry = { "finding_id": f"CODE-{counter:03d}", "type": "hardcoded_secret", "pattern_type": "HARDCODED_SECRET", "cwe_id": "CWE-798", "owasp_category": "A07:2021-Identification and Authentication Failures", "severity": "HIGH", "snippet": f"{h.get('name', 'secret')} = '****' (value redacted)", "line_no": _normalized_line_no(h), "language": sg_result.get("language", "unknown"), "canonical_cwe_id": "CWE-798", "weakness_family": "hardcoded_secret", "evidence_type": "code_scan", "coverage_level": h.get("coverage_level", "pattern"), "confidence": h.get("confidence", "HIGH"), } entry["source_location"] = f"L{entry['line_no']}" if entry["line_no"] > 0 else "Line not provided by scanner" entry = _attach_cwe_reference(entry, "CWE-798") code_patterns.append(entry) counter += 1 logger.info( "[CODE_PATTERNS] Built %d CWE-enriched patterns (from %d raw patterns, %d hardcoded)", len(code_patterns), len(sg_result.get("patterns", [])), len(sg_result.get("hardcoded", [])), ) return code_patterns def _summarize_vulnerabilities(vulnerabilities: list[dict[str, Any]]) -> dict[str, int]: """根據漏洞清單建立前端可直接使用的摘要數字。""" summary = { "total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0, "new": 0, "new_since_last_scan": 0, } for vuln in vulnerabilities: summary["total"] += 1 severity = str(vuln.get("severity", "LOW")).upper() if severity == "CRITICAL": summary["critical"] += 1 elif severity == "HIGH": summary["high"] += 1 elif severity == "MEDIUM": summary["medium"] += 1 else: summary["low"] += 1 if vuln.get("is_new"): summary["new"] += 1 summary["new_since_last_scan"] += 1 return summary def _build_scan_report_payload( scout_output: dict[str, Any], advisor_output: dict[str, Any], layer1_results: dict[str, Any], ) -> dict[str, Any]: """整理本次 scan 專屬的漏洞明細,避免 UI 回頭讀取全域記憶。""" intel_applied = scout_output.get("_intel_fusion_applied", {}) vulnerabilities: list[dict[str, Any]] = [] for vuln in scout_output.get("vulnerabilities", []): entry = dict(vuln) entry.setdefault("source", "SCOUT") enriched_by = list(entry.get("enriched_by", [])) if intel_applied and entry.get("source") != "INTEL_FUSION": if any( field in entry for field in ( "composite_score", "epss_score", "ghsa_severity", "otx_threat", "intel_confidence", "dimensions_used", "weights_used", "in_cisa_kev", ) ): if "INTEL_FUSION" not in enriched_by: enriched_by.append("INTEL_FUSION") if enriched_by: entry["enriched_by"] = enriched_by vulnerabilities.append(entry) detail_source = "scout_final_output" if vulnerabilities: summary = dict(scout_output.get("summary", {})) summary.setdefault("new", summary.get("new_since_last_scan", 0)) summary.setdefault("new_since_last_scan", summary.get("new", 0)) else: detail_source = "advisor_actions_fallback" for level in ("urgent", "important", "resolved"): for item in advisor_output.get("actions", {}).get(level, []): vulnerabilities.append({ "cve_id": item.get("cve_id", ""), "package": item.get("package", "unknown"), "cvss_score": item.get("cvss_score", 0), "severity": item.get("severity", "MEDIUM"), "description": item.get("action", ""), "is_new": item.get("is_new", False), "source": "ADVISOR_ACTIONS", "report_level": level.upper(), }) summary = _summarize_vulnerabilities(vulnerabilities) layer1_agents = [ name for name in ("security_guard", "scout", "intel_fusion") if name in layer1_results ] degraded_agents = [ name for name in layer1_agents if isinstance(layer1_results.get(name), dict) and layer1_results[name].get("_degraded") ] if not layer1_agents: layer1_state = "skipped" elif degraded_agents: layer1_state = "degraded" else: layer1_state = "merged" report_sources: dict[str, Any] = { "vulnerability_detail": detail_source, "enriched_by": ["intel_fusion"] if intel_applied else [], "fallbacks": ["advisor_actions"] if detail_source == "advisor_actions_fallback" else [], "layer1_state": layer1_state, } if degraded_agents: report_sources["degraded_agents"] = degraded_agents if intel_applied: report_sources["intel_fusion_applied"] = intel_applied if scout_output.get("representative_cve_evidence"): report_sources["representative_cve_evidence_count"] = len( scout_output.get("representative_cve_evidence", []) ) return { "vulnerability_detail": vulnerabilities, "vulnerability_summary": summary, "report_sources": report_sources, "representative_cve_evidence": scout_output.get("representative_cve_evidence", []), } def _run_layer1_parallel( tech_stack: str, task_plan: dict, on_progress: Any, input_type: str = "pkg", ) -> dict[str, Any]: """ 頝臬 B Layer 1 銝西瑁嚗雿輻 ThreadPoolExecutor 撠孵嚗 Why ThreadPoolExecutor嚗 asyncio嚗嚗 - main.py 臬甇亦撘蝣潘SSE callback 銋臬甇亦 - ThreadPoolExecutor 臬典甇亦啣銝銝西瑁憭 IO-bound 撌乩嚗LLM 澆恬 - 撠孵嚗銝閬瑽 callback 璈 銝西瑁撠鞊∴閬 task_plan 瘙箏嚗嚗 - security_guard嚗敺蝔撘蝣潭賢/憟隞/璅∪嚗頛詨伐tech_stack嚗 - intel_fusion嚗剔雁望亥岷嚗頛詨伐tech_stack嚗 Args: tech_stack: 雿輻刻頛詨 task_plan: Orchestrator 隞餃閬 on_progress: SSE 脣漲隤 Returns: {"security_guard": dict, "intel_fusion": dict} """ parallel_agents = task_plan.get("parallel_layer1", []) layer1_results: dict[str, Any] = {} def _run_security_guard() -> tuple[str, dict]: """ Thread 銝剖瑁 Security Guard嚗 LLM 嚗""" try: from agents.security_guard import run_security_guard result = run_security_guard(tech_stack, on_progress) return ("security_guard", result) except Exception as e: logger.error("[LAYER1] Security Guard failed: %s", e) degradation_status.degrade("Security Guard", str(e)) return ("security_guard", { "extraction_status": "degraded", "functions": [], "imports": [], "patterns": [], "hardcoded": [], "stats": {"total_lines": 0, "functions_found": 0, "patterns_found": 0}, "_degraded": True, "_error": str(e), }) def _run_intel_fusion(intel_input: list | str) -> tuple[str, dict]: """ Thread 銝剖瑁 Intel Fusion嚗剔雁望亥岷嚗 v3.4 靽桀儔嚗亙憟隞嗅蝔勗銵剁憪蝔撘蝣潘嚗閫瘙 0 CVE 憿 intel_input 臭誑荔 - list[str]嚗憟隞嗅蝔勗銵剁 package_extractor 敺喳伐 - str嚗憪 tech_stack嚗fallback嚗Path A 憟隞嗆格芋撘嚗 """ try: from agents.intel_fusion import run_intel_fusion result = run_intel_fusion(intel_input, on_progress) return ("intel_fusion", result) except Exception as e: logger.error("[LAYER1] Intel Fusion failed: %s", e) degradation_status.degrade("Intel Fusion", str(e)) return ("intel_fusion", { "fusion_results": [], "strategy_applied": "degraded", "api_health_summary": {}, "_degraded": True, "_error": str(e), }) def _run_scout() -> tuple[str, dict]: """Run Scout in parallel with Security Guard during discovery.""" try: scout_target = tech_stack scout_input_type = input_type local_packages: list[str] = [] if input_type in {"code", "injection", "config"}: from agents.security_guard import extract_code_surface from tools.package_extractor import ( format_packages_for_intel_fusion, packages_from_security_guard, ) local_packages = packages_from_security_guard(extract_code_surface(tech_stack)) if not local_packages: return ("scout", { "scan_id": f"scan_code_only_{int(time.time())}", "timestamp": datetime.now(timezone.utc).isoformat(), "tech_stack": [], "vulnerabilities": [], "summary": { "total": 0, "new_since_last_scan": 0, "critical": 0, "high": 0, "medium": 0, "low": 0, }, "_duration_ms": 0, "_package_scan_skipped": True, "_skip_reason": "source_code_without_third_party_packages", "_extracted_packages_local": [], }) scout_target = format_packages_for_intel_fusion(local_packages) scout_input_type = "pkg" result, scout_sl = stage_scout(scout_target, input_type=scout_input_type) if scout_sl.steps: result["_duration_ms"] = scout_sl.steps[-1].get("duration_ms", 0) if local_packages: result["_extracted_packages_local"] = local_packages result["_scout_target_input"] = scout_target result["_scout_input_type"] = scout_input_type return ("scout", result) except Exception as e: logger.error("[LAYER1] Scout failed: %s", e) degradation_status.degrade("Scout", str(e)) return ("scout", { "scan_id": f"scan_degraded_{int(time.time())}", "vulnerabilities": [], "summary": {"total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0}, "_degraded": True, "_error": str(e), }) def _extract_packages_from_sg_result(sg_result: dict[str, Any]) -> list[str]: """從 Security Guard 結果萃取第三方套件,供後續 Scout 使用。""" try: from tools.package_extractor import packages_from_security_guard extracted = packages_from_security_guard(sg_result) logger.info( "[LAYER1] PackageExtractor: %d packages extracted from imports: %s", len(extracted), extracted, ) return extracted except Exception as pe: logger.warning("[LAYER1] PackageExtractor failed (fallback to raw input): %s", pe) return [] def _extract_cwe_targets_from_sg_result(sg_result: dict[str, Any]) -> list[str]: """從 Security Guard patterns 中整理 CWE targets,供 observability 與 fallback 使用。""" sg_patterns = sg_result.get("patterns", []) seen_cwe: set[str] = set() cwe_targets: list[str] = [] for pattern in sg_patterns: cwe = pattern.get("cwe_id", "") if cwe and cwe.startswith("CWE-") and cwe not in seen_cwe: seen_cwe.add(cwe) cwe_targets.append(cwe) return cwe_targets extracted_packages: list[str] = [] cwe_targets: list[str] = [] requested_workers = [ agent_name for agent_name in ("security_guard", "scout") if agent_name in parallel_agents ] if not requested_workers: return layer1_results futures: dict[Any, str] = {} max_workers = len(requested_workers) with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="layer1") as executor: if "security_guard" in requested_workers: rate_limiter.wait_if_needed("security_guard") futures[executor.submit(_run_security_guard)] = "security_guard" if "scout" in requested_workers: rate_limiter.wait_if_needed("scout") if on_progress: on_progress("scout", "RUNNING", {"step": "discovery_parallel"}) futures[executor.submit(_run_scout)] = "scout" for future in as_completed(futures): agent_name = futures[future] try: result_name, result_payload = future.result() except Exception as exc: logger.error("[LAYER1] %s future failed unexpectedly: %s", agent_name, exc) degradation_status.degrade(agent_name, str(exc)) continue layer1_results[result_name] = result_payload logger.info("[LAYER1] %s complete", result_name) if result_name == "security_guard": sg_extracted = _extract_packages_from_sg_result(result_payload) if sg_extracted or not extracted_packages: extracted_packages = sg_extracted cwe_targets = _extract_cwe_targets_from_sg_result(result_payload) if cwe_targets: logger.info( "[LAYER1] Security Guard produced %d CWE targets: %s", len(cwe_targets), cwe_targets, ) elif result_name == "scout" and result_payload.get("_extracted_packages_local") and not extracted_packages: extracted_packages = list(result_payload.get("_extracted_packages_local") or []) layer1_results["_extracted_packages"] = extracted_packages layer1_results["_cwe_targets"] = cwe_targets return layer1_results def _build_intel_fusion_input( scout_output: dict[str, Any], sg_result: dict[str, Any] | None, code_patterns: list[dict[str, Any]], extracted_packages: list[str], tech_stack: str, input_type: str, ) -> dict[str, Any]: """Build a clean post-discovery input for Intel Fusion.""" cve_ids: list[str] = [] seen_cves: set[str] = set() for vuln in scout_output.get("vulnerabilities", []): cve_id = str(vuln.get("cve_id", "")).strip() if (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")) and cve_id not in seen_cves: seen_cves.add(cve_id) cve_ids.append(cve_id) cwe_ids: list[str] = [] seen_cwes: set[str] = set() for pattern in code_patterns: cwe_id = str(pattern.get("cwe_id", "")).strip() if cwe_id.startswith("CWE-") and cwe_id not in seen_cwes: seen_cwes.add(cwe_id) cwe_ids.append(cwe_id) return { "mode": "post_discovery", "input_type": input_type, "cve_ids": cve_ids, "cwe_ids": cwe_ids, "packages": extracted_packages, "scout_vulnerabilities": scout_output.get("vulnerabilities", [])[:20], "code_patterns": code_patterns[:20], "security_guard_stats": (sg_result or {}).get("stats", {}), "raw_input_fingerprint": str(hash(tech_stack)), } def _merge_intel_fusion_into_scout( scout_output: dict[str, Any], intel_fusion_result: dict[str, Any], ) -> dict[str, Any]: """Merge Intel Fusion evidence after Scout completes discovery.""" if not intel_fusion_result: return scout_output try: from agents.scout import merge_intel_fusion_evidence return merge_intel_fusion_evidence(scout_output, intel_fusion_result) except Exception as exc: logger.warning("[PIPELINE] Intel Fusion merge skipped: %s", exc) scout_output["intel_fusion_result"] = intel_fusion_result return scout_output def run_pipeline(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]: """ 瑁摰渡 ThreatHunter 蝞∠嚗v3.1 Orchestrator 撽嚗 Pipeline: Orchestrator [Layer 1 銝西] Scout Analyst [Critic] Advisor v3.7: input_type 瘙箏 Path-Aware Skills 頝舐晞 pkg=憟隞嗆 / code=皞蝣澆祟閮 / injection=AI摰 / config=閮剖瑼 v3.9 Sandbox: SANDBOX_ENABLED=true 銝 Docker 舐剁典捆典批瑁 Args: tech_stack: 雿輻刻頛詨亦銵摮銝莎憒 "Django 4.2, Redis 7.0"嚗 input_type: 蝡臬菜葫頛詨仿 (pkg/code/config/injection) Returns: 怠 Advisor 銵勗 dict嚗銝 pipeline_meta 甈雿 """ # (comment encoding corrupted) if SANDBOX_ENABLED and _DOCKER_SANDBOX_OK and is_docker_available(): logger.info("[SANDBOX] Docker isolation ACTIVE delegating to container") result = run_in_sandbox(tech_stack=tech_stack, input_type=input_type) if not result.get("fallback"): return result # 摰孵典瑁 # (comment encoding corrupted) logger.warning( "[SANDBOX] Container fallback: %s using in-process mode", result.get("error", "unknown"), ) # (comment encoding corrupted) return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type) def run_pipeline_sync(tech_stack: str, input_type: str = "pkg") -> dict[str, Any]: """ run_pipeline 甇亙亙嚗靘 sandbox/sandbox_runner.py 典捆典批澆恬 瘜冽嚗摰孵典批澆急迨賢 SANDBOX_ENABLED=false嚗踹餈湧脣 Docker """ # (comment encoding corrupted) return run_pipeline_with_callback(tech_stack, progress_callback=None, input_type=input_type) def _run_pipeline_with_callback_legacy_v31( tech_stack: str, progress_callback: Any = None, input_type: str = "pkg", ) -> dict[str, Any]: """ 瑁摰 Pipeline嚗瘥 Stage 摰敺澆 progress_callback v3.7: input_type 瘙箏 Agent 頛芸 Skill SOP嚗Path-Aware Skills嚗 Args: tech_stack: 銵摮銝 progress_callback: 交 (agent_name: str, status: str, detail: dict) 賢 input_type: 頛詨仿 (pkg / code / config / injection) Returns: 怠 Advisor 銵勗 dict """ pipeline_start = time.time() completed_stages: list[str] = [] stages_detail: dict[str, Any] = {} # (comment encoding corrupted) orch_ctx: Any = None task_plan: dict = {"path": "B"} # 閮剛楝敺 B layer1_results: dict[str, Any] = {} feedback_loop_count: int = 0 MAX_FEEDBACK_LOOPS: int = 2 def _notify(agent: str, status: str, detail: dict) -> None: if progress_callback: try: progress_callback(agent, status, detail) except Exception: pass # callback 憭望銝敶梢 pipeline logger.info("=" * 60) logger.info(" ThreatHunter Pipeline v3.1 Start (Orchestrator-driven)") logger.info(" tech_stack : %s", tech_stack) logger.info(" input_type : %s", input_type) logger.info(" ENABLE_CRITIC: %s", ENABLE_CRITIC) logger.info("=" * 60) # (comment encoding corrupted) from checkpoint import recorder recorder.start_scan(f"pipe_{int(pipeline_start)}") # (comment encoding corrupted) # (comment encoding corrupted) l0_report: dict[str, Any] = {} sanitized_stack: str = tech_stack try: from input_sanitizer import sanitize_input, format_l0_report san_result = sanitize_input(tech_stack) l0_report = format_l0_report(san_result) if not san_result.safe: # (comment encoding corrupted) logger.warning("[L0] Input BLOCKED: %s", san_result.blocked_reason) _notify("input_sanitizer", "COMPLETE", { "status": "BLOCKED", "reason": san_result.blocked_reason, }) return { "executive_summary": f"頛詨亥◤摰券瞈曉冽蝯嚗{san_result.blocked_reason}", "blocked": True, "actions": {"urgent": [], "important": [], "resolved": []}, "risk_score": 0, "risk_trend": "+0", "pipeline_meta": { "pipeline_version": "3.1", "tech_stack": tech_stack, "stages_completed": 0, "stages_detail": {}, "enable_critic": ENABLE_CRITIC, "critic_verdict": "SKIPPED", "critic_score": 0.0, "duration_seconds": 0.0, "degradation": {"level": 5, "label": "INPUT_BLOCKED"}, "generated_at": datetime.now(timezone.utc).isoformat(), "l0_report": l0_report, }, } sanitized_stack = san_result.sanitized_input if san_result.truncated: logger.warning("[L0] Input truncated: %d %d chars", san_result.original_length, len(sanitized_stack)) logger.info("[L0] OK: type=%s l0_findings=%d hash=%s", san_result.input_type, len(san_result.l0_findings), san_result.input_hash) _notify("input_sanitizer", "COMPLETE", { "status": "SUCCESS", "input_type": san_result.input_type, "l0_warning_count": l0_report.get("l0_warning_count", 0), "truncated": san_result.truncated, }) recorder.stage_exit("input_sanitizer", "SUCCESS", { "input_type": san_result.input_type, "l0_warning_count": l0_report.get("l0_warning_count", 0), }, 0) except ImportError: # (comment encoding corrupted) logger.warning("[L0] input_sanitizer not available, skipping L0 filter") except Exception as e: # (comment encoding corrupted) logger.error("[L0] Sanitizer error (non-fatal): %s", e) # (comment encoding corrupted) _notify("orchestrator", "RUNNING", {}) orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack) scan_path = task_plan.get("path", "B") orch_detail = { "status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED", "scan_path": scan_path, "agents_to_run": task_plan.get("agents_to_run", []), "duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0, "l0_input_type": l0_report.get("input_type", "unknown"), } stages_detail["orchestrator"] = orch_detail completed_stages.append("orchestrator") _notify("orchestrator", "COMPLETE", orch_detail) recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]}) recorder.stage_exit("orchestrator", orch_detail.get("status", "SUCCESS"), { "scan_path": scan_path, "agents_to_run": task_plan.get("agents_to_run", []), }, orch_detail.get("duration_ms", 0)) logger.info("[PIPELINE] Scan path: %s", scan_path) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) extracted_packages: list[str] = [] if scan_path in ("A", "B"): parallel_agents = task_plan.get("parallel_layer1", []) if parallel_agents: _notify("layer1_parallel", "RUNNING", {"agents": parallel_agents}) layer1_results = _run_layer1_parallel(tech_stack, task_plan, _notify) _notify("layer1_parallel", "COMPLETE", { "agents_completed": list(layer1_results.keys()), }) # (comment encoding corrupted) if "intel_fusion" in layer1_results: extracted_packages = layer1_results["intel_fusion"].get("_extracted_packages", []) if extracted_packages: logger.info( "[PIPELINE] extracted_packages from layer1: %s", extracted_packages ) # (comment encoding corrupted) for agent_name, result in layer1_results.items(): is_degraded = result.get("_degraded", False) agent_detail = { "status": "DEGRADED" if is_degraded else "SUCCESS", "duration_ms": result.get("_duration_ms", 0), # (comment encoding corrupted) "_degraded": is_degraded, "_error": result.get("_error", "") if is_degraded else "", } # (comment encoding corrupted) if agent_name == "security_guard": agent_detail["functions_found"] = result.get("stats", {}).get("functions_found", 0) agent_detail["patterns_found"] = result.get("stats", {}).get("patterns_found", 0) agent_detail["injection_detected"] = result.get("injection_attempts_detected", False) # (comment encoding corrupted) elif agent_name == "intel_fusion": agent_detail["cves_scored"] = len(result.get("fusion_results", [])) stages_detail[agent_name] = agent_detail completed_stages.append(agent_name) recorder.stage_exit(agent_name, agent_detail["status"], agent_detail, agent_detail.get("duration_ms", 0)) # (comment encoding corrupted) if orch_ctx is not None: for agent_name, result in layer1_results.items(): try: orch_ctx.store_result(agent_name, result) except Exception: pass # (comment encoding corrupted) # (comment encoding corrupted) _notify("scout", "RUNNING", {}) rate_limiter.wait_if_needed("scout") # (comment encoding corrupted) # (comment encoding corrupted) scout_input: str if extracted_packages: from tools.package_extractor import format_packages_for_intel_fusion scout_input = format_packages_for_intel_fusion(extracted_packages) logger.info("[PIPELINE] Scout using extracted packages: %s", scout_input) else: scout_input = tech_stack logger.info("[PIPELINE] Scout using raw tech_stack (no packages extracted)") scout_output, scout_sl = stage_scout( scout_input, input_type=input_type, intel_fusion_result=layer1_results.get("intel_fusion"), ) # 保留原始 Layer 1 情報,方便 UI 與除錯觀察。 if "intel_fusion" in layer1_results: scout_output["intel_fusion_result"] = layer1_results["intel_fusion"] # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) # (comment encoding corrupted) _sg_code_patterns: list[dict] = [] # 函式級別變數,確保 final return 可存取 if "security_guard" in layer1_results: _sg_code_patterns = _build_code_patterns_summary( layer1_results["security_guard"] ) if _sg_code_patterns: scout_output["code_patterns"] = _sg_code_patterns logger.info( "[PIPELINE] v4.0: SG code_patterns injected into scout_output " "(%d patterns, Path=%s) — will be merged into final result", len(_sg_code_patterns), input_type, ) else: logger.debug("[PIPELINE] v4.0: SG returned no code_patterns (clean code path)") scout_detail = { "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", "vuln_count": len(scout_output.get("vulnerabilities", [])), "duration_ms": scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0, "packages_used": extracted_packages, } stages_detail["scout"] = scout_detail completed_stages.append("scout") _notify("scout", "COMPLETE", scout_detail) # (comment encoding corrupted) from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages}, skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"), input_type=input_type) recorder.stage_exit("scout", scout_detail.get("status", "SUCCESS"), scout_output, scout_detail.get("duration_ms", 0)) # (comment encoding corrupted) if scan_path == "C": logger.info("[PIPELINE] Path C: skipping Analyst + Critic direct to Advisor") _notify("analyst", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"}) _notify("critic", "COMPLETE", {"status": "SKIPPED", "reason": "path_C"}) # (comment encoding corrupted) analyst_output: dict[str, Any] = { "scan_id": scout_output.get("scan_id", "unknown"), "risk_score": 30, "risk_trend": "+0", "analysis": [], "_skipped": True, "_reason": "path_C_doc_scan", } critic_output: dict[str, Any] = { "verdict": "SKIPPED", "weighted_score": 60.0, "_skipped": True, } else: # (comment encoding corrupted) _notify("analyst", "RUNNING", {}) rate_limiter.wait_if_needed("analyst") analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type) analyst_detail = { "status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED", "risk_score": analyst_output.get("risk_score", 0), "duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0, } stages_detail["analyst"] = analyst_detail completed_stages.append("analyst") _notify("analyst", "COMPLETE", analyst_detail) # v3.7: stage_enter with skill_file + input_type from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP recorder.stage_enter("analyst", scout_output, skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"), input_type=input_type) recorder.stage_exit("analyst", analyst_detail.get("status", "SUCCESS"), analyst_output, analyst_detail.get("duration_ms", 0)) # (comment encoding corrupted) _notify("critic", "RUNNING", {}) rate_limiter.wait_if_needed("critic") critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type) critic_detail = { "status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED", "verdict": critic_output.get("verdict", "SKIPPED"), "score": critic_output.get("weighted_score", 0), "duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0, } stages_detail["critic"] = critic_detail completed_stages.append("critic") _notify("critic", "COMPLETE", critic_detail) # v3.7: stage_enter with skill_file + input_type from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP recorder.stage_enter("critic", analyst_output, skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"), input_type=input_type) recorder.stage_exit("critic", critic_detail.get("status", "SUCCESS"), critic_output, critic_detail.get("duration_ms", 0)) # (comment encoding corrupted) _notify("advisor", "RUNNING", {}) rate_limiter.wait_if_needed("advisor") advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type) advisor_detail = { "status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED", "urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])), "duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0, } stages_detail["advisor"] = advisor_detail completed_stages.append("advisor") _notify("advisor", "COMPLETE", advisor_detail) # v3.7: stage_enter with skill_file + input_type from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP recorder.stage_enter("advisor", analyst_output, skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"), input_type=input_type) recorder.stage_exit("advisor", advisor_detail.get("status", "SUCCESS"), advisor_output, advisor_detail.get("duration_ms", 0)) # (comment encoding corrupted) # (comment encoding corrupted) advisor_confidence = advisor_output.get("confidence", "HIGH") feedback_triggered = ( advisor_confidence in ("NEEDS_VERIFICATION", "LOW", "MEDIUM") and feedback_loop_count < MAX_FEEDBACK_LOOPS and scan_path != "D" # 脫迫頝臬 D ⊿擖 ) if feedback_triggered: feedback_loop_count += 1 logger.warning( "[PIPELINE] Advisor confidence=%s triggering Feedback Loop %d/%d", advisor_confidence, feedback_loop_count, MAX_FEEDBACK_LOOPS, ) _notify("feedback_loop", "RUNNING", { "loop": feedback_loop_count, "reason": f"confidence={advisor_confidence}", }) # (comment encoding corrupted) low_conf_cves = [ a.get("cve_id") for a in analyst_output.get("analysis", []) if a.get("chain_risk", {}).get("confidence") in ("NEEDS_VERIFICATION", "LOW") ] logger.info("[PIPELINE] Feedback Loop targeting CVEs: %s", low_conf_cves) # (comment encoding corrupted) if orch_ctx is not None: try: orch_ctx.feedback_loops = feedback_loop_count except Exception: pass _notify("feedback_loop", "COMPLETE", {"loop": feedback_loop_count, "cves": low_conf_cves}) # (comment encoding corrupted) orch_summary: dict = {} if orch_ctx is not None: try: from agents.orchestrator import finalize_orchestration orch_ctx.final_confidence = advisor_confidence orch_summary = finalize_orchestration(orch_ctx) except Exception as e: logger.warning("[PIPELINE] finalize_orchestration failed: %s", e) # (comment encoding corrupted) duration = round(time.time() - pipeline_start, 2) pipeline_meta = { "pipeline_version": "3.1", "tech_stack": tech_stack, "scan_path": scan_path, "stages_completed": len(completed_stages), "stages_detail": stages_detail, "enable_critic": ENABLE_CRITIC, "critic_verdict": critic_output.get("verdict", "SKIPPED"), "critic_score": critic_output.get("weighted_score", 0), "feedback_loops": feedback_loop_count, "duration_seconds": duration, "degradation": degradation_status.to_dict(), "orchestration": orch_summary, "layer1_agents": list(layer1_results.keys()), "generated_at": datetime.now(timezone.utc).isoformat(), } logger.info( "Pipeline v3.1 COMPLETE in %.1fs | path=%s | feedback_loops=%d", duration, scan_path, feedback_loop_count, ) recorder.end_scan("COMPLETE", duration) # ── Harness Layer 7: 強制注入 Security Guard CWE 佐證 ────────────────── # _sg_code_patterns 由 _build_code_patterns_summary() 生成,含 MITRE CWE 定義。 # Advisor LLM 從不接收 code_patterns,因此必須在 return 前直接合併。 # 確保無論掃描路徑(A/B/C)都能在 API response 中看到 CWE 佐證。 report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results) final_output: dict[str, Any] = { **advisor_output, **report_payload, "pipeline_meta": pipeline_meta, } if _sg_code_patterns: existing_cps = final_output.get("code_patterns_summary", []) final_output["code_patterns_summary"] = existing_cps + _sg_code_patterns logger.info( "[PIPELINE] Harness L7: %d CWE-enriched code patterns merged into final result", len(_sg_code_patterns), ) return final_output def run_pipeline_with_callback( tech_stack: str, progress_callback: Any = None, input_type: str = "pkg", ) -> dict[str, Any]: """ Canonical pipeline DAG: Orchestrator -> (Security Guard + Scout in parallel) -> Intel Fusion -> Analyst -> Critic -> Advisor. """ pipeline_start = time.time() completed_stages: list[str] = [] stages_detail: dict[str, Any] = {} layer1_results: dict[str, Any] = {} def _notify(agent: str, status: str, detail: dict) -> None: if not progress_callback: return try: progress_callback(agent, status, detail) except Exception as exc: logger.debug("[PIPELINE] progress callback ignored: %s", exc) def _remember_stage(agent: str, detail: dict[str, Any]) -> None: stages_detail[agent] = detail if agent not in completed_stages: completed_stages.append(agent) from checkpoint import recorder recorder.start_scan(f"pipe_{int(pipeline_start)}") l0_report: dict[str, Any] = {} sanitized_stack = tech_stack try: from input_sanitizer import sanitize_input, format_l0_report san_result = sanitize_input(tech_stack) l0_report = format_l0_report(san_result) if not san_result.safe: _notify("input_sanitizer", "COMPLETE", { "status": "BLOCKED", "reason": san_result.blocked_reason, }) duration = round(time.time() - pipeline_start, 2) return { "executive_summary": f"Input blocked: {san_result.blocked_reason}", "blocked": True, "actions": {"urgent": [], "important": [], "resolved": []}, "risk_score": 0, "risk_trend": "+0", "pipeline_meta": { "pipeline_version": "4.0", "tech_stack": tech_stack, "scan_path": "BLOCKED", "stages_completed": 0, "stages_detail": {}, "enable_critic": ENABLE_CRITIC, "critic_verdict": "SKIPPED", "critic_score": 0.0, "duration_seconds": duration, "degradation": {"level": 5, "label": "INPUT_BLOCKED"}, "generated_at": datetime.now(timezone.utc).isoformat(), "l0_report": l0_report, }, } sanitized_stack = san_result.sanitized_input input_type = _canonical_runtime_input_type(input_type, san_result.input_type) l0_detail = { "status": "SUCCESS", "input_type": san_result.input_type, "pipeline_input_type": input_type, "l0_warning_count": l0_report.get("l0_warning_count", 0), "truncated": san_result.truncated, } _remember_stage("input_sanitizer", l0_detail) _notify("input_sanitizer", "COMPLETE", l0_detail) recorder.stage_exit("input_sanitizer", "SUCCESS", l0_detail, 0) except Exception as exc: logger.warning("[L0] Sanitizer skipped: %s", exc) if input_type == "sql_review": from tools.sql_syntax_reviewer import review_sql_syntax _notify("sql_review", "RUNNING", {}) review = review_sql_syntax(sanitized_stack) sql_detail = { "status": "SUCCESS", "patterns_detected": review.get("summary", {}).get("patterns_detected", 0), "findings_count": review.get("summary", {}).get("total", 0), "requires_application_context": True, } _remember_stage("sql_review", sql_detail) _notify("sql_review", "COMPLETE", sql_detail) recorder.stage_exit("sql_review", "SUCCESS", sql_detail, 0) duration = round(time.time() - pipeline_start, 2) recorder.end_scan("COMPLETE", duration) return { "executive_summary": ( "SQL syntax review completed. Findings are payload/syntax evidence only; " "application source context is required before calling this a verified vulnerability." ), "actions": {"urgent": [], "important": [], "resolved": []}, "risk_score": 0, "risk_trend": "+0", "sql_syntax_review": review, "vulnerability_detail": [], "vulnerability_summary": { "total": 0, "critical": 0, "high": 0, "medium": 0, "low": 0, "new": 0, "new_since_last_scan": 0, }, "report_sources": { "vulnerability_detail": "sql_syntax_review", "layer1_state": "skipped", "external_scan_skipped": True, }, "code_patterns_summary": [], "pipeline_meta": { "pipeline_version": "4.0", "tech_stack": tech_stack, "scan_path": "C", "stages_completed": len(completed_stages), "stages_detail": stages_detail, "enable_critic": ENABLE_CRITIC, "critic_verdict": "SKIPPED", "critic_score": 0.0, "feedback_loops": 0, "duration_seconds": duration, "degradation": degradation_status.to_dict(), "orchestration": {}, "layer1_agents": [], "fusion_after_discovery": False, "l0_report": l0_report, "generated_at": datetime.now(timezone.utc).isoformat(), }, } _notify("orchestrator", "RUNNING", {}) orch_ctx, task_plan, orch_sl = stage_orchestrator(sanitized_stack) task_plan = _constrain_task_plan_by_input_type(task_plan, input_type) scan_path = task_plan.get("path", "B") orch_detail = { "status": "SUCCESS" if not task_plan.get("_degraded") else "DEGRADED", "scan_path": scan_path, "agents_to_run": task_plan.get("agents_to_run", []), "duration_ms": orch_sl.steps[-1].get("duration_ms", 0) if orch_sl.steps else 0, "l0_input_type": l0_report.get("input_type", "unknown"), "pipeline_input_type": input_type, "route_corrected": task_plan.get("_route_corrected"), } _remember_stage("orchestrator", orch_detail) _notify("orchestrator", "COMPLETE", orch_detail) recorder.stage_enter("orchestrator", {"tech_stack": sanitized_stack[:200]}) recorder.stage_exit("orchestrator", orch_detail["status"], orch_detail, orch_detail["duration_ms"]) extracted_packages: list[str] = [] scout_input = sanitized_stack scout_output: dict[str, Any] = {} scout_duration_ms = 0 sg_code_patterns: list[dict[str, Any]] = [] intel_fusion_result: dict[str, Any] = {} benchmark_context: dict[str, Any] | None = None parallel_agents = task_plan.get("parallel_layer1", []) if parallel_agents: _notify("layer1_parallel", "RUNNING", {"agents": parallel_agents}) layer1_results = _run_layer1_parallel( sanitized_stack, task_plan, _notify, input_type=input_type ) visible_agents = [name for name in layer1_results if not name.startswith("_")] _notify("layer1_parallel", "COMPLETE", {"agents_completed": visible_agents}) extracted_packages = layer1_results.get("_extracted_packages", []) if "security_guard" in layer1_results: sg_result = layer1_results["security_guard"] sg_code_patterns = _build_code_patterns_summary(sg_result) sg_detail = { "status": "SUCCESS" if not sg_result.get("_degraded") else "DEGRADED", "duration_ms": sg_result.get("_duration_ms", 0), "functions_found": sg_result.get("stats", {}).get("functions_found", 0), "patterns_found": sg_result.get("stats", {}).get("patterns_found", 0), "injection_detected": sg_result.get("injection_attempts_detected", False), "_degraded": sg_result.get("_degraded", False), "_error": sg_result.get("_error", "") if sg_result.get("_degraded") else "", } _remember_stage("security_guard", sg_detail) recorder.stage_exit("security_guard", sg_detail["status"], sg_detail, sg_detail["duration_ms"]) if "scout" in layer1_results: scout_output = layer1_results["scout"] scout_duration_ms = scout_output.get("_duration_ms", 0) scout_detail = { "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", "vuln_count": len(scout_output.get("vulnerabilities", [])), "duration_ms": scout_duration_ms, "packages_used": extracted_packages, "_degraded": scout_output.get("_degraded", False), "_error": scout_output.get("_error", "") if scout_output.get("_degraded") else "", } _remember_stage("scout", scout_detail) _notify("scout", "COMPLETE", scout_detail) if not scout_output: _notify("scout", "RUNNING", {}) rate_limiter.wait_if_needed("scout") scout_output, scout_sl = stage_scout(scout_input, input_type=input_type) scout_duration_ms = scout_sl.steps[-1].get("duration_ms", 0) if scout_sl.steps else 0 scout_detail = { "status": "SUCCESS" if not scout_output.get("_degraded") else "DEGRADED", "vuln_count": len(scout_output.get("vulnerabilities", [])), "duration_ms": scout_duration_ms, "packages_used": extracted_packages, } _remember_stage("scout", scout_detail) _notify("scout", "COMPLETE", scout_detail) if sg_code_patterns: scout_output["code_patterns"] = sg_code_patterns from agents.scout import SKILL_MAP as SCOUT_SKILL_MAP recorder.stage_enter("scout", {"tech_stack": scout_input[:200], "packages": extracted_packages}, skill_file=SCOUT_SKILL_MAP.get(input_type, "threat_intel.md"), input_type=input_type) recorder.stage_exit("scout", stages_detail["scout"]["status"], scout_output, scout_duration_ms) if task_plan.get("fusion_after_discovery", False): intel_input = _build_intel_fusion_input( scout_output=scout_output, sg_result=layer1_results.get("security_guard"), code_patterns=sg_code_patterns, extracted_packages=extracted_packages, tech_stack=sanitized_stack, input_type=input_type, ) intel_fusion_result, intel_sl = stage_intel_fusion(intel_input, _notify) layer1_results["intel_fusion"] = intel_fusion_result scout_output["intel_fusion_result"] = intel_fusion_result scout_output = _merge_intel_fusion_into_scout(scout_output, intel_fusion_result) intel_detail = { "status": "SUCCESS" if not intel_fusion_result.get("_degraded") else "DEGRADED", "cves_scored": len(intel_fusion_result.get("fusion_results", [])), "duration_ms": intel_sl.steps[-1].get("duration_ms", 0) if intel_sl.steps else 0, "_degraded": intel_fusion_result.get("_degraded", False), "_error": intel_fusion_result.get("_error", "") if intel_fusion_result.get("_degraded") else "", } _remember_stage("intel_fusion", intel_detail) if orch_ctx is not None: try: orch_ctx.store_result("intel_fusion", intel_fusion_result) except Exception as exc: logger.debug("[PIPELINE] Orchestration store skipped for Intel Fusion: %s", exc) if scan_path == "C": analyst_output = { "scan_id": scout_output.get("scan_id", "unknown"), "risk_score": 30, "risk_trend": "+0", "analysis": [], "_skipped": True, "_reason": "path_C_doc_scan", } critic_output = {"verdict": "SKIPPED", "weighted_score": 60.0, "_skipped": True} _remember_stage("analyst", {"status": "SKIPPED", "reason": "path_C"}) _remember_stage("critic", {"status": "SKIPPED", "reason": "path_C"}) _notify("analyst", "COMPLETE", stages_detail["analyst"]) _notify("critic", "COMPLETE", stages_detail["critic"]) else: _notify("analyst", "RUNNING", {}) rate_limiter.wait_if_needed("analyst") analyst_output, analyst_sl = stage_analyst(scout_output, input_type=input_type) benchmark_context = _build_dim11_benchmark_context( source=sanitized_stack, input_type=input_type, task_plan=task_plan, code_patterns=sg_code_patterns, scout_output=scout_output, extracted_packages=extracted_packages, ) if benchmark_context: analyst_output["benchmark_context"] = benchmark_context analyst_output["observed_cwe_categories"] = benchmark_context["observed_cwe_categories"] logger.info( "[DIM11] benchmark context attached before Critic: fixture=%s observed=%d expected=%d", benchmark_context.get("fixture"), len(benchmark_context.get("observed_cwe_categories", [])), len(benchmark_context.get("expected_cwe_categories", [])), ) analyst_detail = { "status": "SUCCESS" if not analyst_output.get("_degraded") else "DEGRADED", "risk_score": analyst_output.get("risk_score", 0), "duration_ms": analyst_sl.steps[-1].get("duration_ms", 0) if analyst_sl.steps else 0, } _remember_stage("analyst", analyst_detail) _notify("analyst", "COMPLETE", analyst_detail) from agents.analyst import SKILL_MAP as ANALYST_SKILL_MAP recorder.stage_enter("analyst", scout_output, skill_file=ANALYST_SKILL_MAP.get(input_type, "chain_analysis.md"), input_type=input_type) recorder.stage_exit("analyst", analyst_detail["status"], analyst_output, analyst_detail["duration_ms"]) _notify("critic", "RUNNING", {}) rate_limiter.wait_if_needed("critic") critic_output, critic_sl = stage_critic(analyst_output, input_type=input_type) critic_detail = { "status": "SUCCESS" if not critic_output.get("_degraded") else "DEGRADED", "verdict": critic_output.get("verdict", "SKIPPED"), "score": critic_output.get("weighted_score", 0), "duration_ms": critic_sl.steps[-1].get("duration_ms", 0) if critic_sl.steps else 0, } if critic_output.get("recall_challenge"): critic_detail["recall_challenge_verdict"] = critic_output["recall_challenge"].get("verdict") _remember_stage("critic", critic_detail) _notify("critic", "COMPLETE", critic_detail) from agents.critic import SKILL_MAP as CRITIC_SKILL_MAP recorder.stage_enter("critic", analyst_output, skill_file=CRITIC_SKILL_MAP.get(input_type, "debate_sop.md"), input_type=input_type) recorder.stage_exit("critic", critic_detail["status"], critic_output, critic_detail["duration_ms"]) _notify("advisor", "RUNNING", {}) rate_limiter.wait_if_needed("advisor") advisor_output, advisor_sl = stage_advisor(analyst_output, critic_output, input_type=input_type) advisor_detail = { "status": "SUCCESS" if not advisor_output.get("_degraded") else "DEGRADED", "urgent_count": len(advisor_output.get("actions", {}).get("urgent", [])), "duration_ms": advisor_sl.steps[-1].get("duration_ms", 0) if advisor_sl.steps else 0, } _remember_stage("advisor", advisor_detail) _notify("advisor", "COMPLETE", advisor_detail) from agents.advisor import SKILL_MAP as ADVISOR_SKILL_MAP recorder.stage_enter("advisor", analyst_output, skill_file=ADVISOR_SKILL_MAP.get(input_type, "action_report.md"), input_type=input_type) recorder.stage_exit("advisor", advisor_detail["status"], advisor_output, advisor_detail["duration_ms"]) duration = round(time.time() - pipeline_start, 2) try: orch_summary = {} if orch_ctx is not None: from agents.orchestrator import finalize_orchestration orch_ctx.final_confidence = advisor_output.get("confidence", "HIGH") orch_summary = finalize_orchestration(orch_ctx) except Exception as exc: logger.debug("[PIPELINE] finalize_orchestration skipped: %s", exc) orch_summary = {} pipeline_meta = { "pipeline_version": "4.0", "tech_stack": tech_stack, "scan_path": scan_path, "stages_completed": len(completed_stages), "stages_detail": stages_detail, "enable_critic": ENABLE_CRITIC, "critic_verdict": critic_output.get("verdict", "SKIPPED"), "critic_score": critic_output.get("weighted_score", 0), "feedback_loops": 0, "duration_seconds": duration, "degradation": degradation_status.to_dict(), "orchestration": orch_summary, "layer1_agents": [name for name in layer1_results if not name.startswith("_")], "fusion_after_discovery": bool(task_plan.get("fusion_after_discovery", False)), "l0_report": l0_report, "generated_at": datetime.now(timezone.utc).isoformat(), } if benchmark_context: pipeline_meta["benchmark_context"] = benchmark_context if critic_output.get("recall_challenge"): pipeline_meta["critic_recall_challenge"] = critic_output["recall_challenge"] if intel_fusion_result.get("evidence_contract"): pipeline_meta["intel_fusion_evidence_contract"] = intel_fusion_result["evidence_contract"] recorder.end_scan("COMPLETE", duration) report_payload = _build_scan_report_payload(scout_output, advisor_output, layer1_results) final_output: dict[str, Any] = { **advisor_output, **report_payload, "pipeline_meta": pipeline_meta, } if sg_code_patterns: existing_cps = final_output.get("code_patterns_summary", []) final_output["code_patterns_summary"] = existing_cps + sg_code_patterns if critic_output.get("recall_challenge"): final_output["critic_recall_challenge"] = critic_output["recall_challenge"] if critic_output.get("needs_rescan"): final_output["needs_rescan"] = True return final_output # ====================================================================== # (comment encoding corrupted) # ====================================================================== if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) if len(sys.argv) > 1: tech_stack = " ".join(sys.argv[1:]) else: tech_stack = "Django 4.2, Redis 7.0, PostgreSQL 16" print(f"\nThreatHunter - Scanning: {tech_stack}\n") result = run_pipeline(tech_stack) print("\n=== Pipeline Result ===") print(json.dumps(result, ensure_ascii=False, indent=2))