Threat_Hunter / agents /debate_engine.py
EricChen2005's picture
Deploy ThreatHunter - AMD MI300X + Qwen2.5-32B
c8d30bc
# agents/debate_engine.py
# ๅŠŸ่ƒฝ๏ผš3 ่ผช่พฏ่ซ–ๅผ•ๆ“Ž + Judge Sub-Agent
# ๆžถๆง‹ไพๆ“š๏ผšDu et al. (2023) "Improving Factuality and Reasoning in LLMs through Multiagent Debate"
# ICML 2023, arXiv:2305.14325
#
# ่จญ่จˆๅŽŸๅ‰‡๏ผˆไพ†่‡ช่ซ–ๆ–‡๏ผ‰๏ผš
# - ๅคšๅ€‹็จ็ซ‹ LLM ๅฏฆไพ‹๏ผˆAnalyst / Critic ๅ„่‡ช็จ็ซ‹๏ผ‰
# - ๆœ€ๅคš 3 ่ผช่ฟญไปฃ๏ผˆๅ ฑ้…ฌ้žๆธ›๏ผŒ3 ่ผชๅพŒๆ•ˆ็›Š่ถจ็ทฉ๏ผ‰
# - ็„กๅ…ฑ่ญ˜ โ†’ ็ฌฌไธ‰ๆ–น Judge sub-agent ไปฒ่ฃ
# - ๅฎ‰ๅ…จๆ€ง้ ˜ๅŸŸ๏ผšๅๅ‘ไฟๅฎˆ๏ผˆ้ซ˜ไผฐ้ขจ้šชๆฏ”ไฝŽไผฐๅฎ‰ๅ…จ๏ผ‰
#
# ๅ…ฑ่ญ˜ๅฎš็พฉ๏ผš
# Analyst ่ˆ‡ Critic ็š„ๆ•ด้ซ”้ขจ้šช็ญ‰็ดš็›ธๅทฎ โ‰ค 1 ็ดš
# (ไพ‹ๅฆ‚ HIGH vs CRITICAL ไธๅ…ฑ่ญ˜๏ผ›HIGH vs HIGH ๅ…ฑ่ญ˜)
from __future__ import annotations
import json
import logging
import time
from typing import Any
from crewai import Agent, Task, Crew, Process
from config import SYSTEM_CONSTITUTION, get_llm, degradation_status
logger = logging.getLogger("ThreatHunter.debate_engine")
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ้ขจ้šช็ญ‰็ดšๆ˜ ๅฐ„๏ผˆ็”จๆ–ผๅ…ฑ่ญ˜ๅˆคๅฎš๏ผ‰
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
RISK_LEVELS: dict[str, int] = {
"LOW": 1,
"MEDIUM": 2,
"HIGH": 3,
"CRITICAL": 4,
}
MAX_DEBATE_ROUNDS = 3
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Judge Sub-Agent ๆง‹ๅปบ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _build_judge_agent() -> Agent:
"""
Judge sub-agent๏ผš็จ็ซ‹็ฌฌไธ‰ๆ–น่ฃๆฑบ่€…ใ€‚
่จญ่จˆๅŽŸๅ‰‡๏ผˆDu et al. 2023๏ผ‰๏ผš
- ไธๆ˜ฏ Analyst ไนŸไธๆ˜ฏ Critic ็š„ๅปถไผธ
- ๆ”ถๅˆฐๅฎŒๆ•ด่พฏ่ซ–็ด€้Œ„ๅพŒ๏ผŒ้ธๆ“‡ๆœ€ๆœ‰้‚่ผฏๆ”ฏๆŒ็š„็ซ‹ๅ ด
- ๅฎ‰ๅ…จๆ€ง้ ˜ๅŸŸๅๅ‘ไฟๅฎˆ๏ผˆๅฏงๅฏ้ซ˜ไผฐ้ขจ้šช๏ผ‰
"""
return Agent(
role="Security Arbitration Judge",
goal=(
"Review the complete debate history between Analyst and Critic. "
"Select the most logically supported risk assessment. "
"In case of equal evidence, err on the side of caution (higher risk). "
"Output a final JSON verdict."
),
backstory=(
f"{SYSTEM_CONSTITUTION}\n\n"
"You are an impartial security arbitration judge. You were not involved in the debate. "
"Your task is to read both sides' arguments and render a final, binding verdict. "
"You must cite which round's argument was most persuasive and why. "
"In security contexts, when evidence is ambiguous, choose the MORE SEVERE rating."
),
llm=get_llm(),
verbose=True,
max_iter=3,
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ่พฏ่ซ–ๅผ•ๆ“Ž
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class DebateEngine:
"""
ๅฏฆไฝœ Du et al. (2023) Multiagent Debate ๆฉŸๅˆถใ€‚
ๆต็จ‹๏ผš
Round 1: Analyst ๆๅ‡บๅˆๅง‹็ซ‹ๅ ด โ†’ Critic ่ณช็–‘
Round 2: Analyst ๆ›ดๆ–ฐ็ซ‹ๅ ด๏ผˆๅซ Critic ๅ้ฅ‹๏ผ‰โ†’ Critic ๅ†่ฉ•
Round 3: Analyst ๆœ€็ต‚็ซ‹ๅ ด โ†’ Critic ๆœ€็ต‚่ฉ•ๅˆค
Final: ่‹ฅ 3 ่ผชๅพŒไป็„กๅ…ฑ่ญ˜ โ†’ Judge sub-agent ไปฒ่ฃ
"""
def __init__(self, max_rounds: int = MAX_DEBATE_ROUNDS):
self.max_rounds = max_rounds
self.min_rounds_with_findings = min(2, max_rounds)
def run_debate(
self,
analyst_output: dict[str, Any],
input_type: str = "pkg",
on_progress: Any = None,
) -> dict[str, Any]:
"""
ๅŸท่กŒๅฎŒๆ•ด่พฏ่ซ–ๆต็จ‹ใ€‚
Args:
analyst_output: Analyst ็š„ๅˆๅง‹ๅˆ†ๆž็ตๆžœ๏ผˆๆฅ่‡ช run_analyst_pipeline๏ผ‰
input_type: ่ผธๅ…ฅ้กžๅž‹๏ผˆpkg/code/config ็ญ‰๏ผŒๅฝฑ้Ÿฟ Critic skill ้ธๆ“‡๏ผ‰
on_progress: SSE ้€ฒๅบฆๅ›ž่ชฟ
Returns:
ๆœ€็ต‚่ฃๆฑบ็ตๆžœ๏ผˆๆ ผๅผๅŒ run_critic_pipeline ่ผธๅ‡บ๏ผ‰
"""
from agents.critic import run_critic_pipeline
t0 = time.time()
history: list[dict[str, Any]] = []
current_analyst_output = analyst_output
consensus = False
consensus_round = 0
early_stop_reason = ""
final_critic_result: dict[str, Any] = {}
has_findings = self._has_findings(analyst_output)
logger.info("[DEBATE] Starting %d-round debate (Du et al. 2023)", self.max_rounds)
if on_progress:
try:
on_progress("debate", "RUNNING", {"step": "starting", "max_rounds": self.max_rounds})
except Exception:
pass
for round_num in range(1, self.max_rounds + 1):
logger.info("[DEBATE] Round %d/%d starting", round_num, self.max_rounds)
if on_progress:
try:
on_progress("debate", "RUNNING", {
"step": f"round_{round_num}",
"round": round_num,
"max_rounds": self.max_rounds,
})
except Exception:
pass
# โ”€โ”€ Critic ่ฉ•ๅฏฉ Analyst ็š„็•ถ่ผช็ซ‹ๅ ด โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ๆณจๅ…ฅ็•ถ่ผชไธŠไธ‹ๆ–‡๏ผŒ่ฎ“ Critic ็Ÿฅ้“้€™ๆ˜ฏ็ฌฌๅนพ่ผช
round_context = dict(current_analyst_output)
round_context["_debate_round"] = round_num
round_context["_debate_max_rounds"] = self.max_rounds
if history:
round_context["_prev_critic_challenges"] = history[-1].get("critic", {}).get("challenges", [])
try:
critic_result = run_critic_pipeline(round_context, input_type=input_type)
except Exception as e:
logger.warning("[DEBATE] Round %d Critic failed: %s โ€” using maintain verdict", round_num, e)
critic_result = {
"verdict": "MAINTAIN",
"weighted_score": 70.0,
"challenges": [],
"reasoning": f"Critic failed in round {round_num}: {e}",
"_degraded": True,
}
history.append({
"round": round_num,
"analyst": current_analyst_output,
"critic": critic_result,
})
final_critic_result = critic_result
# โ”€โ”€ ๅ…ฑ่ญ˜ๅˆคๅฎš โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if self._check_consensus(current_analyst_output, critic_result):
if has_findings and round_num < self.min_rounds_with_findings and not critic_result.get("no_challenge"):
logger.info(
"[DEBATE] Consensus detected at round %d, continuing to minimum %d rounds because findings exist",
round_num,
self.min_rounds_with_findings,
)
if round_num < self.max_rounds:
current_analyst_output = self._analyst_rebuttal(
current_analyst_output, critic_result, round_num
)
continue
consensus = True
consensus_round = round_num
early_stop_reason = "consensus_after_min_rounds" if has_findings else "no_findings"
logger.info(
"[DEBATE] Consensus reached at round %d | "
"analyst_risk=%s | critic_verdict=%s",
round_num,
self._get_analyst_risk(current_analyst_output),
critic_result.get("verdict", "UNKNOWN"),
)
break
# โ”€โ”€ Analyst ๆ›ดๆ–ฐ็ซ‹ๅ ด๏ผˆๅซ Critic ๅ้ฅ‹๏ผ‰โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if round_num < self.max_rounds:
current_analyst_output = self._analyst_rebuttal(
current_analyst_output, critic_result, round_num
)
# โ”€โ”€ ๆœ€็ต‚่ฃๆฑบ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
elapsed_ms = int((time.time() - t0) * 1000)
if consensus:
logger.info(
"[DEBATE] โœ… Consensus after %d/%d rounds | elapsed=%dms",
consensus_round, self.max_rounds, elapsed_ms,
)
result = dict(final_critic_result)
result["_debate_meta"] = {
"consensus": True,
"consensus_round": consensus_round,
"total_rounds": round_num,
"elapsed_ms": elapsed_ms,
"method": "multiagent_debate_Du2023",
"early_stop_reason": early_stop_reason,
"rounds": self._summarize_rounds(history),
}
return result
else:
logger.warning(
"[DEBATE] โš ๏ธ No consensus after %d rounds โ€” invoking Judge sub-agent",
self.max_rounds,
)
return self._judge_verdict(history, elapsed_ms)
# โ”€โ”€ ็งๆœ‰่ผ”ๅŠฉๆ–นๆณ• โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _check_consensus(self, analyst_output: dict, critic_result: dict) -> bool:
"""
ๅ…ฑ่ญ˜ๅˆคๅฎš๏ผš้ขจ้šช็ญ‰็ดš็›ธๅทฎ โ‰ค 1 ็ดš่ฆ–็‚บๅ…ฑ่ญ˜ใ€‚
CRITICAL vs HIGH โ†’ ไธๅ…ฑ่ญ˜๏ผˆๅทฎ 1 ็ดš๏ผŒ่ฆ–็‚บ้‚Š็•Œ๏ผŒไฟ็•™่พฏ่ซ–๏ผ‰
CRITICAL vs CRITICAL โ†’ ๅ…ฑ่ญ˜
HIGH vs MEDIUM โ†’ ไธๅ…ฑ่ญ˜
HIGH vs HIGH โ†’ ๅ…ฑ่ญ˜
Critic verdict=MAINTAIN โ†’ ็›ดๆŽฅๅ…ฑ่ญ˜๏ผˆCritic ๅŒๆ„ Analyst๏ผ‰
"""
# Critic ็›ดๆŽฅๅŒๆ„ โ†’ ๅ…ฑ่ญ˜
verdict = critic_result.get("verdict", "MAINTAIN")
if verdict == "MAINTAIN":
return True
# Critic ๆ˜Ž็ขบ่ฆๆฑ‚้™็ดš โ†’ ้œ€่ฆ็นผ็บŒ่พฏ่ซ–
if verdict == "DOWNGRADE":
analyst_risk = self._get_analyst_risk(analyst_output)
a_level = RISK_LEVELS.get(analyst_risk, 2)
# Downgrade ไปฃ่กจ Critic ่ช็‚บๆ‡‰้™ไธ€็ดš๏ผŒๅฆ‚ๆžœๅˆ†ๆ•ธๅทฒๅค ไฝŽๅ‰‡ๆŽฅๅ—
score = critic_result.get("weighted_score", 70)
if score >= 80: # Critic ่ฉ•ๅˆ†้ซ˜๏ผŒ่ชชๆ˜Ž Analyst ็ซ‹ๅ ดๅผท
return True
return False
# ๅ…ถไป–ๆƒ…ๆณ๏ผˆESCALATE ็ญ‰๏ผ‰โ€”โ€” ็œ‹ๅˆ†ๆ•ธ
score = critic_result.get("weighted_score", 70)
return score >= 75
def _has_findings(self, analyst_output: dict[str, Any]) -> bool:
"""ๅˆคๆ–ท Analyst ๆ˜ฏๅฆ็œŸ็š„ๆœ‰็™ผ็พ้ …๏ผ›ๆœ‰็™ผ็พๆ™‚่‡ณๅฐ‘้œ€่ฆ็ฌฌไบŒ่ผชไบคๅ‰่ณช็–‘ใ€‚"""
for key in ("analysis", "vulnerabilities", "code_patterns", "code_patterns_summary"):
value = analyst_output.get(key)
if isinstance(value, list) and len(value) > 0:
return True
return False
def _summarize_rounds(self, history: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""็”ข็”Ÿ่ผ•้‡ๅ›žๅˆๆ‘˜่ฆ๏ผŒไพ› UI/ๆธฌ่ฉฆ็ขบ่ช่พฏ่ซ–ๆฒ’ๆœ‰่ขซ็ฌฌไธ€่ผชๅžๆމใ€‚"""
rounds = []
for entry in history:
critic = entry.get("critic", {})
analyst = entry.get("analyst", {})
rounds.append({
"round": entry.get("round"),
"analyst_risk": self._get_analyst_risk(analyst),
"critic_verdict": critic.get("verdict"),
"critic_score": critic.get("weighted_score"),
"challenge_count": len(critic.get("challenges", [])),
})
return rounds
def _get_analyst_risk(self, analyst_output: dict) -> str:
"""ๅพž Analyst ่ผธๅ‡บๆๅ–ๆ•ด้ซ”้ขจ้šช็ญ‰็ดš"""
analysis = analyst_output.get("analysis", [])
if not analysis:
return "UNKNOWN"
# ๅ–ๆœ€้ซ˜ๅšด้‡ๆ€ง
max_level = 0
max_name = "UNKNOWN"
for entry in analysis:
sev = entry.get("severity") or entry.get("adjusted_risk", "MEDIUM")
level = RISK_LEVELS.get(sev, 0)
if level > max_level:
max_level = level
max_name = sev
return max_name
@staticmethod
def _normalize_challenge(challenge: Any) -> dict[str, str]:
"""Support both string[] contracts and object-style challenge records."""
if isinstance(challenge, dict):
return {
"type": str(challenge.get("type") or "challenge"),
"description": str(challenge.get("description") or challenge.get("text") or challenge),
}
return {"type": "challenge", "description": str(challenge)}
def _analyst_rebuttal(
self,
analyst_output: dict,
critic_result: dict,
round_num: int,
) -> dict:
"""
Analyst ๆ›ดๆ–ฐ็ซ‹ๅ ด๏ผˆ็ฐกๅŒ–็‰ˆ๏ผšๆณจๅ…ฅ Critic ๆŒ‘ๆˆฐไธฆไฟฎๆ”น reasoning๏ผ‰
ๅœจๅฎŒๆ•ดๅฏฆไฝœไธญ๏ผŒ้€™่ฃกๆ‡‰่ฉฒ้‡ๆ–ฐๅ‘ผๅซ Analyst LLMใ€‚
็›ฎๅ‰็‚บ่ผ•้‡ๅฏฆไฝœ๏ผš็›ดๆŽฅๅœจ่ผธๅ‡บไธญๆณจๅ…ฅ Critic ๅ้ฅ‹๏ผŒ
่ฎ“ไธ‹ไธ€่ผช็š„ Critic ็œ‹ๅˆฐๆ›ดๆ–ฐๅพŒ็š„ไธŠไธ‹ๆ–‡ใ€‚
"""
updated = dict(analyst_output)
challenges = [
self._normalize_challenge(challenge)
for challenge in critic_result.get("challenges", [])
]
updated["_critic_challenges_r"] = {
"round": round_num,
"challenges": challenges,
"critic_verdict": critic_result.get("verdict"),
"critic_score": critic_result.get("weighted_score", 70),
}
# ๆ›ดๆ–ฐ reasoning๏ผŒๆจ™ๆณจ Analyst ๅทฒ็œ‹ๅˆฐ Critic ๅ้ฅ‹
if updated.get("analysis"):
for entry in updated["analysis"]:
if "reasoning" in entry:
entry["reasoning"] = (
f"[Round {round_num} rebuttal after Critic verdict={critic_result.get('verdict')}: "
f"challenges={[c.get('type') for c in challenges[:3]]}] "
+ entry["reasoning"]
)
logger.info(
"[DEBATE] Round %d analyst rebuttal: addressed %d challenges",
round_num, len(challenges),
)
return updated
def _judge_verdict(
self,
debate_history: list[dict],
elapsed_ms: int,
) -> dict[str, Any]:
"""
Judge sub-agent ไปฒ่ฃ๏ผš็„กๅ…ฑ่ญ˜ๆ™‚็”ฑ็ฌฌไธ‰ๆ–น่ฃๆฑบใ€‚
ๅŽŸๅ‰‡๏ผˆDu et al. 2023 + ๅฎ‰ๅ…จๆ€งไฟๅฎˆๅŽŸๅ‰‡๏ผ‰๏ผš
- ้–ฑ่ฎ€ๅฎŒๆ•ด่พฏ่ซ–็ด€้Œ„
- ้ธๆ“‡ๆœ€ๆœ‰้‚่ผฏๆ”ฏๆŒ็š„็ซ‹ๅ ด
- ่‹ฅ่ญ‰ๆ“š็›ธ็•ถ๏ผŒ้ธ MORE SEVERE๏ผˆๅฎ‰ๅ…จๆ€งๅไฟๅฎˆ๏ผ‰
"""
t_judge = time.time()
logger.info("[DEBATE] Invoking Judge sub-agent...")
# ๆบ–ๅ‚™่พฏ่ซ–ๆ‘˜่ฆๆ–‡ๅญ—
debate_summary = self._format_debate_history(debate_history)
try:
judge_agent = _build_judge_agent()
task_desc = (
f"DEBATE HISTORY ({len(debate_history)} rounds):\n\n"
f"{debate_summary}\n\n"
f"No consensus was reached after {len(debate_history)} rounds.\n\n"
f"Your task:\n"
f"1. Review all rounds of argument and counter-argument\n"
f"2. Identify which side presented stronger evidence\n"
f"3. If evidence is equal, choose the HIGHER risk level (security-conservative principle)\n"
f"4. Output a JSON verdict with:\n"
f" {{\"verdict\": \"MAINTAIN|DOWNGRADE|ESCALATE\", "
f"\"weighted_score\": 0-100, "
f"\"reasoning\": \"...\", "
f"\"winning_round\": 1-3, "
f"\"judge_note\": \"...\"}}\n\n"
f"Output ONLY the JSON, no other text."
)
task = Task(
description=task_desc,
expected_output="Pure JSON judge verdict",
agent=judge_agent,
)
crew = Crew(
agents=[judge_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
)
result_str = str(crew.kickoff()).strip()
# ่งฃๆž JSON
import re
if "```json" in result_str:
result_str = result_str.split("```json")[1].split("```")[0].strip()
elif "```" in result_str:
parts = result_str.split("```")
if len(parts) >= 3:
result_str = parts[1].strip()
judge_verdict = json.loads(result_str)
judge_elapsed = int((time.time() - t_judge) * 1000)
logger.info(
"[DEBATE] Judge verdict: %s (score=%s) in %dms",
judge_verdict.get("verdict"),
judge_verdict.get("weighted_score"),
judge_elapsed,
)
judge_verdict["_debate_meta"] = {
"consensus": False,
"total_rounds": len(debate_history),
"judge_invoked": True,
"judge_elapsed_ms": judge_elapsed,
"total_elapsed_ms": elapsed_ms + judge_elapsed,
"method": "multiagent_debate_Du2023_with_judge",
"rounds": self._summarize_rounds(debate_history),
}
return judge_verdict
except Exception as e:
logger.error("[DEBATE] Judge sub-agent failed: %s โ€” falling back to last Critic result", e)
degradation_status.degrade("DebateJudge", str(e))
# Fallback๏ผšๅ–ๆœ€ๅพŒไธ€่ผช็š„ Critic ็ตๆžœ๏ผŒๅผทๅˆถ็‚บ MAINTAIN๏ผˆไฟๅฎˆ๏ผ‰
last_critic = debate_history[-1].get("critic", {}) if debate_history else {}
fallback = dict(last_critic)
fallback["verdict"] = "MAINTAIN" # ไฟๅฎˆ๏ผš็ถญๆŒ Analyst ็ซ‹ๅ ด
fallback["_debate_meta"] = {
"consensus": False,
"total_rounds": len(debate_history),
"judge_invoked": True,
"judge_failed": True,
"judge_error": str(e),
"total_elapsed_ms": elapsed_ms,
"method": "multiagent_debate_Du2023_judge_fallback",
"rounds": self._summarize_rounds(debate_history),
}
return fallback
def _format_debate_history(self, history: list[dict]) -> str:
"""ๅฐ‡่พฏ่ซ–็ด€้Œ„ๆ ผๅผๅŒ–็‚บๅฏ่ฎ€ๅญ—ไธฒ๏ผŒไพ› Judge ้–ฑ่ฎ€"""
lines = []
for entry in history:
r = entry.get("round", "?")
analyst = entry.get("analyst", {})
critic = entry.get("critic", {})
# Analyst ็ซ‹ๅ ดๆ‘˜่ฆ
risk = self._get_analyst_risk(analyst)
findings_count = len(analyst.get("analysis", []))
lines.append(f"=== Round {r} ===")
lines.append(f"ANALYST: overall_risk={risk}, findings={findings_count}")
# ๅ–ๅ‰ 2 ๅ€‹ finding ็š„ reasoning
for f in analyst.get("analysis", [])[:2]:
lines.append(f" - [{f.get('severity')}] {f.get('cwe_id', f.get('cve_id', 'N/A'))}: {f.get('reasoning', '')[:100]}")
# Critic ็ซ‹ๅ ดๆ‘˜่ฆ
verdict = critic.get("verdict", "UNKNOWN")
score = critic.get("weighted_score", "?")
lines.append(f"CRITIC: verdict={verdict}, score={score}")
for raw_ch in critic.get("challenges", [])[:2]:
ch = self._normalize_challenge(raw_ch)
lines.append(f" - Challenge [{ch.get('type')}]: {ch.get('description', '')[:100]}")
lines.append("")
return "\n".join(lines)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ไพฟๅˆฉๅ‡ฝๅผ๏ผšๆ•ดๅˆ้€ฒ main.py ็š„ๆŽฅๅฃ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def run_debate_pipeline(
analyst_output: dict[str, Any],
input_type: str = "pkg",
on_progress: Any = None,
) -> dict[str, Any]:
"""
ๅŸท่กŒๅฎŒๆ•ด่พฏ่ซ–ๆต็จ‹็š„้ ‚ๅฑคๅ‡ฝๅผใ€‚
็›ดๆŽฅๆ›ฟๆ› main.py ไธญ็š„ๅ–ฎ่ผช Critic ๅ‘ผๅซใ€‚
Args:
analyst_output: Analyst ็š„ๅˆๅง‹ๅˆ†ๆž็ตๆžœ
input_type: ่ผธๅ…ฅ้กžๅž‹
on_progress: SSE ้€ฒๅบฆๅ›ž่ชฟ
Returns:
่พฏ่ซ–ๆœ€็ต‚่ฃๆฑบ๏ผˆๆ ผๅผๅŒ run_critic_pipeline ่ผธๅ‡บ๏ผŒ
้™„ๅŠ  _debate_meta ๆฌ„ไฝ็ด€้Œ„่พฏ่ซ–็‹€ๆ…‹๏ผ‰
"""
engine = DebateEngine(max_rounds=MAX_DEBATE_ROUNDS)
try:
return engine.run_debate(analyst_output, input_type=input_type, on_progress=on_progress)
except Exception as e:
logger.error("[DEBATE] DebateEngine failed: %s โ€” falling back to single Critic", e)
degradation_status.degrade("DebateEngine", str(e))
# ๅฎŒๅ…จ้™็ดš๏ผš้€€ๅ›žๅ–ฎ่ผช Critic๏ผˆๅŽŸๆœฌ่กŒ็‚บ๏ผ‰
try:
from agents.critic import run_critic_pipeline
result = run_critic_pipeline(analyst_output, input_type=input_type)
result["_debate_meta"] = {
"consensus": None,
"engine_failed": True,
"error": str(e),
"fallback": "single_round_critic",
}
return result
except Exception as e2:
logger.error("[DEBATE] Critic fallback also failed: %s", e2)
return {
"verdict": "MAINTAIN",
"weighted_score": 60.0,
"challenges": [],
"reasoning": f"Debate engine and Critic both failed: {e} / {e2}",
"_degraded": True,
"_debate_meta": {"engine_failed": True, "critic_failed": True},
}