"""Benchmark Suite for Agent Cost Optimizer. Benchmarks: A. Coding Agent Tasks B. Research Agent Tasks C. Tool-Use Tasks D. Document / Contract / QA Tasks E. Long-Horizon Agent Tasks Baselines: A. always frontier model B. always cheap model C. static model routing D. prompt-only router E. rules-only optimizer F. learned model router G. learned router + context budgeter H. learned router + context + verifier budgeter I. full Agent Cost Optimizer Metrics: - task success - cost per successful task - cost reduction at iso-quality - latency - token usage - model calls - tool calls - verifier calls - retries - cache hit rate - context tokens - false-DONE rate - unsafe cheap-model miss rate - missed escalation rate - user correction rate - regression rate - quality/cost frontier """ import json import time from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from collections import defaultdict from aco.optimizer import AgentCostOptimizer, OptimizationResult from aco.config import ACOConfig, ModelConfig, ToolConfig, VerifierConfig, RoutingPolicy from aco.trace_schema import AgentTrace, TraceStep, ModelCall, ToolCall, VerifierCall, TaskType, Outcome, FailureTag from aco.datasets.synthetic_traces import SyntheticTraceGenerator @dataclass class BenchmarkConfig: name: str task_types: List[TaskType] num_tasks: int routing_mode: str = "cascade" enable_modules: Dict[str, bool] = field(default_factory=dict) baseline_name: str = "" @dataclass class BenchmarkResult: benchmark_name: str baseline_name: str num_tasks: int num_success: int num_partial: int num_failure: int num_false_done: int num_blocked: int total_cost: float avg_cost_success: float avg_latency_ms: float total_tool_calls: int total_verifier_calls: int total_retries: int avg_cache_hit_rate: float total_context_tokens: int cost_reduction_vs_frontier: float false_done_rate: float unsafe_cheap_miss_rate: float missed_escalation_rate: float regression_rate: float quality_cost_frontier: List[Dict[str, float]] = field(default_factory=list) per_task_results: List[Dict[str, Any]] = field(default_factory=list) class BenchmarkSuite: """Runs ACO benchmarks across tasks and baselines.""" def __init__(self, config: Optional[ACOConfig] = None): self.config = config or self._default_config() def _default_config(self) -> ACOConfig: models = { "tiny_local": ModelConfig("tiny_local", "local", 0.0001, 0.0002, latency_ms_estimate=200, strength_tier=1), "cheap_cloud": ModelConfig("cheap_cloud", "cloud", 0.0005, 0.001, latency_ms_estimate=500, strength_tier=2), "medium": ModelConfig("medium", "cloud", 0.003, 0.006, latency_ms_estimate=800, strength_tier=3), "frontier": ModelConfig("frontier", "cloud", 0.01, 0.03, latency_ms_estimate=1500, strength_tier=4), "specialist": ModelConfig("specialist", "cloud", 0.015, 0.045, latency_ms_estimate=2000, strength_tier=5), } tools = { "search": ToolConfig("search", 0.002, 500), "retrieve": ToolConfig("retrieve", 0.001, 300), "code_execution": ToolConfig("code_execution", 0.005, 1000), "linter": ToolConfig("linter", 0.001, 200), "file_read": ToolConfig("file_read", 0.0005, 100), "compliance_check": ToolConfig("compliance_check", 0.01, 1500), "summarize": ToolConfig("summarize", 0.002, 400), } verifiers = { "verifier_medium": VerifierConfig("verifier_medium", 0.005, 800, 0.8), } return ACOConfig( project_name="aco-benchmark", models=models, tools=tools, verifiers=verifiers, routing_policy=RoutingPolicy("benchmark"), ) def generate_benchmark_data(self, n: int = 1000, seed: int = 42) -> List[AgentTrace]: """Generate synthetic traces for benchmarking.""" gen = SyntheticTraceGenerator(seed=seed) return gen.generate(n) def run_baseline( self, traces: List[AgentTrace], baseline_name: str, ) -> BenchmarkResult: """Run a single baseline over the benchmark traces.""" # Configure optimizer for baseline mode_map = { "always_frontier": "always_frontier", "always_cheap": "always_frontier", # overridden below "static": "static", "prompt_only": "prompt_only", "learned": "learned", "learned_verifier": "learned_verifier", "cascade": "cascade", "rules_only": "cascade", # uses cascade routing with rules-based modules "full": "cascade", } # Adjust config based on baseline config = self._default_config() if baseline_name == "always_frontier": config.enable_router = False elif baseline_name == "always_cheap": config.enable_router = False # Override all models to cheap tier in simulation by using special handling elif baseline_name == "static": pass # default static routing elif baseline_name == "prompt_only": pass # prompt heuristic routing elif baseline_name == "rules_only": config.enable_classifier = True config.enable_router = True config.enable_context_budgeter = True config.enable_cache_layout = True config.enable_tool_gate = True config.enable_verifier_budgeter = True config.enable_retry_optimizer = True config.enable_meta_tool_miner = False config.enable_early_termination = True elif baseline_name == "full": pass # all enabled # For ablations, disable specific modules if baseline_name.startswith("no_"): module_name = baseline_name.replace("no_", "") if hasattr(config, f"enable_{module_name}"): setattr(config, f"enable_{module_name}", False) optimizer = AgentCostOptimizer(config) results = [] total_cost = 0.0 total_latency = 0.0 total_tools = 0 total_verifiers = 0 total_retries = 0 total_context = 0 cache_rates = [] success_count = 0 partial_count = 0 failure_count = 0 false_done_count = 0 blocked_count = 0 cheap_misses = 0 escalation_misses = 0 regression_count = 0 frontier_costs = [] actual_costs = [] for trace in traces: # Run optimization on this trace's request run_state = { "trace_id": trace.trace_id, "routing_mode": mode_map.get(baseline_name, "cascade"), "current_cost": 0.0, "planned_tools": [ (tc.tool_name, tc.tool_input) for step in trace.steps for tc in step.tool_calls ], "previous_tool_calls": [ tc for step in trace.steps for tc in step.tool_calls ], "step_number": len(trace.steps), "total_steps": len(trace.steps), "is_irreversible": trace.task_type == TaskType.LEGAL_REGULATED, } result = optimizer.optimize(trace.user_request, run_state) # Simulate execution based on optimization decisions sim_cost, sim_latency, sim_success = self._simulate(trace, result, baseline_name) total_cost += sim_cost total_latency += sim_latency total_tools += len(result.tool_decisions) if result.verifier_decision: total_verifiers += 1 total_retries += sum(1 for d in result.tool_decisions if d.decision.value == "skip") total_context += sum(s.context_size_tokens for s in trace.steps) frontier_cost = sum( s.model_call.total_cost if s.model_call else 0 for s in trace.steps ) if trace.metadata.get("scenario") == "frontier_unnecessary" else trace.total_cost * 2 frontier_costs.append(frontier_cost) actual_costs.append(sim_cost) outcome = trace.final_outcome if sim_success: if outcome == Outcome.SUCCESS: success_count += 1 elif outcome == Outcome.PARTIAL_SUCCESS: partial_count += 1 else: regression_count += 1 else: if outcome == Outcome.FALSE_DONE: false_done_count += 1 elif outcome == Outcome.BLOCKED: blocked_count += 1 else: failure_count += 1 # Check for cheap model misses if trace.metadata.get("scenario") == "cheap_failure" and result.routing_decision.tier <= 2: cheap_misses += 1 # Check for missed escalation if trace.metadata.get("scenario") in ("cheap_failure", "tool_underuse") and result.routing_decision.tier < 3: escalation_misses += 1 cache_rates.append(trace.cache_hit_rate) results.append({ "trace_id": trace.trace_id, "task_type": trace.task_type.value, "scenario": trace.metadata.get("scenario", "normal"), "simulated_cost": sim_cost, "simulated_success": sim_success, "routing_tier": result.routing_decision.tier, "model_id": result.routing_decision.model_id, "tool_count": len(result.tool_decisions), "verifier_used": result.verifier_decision is not None, }) n = len(traces) avg_cost_success = total_cost / max(success_count + partial_count, 1) # Cost reduction vs frontier baseline cost_reduction = (sum(frontier_costs) - sum(actual_costs)) / max(sum(frontier_costs), 1) return BenchmarkResult( benchmark_name="synthetic_benchmark", baseline_name=baseline_name, num_tasks=n, num_success=success_count, num_partial=partial_count, num_failure=failure_count, num_false_done=false_done_count, num_blocked=blocked_count, total_cost=total_cost, avg_cost_success=avg_cost_success, avg_latency_ms=total_latency / n, total_tool_calls=total_tools, total_verifier_calls=total_verifiers, total_retries=total_retries, avg_cache_hit_rate=sum(cache_rates) / n, total_context_tokens=total_context, cost_reduction_vs_frontier=cost_reduction, false_done_rate=false_done_count / n, unsafe_cheap_miss_rate=cheap_misses / n, missed_escalation_rate=escalation_misses / n, regression_rate=regression_count / n, quality_cost_frontier=[ {"cost": c, "success": 1.0 if s else 0.0} for c, s in zip(actual_costs, [r["simulated_success"] for r in results]) ], per_task_results=results, ) def _simulate(self, trace: AgentTrace, result: OptimizationResult, baseline: str) -> tuple: """Simulate execution based on optimizer decisions.""" # Base cost from the trace base_cost = trace.total_cost_computed # Adjust cost based on routing decision tier = result.routing_decision.tier cost_mult = { 1: 0.05, 2: 0.25, 3: 0.75, 4: 1.0, 5: 1.5, }.get(tier, 1.0) # Override for always_cheap baseline if baseline == "always_cheap": cost_mult = 0.25 tier = 2 # Override for always_frontier baseline if baseline == "always_frontier": cost_mult = 1.0 tier = 4 # Apply tool gate savings tools_skipped = sum(1 for d in result.tool_decisions if d.decision.value in ("skip", "use_cache")) tool_savings = tools_skipped * 0.005 # Apply cache savings cache_savings = 0.0 if result.prompt_layout: cache_savings = result.prompt_layout.cache_discount sim_cost = base_cost * cost_mult - tool_savings - cache_savings sim_cost = max(sim_cost, 0.001) # Simulate latency sim_latency = trace.total_latency_ms * cost_mult * 0.8 # Simulate success probability scenario = trace.metadata.get("scenario", "normal") # Base success rate by tier and scenario success_prob = 0.95 if tier >= 3 else 0.7 if scenario == "cheap_failure": success_prob = 0.3 if tier <= 2 else 0.85 elif scenario == "tool_underuse": success_prob = 0.6 if tools_skipped > 0 else 0.8 elif scenario == "retry_loop": success_prob = 0.2 elif scenario == "frontier_unnecessary": success_prob = 0.95 elif scenario == "meta_tool_success": success_prob = 0.9 elif scenario == "meta_tool_bad": success_prob = 0.4 elif scenario == "false_done": success_prob = 0.1 elif scenario == "blocked" or scenario == "stopped_doom": success_prob = 0.0 elif scenario == "human_escalation": success_prob = 0.5 # Verifier improves success for high-risk tasks if result.verifier_decision and result.verifier_decision.decision.value == "call_verifier": success_prob += 0.05 # Meta-tool success bonus if result.meta_tool_match: success_prob += 0.03 sim_success = success_prob > 0.5 # simplified threshold return sim_cost, sim_latency, sim_success def run_all_baselines(self, traces: List[AgentTrace]) -> Dict[str, BenchmarkResult]: """Run all baseline configurations.""" baselines = [ "always_frontier", "always_cheap", "static", "prompt_only", "cascade", "rules_only", "full", ] results = {} for baseline in baselines: print(f"Running baseline: {baseline}...") results[baseline] = self.run_baseline(traces, baseline) return results def run_ablations(self, traces: List[AgentTrace]) -> Dict[str, BenchmarkResult]: """Run ablation study disabling each module.""" ablations = [ "no_router", "no_context_budgeter", "no_cache_layout", "no_tool_gate", "no_verifier_budgeter", "no_retry_optimizer", "no_meta_tool_miner", "no_early_termination", ] results = {} for ablation in ablations: print(f"Running ablation: {ablation}...") results[ablation] = self.run_baseline(traces, ablation) return results def report(self, results: Dict[str, BenchmarkResult]) -> str: """Generate formatted benchmark report.""" lines = ["=" * 80, "AGENT COST OPTIMIZER BENCHMARK REPORT", "=" * 80, ""] headers = ["Baseline", "Success", "Partial", "Fail", "Blocked", "False-DONE", "Total Cost", "Avg Cost/Succ", "Latency(ms)", "Tools", "Verifiers", "Retries", "Cache Hit", "Cost Reduction", "Regression"] lines.append(" | ".join(headers)) lines.append("-" * 120) for name, result in results.items(): row = [ name[:20].ljust(20), f"{result.num_success / result.num_tasks:.1%}", f"{result.num_partial / result.num_tasks:.1%}", f"{result.num_failure / result.num_tasks:.1%}", f"{result.num_blocked / result.num_tasks:.1%}", f"{result.false_done_rate:.1%}", f"${result.total_cost:.2f}", f"${result.avg_cost_success:.4f}", f"{result.avg_latency_ms:.0f}", str(result.total_tool_calls), str(result.total_verifier_calls), str(result.total_retries), f"{result.avg_cache_hit_rate:.1%}", f"{result.cost_reduction_vs_frontier:.1%}", f"{result.regression_rate:.1%}", ] lines.append(" | ".join(row)) lines.append("") lines.append("=" * 80) # Find best cost/success tradeoff best_score = -float("inf") best_name = "" for name, result in results.items(): success_rate = (result.num_success + result.num_partial) / result.num_tasks score = success_rate * 10 - result.avg_cost_success * 100 - result.regression_rate * 50 if score > best_score: best_score = score best_name = name lines.append(f"BEST OVERALL: {best_name} (score={best_score:.2f})") lines.append("") return "\n".join(lines) def export(self, results: Dict[str, BenchmarkResult], path: str) -> None: """Export results to JSON.""" export_data = {} for name, result in results.items(): export_data[name] = { "benchmark_name": result.benchmark_name, "baseline_name": result.baseline_name, "num_tasks": result.num_tasks, "num_success": result.num_success, "num_partial": result.num_partial, "num_failure": result.num_failure, "num_false_done": result.num_false_done, "num_blocked": result.num_blocked, "total_cost": result.total_cost, "avg_cost_success": result.avg_cost_success, "avg_latency_ms": result.avg_latency_ms, "total_tool_calls": result.total_tool_calls, "total_verifier_calls": result.total_verifier_calls, "total_retries": result.total_retries, "avg_cache_hit_rate": result.avg_cache_hit_rate, "total_context_tokens": result.total_context_tokens, "cost_reduction_vs_frontier": result.cost_reduction_vs_frontier, "false_done_rate": result.false_done_rate, "unsafe_cheap_miss_rate": result.unsafe_cheap_miss_rate, "missed_escalation_rate": result.missed_escalation_rate, "regression_rate": result.regression_rate, } with open(path, "w") as f: json.dump(export_data, f, indent=2)