""" ThreatHunter Orchestrator Agent ================================ 職責:動態任務規劃 + Agent 分配 + 回饋迴路管理 架構依據:CrewAI Process.hierarchical + MacNet DAG 不規則拓撲 論文:arXiv:2406.07155 (MacNet) + LLM Discussion (arXiv:2405.06373) 邊界規則(AGENTS.md 合規): 本模組屬於 agents/ 層 可引用 tools/ (第1層) 和 config.py 不可引用 harness/constraints/ 或 harness/entropy/ 內容 """ from __future__ import annotations import json import logging import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, TYPE_CHECKING from config import ( SKILLS_DIR, SYSTEM_CONSTITUTION, degradation_status, get_llm, ) logger = logging.getLogger("threathunter.orchestrator") if TYPE_CHECKING: from crewai import Agent def _call_tool(tool_obj: Any, **kwargs: Any) -> Any: """相容 CrewAI Tool 的 run / invoke 呼叫介面。""" if hasattr(tool_obj, "run"): return tool_obj.run(**kwargs) if hasattr(tool_obj, "invoke"): return tool_obj.invoke(kwargs) return tool_obj(**kwargs) # ── 掃描路徑類型(MacNet 動態路由)───────────────────────────── class ScanPath(str, Enum): """動態任務路由路徑(對應 skills/orchestrator.md Step 2)""" PACKAGES_ONLY = "A" # 輕量:套件掃描 FULL_CODE = "B" # 完整:程式碼 + 文件 + 套件 DOCUMENTS_ONLY = "C" # 文件弱配置掃描 FEEDBACK_LOOP = "D" # Judge 回饋 → 補充分析 # ── Orchestrator 執行上下文(共享記憶)────────────────────────── @dataclass class OrchestrationContext: """ 跨 Agent 的共享短期記憶。 每次掃描建立一個實例,所有 Worker Agent 可讀寫。 """ scan_path: ScanPath = ScanPath.FULL_CODE agents_invoked: list[str] = field(default_factory=list) agents_skipped: list[str] = field(default_factory=list) shortcuts_taken: list[str] = field(default_factory=list) kev_hits: list[str] = field(default_factory=list) # CISA KEV 命中的 CVE feedback_loops: int = 0 max_feedback_loops: int = 2 api_health: dict[str, str] = field(default_factory=dict) intermediate_results: dict[str, Any] = field(default_factory=dict) final_confidence: str = "NEEDS_VERIFICATION" start_time: float = field(default_factory=time.time) def record_invocation(self, agent_name: str) -> None: """記錄 Agent 被呼叫""" self.agents_invoked.append(agent_name) logger.info("[ORCH] Agent invoked: %s", agent_name) def record_skip(self, agent_name: str, reason: str) -> None: """記錄 Agent 被跳過(MacNet Small-World 優化)""" self.agents_skipped.append(agent_name) logger.info("[ORCH] Agent skipped: %s (reason: %s)", agent_name, reason) def record_shortcut(self, shortcut: str) -> None: """記錄走了捷徑(MacNet Small-World 邊)""" self.shortcuts_taken.append(shortcut) logger.info("[ORCH] Shortcut taken: %s", shortcut) def record_kev_hit(self, cve_id: str) -> None: """記錄 CISA KEV 命中(觸發 Small-World 捷徑)""" self.kev_hits.append(cve_id) logger.warning("[ORCH][CRITICAL] KEV Hit: %s → triggering shortcut", cve_id) def store_result(self, agent_name: str, result: Any) -> None: """儲存 Worker 輸出到共享上下文""" self.intermediate_results[agent_name] = result def get_result(self, agent_name: str) -> Any: """取得 Worker 輸出""" return self.intermediate_results.get(agent_name) def elapsed_seconds(self) -> float: """計算執行時間""" return time.time() - self.start_time def to_summary(self) -> dict: """輸出執行摘要(給 main.py 和 UI)""" return { "scan_path": self.scan_path.value, "agents_invoked": self.agents_invoked, "agents_skipped": self.agents_skipped, "shortcuts_taken": self.shortcuts_taken, "kev_hits": self.kev_hits, "feedback_loops": self.feedback_loops, "final_confidence": self.final_confidence, "elapsed_seconds": round(self.elapsed_seconds(), 1), } # ── 輸入分類器(確定性程式碼,非 LLM)───────────────────────── def classify_input(user_input: str | dict) -> ScanPath: """ 根據用戶輸入類型決定掃描路徑。 這是確定性邏輯,不需要 LLM 推理。 對應 skills/orchestrator.md Step 2。 Args: user_input: 用戶提交的掃描請求 Returns: ScanPath 枚舉值 """ # 支援 dict 格式(含 type 欄位) if isinstance(user_input, dict): input_type = user_input.get("type", "mixed") if input_type == "packages": return ScanPath.PACKAGES_ONLY elif input_type in ("document", "config"): return ScanPath.DOCUMENTS_ONLY elif input_type == "feedback": return ScanPath.FEEDBACK_LOOP return ScanPath.FULL_CODE # 純字串:啟發式分類 text = str(user_input).lower() # 判斷是否是套件清單(無程式碼) if all(tok in text for tok in ["==", "\n"]) and "def " not in text and "class " not in text: return ScanPath.PACKAGES_ONLY # 判斷是否是文件類型 doc_extensions = [".env", ".yaml", ".yml", ".json", ".ini", ".toml", "dockerfile"] if any(ext in text for ext in doc_extensions) and "def " not in text: return ScanPath.DOCUMENTS_ONLY return ScanPath.FULL_CODE # ── MacNet Small-World 捷徑決策器 ─────────────────────────────── def check_shortcuts(ctx: OrchestrationContext, scan_result: dict) -> list[str]: """ 檢查是否有 MacNet Small-World 捷徑可以走。 (不規則拓撲的核心:有條件的長程邊) Args: ctx: 當前執行上下文 scan_result: 最近的掃描結果 Returns: 可走的捷徑列表 """ shortcuts = [] # 捷徑 1:CISA KEV 命中 → Intel Fusion 直接通知 Analyst(跳過 Scout 重新評分) kev_hits = scan_result.get("kev_hits", []) if kev_hits: for cve_id in kev_hits: ctx.record_kev_hit(cve_id) shortcuts.append("kev_to_analyst_direct") logger.warning("[SHORTCUT] KEV hits detected, bypassing Scout re-scoring") # 捷徑 2:L0 正則無可疑點 → 跳過 L2 LLM(省 Token) l0_findings = scan_result.get("l0_findings", []) if len(l0_findings) == 0: shortcuts.append("skip_l2_llm") ctx.record_shortcut("skip_l2_llm") logger.info("[SHORTCUT] L0 found 0 suspicious patterns, skipping L2 LLM") # 捷徑 3:Debate 三方第一輪一致 → 跳過 Phase 2(省 6 次 LLM 呼叫) debate_consensus = scan_result.get("debate_consensus", False) if debate_consensus: shortcuts.append("debate_phase2_skipped") ctx.record_shortcut("debate_phase2_skipped") logger.info("[SHORTCUT] Debate consensus reached in Phase 1, skipping Phase 2") # 捷徑 4:所有 CVE 均為低危(CVSS < 4.0)→ 跳過 Debate Cluster vulnerabilities = scan_result.get("vulnerabilities", []) high_risk_vulns = [v for v in vulnerabilities if float(v.get("cvss_score", 0)) >= 4.0] if vulnerabilities and not high_risk_vulns: shortcuts.append("skip_debate_all_low") ctx.record_shortcut("skip_debate_all_low") logger.info("[SHORTCUT] All vulnerabilities low risk, skipping Debate Cluster") return shortcuts # ── Orchestrator Agent 建構器 ──────────────────────────────────── def build_orchestrator_agent() -> "Agent": """ 建立 Orchestrator Agent(CrewAI Manager)。 使用高推理 LLM,負責動態任務規劃和 Agent 分配。 對應 CrewAI Process.hierarchical 的 manager_agent。 Returns: CrewAI Agent 實例 """ skill_path = SKILLS_DIR / "orchestrator.md" skill_content = skill_path.read_text(encoding="utf-8") if skill_path.exists() else "" backstory = f"""You are ThreatHunter's command-level security operations manager. You dynamically plan the task graph, assign worker agents, review output quality, and manage feedback loops. You do not perform detailed vulnerability analysis yourself; you ensure the whole system operates efficiently and accurately. {SYSTEM_CONSTITUTION} --- Orchestrator SOP --- {skill_content} """ # 延遲匯入 CrewAI,避免純路由 / dataclass / 測試路徑在 import 階段觸發本機儲存副作用。 from crewai import Agent llm = get_llm() return Agent( role="Security Operations Manager (Orchestrator)", goal=( "Dynamically plan the scan task graph, assign the best worker agents by input type, " "review each agent's output quality, and trigger a feedback loop when confidence is insufficient." ), backstory=backstory, llm=llm, verbose=True, allow_delegation=True, # CrewAI Hierarchical 核心:允許委派任務 max_iter=8, # Manager 最多 8 次迭代(防止無限循環) ) # ── Orchestration 主函式 ───────────────────────────────────────── def run_orchestration( user_input: str | dict, worker_results: dict[str, Any] | None = None, feedback_from_judge: dict | None = None, ) -> tuple[OrchestrationContext, dict]: """ 執行 Orchestrator 的任務規劃邏輯。 這個函式實作 skills/orchestrator.md 的完整 SOP。 不直接使用 LLM(規劃邏輯是確定性的),只在必要時呼叫 Agent。 Args: user_input: 用戶的掃描請求 worker_results: 已完成的 Worker 輸出(可選,用於捷徑檢查) feedback_from_judge: Judge 的回饋訊息(Feedback Loop 觸發時) Returns: (OrchestrationContext, task_plan_dict) """ from tools.memory_tool import read_memory logger.info("[ORCH] Starting orchestration...") # Step 1:建立執行上下文 ctx = OrchestrationContext() # Step 1a:讀取全局歷史狀態 try: history_raw = _call_tool(read_memory, agent_name="orchestrator") history = json.loads(history_raw) if history_raw else {} ctx.api_health = history.get("api_health", {}) logger.info("[ORCH] Historical API health loaded: %s", ctx.api_health) except Exception as e: logger.warning("[ORCH] Could not load orchestrator memory: %s", e) # Step 1b:若有 Feedback Loop 請求 if feedback_from_judge: ctx.scan_path = ScanPath.FEEDBACK_LOOP ctx.feedback_loops += 1 logger.info( "[ORCH] Feedback loop triggered (%d/%d): %s", ctx.feedback_loops, ctx.max_feedback_loops, feedback_from_judge.get("specific_question", "") ) # 超過上限 → 強制輸出 if ctx.feedback_loops > ctx.max_feedback_loops: logger.warning("[ORCH] Max feedback loops reached, forcing output with NEEDS_VERIFICATION") return ctx, { "action": "force_output", "confidence": "NEEDS_VERIFICATION", "reason": f"Max feedback loops ({ctx.max_feedback_loops}) reached", "target_cves": feedback_from_judge.get("target_cves", []), } # Step 2:輸入分類 → 決定掃描路徑 if not feedback_from_judge: ctx.scan_path = classify_input(user_input) logger.info("[ORCH] Scan path determined: %s", ctx.scan_path.value) # Step 3:SmallWorld 捷徑檢查(若有中間結果) shortcuts = [] if worker_results: shortcuts = check_shortcuts(ctx, worker_results) # Step 4:根據路徑建立任務規劃 task_plan = _build_task_plan(ctx, shortcuts, feedback_from_judge) logger.info( "[ORCH] Task plan ready | path=%s | agents=%s | shortcuts=%s", ctx.scan_path.value, task_plan.get("agents_to_run", []), shortcuts, ) return ctx, task_plan def _build_task_plan( ctx: OrchestrationContext, shortcuts: list[str], feedback: dict | None, ) -> dict: """ 根據掃描路徑和捷徑建立任務規劃字典。 對應 skills/orchestrator.md 的三條路徑設計。 Args: ctx: 執行上下文 shortcuts: 已確定的捷徑列表 feedback: Judge 回饋(Feedback Loop 時) Returns: task_plan dict,包含要啟動的 Agent 順序和並行組 """ skip_debate = "skip_debate_all_low" in shortcuts skip_l2_llm = "skip_l2_llm" in shortcuts kev_shortcut = "kev_to_analyst_direct" in shortcuts if ctx.scan_path == ScanPath.PACKAGES_ONLY: # 路徑 A:輕量套件掃描 ctx.record_skip("security_guard", "no code input") ctx.record_skip("doc_scanner", "no documents") return { "path": "A", "parallel_layer1": ["intel_fusion"], # 只有情報融合 "layer2": ["scout"], "layer3": ["analyst"] if not skip_debate else [], "debate_cluster": not skip_debate, "judge": True, "skip_l2_llm": True, # 套件掃描不需要 L2 LLM "kev_shortcut": kev_shortcut, "agents_to_run": ["intel_fusion", "scout", "analyst", "debate", "judge"], } elif ctx.scan_path == ScanPath.DOCUMENTS_ONLY: # 路徑 C:文件弱配置掃描 ctx.record_skip("security_guard", "documents don't need LLM isolation") ctx.record_skip("analyst", "doc scanning doesn't need chain analysis") ctx.record_skip("debate_cluster", "doc findings don't need debate") return { "path": "C", "parallel_layer1": ["doc_scanner", "intel_fusion"], "layer2": ["scout"], "layer3": [], "debate_cluster": False, "judge": True, "skip_l2_llm": True, "kev_shortcut": False, "agents_to_run": ["doc_scanner", "intel_fusion", "scout", "judge"], } elif ctx.scan_path == ScanPath.FEEDBACK_LOOP: # 路徑 D:精準補充分析(不重跑整個 Pipeline) target_cves = feedback.get("target_cves", []) if feedback else [] missing_data = feedback.get("missing_data", []) if feedback else [] return { "path": "D", "parallel_layer1": ["intel_fusion"], # 只補充情報 "layer2": [], # 跳過 Scout(已有結果) "layer3": ["analyst"], # 只分析目標 CVE "debate_cluster": True, "judge": True, "targeted_cves": target_cves, "missing_data": missing_data, "skip_l2_llm": skip_l2_llm, "kev_shortcut": kev_shortcut, "agents_to_run": ["intel_fusion", "analyst", "debate", "judge"], } else: # 路徑 B:完整程式碼掃描(預設) return { "path": "B", "parallel_layer1": [ # MacNet Layer 1:並行 "security_guard", "intel_fusion", "l0_l1_scanner", ], "layer2": ["scout"], # MacNet Layer 2:合成 "layer3": ["analyst"] if not skip_debate else [], # MacNet Layer 3:連鎖 "debate_cluster": not skip_debate, # MacNet Layer 4:ColMAD "judge": True, # MacNet Layer 5:裁決 "skip_l2_llm": skip_l2_llm, "kev_shortcut": kev_shortcut, "agents_to_run": [ "security_guard", "intel_fusion", "l0_l1_scanner", "scout", "analyst", "debate", "judge", ], } # ── 結果品質審閱(CrewAI Hierarchical 的 Manager 審閱機制)─────── def review_worker_output(agent_name: str, output: Any, ctx: OrchestrationContext) -> tuple[bool, str]: """ Manager 審閱 Worker 輸出品質。 對應 CrewAI Hierarchical 中 Manager 的審閱機制。 Args: agent_name: 輸出的 Agent 名稱 output: Worker 的輸出(str 或 dict) ctx: 當前執行上下文 Returns: (is_acceptable: bool, issue_description: str) """ # 嘗試解析 JSON if isinstance(output, str): try: output_dict = json.loads(output) except json.JSONDecodeError: return False, f"{agent_name}: output is not valid JSON" else: output_dict = output # 各 Agent 的品質檢查標準 quality_checks = { "security_guard": lambda o: ( "functions" in o and "patterns" in o, "missing functions or patterns in extraction" ), "intel_fusion": lambda o: ( "fusion_results" in o and len(o["fusion_results"]) > 0, "empty fusion_results" ), "scout": lambda o: ( "vulnerabilities" in o, "missing vulnerabilities array" ), "analyst": lambda o: ( "analysis" in o and "risk_score" in o, "missing analysis or risk_score" ), "debate": lambda o: ( "debate_record" in o and "weighted_score" in o, "missing debate_record or weighted_score" ), "judge": lambda o: ( "confidence" in o, "missing confidence field" ), } check = quality_checks.get(agent_name) if check is None: return True, "" # 未知 Agent,放行 is_ok, issue = check(output_dict) if not is_ok: logger.warning("[ORCH][REVIEW] %s output rejected: %s", agent_name, issue) return False, issue # 儲存通過審閱的結果到共享上下文 ctx.store_result(agent_name, output_dict) logger.info("[ORCH][REVIEW] %s output accepted", agent_name) return True, "" # ── 執行結束:寫入 Orchestration 摘要 ─────────────────────────── def finalize_orchestration(ctx: OrchestrationContext) -> dict: """ 掃描結束時,寫入執行摘要到記憶,輸出給 main.py。 Args: ctx: 最終執行上下文 Returns: orchestration_summary dict """ summary = ctx.to_summary() # 寫入長期記憶(包含 API 健康狀態,供下次 Intel Fusion 讀取) try: from tools.memory_tool import write_memory intel_result = ctx.get_result("intel_fusion") or {} api_health = intel_result.get("api_health_summary", {}) memory_payload = json.dumps({ "api_health": api_health, "last_scan_path": summary["scan_path"], "last_shortcuts": summary["shortcuts_taken"], "last_elapsed_s": summary["elapsed_seconds"], }) _call_tool(write_memory, agent_name="orchestrator", data=memory_payload) logger.info("[ORCH] Orchestration summary written to memory") except Exception as e: logger.warning("[ORCH] Could not write orchestration memory: %s", e) logger.info( "[ORCH] Done | path=%s | agents=%d | shortcuts=%d | loops=%d | confidence=%s | time=%.1fs", summary["scan_path"], len(summary["agents_invoked"]), len(summary["shortcuts_taken"]), summary["feedback_loops"], summary["final_confidence"], summary["elapsed_seconds"], ) return summary