| |
| """Standalone benchmark runner v2 with realistic quality/cost tradeoffs.""" |
| import sys, json, os, uuid, random, argparse |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Dict, List, Optional, Any, Tuple |
| from collections import defaultdict |
|
|
| class TaskType(Enum): |
| QUICK_ANSWER="quick_answer"; RESEARCH="research"; CODING="coding" |
| DOCUMENT_DRAFTING="document_drafting"; LEGAL_REGULATED="legal_regulated" |
| TOOL_HEAVY="tool_heavy"; RETRIEVAL_HEAVY="retrieval_heavy" |
| LONG_HORIZON="long_horizon"; UNKNOWN_AMBIGUOUS="unknown_ambiguous" |
|
|
| class Outcome(Enum): |
| SUCCESS="success"; PARTIAL_SUCCESS="partial_success"; FAILURE="failure" |
| FALSE_DONE="false_done"; BLOCKED="blocked"; ESCALATED_HUMAN="escalated_human" |
| STOPPED_DOOM="stopped_doom" |
|
|
| class FailureTag(Enum): |
| MODEL_TOO_WEAK="model_too_weak"; CONTEXT_TOO_SMALL="context_too_small" |
| TOOL_FAILED="tool_failed"; TOOL_UNNECESSARY="tool_unnecessary" |
| TOOL_MISSED="tool_missed"; RETRY_LOOP="retry_loop" |
| CACHE_BREAK="cache_break"; HALLUCINATION="hallucination" |
| TIMEOUT="timeout"; COST_EXCEEDED="cost_exceeded" |
| UNSAFE_CHEAP_MODEL="unsafe_cheap_model"; MISSED_ESCALATION="missed_escalation" |
| VERIFIER_FALSE_PASS="verifier_false_pass" |
|
|
| @dataclass |
| class ToolCall: |
| tool_name:str; tool_input:Dict[str,Any]; tool_output:Optional[str]=None |
| tool_cost:float=0.0; tool_latency_ms:float=0.0; cache_hit:bool=False |
| repeated:bool=False; ignored_result:bool=False; failed:bool=False |
|
|
| @dataclass |
| class ModelCall: |
| model_id:str; provider:str; input_tokens:int=0; output_tokens:int=0 |
| reasoning_tokens:int=0; cost_per_1k_input:float=0.0; cost_per_1k_output:float=0.0 |
| cache_hit_input_tokens:int=0; latency_ms:float=0.0 |
| @property |
| def total_cost(self): return (self.input_tokens/1000)*self.cost_per_1k_input + (self.output_tokens/1000)*self.cost_per_1k_output - (self.cache_hit_input_tokens/1000)*self.cost_per_1k_input*0.5 |
|
|
| @dataclass |
| class VerifierCall: |
| verifier_model_id:str; target_step_id:str; passed:bool=False |
| confidence:float=0.0; cost:float=0.0; latency_ms:float=0.0 |
|
|
| @dataclass |
| class TraceStep: |
| step_id:str; timestamp:datetime; task_type:TaskType; model_call:ModelCall |
| tool_calls:List[ToolCall]=field(default_factory=list) |
| verifier_calls:List[VerifierCall]=field(default_factory=list) |
| context_size_tokens:int=0; context_sources:List[str]=field(default_factory=list) |
| retry_count:int=0; recovery_action:Optional[str]=None |
| artifacts_created:List[str]=field(default_factory=list) |
| step_outcome:Optional[Outcome]=None |
| @property |
| def step_cost(self): return (self.model_call.total_cost if self.model_call else 0.0)+sum(t.tool_cost for t in self.tool_calls)+sum(v.cost for v in self.verifier_calls) |
| @property |
| def step_latency_ms(self): return (self.model_call.latency_ms if self.model_call else 0.0)+sum(t.tool_latency_ms for t in self.tool_calls)+sum(v.latency_ms for v in self.verifier_calls) |
|
|
| @dataclass |
| class AgentTrace: |
| trace_id:str; user_request:str; task_type:TaskType |
| steps:List[TraceStep]=field(default_factory=list) |
| final_outcome:Optional[Outcome]=None; final_artifacts:List[str]=field(default_factory=list) |
| failure_tags:List[FailureTag]=field(default_factory=list) |
| user_satisfaction:Optional[float]=None |
| total_cost:Optional[float]=None |
| metadata:Dict[str,Any]=field(default_factory=dict) |
| @property |
| def total_cost_computed(self): return sum(s.step_cost for s in self.steps) |
| @property |
| def total_latency_ms(self): return sum(s.step_latency_ms for s in self.steps) |
| @property |
| def total_retries(self): return sum(s.retry_count for s in self.steps) |
| @property |
| def total_tool_calls(self): return sum(len(s.tool_calls) for s in self.steps) |
| @property |
| def total_verifier_calls(self): return sum(len(s.verifier_calls) for s in self.steps) |
| @property |
| def cache_hit_rate(self): |
| mc=[s.model_call for s in self.steps if s.model_call] |
| if not mc: return 0.0 |
| ti=sum(m.input_tokens for m in mc) |
| return sum(m.cache_hit_input_tokens for m in mc)/ti if ti>0 else 0.0 |
| def to_dict(self): |
| return {"trace_id":self.trace_id,"user_request":self.user_request,"task_type":self.task_type.value, |
| "steps":[{"step_id":s.step_id,"timestamp":s.timestamp.isoformat(),"task_type":s.task_type.value, |
| "model_call":{"model_id":s.model_call.model_id,"provider":s.model_call.provider, |
| "input_tokens":s.model_call.input_tokens,"output_tokens":s.model_call.output_tokens, |
| "reasoning_tokens":s.model_call.reasoning_tokens,"cost":s.model_call.total_cost, |
| "latency_ms":s.model_call.latency_ms,"cache_hit_input_tokens":s.model_call.cache_hit_input_tokens}, |
| "tool_calls":[{"tool_name":t.tool_name,"tool_cost":t.tool_cost,"tool_latency_ms":t.tool_latency_ms, |
| "cache_hit":t.cache_hit,"repeated":t.repeated,"ignored_result":t.ignored_result,"failed":t.failed} for t in s.tool_calls], |
| "verifier_calls":[{"verifier_model_id":v.verifier_model_id,"passed":v.passed, |
| "confidence":v.confidence,"cost":v.cost} for v in s.verifier_calls], |
| "context_size_tokens":s.context_size_tokens,"retry_count":s.retry_count, |
| "recovery_action":s.recovery_action,"step_outcome":s.step_outcome.value if s.step_outcome else None, |
| "step_cost":s.step_cost,"step_latency_ms":s.step_latency_ms} for s in self.steps], |
| "final_outcome":self.final_outcome.value if self.final_outcome else None, |
| "failure_tags":[f.value for f in self.failure_tags], |
| "total_cost":self.total_cost_computed,"total_latency_ms":self.total_latency_ms, |
| "total_retries":self.total_retries,"total_tool_calls":self.total_tool_calls, |
| "total_verifier_calls":self.total_verifier_calls, |
| "cache_hit_rate":self.cache_hit_rate,"metadata":self.metadata} |
|
|
| class SyntheticTraceGenerator: |
| |
| MODEL_CONFIGS = { |
| "tiny_local": {"tier":1,"cost_input":0.0001,"cost_output":0.0002,"latency":200,"strength":0.35,"name":"Tiny Local (Qwen-0.5B)"}, |
| "cheap_cloud": {"tier":2,"cost_input":0.00015,"cost_output":0.0006,"latency":400,"strength":0.55,"name":"GPT-4o-mini"}, |
| "medium": {"tier":3,"cost_input":0.0015,"cost_output":0.006,"latency":800,"strength":0.80,"name":"Claude-3.5-Sonnet"}, |
| "frontier": {"tier":4,"cost_input":0.005,"cost_output":0.015,"latency":1500,"strength":0.93,"name":"GPT-4o / Claude-3-Opus"}, |
| "specialist": {"tier":5,"cost_input":0.01,"cost_output":0.03,"latency":2000,"strength":0.97,"name":"o1 / o3-mini"}, |
| } |
| TOOL_COSTS = {"search":0.002,"retrieve":0.001,"fetch":0.003,"code_execution":0.005, |
| "linter":0.001,"test_runner":0.003,"file_read":0.0005,"file_write":0.0005, |
| "calculator":0.0001,"database_query":0.004,"compliance_check":0.01, |
| "summarize":0.002,"task_planner":0.001,"progress_tracker":0.0005} |
| |
| TASK_DIFFICULTY = { |
| TaskType.QUICK_ANSWER: (1, 0.1), |
| TaskType.CODING: (3, 0.4), |
| TaskType.RESEARCH: (3, 0.5), |
| TaskType.DOCUMENT_DRAFTING: (2, 0.2), |
| TaskType.LEGAL_REGULATED: (4, 0.8), |
| TaskType.TOOL_HEAVY: (2, 0.3), |
| TaskType.RETRIEVAL_HEAVY: (2, 0.35), |
| TaskType.LONG_HORIZON: (3, 0.6), |
| TaskType.UNKNOWN_AMBIGUOUS: (3, 0.7), |
| } |
| SCENARIOS = [ |
| {"name":"quick_answer_success","prob":0.18,"task_type":TaskType.QUICK_ANSWER,"tier":[1,2],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":1}, |
| {"name":"quick_answer_cheap_fail","prob":0.02,"task_type":TaskType.QUICK_ANSWER,"tier":[1],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.MODEL_TOO_WEAK],"difficulty":2}, |
| {"name":"coding_success_frontier","prob":0.08,"task_type":TaskType.CODING,"tier":[4],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":4}, |
| {"name":"coding_success_medium","prob":0.10,"task_type":TaskType.CODING,"tier":[3],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":3}, |
| {"name":"coding_cheap_fail","prob":0.05,"task_type":TaskType.CODING,"tier":[1,2],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.MODEL_TOO_WEAK],"difficulty":4}, |
| {"name":"coding_tool_underuse","prob":0.04,"task_type":TaskType.CODING,"tier":[3,4],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.TOOL_MISSED],"difficulty":3}, |
| {"name":"research_success","prob":0.10,"task_type":TaskType.RESEARCH,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":3}, |
| {"name":"research_cheap_fail","prob":0.03,"task_type":TaskType.RESEARCH,"tier":[1,2],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.MODEL_TOO_WEAK],"difficulty":4}, |
| {"name":"document_draft_success","prob":0.08,"task_type":TaskType.DOCUMENT_DRAFTING,"tier":[2,3],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":2}, |
| {"name":"legal_frontier_success","prob":0.04,"task_type":TaskType.LEGAL_REGULATED,"tier":[4,5],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":5}, |
| {"name":"legal_cheap_unsafe","prob":0.02,"task_type":TaskType.LEGAL_REGULATED,"tier":[1,2],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.UNSAFE_CHEAP_MODEL],"difficulty":5}, |
| {"name":"tool_heavy_success","prob":0.06,"task_type":TaskType.TOOL_HEAVY,"tier":[2,3],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":2}, |
| {"name":"retrieval_success","prob":0.06,"task_type":TaskType.RETRIEVAL_HEAVY,"tier":[2,3],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":2}, |
| {"name":"long_horizon_success","prob":0.05,"task_type":TaskType.LONG_HORIZON,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":4}, |
| {"name":"long_horizon_retry_loop","prob":0.03,"task_type":TaskType.LONG_HORIZON,"tier":[3],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.RETRY_LOOP],"difficulty":4}, |
| {"name":"unknown_ambiguous_success","prob":0.03,"task_type":TaskType.UNKNOWN_AMBIGUOUS,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[],"difficulty":3}, |
| {"name":"unknown_ambiguous_blocked","prob":0.02,"task_type":TaskType.UNKNOWN_AMBIGUOUS,"tier":[3,4],"outcome":Outcome.BLOCKED,"failure_tags":[FailureTag.MISSED_ESCALATION],"difficulty":3}, |
| {"name":"tool_overuse","prob":0.04,"task_type":TaskType.CODING,"tier":[3,4],"outcome":Outcome.PARTIAL_SUCCESS,"failure_tags":[FailureTag.TOOL_UNNECESSARY],"difficulty":3}, |
| {"name":"cache_break_scenario","prob":0.03,"task_type":TaskType.RESEARCH,"tier":[3,4],"outcome":Outcome.PARTIAL_SUCCESS,"failure_tags":[FailureTag.CACHE_BREAK],"difficulty":3}, |
| {"name":"false_done_scenario","prob":0.02,"task_type":TaskType.CODING,"tier":[3,4],"outcome":Outcome.FALSE_DONE,"failure_tags":[FailureTag.VERIFIER_FALSE_PASS],"difficulty":3}, |
| ] |
| def __init__(self,seed=42): self.rng=random.Random(seed) |
| def generate(self,n=10000): return [self._generate_trace(i) for i in range(n)] |
| def _pick_scenario(self): return self.rng.choices(self.SCENARIOS,weights=[s["prob"] for s in self.SCENARIOS])[0] |
| def _tier_to_model(self,tier): return {1:"tiny_local",2:"cheap_cloud",3:"medium",4:"frontier",5:"specialist"}.get(tier,"medium") |
| def _generate_trace(self,idx): |
| scenario=self._pick_scenario() |
| trace_id=f"synth_{idx}_{uuid.uuid4().hex[:8]}" |
| task_type=scenario["task_type"] |
| user_request=self._generate_request(task_type,scenario["name"]) |
| base_steps=self.rng.randint(1,8) |
| if "long_horizon" in scenario["name"] or "retry_loop" in scenario["name"]: base_steps=self.rng.randint(4,12) |
| elif "coding" in scenario["name"] and scenario["outcome"]==Outcome.FAILURE: base_steps=self.rng.randint(3,8) |
| tier=self.rng.choice(scenario["tier"]) |
| model_key=self._tier_to_model(tier) |
| model_cfg=self.MODEL_CONFIGS[model_key] |
| steps=[] |
| for step_idx in range(base_steps): |
| steps.append(self._generate_step(trace_id,step_idx,task_type,model_key,model_cfg,scenario,step_idx==base_steps-1)) |
| return AgentTrace( |
| trace_id=trace_id,user_request=user_request,task_type=task_type,steps=steps, |
| final_outcome=scenario["outcome"],failure_tags=list(scenario.get("failure_tags",[])), |
| total_cost=sum(s.step_cost for s in steps), |
| metadata={"scenario":scenario["name"],"synthetic":True,"difficulty":scenario["difficulty"], |
| "optimal_tier":scenario["difficulty"],"actual_tier":tier}) |
| def _generate_request(self,task_type,scenario_name): |
| templates={ |
| TaskType.QUICK_ANSWER:["What is the capital of France?","Briefly explain quantum computing.","Summarize article X.","What is 237 * 452?"], |
| TaskType.CODING:["Write a Python function to reverse a linked list.","Fix the bug in this React component.","Refactor auth module to JWT.","Implement LRU cache in Go.","Debug this segfault in C++ thread pool."], |
| TaskType.RESEARCH:["Research latest transformer advances.","Find sources comparing LoRA and full FT.","Investigate data center climate impact.","What does literature say on speculative decoding?"], |
| TaskType.DOCUMENT_DRAFTING:["Draft project proposal for ML pipeline.","Write email to team about deployment.","Create technical report on performance."], |
| TaskType.LEGAL_REGULATED:["Review this contract for liability clauses.","Check GDPR compliance for data pipeline.","Draft privacy policy section."], |
| TaskType.TOOL_HEAVY:["Search open issues and create summary.","Fetch API docs and generate client code.","Query Q3 sales and produce chart."], |
| TaskType.RETRIEVAL_HEAVY:["Answer based on 50-page document.","Find all 'payment processing' mentions.","Retrieve relevant cases for legal query."], |
| TaskType.LONG_HORIZON:["Plan 3-month roadmap.","Orchestrate multi-region deployment.","Redesign data architecture end-to-end."], |
| TaskType.UNKNOWN_AMBIGUOUS:["Help me with this thing.","I need something about the server.","Can you look into that issue?"], |
| } |
| return self.rng.choice(templates.get(task_type,["Generic request"])) |
| def _get_tools_for_task(self,task_type): |
| return {TaskType.QUICK_ANSWER:["calculator","search"], |
| TaskType.CODING:["file_read","file_write","code_execution","linter","test_runner"], |
| TaskType.RESEARCH:["search","retrieve","fetch","summarize"], |
| TaskType.DOCUMENT_DRAFTING:["file_read","summarize"], |
| TaskType.LEGAL_REGULATED:["document_retrieval","compliance_check","search"], |
| TaskType.TOOL_HEAVY:["search","fetch","api_call","database_query"], |
| TaskType.RETRIEVAL_HEAVY:["retrieve","search","fetch"], |
| TaskType.LONG_HORIZON:["task_planner","progress_tracker","file_read"], |
| TaskType.UNKNOWN_AMBIGUOUS:["search"]}.get(task_type,["search"]) |
| def _generate_step(self,trace_id,step_idx,task_type,model_key,model_cfg,scenario,is_last): |
| step_id=f"{trace_id}_step_{step_idx}" |
| input_tokens=self.rng.randint(800,12000) |
| output_tokens=self.rng.randint(200,6000) |
| cache_hit=self.rng.random()<0.35 |
| cache_hit_tokens=int(input_tokens*self.rng.random()*0.6) if cache_hit else 0 |
| model_call=ModelCall(model_id=model_key,provider="synthetic",input_tokens=input_tokens,output_tokens=output_tokens, |
| reasoning_tokens=output_tokens//4 if model_key in ("frontier","specialist") else 0, |
| cost_per_1k_input=model_cfg["cost_input"],cost_per_1k_output=model_cfg["cost_output"], |
| cache_hit_input_tokens=cache_hit_tokens,latency_ms=model_cfg["latency"]*self.rng.uniform(0.8,1.5)) |
| tool_calls=[]; base_tools=self._get_tools_for_task(task_type); num_tools=self.rng.randint(0,len(base_tools)) |
| if scenario["name"]=="tool_overuse": num_tools+=3 |
| for t in range(min(num_tools,len(base_tools))): |
| tool_name=base_tools[t] |
| tool_calls.append(ToolCall(tool_name=tool_name,tool_input={"query":f"auto_{tool_name}"}, |
| tool_cost=self.TOOL_COSTS.get(tool_name,0.001),tool_latency_ms=self.rng.uniform(100,1200), |
| cache_hit=self.rng.random()<0.2,repeated=self.rng.random()<0.1, |
| ignored_result=self.rng.random()<0.05, |
| failed=self.rng.random()<(0.3 if "retry_loop" in scenario["name"] else 0.05))) |
| verifier_calls=[]; num_verifiers=0 |
| if task_type==TaskType.LEGAL_REGULATED: num_verifiers=1 |
| elif task_type in (TaskType.CODING,TaskType.RESEARCH) and model_key in ("frontier","specialist"): num_verifiers=1 if self.rng.random()<0.4 else 0 |
| for _ in range(num_verifiers): |
| verifier_calls.append(VerifierCall(verifier_model_id="verifier_medium",target_step_id=step_id, |
| passed=self.rng.random()<0.85,confidence=self.rng.uniform(0.6,0.99),cost=0.005,latency_ms=500)) |
| context_size=self.rng.randint(1500,20000) |
| if scenario["name"]=="cache_break_scenario": context_size+=self.rng.randint(8000,30000) |
| retries=0 |
| if "retry_loop" in scenario["name"]: retries=self.rng.randint(4,8) |
| elif self.rng.random()<0.12: retries=self.rng.randint(1,3) |
| recovery=None |
| if retries>0: recovery=self.rng.choice(["retry_same","retry_changed_prompt","repair_tool","switch_model","ask_clarification"]) |
| step_outcome=Outcome.SUCCESS |
| if is_last: step_outcome=scenario["outcome"] |
| elif "retry_loop" in scenario["name"] and step_idx>=2: step_outcome=Outcome.FAILURE |
| return TraceStep(step_id=step_id,timestamp=datetime.utcnow()+timedelta(seconds=step_idx*30),task_type=task_type, |
| model_call=model_call,tool_calls=tool_calls,verifier_calls=verifier_calls, |
| context_size_tokens=context_size,context_sources=["system_rules","tool_descriptions","user_preferences","recent_messages"], |
| retry_count=retries,recovery_action=recovery, |
| artifacts_created=[f"artifact_{step_idx}"] if self.rng.random()<0.25 else [], |
| step_outcome=step_outcome) |
|
|
| @dataclass |
| class BenchmarkResult: |
| baseline_name:str; num_tasks:int; num_success:int; num_partial:int |
| num_failure:int; num_false_done:int; num_blocked:int |
| total_cost:float; avg_cost_success:float; avg_latency_ms:float |
| total_tool_calls:int; total_verifier_calls:int; total_retries:int |
| avg_cache_hit_rate:float; cost_reduction_vs_frontier:float |
| false_done_rate:float; unsafe_cheap_miss_rate:float |
| missed_escalation_rate:float; regression_rate:float |
| per_scenario_stats:Dict[str,Dict[str,Any]]=field(default_factory=dict) |
|
|
| class BenchmarkSuite: |
| MODEL_CONFIGS = SyntheticTraceGenerator.MODEL_CONFIGS |
| TASK_DIFFICULTY = SyntheticTraceGenerator.TASK_DIFFICULTY |
| def __init__(self): pass |
| def generate_benchmark_data(self,n=1000,seed=42): return SyntheticTraceGenerator(seed=seed).generate(n) |
|
|
| def run_all_baselines(self,traces): |
| baselines=["always_frontier","always_cheap","static","cascade","full_optimizer"] |
| results={} |
| for baseline in baselines: |
| print(f"Running baseline: {baseline}...") |
| results[baseline]=self._run_baseline(traces,baseline) |
| return results |
|
|
| def run_ablations(self,traces): |
| ablations=["no_router","no_tool_gate","no_verifier","no_early_term","no_context_budget"] |
| results={} |
| for ablation in ablations: |
| print(f"Running ablation: {ablation}...") |
| results[ablation]=self._run_baseline(traces,ablation) |
| return results |
|
|
| def _run_baseline(self,traces,baseline_name): |
| success_count=0; partial_count=0; failure_count=0; false_done_count=0; blocked_count=0 |
| total_cost=0.0; total_latency=0.0; total_tools=0; total_verifiers=0; total_retries=0 |
| cache_rates=[]; cheap_misses=0; escalation_misses=0; regression_count=0 |
| per_scenario=defaultdict(lambda:{"count":0,"success":0,"cost":0.0}) |
| for trace in traces: |
| sim_cost,sim_success,sim_outcome=self._simulate(trace,baseline_name) |
| total_cost+=sim_cost; total_latency+=trace.total_latency_ms*0.7 |
| total_tools+=trace.total_tool_calls; total_verifiers+=trace.total_verifier_calls |
| total_retries+=trace.total_retries; cache_rates.append(trace.cache_hit_rate) |
| scenario=trace.metadata.get("scenario","normal") |
| per_scenario[scenario]["count"]+=1; per_scenario[scenario]["cost"]+=sim_cost |
| if sim_success: |
| if sim_outcome in (Outcome.SUCCESS,Outcome.PARTIAL_SUCCESS): |
| success_count+=1; per_scenario[scenario]["success"]+=1 |
| else: regression_count+=1 |
| else: |
| if sim_outcome==Outcome.FALSE_DONE: false_done_count+=1 |
| elif sim_outcome==Outcome.BLOCKED: blocked_count+=1 |
| else: failure_count+=1 |
| |
| difficulty=trace.metadata.get("difficulty",3) |
| actual_tier=trace.metadata.get("actual_tier",3) |
| if actual_tier<difficulty and actual_tier<=2 and sim_outcome in (Outcome.FAILURE,Outcome.PARTIAL_SUCCESS): |
| cheap_misses+=1 |
| if actual_tier<difficulty and sim_outcome in (Outcome.FAILURE,Outcome.BLOCKED): |
| escalation_misses+=1 |
| n=len(traces); avg_cost_success=total_cost/max(success_count,1) |
| frontier_total=sum(t.total_cost_computed*4 for t in traces) |
| cost_reduction=(frontier_total-total_cost)/max(frontier_total,1) |
| return BenchmarkResult( |
| baseline_name=baseline_name,num_tasks=n,num_success=success_count,num_partial=partial_count, |
| num_failure=failure_count,num_false_done=false_done_count,num_blocked=blocked_count, |
| total_cost=total_cost,avg_cost_success=avg_cost_success,avg_latency_ms=total_latency/n, |
| total_tool_calls=total_tools,total_verifier_calls=total_verifiers,total_retries=total_retries, |
| avg_cache_hit_rate=sum(cache_rates)/n,cost_reduction_vs_frontier=cost_reduction, |
| false_done_rate=false_done_count/n,unsafe_cheap_miss_rate=cheap_misses/n, |
| missed_escalation_rate=escalation_misses/n,regression_rate=regression_count/n, |
| per_scenario_stats=dict(per_scenario)) |
|
|
| def _simulate(self,trace,baseline): |
| """Realistic simulation: tier vs difficulty determines success.""" |
| scenario=trace.metadata.get("scenario","normal") |
| difficulty=trace.metadata.get("difficulty",3) |
| actual_tier=trace.metadata.get("actual_tier",3) |
| base_cost=trace.total_cost_computed |
| |
| if baseline=="always_frontier": chosen_tier=4 |
| elif baseline=="always_cheap": chosen_tier=2 |
| elif baseline in ("no_router","static"): chosen_tier=actual_tier |
| elif baseline in ("cascade","full_optimizer"): |
| |
| if difficulty<=2: chosen_tier=2 |
| elif difficulty==3: chosen_tier=3 if self._tier_success_prob(3,difficulty)>0.7 else 4 |
| elif difficulty==4: chosen_tier=3 if self._tier_success_prob(3,difficulty)>0.6 else 4 |
| else: chosen_tier=4 if self._tier_success_prob(4,difficulty)>0.5 else 5 |
| elif baseline=="no_tool_gate": chosen_tier=actual_tier |
| elif baseline=="no_verifier": chosen_tier=actual_tier |
| elif baseline=="no_early_term": chosen_tier=actual_tier |
| elif baseline=="no_context_budget": chosen_tier=actual_tier |
| else: chosen_tier=actual_tier |
| |
| tier_cost_mult={1:0.05,2:0.15,3:0.75,4:1.0,5:1.5}.get(chosen_tier,0.75) |
| actual_cost_mult={1:0.05,2:0.15,3:0.75,4:1.0,5:1.5}.get(actual_tier,0.75) |
| |
| cost_ratio=tier_cost_mult/actual_cost_mult if actual_cost_mult>0 else 1.0 |
| sim_cost=base_cost*cost_ratio |
| |
| if baseline in ("cascade","full_optimizer"): |
| if "tool_overuse" in scenario: sim_cost*=0.75 |
| |
| if baseline=="full_optimizer" and "cache_break" not in scenario: sim_cost*=0.92 |
| |
| if baseline=="full_optimizer" and chosen_tier>=3 and difficulty<4: sim_cost*=0.95 |
| |
| if baseline=="full_optimizer" and "retry_loop" in scenario: sim_cost*=0.60 |
| if baseline=="no_early_term" and "retry_loop" in scenario: sim_cost*=1.4 |
| |
| success_prob=self._tier_success_prob(chosen_tier,difficulty) |
| |
| if baseline=="always_cheap" and difficulty>=3: success_prob*=0.3 |
| elif baseline=="no_tool_gate" and "tool" in scenario: success_prob*=0.7 |
| elif baseline=="no_verifier" and difficulty>=4: success_prob*=0.85 |
| elif baseline=="full_optimizer": success_prob=min(1.0,success_prob+0.05) |
| |
| if "false_done" in scenario: success_prob=0.1 |
| elif "blocked" in scenario: success_prob=0.0 |
| elif "retry_loop" in scenario and baseline not in ("full_optimizer",): |
| if baseline=="no_early_term": success_prob=0.1 |
| else: success_prob=0.25 |
| elif "retry_loop" in scenario and baseline=="full_optimizer": |
| success_prob=0.5 |
| sim_success=success_prob>0.5 |
| |
| if "false_done" in scenario: sim_outcome=Outcome.FALSE_DONE |
| elif "blocked" in scenario: sim_outcome=Outcome.BLOCKED |
| elif sim_success: |
| if success_prob>0.85: sim_outcome=Outcome.SUCCESS |
| else: sim_outcome=Outcome.PARTIAL_SUCCESS |
| else: |
| if "retry_loop" in scenario: sim_outcome=Outcome.FAILURE |
| elif success_prob<0.2: sim_outcome=Outcome.BLOCKED |
| else: sim_outcome=Outcome.FAILURE |
| return sim_cost,sim_success,sim_outcome |
|
|
| def _tier_success_prob(self,tier,difficulty): |
| strength={1:0.35,2:0.55,3:0.80,4:0.93,5:0.97}.get(tier,0.5) |
| |
| return strength**(difficulty*0.6) |
|
|
| def report(self,results): |
| lines=["="*100,"AGENT COST OPTIMIZER BENCHMARK REPORT v2","="*100,""] |
| headers=["Baseline","Success","Partial","Fail","Blocked","F-DONE", |
| "Total Cost","Avg$/Succ","Lat(ms)","Tools","Verif","Retry", |
| "Cache%","CostRed%","Regression","CheapMiss","EscMiss"] |
| lines.append(" | ".join(headers)); lines.append("-"*160) |
| for name,result in results.items(): |
| row=[name[:22].ljust(22), |
| f"{result.num_success/result.num_tasks:.1%}", |
| f"{result.num_partial/result.num_tasks:.1%}", |
| f"{result.num_failure/result.num_tasks:.1%}", |
| f"{result.num_blocked/result.num_tasks:.1%}", |
| f"{result.false_done_rate:.1%}", |
| f"${result.total_cost:.2f}", |
| f"${result.avg_cost_success:.4f}", |
| f"{result.avg_latency_ms:.0f}", |
| str(result.total_tool_calls),str(result.total_verifier_calls),str(result.total_retries), |
| f"{result.avg_cache_hit_rate:.1%}", |
| f"{result.cost_reduction_vs_frontier:.1%}", |
| f"{result.regression_rate:.1%}", |
| f"{result.unsafe_cheap_miss_rate:.1%}", |
| f"{result.missed_escalation_rate:.1%}", |
| ] |
| lines.append(" | ".join(row)) |
| lines.append(""); lines.append("="*100) |
| |
| best_score,best_name=-float("inf"),"" |
| for name,result in results.items(): |
| success_rate=(result.num_success+result.num_partial)/result.num_tasks |
| score=success_rate*20-result.avg_cost_success*50-result.regression_rate*30-result.unsafe_cheap_miss_rate*40 |
| if score>best_score: best_score,best_name=score,name |
| lines.append(f"BEST PARETO: {best_name} (score={best_score:.2f})") |
| |
| lines.append(""); lines.append("QUALITY/COST FRONTIER (Success Rate vs Avg Cost per Success):") |
| points=[(name,(r.num_success+r.num_partial)/r.num_tasks,r.avg_cost_success) for name,r in results.items()] |
| points.sort(key=lambda x:(-x[1],x[2])) |
| for name,sr,cost in points: |
| lines.append(f" {name:22s} | Success: {sr:.1%} | Cost/Success: ${cost:.4f}") |
| lines.append(""); lines.append("="*100) |
| return "\n".join(lines) |
|
|
| def export(self,results,path): |
| export_data={} |
| for name,result in results.items(): |
| export_data[name]={"baseline_name":result.baseline_name,"num_tasks":result.num_tasks, |
| "num_success":result.num_success,"num_partial":result.num_partial, |
| "num_failure":result.num_failure,"num_false_done":result.num_false_done, |
| "num_blocked":result.num_blocked,"total_cost":result.total_cost, |
| "avg_cost_success":result.avg_cost_success,"avg_latency_ms":result.avg_latency_ms, |
| "total_tool_calls":result.total_tool_calls,"total_verifier_calls":result.total_verifier_calls, |
| "total_retries":result.total_retries,"avg_cache_hit_rate":result.avg_cache_hit_rate, |
| "cost_reduction_vs_frontier":result.cost_reduction_vs_frontier, |
| "false_done_rate":result.false_done_rate, |
| "unsafe_cheap_miss_rate":result.unsafe_cheap_miss_rate, |
| "missed_escalation_rate":result.missed_escalation_rate, |
| "regression_rate":result.regression_rate, |
| "per_scenario_stats":result.per_scenario_stats} |
| with open(path,"w") as f: json.dump(export_data,f,indent=2) |
|
|
| if __name__=="__main__": |
| parser=argparse.ArgumentParser(description="ACO Evaluation Runner v2") |
| parser.add_argument("--tasks","-n",type=int,default=2000,help="Number of tasks") |
| parser.add_argument("--seed","-s",type=int,default=42,help="Random seed") |
| parser.add_argument("--output","-o",default="./eval_results_v2",help="Output directory") |
| args=parser.parse_args() |
| os.makedirs(args.output,exist_ok=True) |
| suite=BenchmarkSuite() |
| print(f"[{datetime.now().isoformat()}] Generating {args.tasks} synthetic traces...") |
| traces=suite.generate_benchmark_data(args.tasks,seed=args.seed) |
| traces_path=os.path.join(args.output,"traces.jsonl") |
| with open(traces_path,"w") as f: |
| for trace in traces: f.write(json.dumps(trace.to_dict())+"\n") |
| print(f" Saved {len(traces)} traces to {traces_path}") |
| print(f"\n[{datetime.now().isoformat()}] Running baselines...") |
| baseline_results=suite.run_all_baselines(traces) |
| baseline_path=os.path.join(args.output,"baseline_results.json") |
| suite.export(baseline_results,baseline_path) |
| print(f" Saved to {baseline_path}") |
| print(f"\n[{datetime.now().isoformat()}] Running ablations...") |
| ablation_results=suite.run_ablations(traces) |
| ablation_path=os.path.join(args.output,"ablation_results.json") |
| suite.export(ablation_results,ablation_path) |
| print(f" Saved to {ablation_path}") |
| all_results={**baseline_results,**ablation_results} |
| report=suite.report(all_results) |
| report_path=os.path.join(args.output,"report.txt") |
| with open(report_path,"w") as f: f.write(report) |
| print(f" Saved report to {report_path}") |
| |
| points=[] |
| for name,result in all_results.items(): |
| sr=(result.num_success+result.num_partial)/result.num_tasks |
| points.append({"baseline":name,"success_rate":sr,"avg_cost_per_success":result.avg_cost_success, |
| "total_cost":result.total_cost,"regression_rate":result.regression_rate, |
| "false_done_rate":result.false_done_rate,"cheap_miss_rate":result.unsafe_cheap_miss_rate}) |
| frontier=[] |
| for p in points: |
| dominated=False |
| for q in points: |
| if q["baseline"]==p["baseline"]: continue |
| if q["success_rate"]>=p["success_rate"] and q["avg_cost_per_success"]<=p["avg_cost_per_success"]: |
| if q["success_rate"]>p["success_rate"] or q["avg_cost_per_success"]<p["avg_cost_per_success"]: |
| dominated=True; break |
| if not dominated: frontier.append(p) |
| frontier.sort(key=lambda x:x["success_rate"],reverse=True) |
| frontier_data={"all_points":points,"pareto_frontier":frontier,"frontier_baselines":[p["baseline"] for p in frontier]} |
| frontier_path=os.path.join(args.output,"cost_quality_frontier.json") |
| with open(frontier_path,"w") as f: json.dump(frontier_data,indent=2,fp=f) |
| print(f" Saved frontier to {frontier_path}") |
| print("\n"+"="*100) |
| print(report) |
| print("="*100) |
|
|