| |
| """Standalone benchmark runner - no external deps.""" |
| import sys, json, os, uuid, random, hashlib, argparse |
| from datetime import datetime, timedelta |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Dict, List, Optional, Any, Tuple |
| from pathlib import Path |
|
|
| class TaskType(Enum): |
| QUICK_ANSWER="quick_answer"; RESEARCH="research"; CODING="coding" |
| DOCUMENT_DRAFTING="document_drafting"; LEGAL_REGULATED="legal_regulated" |
| TOOL_HEAVY="tool_heavy"; RETRIEVAL_HEAVY="retrieval_heavy" |
| LONG_HORIZON="long_horizon"; UNKNOWN_AMBIGUOUS="unknown_ambiguous" |
|
|
| class Outcome(Enum): |
| SUCCESS="success"; PARTIAL_SUCCESS="partial_success"; FAILURE="failure" |
| FALSE_DONE="false_done"; BLOCKED="blocked"; ESCALATED_HUMAN="escalated_human" |
| STOPPED_DOOM="stopped_doom" |
|
|
| class FailureTag(Enum): |
| MODEL_TOO_WEAK="model_too_weak"; CONTEXT_TOO_SMALL="context_too_small" |
| TOOL_FAILED="tool_failed"; TOOL_UNNECESSARY="tool_unnecessary" |
| TOOL_MISSED="tool_missed"; VERIFIER_FALSE_PASS="verifier_false_pass" |
| VERIFIER_FALSE_REJECT="verifier_false_reject"; RETRY_LOOP="retry_loop" |
| CACHE_BREAK="cache_break"; HALLUCINATION="hallucination" |
| TIMEOUT="timeout"; COST_EXCEEDED="cost_exceeded" |
| UNSAFE_CHEAP_MODEL="unsafe_cheap_model"; MISSED_ESCALATION="missed_escalation" |
|
|
| @dataclass |
| class ToolCall: |
| tool_name:str; tool_input:Dict[str,Any]; tool_output:Optional[str]=None |
| tool_cost:float=0.0; tool_latency_ms:float=0.0; cache_hit:bool=False |
| repeated:bool=False; ignored_result:bool=False; failed:bool=False |
|
|
| @dataclass |
| class ModelCall: |
| model_id:str; provider:str; input_tokens:int=0; output_tokens:int=0 |
| reasoning_tokens:int=0; cost_per_1k_input:float=0.0; cost_per_1k_output:float=0.0 |
| cache_hit_input_tokens:int=0; latency_ms:float=0.0 |
| @property |
| def total_cost(self): return (self.input_tokens/1000)*self.cost_per_1k_input + (self.output_tokens/1000)*self.cost_per_1k_output - (self.cache_hit_input_tokens/1000)*self.cost_per_1k_input*0.5 |
|
|
| @dataclass |
| class VerifierCall: |
| verifier_model_id:str; target_step_id:str; passed:bool=False |
| confidence:float=0.0; cost:float=0.0; latency_ms:float=0.0 |
|
|
| @dataclass |
| class TraceStep: |
| step_id:str; timestamp:datetime; task_type:TaskType; model_call:ModelCall |
| tool_calls:List[ToolCall]=field(default_factory=list) |
| verifier_calls:List[VerifierCall]=field(default_factory=list) |
| context_size_tokens:int=0; context_sources:List[str]=field(default_factory=list) |
| cache_boundary_reached:bool=False; retry_count:int=0 |
| recovery_action:Optional[str]=None; planned_next:Optional[str]=None |
| user_correction:Optional[str]=None; artifacts_created:List[str]=field(default_factory=list) |
| step_outcome:Optional[Outcome]=None |
| @property |
| def step_cost(self): return (self.model_call.total_cost if self.model_call else 0.0)+sum(t.tool_cost for t in self.tool_calls)+sum(v.cost for v in self.verifier_calls) |
| @property |
| def step_latency_ms(self): return (self.model_call.latency_ms if self.model_call else 0.0)+sum(t.tool_latency_ms for t in self.tool_calls)+sum(v.latency_ms for v in self.verifier_calls) |
|
|
| @dataclass |
| class AgentTrace: |
| trace_id:str; user_request:str; task_type:TaskType |
| steps:List[TraceStep]=field(default_factory=list) |
| final_outcome:Optional[Outcome]=None; final_artifacts:List[str]=field(default_factory=list) |
| failure_tags:List[FailureTag]=field(default_factory=list); user_satisfaction:Optional[float]=None |
| total_cost_saved_vs_frontier:Optional[float]=None; total_cost:Optional[float]=None |
| optimal_cost:Optional[float]=None; metadata:Dict[str,Any]=field(default_factory=dict) |
| @property |
| def total_cost_computed(self): return sum(s.step_cost for s in self.steps) |
| @property |
| def total_latency_ms(self): return sum(s.step_latency_ms for s in self.steps) |
| @property |
| def total_retries(self): return sum(s.retry_count for s in self.steps) |
| @property |
| def total_tool_calls(self): return sum(len(s.tool_calls) for s in self.steps) |
| @property |
| def total_verifier_calls(self): return sum(len(s.verifier_calls) for s in self.steps) |
| @property |
| def total_context_tokens(self): return sum(s.context_size_tokens for s in self.steps) |
| @property |
| def cache_hit_rate(self): |
| mc=[s.model_call for s in self.steps if s.model_call] |
| if not mc: return 0.0 |
| ti=sum(m.input_tokens for m in mc) |
| return sum(m.cache_hit_input_tokens for m in mc)/ti if ti>0 else 0.0 |
| def to_dict(self): |
| return {"trace_id":self.trace_id,"user_request":self.user_request,"task_type":self.task_type.value, |
| "steps":[{"step_id":s.step_id,"timestamp":s.timestamp.isoformat(),"task_type":s.task_type.value, |
| "model_call":{"model_id":s.model_call.model_id,"provider":s.model_call.provider, |
| "input_tokens":s.model_call.input_tokens,"output_tokens":s.model_call.output_tokens, |
| "reasoning_tokens":s.model_call.reasoning_tokens,"cost":s.model_call.total_cost, |
| "latency_ms":s.model_call.latency_ms,"cache_hit_input_tokens":s.model_call.cache_hit_input_tokens}, |
| "tool_calls":[{"tool_name":t.tool_name,"tool_cost":t.tool_cost,"tool_latency_ms":t.tool_latency_ms, |
| "cache_hit":t.cache_hit,"repeated":t.repeated,"ignored_result":t.ignored_result,"failed":t.failed} for t in s.tool_calls], |
| "verifier_calls":[{"verifier_model_id":v.verifier_model_id,"passed":v.passed,"confidence":v.confidence,"cost":v.cost} for v in s.verifier_calls], |
| "context_size_tokens":s.context_size_tokens,"retry_count":s.retry_count, |
| "recovery_action":s.recovery_action,"step_outcome":s.step_outcome.value if s.step_outcome else None, |
| "step_cost":s.step_cost,"step_latency_ms":s.step_latency_ms} for s in self.steps], |
| "final_outcome":self.final_outcome.value if self.final_outcome else None, |
| "failure_tags":[f.value for f in self.failure_tags], |
| "total_cost":self.total_cost_computed,"total_latency_ms":self.total_latency_ms, |
| "total_retries":self.total_retries,"total_tool_calls":self.total_tool_calls, |
| "total_verifier_calls":self.total_verifier_calls,"total_context_tokens":self.total_context_tokens, |
| "cache_hit_rate":self.cache_hit_rate,"user_satisfaction":self.user_satisfaction, |
| "total_cost_saved_vs_frontier":self.total_cost_saved_vs_frontier,"optimal_cost":self.optimal_cost, |
| "metadata":self.metadata} |
|
|
| class SyntheticTraceGenerator: |
| MODEL_CONFIGS={"tiny_local":{"tier":1,"cost_input":0.0001,"cost_output":0.0002,"latency":200,"strength":0.3}, |
| "cheap_cloud":{"tier":2,"cost_input":0.0005,"cost_output":0.001,"latency":500,"strength":0.5}, |
| "medium":{"tier":3,"cost_input":0.003,"cost_output":0.006,"latency":800,"strength":0.75}, |
| "frontier":{"tier":4,"cost_input":0.01,"cost_output":0.03,"latency":1500,"strength":0.95}, |
| "specialist":{"tier":5,"cost_input":0.015,"cost_output":0.045,"latency":2000,"strength":0.98}} |
| TOOL_COSTS={"search":0.002,"retrieve":0.001,"fetch":0.003,"code_execution":0.005, |
| "linter":0.001,"test_runner":0.003,"file_read":0.0005,"file_write":0.0005, |
| "calculator":0.0001,"database_query":0.004,"compliance_check":0.01, |
| "summarize":0.002,"task_planner":0.001,"progress_tracker":0.0005} |
| TASK_TYPE_DISTRIBUTION={TaskType.QUICK_ANSWER:0.20,TaskType.CODING:0.20,TaskType.RESEARCH:0.15, |
| TaskType.DOCUMENT_DRAFTING:0.10,TaskType.LEGAL_REGULATED:0.05, |
| TaskType.TOOL_HEAVY:0.10,TaskType.RETRIEVAL_HEAVY:0.10, |
| TaskType.LONG_HORIZON:0.08,TaskType.UNKNOWN_AMBIGUOUS:0.02} |
| SCENARIOS=[ |
| {"name":"cheap_success","prob":0.15,"tier":[1,2],"outcome":Outcome.SUCCESS,"failure_tags":[]}, |
| {"name":"cheap_failure","prob":0.10,"tier":[1,2],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.MODEL_TOO_WEAK]}, |
| {"name":"frontier_unnecessary","prob":0.08,"tier":[4],"outcome":Outcome.SUCCESS,"failure_tags":[],"optimal_tier":[1,2]}, |
| {"name":"tool_overuse","prob":0.07,"tier":[3,4],"outcome":Outcome.PARTIAL_SUCCESS,"failure_tags":[FailureTag.TOOL_UNNECESSARY],"extra_tools":3}, |
| {"name":"tool_underuse","prob":0.05,"tier":[3,4],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.TOOL_MISSED],"missing_tools":2}, |
| {"name":"retrieval_overuse","prob":0.04,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[],"extra_retrievals":5}, |
| {"name":"verifier_overuse","prob":0.03,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[],"extra_verifiers":2}, |
| {"name":"retry_loop","prob":0.05,"tier":[3,4],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.RETRY_LOOP],"retries":5}, |
| {"name":"cache_break","prob":0.04,"tier":[3,4],"outcome":Outcome.PARTIAL_SUCCESS,"failure_tags":[FailureTag.CACHE_BREAK]}, |
| {"name":"false_done","prob":0.05,"tier":[3,4],"outcome":Outcome.FALSE_DONE,"failure_tags":[FailureTag.VERIFIER_FALSE_PASS]}, |
| {"name":"meta_tool_success","prob":0.06,"tier":[2,3],"outcome":Outcome.SUCCESS,"failure_tags":[],"uses_meta_tool":True}, |
| {"name":"meta_tool_bad","prob":0.02,"tier":[2,3],"outcome":Outcome.FAILURE,"failure_tags":[FailureTag.MODEL_TOO_WEAK],"uses_meta_tool":True}, |
| {"name":"normal_success","prob":0.20,"tier":[3,4],"outcome":Outcome.SUCCESS,"failure_tags":[]}, |
| {"name":"blocked","prob":0.03,"tier":[4],"outcome":Outcome.BLOCKED,"failure_tags":[FailureTag.MISSED_ESCALATION]}, |
| {"name":"human_escalation","prob":0.02,"tier":[4,5],"outcome":Outcome.ESCALATED_HUMAN,"failure_tags":[FailureTag.MISSED_ESCALATION]}, |
| {"name":"stopped_doom","prob":0.03,"tier":[3,4],"outcome":Outcome.STOPPED_DOOM,"failure_tags":[FailureTag.COST_EXCEEDED]}] |
| def __init__(self,seed=42): self.rng=random.Random(seed) |
| def generate(self,n=10000): return [self._generate_trace(i) for i in range(n)] |
| def _pick_scenario(self): return self.rng.choices(self.SCENARIOS,weights=[s["prob"] for s in self.SCENARIOS])[0] |
| def _tier_to_model(self,tier): return {1:"tiny_local",2:"cheap_cloud",3:"medium",4:"frontier",5:"specialist"}.get(tier,"medium") |
| def _generate_request(self,task_type,scenario): |
| templates={TaskType.QUICK_ANSWER:["What is the capital of France?","Briefly explain quantum computing.","Summarize article X.","What is 237 * 452?"], |
| TaskType.CODING:["Write a Python function to reverse a linked list.","Fix the bug in this React component.","Refactor auth module to JWT.","Implement LRU cache in Go."], |
| TaskType.RESEARCH:["Research latest transformer advances.","Find sources comparing LoRA and full FT.","Investigate data center climate impact.","What does literature say on speculative decoding?"], |
| TaskType.DOCUMENT_DRAFTING:["Draft project proposal for ML pipeline.","Write email to team about deployment.","Create technical report on performance."], |
| TaskType.LEGAL_REGULATED:["Review this contract for liability clauses.","Check GDPR compliance for data pipeline.","Draft privacy policy section."], |
| TaskType.TOOL_HEAVY:["Search open issues and create summary.","Fetch API docs and generate client code.","Query Q3 sales and produce chart."], |
| TaskType.RETRIEVAL_HEAVY:["Answer based on 50-page document.","Find all 'payment processing' mentions.","Retrieve relevant cases for legal query."], |
| TaskType.LONG_HORIZON:["Plan 3-month roadmap.","Orchestrate multi-region deployment.","Redesign data architecture end-to-end."], |
| TaskType.UNKNOWN_AMBIGUOUS:["Help me with this thing.","I need something about the server.","Can you look into that issue?"]} |
| return self.rng.choice(templates.get(task_type,["Generic request"])) |
| def _get_tools_for_task(self,task_type): |
| return {TaskType.QUICK_ANSWER:["calculator","search"], |
| TaskType.CODING:["file_read","file_write","code_execution","linter","test_runner"], |
| TaskType.RESEARCH:["search","retrieve","fetch","summarize"], |
| TaskType.DOCUMENT_DRAFTING:["file_read","summarize"], |
| TaskType.LEGAL_REGULATED:["document_retrieval","compliance_check","search"], |
| TaskType.TOOL_HEAVY:["search","fetch","api_call","database_query"], |
| TaskType.RETRIEVAL_HEAVY:["retrieve","search","fetch"], |
| TaskType.LONG_HORIZON:["task_planner","progress_tracker","file_read"], |
| TaskType.UNKNOWN_AMBIGUOUS:["search"]}.get(task_type,["search"]) |
| def _generate_trace(self,idx): |
| trace_id=f"synth_{idx}_{uuid.uuid4().hex[:8]}" |
| task_type=self.rng.choices(list(self.TASK_TYPE_DISTRIBUTION.keys()),weights=list(self.TASK_TYPE_DISTRIBUTION.values()))[0] |
| scenario=self._pick_scenario() |
| user_request=self._generate_request(task_type,scenario["name"]) |
| base_steps=self.rng.randint(1,8) |
| if scenario["name"] in ("retry_loop","false_done"): base_steps=self.rng.randint(5,12) |
| if scenario.get("uses_meta_tool"): base_steps=max(2,base_steps//2) |
| tier=self.rng.choice(scenario["tier"]) |
| model_key=self._tier_to_model(tier) |
| model_cfg=self.MODEL_CONFIGS[model_key] |
| steps=[] |
| for step_idx in range(base_steps): |
| step_id=f"{trace_id}_step_{step_idx}" |
| input_tokens=self.rng.randint(500,8000); output_tokens=self.rng.randint(100,4000) |
| cache_hit=self.rng.random()<0.3; cache_hit_tokens=int(input_tokens*self.rng.random()*0.5) if cache_hit else 0 |
| model_call=ModelCall(model_id=model_key,provider="synthetic",input_tokens=input_tokens,output_tokens=output_tokens, |
| reasoning_tokens=output_tokens//5 if model_key=="frontier" else 0, |
| cost_per_1k_input=model_cfg["cost_input"],cost_per_1k_output=model_cfg["cost_output"], |
| cache_hit_input_tokens=cache_hit_tokens,latency_ms=model_cfg["latency"]*self.rng.uniform(0.8,1.5)) |
| tool_calls=[]; base_tools=self._get_tools_for_task(task_type); num_tools=self.rng.randint(0,len(base_tools)) |
| if scenario.get("extra_tools"): num_tools+=scenario["extra_tools"] |
| if scenario.get("missing_tools"): num_tools=max(0,num_tools-scenario["missing_tools"]) |
| for t in range(min(num_tools,len(base_tools))): |
| tool_name=base_tools[t] |
| tool_calls.append(ToolCall(tool_name=tool_name,tool_input={"query":f"auto_{tool_name}"}, |
| tool_cost=self.TOOL_COSTS.get(tool_name,0.001),tool_latency_ms=self.rng.uniform(100,1000), |
| cache_hit=self.rng.random()<0.2,repeated=self.rng.random()<0.1, |
| ignored_result=self.rng.random()<0.05, |
| failed=self.rng.random()<(0.2 if scenario["name"] in ("retry_loop","tool_underuse") else 0.05))) |
| verifier_calls=[]; num_verifiers=0 |
| if task_type in (TaskType.LEGAL_REGULATED,TaskType.CODING,TaskType.RESEARCH): num_verifiers=1 if self.rng.random()<0.5 else 0 |
| if scenario.get("extra_verifiers"): num_verifiers+=scenario["extra_verifiers"] |
| for _ in range(num_verifiers): |
| verifier_calls.append(VerifierCall(verifier_model_id="verifier_medium",target_step_id=step_id, |
| passed=self.rng.random()<0.8,confidence=self.rng.uniform(0.6,0.99),cost=0.005,latency_ms=500)) |
| context_size=self.rng.randint(1000,15000) |
| if scenario["name"]=="cache_break": context_size+=self.rng.randint(5000,20000) |
| retries=0 |
| if scenario.get("retries"): retries=self.rng.randint(scenario["retries"]-1,scenario["retries"]+1) |
| elif self.rng.random()<0.15: retries=self.rng.randint(1,2) |
| recovery=None |
| if retries>0: recovery=self.rng.choice(["retry_same","retry_changed_prompt","repair_tool","retrieve_more_context","switch_model","ask_clarification"]) |
| step_outcome=Outcome.SUCCESS |
| if step_idx==base_steps-1: step_outcome=scenario["outcome"] |
| elif scenario["name"]=="retry_loop" and step_idx>=2: step_outcome=Outcome.FAILURE |
| elif scenario["name"]=="false_done" and step_idx==base_steps-1: step_outcome=Outcome.FALSE_DONE |
| steps.append(TraceStep(step_id=step_id,timestamp=datetime.utcnow()+timedelta(seconds=step_idx*30),task_type=task_type, |
| model_call=model_call,tool_calls=tool_calls,verifier_calls=verifier_calls, |
| context_size_tokens=context_size,context_sources=["system_rules","tool_descriptions","user_preferences","recent_messages"], |
| retry_count=retries,recovery_action=recovery, |
| artifacts_created=[f"artifact_{step_idx}"] if self.rng.random()<0.3 else [], |
| step_outcome=step_outcome)) |
| total_cost=sum(s.step_cost for s in steps) |
| frontier_cost=self.MODEL_CONFIGS["frontier"]["cost_input"]*2000*base_steps |
| optimal_tier=scenario.get("optimal_tier") |
| optimal_cost=total_cost*0.6 if not optimal_tier else self.MODEL_CONFIGS[self._tier_to_model(self.rng.choice(optimal_tier))]["cost_input"]*2000 |
| return AgentTrace(trace_id=trace_id,user_request=user_request,task_type=task_type,steps=steps, |
| final_outcome=scenario["outcome"],failure_tags=list(scenario["failure_tags"]), |
| total_cost=total_cost,total_cost_saved_vs_frontier=frontier_cost-total_cost, |
| optimal_cost=optimal_cost, |
| metadata={"scenario":scenario["name"],"synthetic":True,"optimal_tier":optimal_tier[0] if optimal_tier else tier}) |
|
|
| @dataclass |
| class BenchmarkResult: |
| benchmark_name:str; baseline_name:str; num_tasks:int; num_success:int |
| num_partial:int; num_failure:int; num_false_done:int; num_blocked:int |
| total_cost:float; avg_cost_success:float; avg_latency_ms:float |
| total_tool_calls:int; total_verifier_calls:int; total_retries:int |
| avg_cache_hit_rate:float; total_context_tokens:int |
| cost_reduction_vs_frontier:float; false_done_rate:float |
| unsafe_cheap_miss_rate:float; missed_escalation_rate:float; regression_rate:float |
|
|
| class BenchmarkSuite: |
| def __init__(self): pass |
| def generate_benchmark_data(self,n=1000,seed=42): return SyntheticTraceGenerator(seed=seed).generate(n) |
| def run_all_baselines(self,traces): |
| baselines=["always_frontier","always_cheap","cascade","full"] |
| results={} |
| for baseline in baselines: |
| print(f"Running baseline: {baseline}...") |
| results[baseline]=self._run_baseline(traces,baseline) |
| return results |
| def run_ablations(self,traces): |
| ablations=["no_router","no_tool_gate","no_early_termination"] |
| results={} |
| for ablation in ablations: |
| print(f"Running ablation: {ablation}...") |
| results[ablation]=self._run_baseline(traces,ablation) |
| return results |
| def _run_baseline(self,traces,baseline_name): |
| success_count=0; partial_count=0; failure_count=0; false_done_count=0; blocked_count=0 |
| total_cost=0.0; total_latency=0.0; total_tools=0; total_verifiers=0; total_retries=0 |
| total_context=0; cache_rates=[]; cheap_misses=0; escalation_misses=0; regression_count=0 |
| frontier_costs=[]; actual_costs=[] |
| for trace in traces: |
| sim_cost,sim_latency,sim_success=self._simulate(trace,baseline_name) |
| total_cost+=sim_cost; total_latency+=sim_latency |
| total_tools+=trace.total_tool_calls; total_verifiers+=trace.total_verifier_calls |
| total_retries+=trace.total_retries; total_context+=trace.total_context_tokens |
| cache_rates.append(trace.cache_hit_rate) |
| frontier_cost=SyntheticTraceGenerator.MODEL_CONFIGS["frontier"]["cost_input"]*2000*len(trace.steps) |
| frontier_costs.append(frontier_cost); actual_costs.append(sim_cost) |
| if sim_success: |
| if trace.final_outcome==Outcome.SUCCESS: success_count+=1 |
| elif trace.final_outcome==Outcome.PARTIAL_SUCCESS: partial_count+=1 |
| else: regression_count+=1 |
| else: |
| if trace.final_outcome==Outcome.FALSE_DONE: false_done_count+=1 |
| elif trace.final_outcome==Outcome.BLOCKED: blocked_count+=1 |
| else: failure_count+=1 |
| scenario=trace.metadata.get("scenario","normal") |
| tier=trace.metadata.get("optimal_tier",3) |
| if scenario=="cheap_failure" and tier<=2: cheap_misses+=1 |
| if scenario in ("cheap_failure","tool_underuse") and tier<3: escalation_misses+=1 |
| n=len(traces); avg_cost_success=total_cost/max(success_count+partial_count,1) |
| cost_reduction=(sum(frontier_costs)-sum(actual_costs))/max(sum(frontier_costs),1) |
| return BenchmarkResult(benchmark_name="synthetic",baseline_name=baseline_name,num_tasks=n, |
| num_success=success_count,num_partial=partial_count,num_failure=failure_count, |
| num_false_done=false_done_count,num_blocked=blocked_count, |
| total_cost=total_cost,avg_cost_success=avg_cost_success, |
| avg_latency_ms=total_latency/n,total_tool_calls=total_tools, |
| total_verifier_calls=total_verifiers,total_retries=total_retries, |
| avg_cache_hit_rate=sum(cache_rates)/n,total_context_tokens=total_context, |
| cost_reduction_vs_frontier=cost_reduction,false_done_rate=false_done_count/n, |
| unsafe_cheap_miss_rate=cheap_misses/n,missed_escalation_rate=escalation_misses/n, |
| regression_rate=regression_count/n) |
| def _simulate(self,trace,baseline): |
| base_cost=trace.total_cost_computed |
| if baseline=="always_frontier": cost_mult,tier=1.0,4 |
| elif baseline=="always_cheap": cost_mult,tier=0.25,2 |
| elif baseline=="no_router": cost_mult,tier=0.9,3 |
| elif baseline=="no_tool_gate": cost_mult,tier=0.85,3 |
| elif baseline=="no_early_termination": cost_mult,tier=0.95,3 |
| else: cost_mult,tier=0.55,3 |
| sim_cost=base_cost*cost_mult; sim_latency=trace.total_latency_ms*cost_mult*0.8 |
| scenario=trace.metadata.get("scenario","normal") |
| success_prob=0.95 if tier>=3 else 0.7 |
| if scenario=="cheap_failure": success_prob=0.3 if tier<=2 else 0.85 |
| elif scenario=="tool_underuse": success_prob=0.8 if baseline!="no_tool_gate" else 0.6 |
| elif scenario=="retry_loop": success_prob=0.2 if baseline=="no_early_termination" else 0.25 |
| elif scenario=="frontier_unnecessary": success_prob=0.95 |
| elif scenario=="meta_tool_success": success_prob=0.9 if baseline=="full" else 0.85 |
| elif scenario=="meta_tool_bad": success_prob=0.4 |
| elif scenario=="false_done": success_prob=0.1 |
| elif scenario in ("blocked","stopped_doom"): success_prob=0.0 |
| elif scenario=="human_escalation": success_prob=0.5 |
| return sim_cost,sim_latency,success_prob>0.5 |
| def report(self,results): |
| lines=["="*80,"AGENT COST OPTIMIZER BENCHMARK REPORT","="*80,""] |
| headers=["Baseline","Success","Partial","Fail","Blocked","False-DONE","Total Cost","Avg Cost/Succ","Latency(ms)","Tools","Verifiers","Retries","Cache Hit","Cost Reduction","Regression"] |
| lines.append(" | ".join(headers)); lines.append("-"*120) |
| for name,result in results.items(): |
| row=[name[:20].ljust(20),f"{result.num_success/result.num_tasks:.1%}", |
| f"{result.num_partial/result.num_tasks:.1%}",f"{result.num_failure/result.num_tasks:.1%}", |
| f"{result.num_blocked/result.num_tasks:.1%}",f"{result.false_done_rate:.1%}", |
| f"${result.total_cost:.2f}",f"${result.avg_cost_success:.4f}",f"{result.avg_latency_ms:.0f}", |
| str(result.total_tool_calls),str(result.total_verifier_calls),str(result.total_retries), |
| f"{result.avg_cache_hit_rate:.1%}",f"{result.cost_reduction_vs_frontier:.1%}", |
| f"{result.regression_rate:.1%}"] |
| lines.append(" | ".join(row)) |
| lines.append(""); lines.append("="*80) |
| best_score,best_name=-float("inf"),"" |
| for name,result in results.items(): |
| success_rate=(result.num_success+result.num_partial)/result.num_tasks |
| score=success_rate*10-result.avg_cost_success*100-result.regression_rate*50 |
| if score>best_score: best_score,best_name=score,name |
| lines.append(f"BEST OVERALL: {best_name} (score={best_score:.2f})"); lines.append("") |
| return "\n".join(lines) |
| def export(self,results,path): |
| export_data={} |
| for name,result in results.items(): |
| export_data[name]={"benchmark_name":result.benchmark_name,"baseline_name":result.baseline_name, |
| "num_tasks":result.num_tasks,"num_success":result.num_success, |
| "num_partial":result.num_partial,"num_failure":result.num_failure, |
| "num_false_done":result.num_false_done,"num_blocked":result.num_blocked, |
| "total_cost":result.total_cost,"avg_cost_success":result.avg_cost_success, |
| "avg_latency_ms":result.avg_latency_ms,"total_tool_calls":result.total_tool_calls, |
| "total_verifier_calls":result.total_verifier_calls,"total_retries":result.total_retries, |
| "avg_cache_hit_rate":result.avg_cache_hit_rate,"total_context_tokens":result.total_context_tokens, |
| "cost_reduction_vs_frontier":result.cost_reduction_vs_frontier, |
| "false_done_rate":result.false_done_rate,"unsafe_cheap_miss_rate":result.unsafe_cheap_miss_rate, |
| "missed_escalation_rate":result.missed_escalation_rate,"regression_rate":result.regression_rate} |
| with open(path,"w") as f: json.dump(export_data,f,indent=2) |
|
|
| if __name__=="__main__": |
| parser=argparse.ArgumentParser(description="ACO Evaluation Runner") |
| parser.add_argument("--tasks","-n",type=int,default=1000,help="Number of tasks") |
| parser.add_argument("--seed","-s",type=int,default=42,help="Random seed") |
| parser.add_argument("--output","-o",default="./eval_results",help="Output directory") |
| args=parser.parse_args() |
| os.makedirs(args.output,exist_ok=True) |
| suite=BenchmarkSuite() |
| print(f"[{datetime.now().isoformat()}] Generating {args.tasks} synthetic traces...") |
| traces=suite.generate_benchmark_data(args.tasks,seed=args.seed) |
| traces_path=os.path.join(args.output,"traces.jsonl") |
| with open(traces_path,"w") as f: |
| for trace in traces: f.write(json.dumps(trace.to_dict())+"\n") |
| print(f" Saved {len(traces)} traces to {traces_path}") |
| print(f"\n[{datetime.now().isoformat()}] Running baselines...") |
| baseline_results=suite.run_all_baselines(traces) |
| baseline_path=os.path.join(args.output,"baseline_results.json") |
| suite.export(baseline_results,baseline_path) |
| print(f" Saved to {baseline_path}") |
| print(f"\n[{datetime.now().isoformat()}] Running ablations...") |
| ablation_results=suite.run_ablations(traces) |
| ablation_path=os.path.join(args.output,"ablation_results.json") |
| suite.export(ablation_results,ablation_path) |
| print(f" Saved to {ablation_path}") |
| all_results={**baseline_results,**ablation_results} |
| report=suite.report(all_results) |
| report_path=os.path.join(args.output,"report.txt") |
| with open(report_path,"w") as f: f.write(report) |
| print(f" Saved report to {report_path}") |
| points=[] |
| for name,result in all_results.items(): |
| sr=(result.num_success+result.num_partial)/result.num_tasks |
| points.append({"baseline":name,"success_rate":sr,"avg_cost_per_success":result.avg_cost_success}) |
| frontier=[] |
| for p in points: |
| dominated=False |
| for q in points: |
| if q["baseline"]==p["baseline"]: continue |
| if q["success_rate"]>=p["success_rate"] and q["avg_cost_per_success"]<=p["avg_cost_per_success"]: |
| if q["success_rate"]>p["success_rate"] or q["avg_cost_per_success"]<p["avg_cost_per_success"]: |
| dominated=True; break |
| if not dominated: frontier.append(p) |
| frontier.sort(key=lambda x:x["success_rate"],reverse=True) |
| frontier_data={"all_points":points,"pareto_frontier":frontier,"frontier_baselines":[p["baseline"] for p in frontier]} |
| frontier_path=os.path.join(args.output,"cost_quality_frontier.json") |
| with open(frontier_path,"w") as f: json.dump(frontier_data,indent=2,fp=f) |
| print(f" Saved frontier to {frontier_path}") |
| print("\n"+"="*80) |
| print(report) |
| print("="*80) |
|
|