| """Benchmark Suite for Agent Cost Optimizer. |
| |
| Benchmarks: |
| A. Coding Agent Tasks |
| B. Research Agent Tasks |
| C. Tool-Use Tasks |
| D. Document / Contract / QA Tasks |
| E. Long-Horizon Agent Tasks |
| |
| Baselines: |
| A. always frontier model |
| B. always cheap model |
| C. static model routing |
| D. prompt-only router |
| E. rules-only optimizer |
| F. learned model router |
| G. learned router + context budgeter |
| H. learned router + context + verifier budgeter |
| I. full Agent Cost Optimizer |
| |
| Metrics: |
| - task success |
| - cost per successful task |
| - cost reduction at iso-quality |
| - latency |
| - token usage |
| - model calls |
| - tool calls |
| - verifier calls |
| - retries |
| - cache hit rate |
| - context tokens |
| - false-DONE rate |
| - unsafe cheap-model miss rate |
| - missed escalation rate |
| - user correction rate |
| - regression rate |
| - quality/cost frontier |
| """ |
|
|
| import json |
| import time |
| from typing import Dict, List, Any, Optional |
| from dataclasses import dataclass, field |
| from collections import defaultdict |
|
|
| from aco.optimizer import AgentCostOptimizer, OptimizationResult |
| from aco.config import ACOConfig, ModelConfig, ToolConfig, VerifierConfig, RoutingPolicy |
| from aco.trace_schema import AgentTrace, TraceStep, ModelCall, ToolCall, VerifierCall, TaskType, Outcome, FailureTag |
| from aco.datasets.synthetic_traces import SyntheticTraceGenerator |
|
|
|
|
| @dataclass |
| class BenchmarkConfig: |
| name: str |
| task_types: List[TaskType] |
| num_tasks: int |
| routing_mode: str = "cascade" |
| enable_modules: Dict[str, bool] = field(default_factory=dict) |
| baseline_name: str = "" |
|
|
|
|
| @dataclass |
| class BenchmarkResult: |
| benchmark_name: str |
| baseline_name: str |
| num_tasks: int |
| num_success: int |
| num_partial: int |
| num_failure: int |
| num_false_done: int |
| num_blocked: int |
| total_cost: float |
| avg_cost_success: float |
| avg_latency_ms: float |
| total_tool_calls: int |
| total_verifier_calls: int |
| total_retries: int |
| avg_cache_hit_rate: float |
| total_context_tokens: int |
| cost_reduction_vs_frontier: float |
| false_done_rate: float |
| unsafe_cheap_miss_rate: float |
| missed_escalation_rate: float |
| regression_rate: float |
| quality_cost_frontier: List[Dict[str, float]] = field(default_factory=list) |
| per_task_results: List[Dict[str, Any]] = field(default_factory=list) |
|
|
|
|
| class BenchmarkSuite: |
| """Runs ACO benchmarks across tasks and baselines.""" |
|
|
| def __init__(self, config: Optional[ACOConfig] = None): |
| self.config = config or self._default_config() |
|
|
| def _default_config(self) -> ACOConfig: |
| models = { |
| "tiny_local": ModelConfig("tiny_local", "local", 0.0001, 0.0002, latency_ms_estimate=200, strength_tier=1), |
| "cheap_cloud": ModelConfig("cheap_cloud", "cloud", 0.0005, 0.001, latency_ms_estimate=500, strength_tier=2), |
| "medium": ModelConfig("medium", "cloud", 0.003, 0.006, latency_ms_estimate=800, strength_tier=3), |
| "frontier": ModelConfig("frontier", "cloud", 0.01, 0.03, latency_ms_estimate=1500, strength_tier=4), |
| "specialist": ModelConfig("specialist", "cloud", 0.015, 0.045, latency_ms_estimate=2000, strength_tier=5), |
| } |
| tools = { |
| "search": ToolConfig("search", 0.002, 500), |
| "retrieve": ToolConfig("retrieve", 0.001, 300), |
| "code_execution": ToolConfig("code_execution", 0.005, 1000), |
| "linter": ToolConfig("linter", 0.001, 200), |
| "file_read": ToolConfig("file_read", 0.0005, 100), |
| "compliance_check": ToolConfig("compliance_check", 0.01, 1500), |
| "summarize": ToolConfig("summarize", 0.002, 400), |
| } |
| verifiers = { |
| "verifier_medium": VerifierConfig("verifier_medium", 0.005, 800, 0.8), |
| } |
| return ACOConfig( |
| project_name="aco-benchmark", |
| models=models, |
| tools=tools, |
| verifiers=verifiers, |
| routing_policy=RoutingPolicy("benchmark"), |
| ) |
|
|
| def generate_benchmark_data(self, n: int = 1000, seed: int = 42) -> List[AgentTrace]: |
| """Generate synthetic traces for benchmarking.""" |
| gen = SyntheticTraceGenerator(seed=seed) |
| return gen.generate(n) |
|
|
| def run_baseline( |
| self, |
| traces: List[AgentTrace], |
| baseline_name: str, |
| ) -> BenchmarkResult: |
| """Run a single baseline over the benchmark traces.""" |
| |
| |
| mode_map = { |
| "always_frontier": "always_frontier", |
| "always_cheap": "always_frontier", |
| "static": "static", |
| "prompt_only": "prompt_only", |
| "learned": "learned", |
| "learned_verifier": "learned_verifier", |
| "cascade": "cascade", |
| "rules_only": "cascade", |
| "full": "cascade", |
| } |
| |
| |
| config = self._default_config() |
| |
| if baseline_name == "always_frontier": |
| config.enable_router = False |
| elif baseline_name == "always_cheap": |
| config.enable_router = False |
| |
| elif baseline_name == "static": |
| pass |
| elif baseline_name == "prompt_only": |
| pass |
| elif baseline_name == "rules_only": |
| config.enable_classifier = True |
| config.enable_router = True |
| config.enable_context_budgeter = True |
| config.enable_cache_layout = True |
| config.enable_tool_gate = True |
| config.enable_verifier_budgeter = True |
| config.enable_retry_optimizer = True |
| config.enable_meta_tool_miner = False |
| config.enable_early_termination = True |
| elif baseline_name == "full": |
| pass |
| |
| |
| if baseline_name.startswith("no_"): |
| module_name = baseline_name.replace("no_", "") |
| if hasattr(config, f"enable_{module_name}"): |
| setattr(config, f"enable_{module_name}", False) |
| |
| optimizer = AgentCostOptimizer(config) |
| |
| results = [] |
| total_cost = 0.0 |
| total_latency = 0.0 |
| total_tools = 0 |
| total_verifiers = 0 |
| total_retries = 0 |
| total_context = 0 |
| cache_rates = [] |
| |
| success_count = 0 |
| partial_count = 0 |
| failure_count = 0 |
| false_done_count = 0 |
| blocked_count = 0 |
| |
| cheap_misses = 0 |
| escalation_misses = 0 |
| regression_count = 0 |
| |
| frontier_costs = [] |
| actual_costs = [] |
| |
| for trace in traces: |
| |
| run_state = { |
| "trace_id": trace.trace_id, |
| "routing_mode": mode_map.get(baseline_name, "cascade"), |
| "current_cost": 0.0, |
| "planned_tools": [ |
| (tc.tool_name, tc.tool_input) |
| for step in trace.steps |
| for tc in step.tool_calls |
| ], |
| "previous_tool_calls": [ |
| tc for step in trace.steps for tc in step.tool_calls |
| ], |
| "step_number": len(trace.steps), |
| "total_steps": len(trace.steps), |
| "is_irreversible": trace.task_type == TaskType.LEGAL_REGULATED, |
| } |
| |
| result = optimizer.optimize(trace.user_request, run_state) |
| |
| |
| sim_cost, sim_latency, sim_success = self._simulate(trace, result, baseline_name) |
| |
| total_cost += sim_cost |
| total_latency += sim_latency |
| total_tools += len(result.tool_decisions) |
| if result.verifier_decision: |
| total_verifiers += 1 |
| total_retries += sum(1 for d in result.tool_decisions if d.decision.value == "skip") |
| total_context += sum(s.context_size_tokens for s in trace.steps) |
| |
| frontier_cost = sum( |
| s.model_call.total_cost if s.model_call else 0 |
| for s in trace.steps |
| ) if trace.metadata.get("scenario") == "frontier_unnecessary" else trace.total_cost * 2 |
| frontier_costs.append(frontier_cost) |
| actual_costs.append(sim_cost) |
| |
| outcome = trace.final_outcome |
| if sim_success: |
| if outcome == Outcome.SUCCESS: |
| success_count += 1 |
| elif outcome == Outcome.PARTIAL_SUCCESS: |
| partial_count += 1 |
| else: |
| regression_count += 1 |
| else: |
| if outcome == Outcome.FALSE_DONE: |
| false_done_count += 1 |
| elif outcome == Outcome.BLOCKED: |
| blocked_count += 1 |
| else: |
| failure_count += 1 |
| |
| |
| if trace.metadata.get("scenario") == "cheap_failure" and result.routing_decision.tier <= 2: |
| cheap_misses += 1 |
| |
| |
| if trace.metadata.get("scenario") in ("cheap_failure", "tool_underuse") and result.routing_decision.tier < 3: |
| escalation_misses += 1 |
| |
| cache_rates.append(trace.cache_hit_rate) |
| |
| results.append({ |
| "trace_id": trace.trace_id, |
| "task_type": trace.task_type.value, |
| "scenario": trace.metadata.get("scenario", "normal"), |
| "simulated_cost": sim_cost, |
| "simulated_success": sim_success, |
| "routing_tier": result.routing_decision.tier, |
| "model_id": result.routing_decision.model_id, |
| "tool_count": len(result.tool_decisions), |
| "verifier_used": result.verifier_decision is not None, |
| }) |
| |
| n = len(traces) |
| avg_cost_success = total_cost / max(success_count + partial_count, 1) |
| |
| |
| cost_reduction = (sum(frontier_costs) - sum(actual_costs)) / max(sum(frontier_costs), 1) |
| |
| return BenchmarkResult( |
| benchmark_name="synthetic_benchmark", |
| baseline_name=baseline_name, |
| num_tasks=n, |
| num_success=success_count, |
| num_partial=partial_count, |
| num_failure=failure_count, |
| num_false_done=false_done_count, |
| num_blocked=blocked_count, |
| total_cost=total_cost, |
| avg_cost_success=avg_cost_success, |
| avg_latency_ms=total_latency / n, |
| total_tool_calls=total_tools, |
| total_verifier_calls=total_verifiers, |
| total_retries=total_retries, |
| avg_cache_hit_rate=sum(cache_rates) / n, |
| total_context_tokens=total_context, |
| cost_reduction_vs_frontier=cost_reduction, |
| false_done_rate=false_done_count / n, |
| unsafe_cheap_miss_rate=cheap_misses / n, |
| missed_escalation_rate=escalation_misses / n, |
| regression_rate=regression_count / n, |
| quality_cost_frontier=[ |
| {"cost": c, "success": 1.0 if s else 0.0} |
| for c, s in zip(actual_costs, [r["simulated_success"] for r in results]) |
| ], |
| per_task_results=results, |
| ) |
|
|
| def _simulate(self, trace: AgentTrace, result: OptimizationResult, baseline: str) -> tuple: |
| """Simulate execution based on optimizer decisions.""" |
| |
| |
| base_cost = trace.total_cost_computed |
| |
| |
| tier = result.routing_decision.tier |
| cost_mult = { |
| 1: 0.05, 2: 0.25, 3: 0.75, 4: 1.0, 5: 1.5, |
| }.get(tier, 1.0) |
| |
| |
| if baseline == "always_cheap": |
| cost_mult = 0.25 |
| tier = 2 |
| |
| |
| if baseline == "always_frontier": |
| cost_mult = 1.0 |
| tier = 4 |
| |
| |
| tools_skipped = sum(1 for d in result.tool_decisions if d.decision.value in ("skip", "use_cache")) |
| tool_savings = tools_skipped * 0.005 |
| |
| |
| cache_savings = 0.0 |
| if result.prompt_layout: |
| cache_savings = result.prompt_layout.cache_discount |
| |
| sim_cost = base_cost * cost_mult - tool_savings - cache_savings |
| sim_cost = max(sim_cost, 0.001) |
| |
| |
| sim_latency = trace.total_latency_ms * cost_mult * 0.8 |
| |
| |
| scenario = trace.metadata.get("scenario", "normal") |
| |
| |
| success_prob = 0.95 if tier >= 3 else 0.7 |
| if scenario == "cheap_failure": |
| success_prob = 0.3 if tier <= 2 else 0.85 |
| elif scenario == "tool_underuse": |
| success_prob = 0.6 if tools_skipped > 0 else 0.8 |
| elif scenario == "retry_loop": |
| success_prob = 0.2 |
| elif scenario == "frontier_unnecessary": |
| success_prob = 0.95 |
| elif scenario == "meta_tool_success": |
| success_prob = 0.9 |
| elif scenario == "meta_tool_bad": |
| success_prob = 0.4 |
| elif scenario == "false_done": |
| success_prob = 0.1 |
| elif scenario == "blocked" or scenario == "stopped_doom": |
| success_prob = 0.0 |
| elif scenario == "human_escalation": |
| success_prob = 0.5 |
| |
| |
| if result.verifier_decision and result.verifier_decision.decision.value == "call_verifier": |
| success_prob += 0.05 |
| |
| |
| if result.meta_tool_match: |
| success_prob += 0.03 |
| |
| sim_success = success_prob > 0.5 |
| |
| return sim_cost, sim_latency, sim_success |
|
|
| def run_all_baselines(self, traces: List[AgentTrace]) -> Dict[str, BenchmarkResult]: |
| """Run all baseline configurations.""" |
| baselines = [ |
| "always_frontier", |
| "always_cheap", |
| "static", |
| "prompt_only", |
| "cascade", |
| "rules_only", |
| "full", |
| ] |
| |
| results = {} |
| for baseline in baselines: |
| print(f"Running baseline: {baseline}...") |
| results[baseline] = self.run_baseline(traces, baseline) |
| |
| return results |
|
|
| def run_ablations(self, traces: List[AgentTrace]) -> Dict[str, BenchmarkResult]: |
| """Run ablation study disabling each module.""" |
| ablations = [ |
| "no_router", |
| "no_context_budgeter", |
| "no_cache_layout", |
| "no_tool_gate", |
| "no_verifier_budgeter", |
| "no_retry_optimizer", |
| "no_meta_tool_miner", |
| "no_early_termination", |
| ] |
| |
| results = {} |
| for ablation in ablations: |
| print(f"Running ablation: {ablation}...") |
| results[ablation] = self.run_baseline(traces, ablation) |
| |
| return results |
|
|
| def report(self, results: Dict[str, BenchmarkResult]) -> str: |
| """Generate formatted benchmark report.""" |
| lines = ["=" * 80, "AGENT COST OPTIMIZER BENCHMARK REPORT", "=" * 80, ""] |
| |
| headers = ["Baseline", "Success", "Partial", "Fail", "Blocked", "False-DONE", |
| "Total Cost", "Avg Cost/Succ", "Latency(ms)", "Tools", "Verifiers", |
| "Retries", "Cache Hit", "Cost Reduction", "Regression"] |
| |
| lines.append(" | ".join(headers)) |
| lines.append("-" * 120) |
| |
| for name, result in results.items(): |
| row = [ |
| name[:20].ljust(20), |
| f"{result.num_success / result.num_tasks:.1%}", |
| f"{result.num_partial / result.num_tasks:.1%}", |
| f"{result.num_failure / result.num_tasks:.1%}", |
| f"{result.num_blocked / result.num_tasks:.1%}", |
| f"{result.false_done_rate:.1%}", |
| f"${result.total_cost:.2f}", |
| f"${result.avg_cost_success:.4f}", |
| f"{result.avg_latency_ms:.0f}", |
| str(result.total_tool_calls), |
| str(result.total_verifier_calls), |
| str(result.total_retries), |
| f"{result.avg_cache_hit_rate:.1%}", |
| f"{result.cost_reduction_vs_frontier:.1%}", |
| f"{result.regression_rate:.1%}", |
| ] |
| lines.append(" | ".join(row)) |
| |
| lines.append("") |
| lines.append("=" * 80) |
| |
| |
| best_score = -float("inf") |
| best_name = "" |
| for name, result in results.items(): |
| success_rate = (result.num_success + result.num_partial) / result.num_tasks |
| score = success_rate * 10 - result.avg_cost_success * 100 - result.regression_rate * 50 |
| if score > best_score: |
| best_score = score |
| best_name = name |
| |
| lines.append(f"BEST OVERALL: {best_name} (score={best_score:.2f})") |
| lines.append("") |
| |
| return "\n".join(lines) |
|
|
| def export(self, results: Dict[str, BenchmarkResult], path: str) -> None: |
| """Export results to JSON.""" |
| export_data = {} |
| for name, result in results.items(): |
| export_data[name] = { |
| "benchmark_name": result.benchmark_name, |
| "baseline_name": result.baseline_name, |
| "num_tasks": result.num_tasks, |
| "num_success": result.num_success, |
| "num_partial": result.num_partial, |
| "num_failure": result.num_failure, |
| "num_false_done": result.num_false_done, |
| "num_blocked": result.num_blocked, |
| "total_cost": result.total_cost, |
| "avg_cost_success": result.avg_cost_success, |
| "avg_latency_ms": result.avg_latency_ms, |
| "total_tool_calls": result.total_tool_calls, |
| "total_verifier_calls": result.total_verifier_calls, |
| "total_retries": result.total_retries, |
| "avg_cache_hit_rate": result.avg_cache_hit_rate, |
| "total_context_tokens": result.total_context_tokens, |
| "cost_reduction_vs_frontier": result.cost_reduction_vs_frontier, |
| "false_done_rate": result.false_done_rate, |
| "unsafe_cheap_miss_rate": result.unsafe_cheap_miss_rate, |
| "missed_escalation_rate": result.missed_escalation_rate, |
| "regression_rate": result.regression_rate, |
| } |
| |
| with open(path, "w") as f: |
| json.dump(export_data, f, indent=2) |
|
|