agent-cost-optimizer / aco /doom_detector.py
narcolepticchicken's picture
Upload aco/doom_detector.py with huggingface_hub
ff456f8 verified
"""Early Termination / Doom Detector: Stop runs unlikely to succeed."""
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class DoomSignal:
signal_type: str
severity: float # 0-1
evidence: str
@dataclass
class DoomAssessment:
doomed: bool
severity: float # 0-1
signals: List[DoomSignal]
recommended_action: str # "continue","stop","mark_blocked","ask_question","switch_strategy"
reasoning: str
DOOM_THRESHOLDS = {
"max_failed_tool_calls": 3,
"max_repeated_planning": 2,
"max_cost_without_progress": 2.0,
"max_context_confusion": 0.7,
"max_escalation_loops": 2,
}
class DoomDetector:
def __init__(self, doom_threshold: float = 0.7):
self.doom_threshold = doom_threshold
self.assessments: List[DoomAssessment] = []
def assess(self, steps: List[Dict], current_cost: float, max_cost: float,
model_tier: int, verifier_disagreements: int = 0) -> DoomAssessment:
signals = []
severity = 0.0
# Check signals
failed_tools = sum(1 for s in steps for tc in s.get("tool_calls",[])
if not tc.get("success", True))
if failed_tools >= DOOM_THRESHOLDS["max_failed_tool_calls"]:
signals.append(DoomSignal("failed_tool_calls", 0.6,
f"{failed_tools} failed tool calls"))
severity += 0.3
no_progress = all(not s.get("artifacts_created") for s in steps)
if no_progress and len(steps) > 3:
signals.append(DoomSignal("no_artifact_progress", 0.5,
"no artifacts after 3+ steps"))
severity += 0.25
if current_cost > DOOM_THRESHOLDS["max_cost_without_progress"] and no_progress:
signals.append(DoomSignal("growing_cost_no_progress", 0.7,
f"cost={current_cost:.2f} with no progress"))
severity += 0.35
planning_steps = sum(1 for s in steps if s.get("retry_num",0) > 0)
if planning_steps >= DOOM_THRESHOLDS["max_repeated_planning"]:
signals.append(DoomSignal("repeated_planning", 0.4,
f"{planning_steps} retry steps"))
severity += 0.2
if verifier_disagreements >= 2:
signals.append(DoomSignal("verifier_disagreement", 0.6,
f"{verifier_disagreements} verifier disagreements"))
severity += 0.3
if current_cost >= max_cost * 0.9:
signals.append(DoomSignal("approaching_cost_limit", 0.5,
f"cost={current_cost:.2f} / {max_cost:.2f}"))
severity += 0.4
severity = min(severity, 1.0)
doomed = severity >= self.doom_threshold
# Determine action
if not doomed:
action = "continue"
reasoning = f"severity={severity:.2f} < threshold={self.doom_threshold}"
elif failed_tools >= 3 and no_progress:
action = "mark_blocked"
reasoning = "too many failures with no progress"
elif current_cost >= max_cost * 0.9:
action = "stop"
reasoning = "approaching cost limit"
elif verifier_disagreements >= 2:
action = "switch_strategy"
reasoning = "verifier disagreement suggests wrong approach"
else:
action = "ask_question"
reasoning = "run may be recoverable with user input"
assessment = DoomAssessment(doomed, severity, signals, action, reasoning)
self.assessments.append(assessment)
return assessment