| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from typing import Any |
|
|
| from crewai import Agent, Task, Crew, Process |
|
|
| from config import SYSTEM_CONSTITUTION, get_llm, degradation_status |
|
|
| logger = logging.getLogger("ThreatHunter.debate_engine") |
|
|
| |
| |
| |
| RISK_LEVELS: dict[str, int] = { |
| "LOW": 1, |
| "MEDIUM": 2, |
| "HIGH": 3, |
| "CRITICAL": 4, |
| } |
|
|
| MAX_DEBATE_ROUNDS = 3 |
|
|
|
|
| |
| |
| |
|
|
| def _build_judge_agent() -> Agent: |
| """ |
| Judge sub-agent๏ผ็จ็ซ็ฌฌไธๆน่ฃๆฑบ่
ใ |
| |
| ่จญ่จๅๅ๏ผDu et al. 2023๏ผ๏ผ |
| - ไธๆฏ Analyst ไนไธๆฏ Critic ็ๅปถไผธ |
| - ๆถๅฐๅฎๆด่พฏ่ซ็ด้ๅพ๏ผ้ธๆๆๆ้่ผฏๆฏๆ็็ซๅ ด |
| - ๅฎๅ
จๆง้ ๅๅๅไฟๅฎ๏ผๅฏงๅฏ้ซไผฐ้ขจ้ช๏ผ |
| """ |
| return Agent( |
| role="Security Arbitration Judge", |
| goal=( |
| "Review the complete debate history between Analyst and Critic. " |
| "Select the most logically supported risk assessment. " |
| "In case of equal evidence, err on the side of caution (higher risk). " |
| "Output a final JSON verdict." |
| ), |
| backstory=( |
| f"{SYSTEM_CONSTITUTION}\n\n" |
| "You are an impartial security arbitration judge. You were not involved in the debate. " |
| "Your task is to read both sides' arguments and render a final, binding verdict. " |
| "You must cite which round's argument was most persuasive and why. " |
| "In security contexts, when evidence is ambiguous, choose the MORE SEVERE rating." |
| ), |
| llm=get_llm(), |
| verbose=True, |
| max_iter=3, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class DebateEngine: |
| """ |
| ๅฏฆไฝ Du et al. (2023) Multiagent Debate ๆฉๅถใ |
| |
| ๆต็จ๏ผ |
| Round 1: Analyst ๆๅบๅๅง็ซๅ ด โ Critic ่ณช็ |
| Round 2: Analyst ๆดๆฐ็ซๅ ด๏ผๅซ Critic ๅ้ฅ๏ผโ Critic ๅ่ฉ |
| Round 3: Analyst ๆ็ต็ซๅ ด โ Critic ๆ็ต่ฉๅค |
| Final: ่ฅ 3 ่ผชๅพไป็กๅ
ฑ่ญ โ Judge sub-agent ไปฒ่ฃ |
| """ |
|
|
| def __init__(self, max_rounds: int = MAX_DEBATE_ROUNDS): |
| self.max_rounds = max_rounds |
| self.min_rounds_with_findings = min(2, max_rounds) |
|
|
| def run_debate( |
| self, |
| analyst_output: dict[str, Any], |
| input_type: str = "pkg", |
| on_progress: Any = None, |
| ) -> dict[str, Any]: |
| """ |
| ๅท่กๅฎๆด่พฏ่ซๆต็จใ |
| |
| Args: |
| analyst_output: Analyst ็ๅๅงๅๆ็ตๆ๏ผๆฅ่ช run_analyst_pipeline๏ผ |
| input_type: ่ผธๅ
ฅ้กๅ๏ผpkg/code/config ็ญ๏ผๅฝฑ้ฟ Critic skill ้ธๆ๏ผ |
| on_progress: SSE ้ฒๅบฆๅ่ชฟ |
| |
| Returns: |
| ๆ็ต่ฃๆฑบ็ตๆ๏ผๆ ผๅผๅ run_critic_pipeline ่ผธๅบ๏ผ |
| """ |
| from agents.critic import run_critic_pipeline |
|
|
| t0 = time.time() |
| history: list[dict[str, Any]] = [] |
| current_analyst_output = analyst_output |
| consensus = False |
| consensus_round = 0 |
| early_stop_reason = "" |
| final_critic_result: dict[str, Any] = {} |
| has_findings = self._has_findings(analyst_output) |
|
|
| logger.info("[DEBATE] Starting %d-round debate (Du et al. 2023)", self.max_rounds) |
|
|
| if on_progress: |
| try: |
| on_progress("debate", "RUNNING", {"step": "starting", "max_rounds": self.max_rounds}) |
| except Exception: |
| pass |
|
|
| for round_num in range(1, self.max_rounds + 1): |
| logger.info("[DEBATE] Round %d/%d starting", round_num, self.max_rounds) |
|
|
| if on_progress: |
| try: |
| on_progress("debate", "RUNNING", { |
| "step": f"round_{round_num}", |
| "round": round_num, |
| "max_rounds": self.max_rounds, |
| }) |
| except Exception: |
| pass |
|
|
| |
| |
| round_context = dict(current_analyst_output) |
| round_context["_debate_round"] = round_num |
| round_context["_debate_max_rounds"] = self.max_rounds |
| if history: |
| round_context["_prev_critic_challenges"] = history[-1].get("critic", {}).get("challenges", []) |
|
|
| try: |
| critic_result = run_critic_pipeline(round_context, input_type=input_type) |
| except Exception as e: |
| logger.warning("[DEBATE] Round %d Critic failed: %s โ using maintain verdict", round_num, e) |
| critic_result = { |
| "verdict": "MAINTAIN", |
| "weighted_score": 70.0, |
| "challenges": [], |
| "reasoning": f"Critic failed in round {round_num}: {e}", |
| "_degraded": True, |
| } |
|
|
| history.append({ |
| "round": round_num, |
| "analyst": current_analyst_output, |
| "critic": critic_result, |
| }) |
|
|
| final_critic_result = critic_result |
|
|
| |
| if self._check_consensus(current_analyst_output, critic_result): |
| if has_findings and round_num < self.min_rounds_with_findings and not critic_result.get("no_challenge"): |
| logger.info( |
| "[DEBATE] Consensus detected at round %d, continuing to minimum %d rounds because findings exist", |
| round_num, |
| self.min_rounds_with_findings, |
| ) |
| if round_num < self.max_rounds: |
| current_analyst_output = self._analyst_rebuttal( |
| current_analyst_output, critic_result, round_num |
| ) |
| continue |
| consensus = True |
| consensus_round = round_num |
| early_stop_reason = "consensus_after_min_rounds" if has_findings else "no_findings" |
| logger.info( |
| "[DEBATE] Consensus reached at round %d | " |
| "analyst_risk=%s | critic_verdict=%s", |
| round_num, |
| self._get_analyst_risk(current_analyst_output), |
| critic_result.get("verdict", "UNKNOWN"), |
| ) |
| break |
|
|
| |
| if round_num < self.max_rounds: |
| current_analyst_output = self._analyst_rebuttal( |
| current_analyst_output, critic_result, round_num |
| ) |
|
|
| |
| elapsed_ms = int((time.time() - t0) * 1000) |
|
|
| if consensus: |
| logger.info( |
| "[DEBATE] โ
Consensus after %d/%d rounds | elapsed=%dms", |
| consensus_round, self.max_rounds, elapsed_ms, |
| ) |
| result = dict(final_critic_result) |
| result["_debate_meta"] = { |
| "consensus": True, |
| "consensus_round": consensus_round, |
| "total_rounds": round_num, |
| "elapsed_ms": elapsed_ms, |
| "method": "multiagent_debate_Du2023", |
| "early_stop_reason": early_stop_reason, |
| "rounds": self._summarize_rounds(history), |
| } |
| return result |
| else: |
| logger.warning( |
| "[DEBATE] โ ๏ธ No consensus after %d rounds โ invoking Judge sub-agent", |
| self.max_rounds, |
| ) |
| return self._judge_verdict(history, elapsed_ms) |
|
|
| |
|
|
| def _check_consensus(self, analyst_output: dict, critic_result: dict) -> bool: |
| """ |
| ๅ
ฑ่ญๅคๅฎ๏ผ้ขจ้ช็ญ็ด็ธๅทฎ โค 1 ็ด่ฆ็บๅ
ฑ่ญใ |
| |
| CRITICAL vs HIGH โ ไธๅ
ฑ่ญ๏ผๅทฎ 1 ็ด๏ผ่ฆ็บ้็๏ผไฟ็่พฏ่ซ๏ผ |
| CRITICAL vs CRITICAL โ ๅ
ฑ่ญ |
| HIGH vs MEDIUM โ ไธๅ
ฑ่ญ |
| HIGH vs HIGH โ ๅ
ฑ่ญ |
| Critic verdict=MAINTAIN โ ็ดๆฅๅ
ฑ่ญ๏ผCritic ๅๆ Analyst๏ผ |
| """ |
| |
| verdict = critic_result.get("verdict", "MAINTAIN") |
| if verdict == "MAINTAIN": |
| return True |
|
|
| |
| if verdict == "DOWNGRADE": |
| analyst_risk = self._get_analyst_risk(analyst_output) |
| a_level = RISK_LEVELS.get(analyst_risk, 2) |
| |
| score = critic_result.get("weighted_score", 70) |
| if score >= 80: |
| return True |
| return False |
|
|
| |
| score = critic_result.get("weighted_score", 70) |
| return score >= 75 |
|
|
| def _has_findings(self, analyst_output: dict[str, Any]) -> bool: |
| """ๅคๆท Analyst ๆฏๅฆ็็ๆ็ผ็พ้
๏ผๆ็ผ็พๆ่ณๅฐ้่ฆ็ฌฌไบ่ผชไบคๅ่ณช็ใ""" |
| for key in ("analysis", "vulnerabilities", "code_patterns", "code_patterns_summary"): |
| value = analyst_output.get(key) |
| if isinstance(value, list) and len(value) > 0: |
| return True |
| return False |
|
|
| def _summarize_rounds(self, history: list[dict[str, Any]]) -> list[dict[str, Any]]: |
| """็ข็่ผ้ๅๅๆ่ฆ๏ผไพ UI/ๆธฌ่ฉฆ็ขบ่ช่พฏ่ซๆฒๆ่ขซ็ฌฌไธ่ผชๅๆใ""" |
| rounds = [] |
| for entry in history: |
| critic = entry.get("critic", {}) |
| analyst = entry.get("analyst", {}) |
| rounds.append({ |
| "round": entry.get("round"), |
| "analyst_risk": self._get_analyst_risk(analyst), |
| "critic_verdict": critic.get("verdict"), |
| "critic_score": critic.get("weighted_score"), |
| "challenge_count": len(critic.get("challenges", [])), |
| }) |
| return rounds |
|
|
| def _get_analyst_risk(self, analyst_output: dict) -> str: |
| """ๅพ Analyst ่ผธๅบๆๅๆด้ซ้ขจ้ช็ญ็ด""" |
| analysis = analyst_output.get("analysis", []) |
| if not analysis: |
| return "UNKNOWN" |
| |
| max_level = 0 |
| max_name = "UNKNOWN" |
| for entry in analysis: |
| sev = entry.get("severity") or entry.get("adjusted_risk", "MEDIUM") |
| level = RISK_LEVELS.get(sev, 0) |
| if level > max_level: |
| max_level = level |
| max_name = sev |
| return max_name |
|
|
| @staticmethod |
| def _normalize_challenge(challenge: Any) -> dict[str, str]: |
| """Support both string[] contracts and object-style challenge records.""" |
| if isinstance(challenge, dict): |
| return { |
| "type": str(challenge.get("type") or "challenge"), |
| "description": str(challenge.get("description") or challenge.get("text") or challenge), |
| } |
| return {"type": "challenge", "description": str(challenge)} |
|
|
| def _analyst_rebuttal( |
| self, |
| analyst_output: dict, |
| critic_result: dict, |
| round_num: int, |
| ) -> dict: |
| """ |
| Analyst ๆดๆฐ็ซๅ ด๏ผ็ฐกๅ็๏ผๆณจๅ
ฅ Critic ๆๆฐไธฆไฟฎๆน reasoning๏ผ |
| |
| ๅจๅฎๆดๅฏฆไฝไธญ๏ผ้่ฃกๆ่ฉฒ้ๆฐๅผๅซ Analyst LLMใ |
| ็ฎๅ็บ่ผ้ๅฏฆไฝ๏ผ็ดๆฅๅจ่ผธๅบไธญๆณจๅ
ฅ Critic ๅ้ฅ๏ผ |
| ่ฎไธไธ่ผช็ Critic ็ๅฐๆดๆฐๅพ็ไธไธๆใ |
| """ |
| updated = dict(analyst_output) |
| challenges = [ |
| self._normalize_challenge(challenge) |
| for challenge in critic_result.get("challenges", []) |
| ] |
| updated["_critic_challenges_r"] = { |
| "round": round_num, |
| "challenges": challenges, |
| "critic_verdict": critic_result.get("verdict"), |
| "critic_score": critic_result.get("weighted_score", 70), |
| } |
| |
| if updated.get("analysis"): |
| for entry in updated["analysis"]: |
| if "reasoning" in entry: |
| entry["reasoning"] = ( |
| f"[Round {round_num} rebuttal after Critic verdict={critic_result.get('verdict')}: " |
| f"challenges={[c.get('type') for c in challenges[:3]]}] " |
| + entry["reasoning"] |
| ) |
| logger.info( |
| "[DEBATE] Round %d analyst rebuttal: addressed %d challenges", |
| round_num, len(challenges), |
| ) |
| return updated |
|
|
| def _judge_verdict( |
| self, |
| debate_history: list[dict], |
| elapsed_ms: int, |
| ) -> dict[str, Any]: |
| """ |
| Judge sub-agent ไปฒ่ฃ๏ผ็กๅ
ฑ่ญๆ็ฑ็ฌฌไธๆน่ฃๆฑบใ |
| |
| ๅๅ๏ผDu et al. 2023 + ๅฎๅ
จๆงไฟๅฎๅๅ๏ผ๏ผ |
| - ้ฑ่ฎๅฎๆด่พฏ่ซ็ด้ |
| - ้ธๆๆๆ้่ผฏๆฏๆ็็ซๅ ด |
| - ่ฅ่ญๆ็ธ็ถ๏ผ้ธ MORE SEVERE๏ผๅฎๅ
จๆงๅไฟๅฎ๏ผ |
| """ |
| t_judge = time.time() |
| logger.info("[DEBATE] Invoking Judge sub-agent...") |
|
|
| |
| debate_summary = self._format_debate_history(debate_history) |
|
|
| try: |
| judge_agent = _build_judge_agent() |
|
|
| task_desc = ( |
| f"DEBATE HISTORY ({len(debate_history)} rounds):\n\n" |
| f"{debate_summary}\n\n" |
| f"No consensus was reached after {len(debate_history)} rounds.\n\n" |
| f"Your task:\n" |
| f"1. Review all rounds of argument and counter-argument\n" |
| f"2. Identify which side presented stronger evidence\n" |
| f"3. If evidence is equal, choose the HIGHER risk level (security-conservative principle)\n" |
| f"4. Output a JSON verdict with:\n" |
| f" {{\"verdict\": \"MAINTAIN|DOWNGRADE|ESCALATE\", " |
| f"\"weighted_score\": 0-100, " |
| f"\"reasoning\": \"...\", " |
| f"\"winning_round\": 1-3, " |
| f"\"judge_note\": \"...\"}}\n\n" |
| f"Output ONLY the JSON, no other text." |
| ) |
|
|
| task = Task( |
| description=task_desc, |
| expected_output="Pure JSON judge verdict", |
| agent=judge_agent, |
| ) |
|
|
| crew = Crew( |
| agents=[judge_agent], |
| tasks=[task], |
| process=Process.sequential, |
| verbose=False, |
| ) |
|
|
| result_str = str(crew.kickoff()).strip() |
|
|
| |
| import re |
| if "```json" in result_str: |
| result_str = result_str.split("```json")[1].split("```")[0].strip() |
| elif "```" in result_str: |
| parts = result_str.split("```") |
| if len(parts) >= 3: |
| result_str = parts[1].strip() |
|
|
| judge_verdict = json.loads(result_str) |
| judge_elapsed = int((time.time() - t_judge) * 1000) |
|
|
| logger.info( |
| "[DEBATE] Judge verdict: %s (score=%s) in %dms", |
| judge_verdict.get("verdict"), |
| judge_verdict.get("weighted_score"), |
| judge_elapsed, |
| ) |
|
|
| judge_verdict["_debate_meta"] = { |
| "consensus": False, |
| "total_rounds": len(debate_history), |
| "judge_invoked": True, |
| "judge_elapsed_ms": judge_elapsed, |
| "total_elapsed_ms": elapsed_ms + judge_elapsed, |
| "method": "multiagent_debate_Du2023_with_judge", |
| "rounds": self._summarize_rounds(debate_history), |
| } |
| return judge_verdict |
|
|
| except Exception as e: |
| logger.error("[DEBATE] Judge sub-agent failed: %s โ falling back to last Critic result", e) |
| degradation_status.degrade("DebateJudge", str(e)) |
|
|
| |
| last_critic = debate_history[-1].get("critic", {}) if debate_history else {} |
| fallback = dict(last_critic) |
| fallback["verdict"] = "MAINTAIN" |
| fallback["_debate_meta"] = { |
| "consensus": False, |
| "total_rounds": len(debate_history), |
| "judge_invoked": True, |
| "judge_failed": True, |
| "judge_error": str(e), |
| "total_elapsed_ms": elapsed_ms, |
| "method": "multiagent_debate_Du2023_judge_fallback", |
| "rounds": self._summarize_rounds(debate_history), |
| } |
| return fallback |
|
|
| def _format_debate_history(self, history: list[dict]) -> str: |
| """ๅฐ่พฏ่ซ็ด้ๆ ผๅผๅ็บๅฏ่ฎๅญไธฒ๏ผไพ Judge ้ฑ่ฎ""" |
| lines = [] |
| for entry in history: |
| r = entry.get("round", "?") |
| analyst = entry.get("analyst", {}) |
| critic = entry.get("critic", {}) |
|
|
| |
| risk = self._get_analyst_risk(analyst) |
| findings_count = len(analyst.get("analysis", [])) |
| lines.append(f"=== Round {r} ===") |
| lines.append(f"ANALYST: overall_risk={risk}, findings={findings_count}") |
|
|
| |
| for f in analyst.get("analysis", [])[:2]: |
| lines.append(f" - [{f.get('severity')}] {f.get('cwe_id', f.get('cve_id', 'N/A'))}: {f.get('reasoning', '')[:100]}") |
|
|
| |
| verdict = critic.get("verdict", "UNKNOWN") |
| score = critic.get("weighted_score", "?") |
| lines.append(f"CRITIC: verdict={verdict}, score={score}") |
| for raw_ch in critic.get("challenges", [])[:2]: |
| ch = self._normalize_challenge(raw_ch) |
| lines.append(f" - Challenge [{ch.get('type')}]: {ch.get('description', '')[:100]}") |
|
|
| lines.append("") |
|
|
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def run_debate_pipeline( |
| analyst_output: dict[str, Any], |
| input_type: str = "pkg", |
| on_progress: Any = None, |
| ) -> dict[str, Any]: |
| """ |
| ๅท่กๅฎๆด่พฏ่ซๆต็จ็้ ๅฑคๅฝๅผใ |
| ็ดๆฅๆฟๆ main.py ไธญ็ๅฎ่ผช Critic ๅผๅซใ |
| |
| Args: |
| analyst_output: Analyst ็ๅๅงๅๆ็ตๆ |
| input_type: ่ผธๅ
ฅ้กๅ |
| on_progress: SSE ้ฒๅบฆๅ่ชฟ |
| |
| Returns: |
| ่พฏ่ซๆ็ต่ฃๆฑบ๏ผๆ ผๅผๅ run_critic_pipeline ่ผธๅบ๏ผ |
| ้ๅ _debate_meta ๆฌไฝ็ด้่พฏ่ซ็ๆ
๏ผ |
| """ |
| engine = DebateEngine(max_rounds=MAX_DEBATE_ROUNDS) |
|
|
| try: |
| return engine.run_debate(analyst_output, input_type=input_type, on_progress=on_progress) |
| except Exception as e: |
| logger.error("[DEBATE] DebateEngine failed: %s โ falling back to single Critic", e) |
| degradation_status.degrade("DebateEngine", str(e)) |
|
|
| |
| try: |
| from agents.critic import run_critic_pipeline |
| result = run_critic_pipeline(analyst_output, input_type=input_type) |
| result["_debate_meta"] = { |
| "consensus": None, |
| "engine_failed": True, |
| "error": str(e), |
| "fallback": "single_round_critic", |
| } |
| return result |
| except Exception as e2: |
| logger.error("[DEBATE] Critic fallback also failed: %s", e2) |
| return { |
| "verdict": "MAINTAIN", |
| "weighted_score": 60.0, |
| "challenges": [], |
| "reasoning": f"Debate engine and Critic both failed: {e} / {e2}", |
| "_degraded": True, |
| "_debate_meta": {"engine_failed": True, "critic_failed": True}, |
| } |
|
|