agent-cost-optimizer / training /router_integration_benchmark.py
narcolepticchicken's picture
Upload training/router_integration_benchmark.py with huggingface_hub
ee396fa verified
#!/usr/bin/env python3
"""
Standalone integration: Replace heuristic router with trained XGBoost router.
This script:
1. Generates 2K eval traces (same as standalone_eval_v2.py)
2. Runs the full benchmark with the TRAINED router replacing _route_learned()
3. Compares: heuristic, trained, and oracle routers
"""
import json, os, sys, random, uuid, pickle
import numpy as np
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
from typing import Dict, List, Optional, Any
# ─── Load trained models ─────────────────────────────────────────────
print("="*80)
print("ACO TRAINED ROUTER - INTEGRATION BENCHMARK")
print("="*80)
import xgboost as xgb
MODEL_DIR = "/app/router_models"
feat_keys = json.load(open(f"{MODEL_DIR}/feat_keys.json"))
tier_config = json.load(open(f"{MODEL_DIR}/tier_config.json"))
TIER_COST = {int(k):v for k,v in tier_config["tier_cost"].items()}
TIER_STR = {int(k):v for k,v in tier_config["tier_str"].items()}
TASK_FLOOR = tier_config["task_floor"]
print(f"\n[1] Loading trained router models...")
tier_clfs = {}
for tier in range(1, 6):
clf = xgb.XGBClassifier()
clf.load_model(f"{MODEL_DIR}/tier_{tier}_success.json")
tier_clfs[tier] = clf
print(f" Loaded tier_{tier}_success.json")
# ─── Feature extraction (must match training) ────────────────────────
TASK_TYPES = ["quick_answer","coding","research","document_drafting",
"legal_regulated","tool_heavy","retrieval_heavy",
"long_horizon","unknown_ambiguous"]
TT2IDX = {t:i for i,t in enumerate(TASK_TYPES)}
CODE_KW = ["python","javascript","code","function","bug","debug","refactor",
"implement","test","compile","runtime","class","module","async","thread"]
LEGAL_KW = ["contract","legal","compliance","gdpr","privacy","policy","regulatory","liability"]
RESEARCH_KW = ["research","find sources","literature","investigate","compare","analyze","survey"]
TOOL_KW = ["search","fetch","retrieve","query","api","database","scrape","aggregate"]
LONG_KW = ["plan","project","roadmap","orchestrate","multi-step","migrate","pipeline","deploy"]
MATH_KW = ["calculate","compute","solve","equation","formula","optimize","probability"]
def extract_features(request, task_type, difficulty=3):
r = request.lower()
f = {"req_len": len(request), "num_words": len(request.split()),
"has_code": int(any(k in r for k in CODE_KW)),
"n_code": sum(1 for k in CODE_KW if k in r),
"has_legal": int(any(k in r for k in LEGAL_KW)),
"n_legal": sum(1 for k in LEGAL_KW if k in r),
"has_research": int(any(k in r for k in RESEARCH_KW)),
"n_research": sum(1 for k in RESEARCH_KW if k in r),
"has_tool": int(any(k in r for k in TOOL_KW)),
"n_tool": sum(1 for k in TOOL_KW if k in r),
"has_long": int(any(k in r for k in LONG_KW)),
"has_math": int(any(k in r for k in MATH_KW)),
"tt_idx": TT2IDX.get(task_type, 8), "difficulty": difficulty}
for tt in TASK_TYPES:
f[f"tt_{tt}"] = int(task_type == tt)
return f
def f2v(feats):
return np.array([float(feats.get(k, 0.0)) for k in feat_keys], dtype=np.float32)
# ─── Routing Functions ────────────────────────────────────────────────
def route_trained(request, task_type, difficulty):
"""Trained router: per-tier P(success) + safety floor + asymmetric cost."""
feats = extract_features(request, task_type, difficulty)
x = f2v(feats).reshape(1, -1)
floor = TASK_FLOOR.get(task_type, 2)
best_tier = floor; best_score = float("inf")
for tier in range(floor, 6):
p_success = tier_clfs[tier].predict_proba(x)[0, 1]
p_fail = 1.0 - p_success
cost_norm = TIER_COST[tier] / TIER_COST[5]
score = p_fail * 5.0 + cost_norm * 1.0 # asymmetric: 5x underkill penalty
if score < best_score:
best_score = score; best_tier = tier
return best_tier
def route_heuristic(task_type, difficulty):
"""Original heuristic router: difficulty + 1."""
return min(difficulty + 1, 5)
def route_frontier():
return 4
def route_cascade_trained(request, task_type, difficulty):
"""Cascade: start at floor, escalate if P(success) < threshold."""
feats = extract_features(request, task_type, difficulty)
x = f2v(feats).reshape(1, -1)
floor = TASK_FLOOR.get(task_type, 2)
for tier in range(floor, 6):
p_success = tier_clfs[tier].predict_proba(x)[0, 1]
if p_success >= 0.65:
return tier
return 4 # fallback to frontier
# ─── Generate Evaluation Traces ────────────────────────────────────────
print("\n[2] Generating 2K evaluation traces (different seed from training)...")
class TaskType(Enum):
QUICK_ANSWER="quick_answer"; CODING="coding"; RESEARCH="research"
DOCUMENT_DRAFTING="document_drafting"; LEGAL_REGULATED="legal_regulated"
TOOL_HEAVY="tool_heavy"; RETRIEVAL_HEAVY="retrieval_heavy"
LONG_HORIZON="long_horizon"; UNKNOWN_AMBIGUOUS="unknown_ambiguous"
TASK_TEMPLATES_EVAL = {
"quick_answer":["What is the capital of France?","Explain quantum computing briefly.","What is 237*452?"],
"coding":["Write a Python function to reverse a linked list.","Fix the bug in this React component.",
"Implement LRU cache in Go.","Debug segfault in C++ thread pool."],
"research":["Research latest transformer advances.","Find sources comparing LoRA and full FT.",
"Investigate data center climate impact."],
"document_drafting":["Draft project proposal for ML pipeline.","Write email to team about deployment."],
"legal_regulated":["Review this contract for liability clauses.","Check GDPR compliance for data pipeline.",
"Draft privacy policy section."],
"tool_heavy":["Search open issues and create summary.","Fetch API docs and generate client code."],
"retrieval_heavy":["Answer based on 50-page document.","Find all payment processing mentions."],
"long_horizon":["Plan 3-month roadmap.","Orchestrate multi-region deployment."],
"unknown_ambiguous":["Help me with this thing.","I need something about the server."],
}
def tsp(tier, diff):
s = {1:0.35,2:0.55,3:0.80,4:0.93,5:0.97}[tier]
return s ** (diff * 0.6)
eval_rng = random.Random(999) # DIFFERENT seed for eval
eval_traces = []
for i in range(2000):
tt = eval_rng.choice(list(TASK_TEMPLATES_EVAL.keys()))
diff = {"quick_answer":1,"document_drafting":2,"tool_heavy":2,"retrieval_heavy":2,
"research":3,"coding":3,"unknown_ambiguous":3,"long_horizon":4,"legal_regulated":5}[tt]
tier_out = {t: eval_rng.random() < tsp(t, diff) for t in range(1,6)}
opt = 5
for t in range(1,6):
if tier_out[t]: opt = t; break
req = eval_rng.choice(TASK_TEMPLATES_EVAL[tt])
eval_traces.append({"tt":tt,"diff":diff,"opt":opt,"tier_out":tier_out,"req":req})
print(f" Generated {len(eval_traces)} eval traces")
# ─── Evaluate All Routers ─────────────────────────────────────────────
print("\n[3] Evaluating all routers on 2K traces...")
def eval_router(name, route_fn):
succ = 0; cost = 0.0; unsafe = 0; fd = 0
td = defaultdict(int)
for t in eval_traces:
pred = route_fn(t)
td[pred] += 1
if t["tier_out"].get(pred, False):
succ += 1
elif pred < t["opt"]:
unsafe += 1
else:
fd += 1
cost += TIER_COST[pred]
n = len(eval_traces)
return {"success":succ/n, "avg_cost":cost/n, "unsafe_rate":unsafe/n,
"false_done":fd/n, "tier_dist":dict(td)}
routers = {
"always_frontier": lambda t: 4,
"always_cheap": lambda t: 1,
"heuristic_diff+1": lambda t: min(t["diff"]+1, 5),
"heuristic_floor": lambda t: TASK_FLOOR.get(t["tt"], 2),
"trained_asymmetric": lambda t: route_trained(t["req"], t["tt"], t["diff"]),
"trained_cascade_t0.65": lambda t: route_cascade_trained(t["req"], t["tt"], t["diff"]),
"oracle": lambda t: t["opt"],
}
results = {}
for name, fn in routers.items():
results[name] = eval_router(name, fn)
r = results[name]
fc = results["always_frontier"]["avg_cost"]
cr = (1 - r["avg_cost"]/fc)*100
print(f" {name:<25} success={r['success']:.3f} cost={r['avg_cost']:.4f} costRed={cr:.1f}% unsafe={r['unsafe_rate']:.3f}")
# ─── Per-Task-Type Breakdown ──────────────────────────────────────────
print("\n\n[4] Per-task-type breakdown (trained vs heuristic vs frontier)...")
for tt in sorted(set(t["tt"] for t in eval_traces)):
tt_traces = [t for t in eval_traces if t["tt"] == tt]
n_tt = len(tt_traces)
if n_tt == 0: continue
for rname, rfn in [("frontier", lambda t:4),
("heuristic", lambda t:min(t["diff"]+1,5)),
("trained", lambda t:route_trained(t["req"],t["tt"],t["diff"]))]:
succ = sum(1 for t in tt_traces if t["tier_out"].get(rfn(t), False))
cost = sum(TIER_COST[rfn(t)] for t in tt_traces)
sr = succ/n_tt; ac = cost/n_tt
if rname == "frontier":
print(f"\n {tt} (n={n_tt}):")
print(f" {rname:<12} success={sr:.3f} cost={ac:.4f}")
# ─── Final Comparison ─────────────────────────────────────────────────
print(f"\n\n{'='*80}")
print("FINAL INTEGRATION BENCHMARK")
print(f"{'='*80}")
print(f"\n{'Router':<25} {'Success':>10} {'AvgCost':>10} {'CostRed':>10} {'Unsafe':>10} {'F-DONE':>10}")
print("-"*75)
fc = results["always_frontier"]["avg_cost"]
for name, r in sorted(results.items(), key=lambda x: (-x[1]["success"], x[1]["avg_cost"])):
cr = (1 - r["avg_cost"]/fc)*100
print(f"{name:<25} {r['success']:>10.3f} {r['avg_cost']:>10.4f} {cr:>9.1f}% {r['unsafe_rate']:>10.3f} {r['false_done']:>10.3f}")
# Quality/cost frontier
print("\nPARETO FRONTIER:")
pareto = []
for name, r in results.items():
if name == "always_cheap": continue
dominated = False
for name2, r2 in results.items():
if name == name2: continue
if r2["success"] >= r["success"] and r2["avg_cost"] <= r["avg_cost"]:
if r2["success"] > r["success"] or r2["avg_cost"] < r["avg_cost"]:
dominated = True; break
if not dominated:
pareto.append((name, r))
cr = (1 - r["avg_cost"]/fc)*100
print(f" {name:<25} success={r['success']:.3f} cost={r['avg_cost']:.4f} costRed={cr:.1f}%")
print(f"\n\nDONE!")