| """Early Termination / Doom Detector: Stop runs unlikely to succeed.""" |
| from typing import Dict, List, Optional |
| from dataclasses import dataclass |
|
|
| @dataclass |
| class DoomSignal: |
| signal_type: str |
| severity: float |
| evidence: str |
|
|
| @dataclass |
| class DoomAssessment: |
| doomed: bool |
| severity: float |
| signals: List[DoomSignal] |
| recommended_action: str |
| reasoning: str |
|
|
| DOOM_THRESHOLDS = { |
| "max_failed_tool_calls": 3, |
| "max_repeated_planning": 2, |
| "max_cost_without_progress": 2.0, |
| "max_context_confusion": 0.7, |
| "max_escalation_loops": 2, |
| } |
|
|
| class DoomDetector: |
| def __init__(self, doom_threshold: float = 0.7): |
| self.doom_threshold = doom_threshold |
| self.assessments: List[DoomAssessment] = [] |
|
|
| def assess(self, steps: List[Dict], current_cost: float, max_cost: float, |
| model_tier: int, verifier_disagreements: int = 0) -> DoomAssessment: |
| signals = [] |
| severity = 0.0 |
| |
| failed_tools = sum(1 for s in steps for tc in s.get("tool_calls",[]) |
| if not tc.get("success", True)) |
| if failed_tools >= DOOM_THRESHOLDS["max_failed_tool_calls"]: |
| signals.append(DoomSignal("failed_tool_calls", 0.6, |
| f"{failed_tools} failed tool calls")) |
| severity += 0.3 |
|
|
| no_progress = all(not s.get("artifacts_created") for s in steps) |
| if no_progress and len(steps) > 3: |
| signals.append(DoomSignal("no_artifact_progress", 0.5, |
| "no artifacts after 3+ steps")) |
| severity += 0.25 |
|
|
| if current_cost > DOOM_THRESHOLDS["max_cost_without_progress"] and no_progress: |
| signals.append(DoomSignal("growing_cost_no_progress", 0.7, |
| f"cost={current_cost:.2f} with no progress")) |
| severity += 0.35 |
|
|
| planning_steps = sum(1 for s in steps if s.get("retry_num",0) > 0) |
| if planning_steps >= DOOM_THRESHOLDS["max_repeated_planning"]: |
| signals.append(DoomSignal("repeated_planning", 0.4, |
| f"{planning_steps} retry steps")) |
| severity += 0.2 |
|
|
| if verifier_disagreements >= 2: |
| signals.append(DoomSignal("verifier_disagreement", 0.6, |
| f"{verifier_disagreements} verifier disagreements")) |
| severity += 0.3 |
|
|
| if current_cost >= max_cost * 0.9: |
| signals.append(DoomSignal("approaching_cost_limit", 0.5, |
| f"cost={current_cost:.2f} / {max_cost:.2f}")) |
| severity += 0.4 |
|
|
| severity = min(severity, 1.0) |
| doomed = severity >= self.doom_threshold |
| |
| if not doomed: |
| action = "continue" |
| reasoning = f"severity={severity:.2f} < threshold={self.doom_threshold}" |
| elif failed_tools >= 3 and no_progress: |
| action = "mark_blocked" |
| reasoning = "too many failures with no progress" |
| elif current_cost >= max_cost * 0.9: |
| action = "stop" |
| reasoning = "approaching cost limit" |
| elif verifier_disagreements >= 2: |
| action = "switch_strategy" |
| reasoning = "verifier disagreement suggests wrong approach" |
| else: |
| action = "ask_question" |
| reasoning = "run may be recoverable with user input" |
| assessment = DoomAssessment(doomed, severity, signals, action, reasoning) |
| self.assessments.append(assessment) |
| return assessment |
|
|