| |
| """Trained Router v3: Combined best approach. |
| Uses per-tier P(success) classifiers + safety floors + |
| cost-aware routing with ASYMMETRIC penalties (underkill penalized 5x harder than overkill). |
| """ |
| import json, os, sys, random, pickle, uuid |
| import numpy as np |
| from datetime import datetime |
| from collections import defaultdict |
| from typing import Dict, List, Tuple, Any, Optional |
|
|
| TASK_TYPES = ["quick_answer","coding","research","document_drafting", |
| "legal_regulated","tool_heavy","retrieval_heavy", |
| "long_horizon","unknown_ambiguous"] |
| TT2IDX = {t:i for i,t in enumerate(TASK_TYPES)} |
|
|
| CODE_KW = ["python","javascript","code","function","bug","debug","refactor", |
| "implement","test","compile","runtime","class","module","async","thread"] |
| LEGAL_KW = ["contract","legal","compliance","gdpr","privacy","policy","regulatory","liability"] |
| RESEARCH_KW = ["research","find sources","literature","investigate","compare","analyze","survey"] |
| TOOL_KW = ["search","fetch","retrieve","query","api","database","scrape","aggregate"] |
| LONG_KW = ["plan","project","roadmap","orchestrate","multi-step","migrate","pipeline","deploy"] |
| MATH_KW = ["calculate","compute","solve","equation","formula","optimize","probability"] |
|
|
| TIER_STR = {1:0.35,2:0.55,3:0.80,4:0.93,5:0.97} |
| TIER_COST = {1:0.05,2:0.15,3:0.75,4:1.0,5:1.5} |
| TASK_FLOOR = {"legal_regulated":4,"long_horizon":3,"research":3,"coding":3, |
| "unknown_ambiguous":3,"quick_answer":1,"document_drafting":2, |
| "tool_heavy":2,"retrieval_heavy":2} |
|
|
| TASK_TEMPLATES = { |
| "quick_answer":["What is the capital of France?","Explain quantum computing briefly.", |
| "What is 237*452?","Define photosynthesis.","Who wrote Hamlet?", |
| "What is the speed of light?","List the primary colors.","What is GDP?"], |
| "coding":["Write a Python function to reverse a linked list.", |
| "Fix the bug in this React component.","Refactor auth module to JWT.", |
| "Implement LRU cache in Go.","Debug segfault in C++ thread pool.", |
| "Add unit tests for the payment module.","Optimize this SQL query.", |
| "Create a REST API for user management.","Implement binary search in Rust."], |
| "research":["Research latest transformer advances.", |
| "Find sources comparing LoRA and full FT.", |
| "Investigate data center climate impact.", |
| "Survey privacy-preserving ML techniques.", |
| "Compare reinforcement learning algorithms for robotics."], |
| "document_drafting":["Draft project proposal for ML pipeline.", |
| "Write email to team about deployment.","Create technical report on performance."], |
| "legal_regulated":["Review this contract for liability clauses.", |
| "Check GDPR compliance for data pipeline.","Draft privacy policy section.", |
| "Verify regulatory compliance for medical device software."], |
| "tool_heavy":["Search open issues and create summary.", |
| "Fetch API docs and generate client code.","Query Q3 sales and produce chart."], |
| "retrieval_heavy":["Answer based on 50-page document.", |
| "Find all payment processing mentions.","Retrieve relevant cases for legal query."], |
| "long_horizon":["Plan 3-month roadmap.","Orchestrate multi-region deployment.", |
| "Redesign data architecture end-to-end.","Migrate monolith to microservices."], |
| "unknown_ambiguous":["Help me with this thing.", |
| "I need something about the server.","Can you look into that issue?"], |
| } |
|
|
| def tsp(tier, diff): |
| return TIER_STR[tier] ** (diff * 0.6) |
|
|
| def extract_features(request, task_type, difficulty=3): |
| r = request.lower() |
| f = { |
| "req_len": len(request), "num_words": len(request.split()), |
| "has_code": int(any(k in r for k in CODE_KW)), |
| "n_code": sum(1 for k in CODE_KW if k in r), |
| "has_legal": int(any(k in r for k in LEGAL_KW)), |
| "n_legal": sum(1 for k in LEGAL_KW if k in r), |
| "has_research": int(any(k in r for k in RESEARCH_KW)), |
| "n_research": sum(1 for k in RESEARCH_KW if k in r), |
| "has_tool": int(any(k in r for k in TOOL_KW)), |
| "n_tool": sum(1 for k in TOOL_KW if k in r), |
| "has_long": int(any(k in r for k in LONG_KW)), |
| "has_math": int(any(k in r for k in MATH_KW)), |
| "tt_idx": TT2IDX.get(task_type, 8), "difficulty": difficulty, |
| } |
| for tt in TASK_TYPES: |
| f[f"tt_{tt}"] = int(task_type == tt) |
| return f |
|
|
| def gen_trace(idx, rng): |
| tt = rng.choice(list(TASK_TEMPLATES.keys())) |
| diff = {"quick_answer":1,"document_drafting":2,"tool_heavy":2,"retrieval_heavy":2, |
| "research":3,"coding":3,"unknown_ambiguous":3,"long_horizon":4,"legal_regulated":5}[tt] |
| tier_out = {} |
| for t in range(1,6): |
| tier_out[t] = rng.random() < tsp(t, diff) |
| opt = 5 |
| for t in range(1,6): |
| if tier_out[t]: opt = t; break |
| if diff <= 2: actual = rng.choices([1,2,3,4,5],weights=[3,4,2,1,0.5])[0] |
| elif diff == 3: actual = rng.choices([1,2,3,4,5],weights=[1,2,4,2,1])[0] |
| elif diff == 4: actual = rng.choices([1,2,3,4,5],weights=[0.5,1,2,4,2])[0] |
| else: actual = rng.choices([1,2,3,4,5],weights=[0.2,0.5,1,3,4])[0] |
| outcome = "success" if tier_out[actual] else "failure" |
| req = rng.choice(TASK_TEMPLATES[tt]) |
| feats = extract_features(req, tt, diff) |
| return {"feats":feats,"opt":opt,"actual":actual,"outcome":outcome, |
| "tier_out":tier_out,"tt":tt,"diff":diff,"req":req} |
|
|
| print("="*80) |
| print("AGENT COST OPTIMIZER - TRAINED ROUTER v3") |
| print("Asymmetric cost scoring: underkill 5x penalty") |
| print("="*80) |
|
|
| |
| print("\n[1] Generating 50K training traces...") |
| rng = random.Random(42) |
| traces = [gen_trace(i, rng) for i in range(50000)] |
| FEAT_KEYS = sorted(traces[0]["feats"].keys()) |
| def f2v(feats): |
| return np.array([float(feats.get(k, 0.0)) for k in FEAT_KEYS], dtype=np.float32) |
|
|
| X_all = np.array([f2v(t["feats"]) for t in traces]) |
| y_opt = np.array([t["opt"] for t in traces]) |
|
|
| per_tier_labels = {} |
| for tier in range(1,6): |
| per_tier_labels[tier] = np.array([1 if t["tier_out"].get(tier,False) else 0 for t in traces]) |
|
|
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import accuracy_score, f1_score |
| import xgboost as xgb |
|
|
| X_train, X_test, idx_train, idx_test = train_test_split(X_all, range(len(traces)), test_size=0.2, random_state=42, stratify=y_opt) |
| print(f" Train: {len(X_train)}, Test: {len(X_test)}") |
|
|
| |
| print("\n[2] Training per-tier P(success) classifiers...") |
| tier_clfs = {} |
| for tier in range(1,6): |
| y_tr = per_tier_labels[tier][idx_train] |
| neg = (y_tr==0).sum(); pos = (y_tr==1).sum() |
| spw = neg / max(pos,1) |
| clf = xgb.XGBClassifier(n_estimators=150, max_depth=5, learning_rate=0.1, |
| subsample=0.8, colsample_bytree=0.8, scale_pos_weight=min(spw,5.0), |
| objective="binary:logistic", eval_metric="logloss", random_state=42, verbosity=0) |
| clf.fit(X_train, y_tr) |
| y_pred = clf.predict(X_test) |
| acc = accuracy_score(per_tier_labels[tier][idx_test], y_pred) |
| f1 = f1_score(per_tier_labels[tier][idx_test], y_pred, zero_division=0) |
| tier_clfs[tier] = clf |
| print(f" Tier {tier}: acc={acc:.3f}, f1={f1:.3f}") |
|
|
| |
| print("\n[3] Building asymmetric cost router...") |
|
|
| def route_asymmetric(x, task_type, tier_clfs, underkill_penalty=5.0, overkill_penalty=1.0): |
| """Score each tier with asymmetric penalties. |
| |
| score(tier) = P(failure@tier) * underkill_penalty * cost_of_failure |
| + cost(tier) * overkill_penalty |
| |
| Underkill (routing too low) is penalized 5x more than overkill. |
| """ |
| if x.ndim == 1: |
| x = x.reshape(1, -1) |
| floor = TASK_FLOOR.get(task_type, 2) |
|
|
| best_tier = floor |
| best_score = float("inf") |
|
|
| for tier in range(floor, 6): |
| p_fail = 1.0 - tier_clfs[tier].predict_proba(x)[0, 1] |
| cost_norm = TIER_COST[tier] / TIER_COST[5] |
| |
| |
| failure_cost = p_fail * underkill_penalty |
| |
| |
| tier_cost = cost_norm * overkill_penalty |
| |
| score = failure_cost + tier_cost |
| |
| if score < best_score: |
| best_score = score |
| best_tier = tier |
|
|
| return best_tier |
|
|
| |
| print("\n[4] Evaluating all routers...") |
| n_test = len(idx_test) |
| results = {} |
|
|
| def eval_router(name, route_fn): |
| succ = 0; cost = 0.0; unsafe = 0; fd = 0 |
| td = defaultdict(int) |
| for i in idx_test: |
| t = traces[i] |
| x = f2v(t["feats"]).reshape(1,-1) |
| pred = route_fn(x, t) |
| td[pred] += 1 |
| if t["tier_out"].get(pred, False): succ += 1 |
| elif pred < t["opt"]: unsafe += 1 |
| elif pred >= t["opt"]: fd += 1 |
| cost += TIER_COST[pred] |
| results[name] = {"success":succ/n_test, "avg_cost":cost/n_test, |
| "unsafe_rate":unsafe/n_test, "false_done":fd/n_test, |
| "tier_dist":dict(td)} |
|
|
| eval_router("always_frontier", lambda x,t: 4) |
| eval_router("always_cheap", lambda x,t: 1) |
| eval_router("heuristic_diff+1", lambda x,t: min(t["diff"]+1,5)) |
| eval_router("heuristic_floor", lambda x,t: TASK_FLOOR.get(t["tt"],2)) |
|
|
| for ukp in [3.0, 5.0, 8.0, 10.0, 15.0]: |
| eval_router(f"asymmetric_uk{ukp:.0f}", lambda x,t,uk=ukp: route_asymmetric(x, t["tt"], tier_clfs, underkill_penalty=uk)) |
|
|
| eval_router("oracle", lambda x,t: t["opt"]) |
|
|
| |
| print(f"\n{'Router':<25} {'Success':>10} {'AvgCost':>10} {'CostRed':>10} {'Unsafe':>10} {'F-DONE':>10}") |
| print("-"*75) |
| fc = results["always_frontier"]["avg_cost"] |
| for name, r in sorted(results.items(), key=lambda x: (-x[1]["success"], x[1]["avg_cost"])): |
| cr = (1 - r["avg_cost"]/fc)*100 |
| print(f"{name:<25} {r['success']:>10.3f} {r['avg_cost']:>10.4f} {cr:>9.1f}% {r['unsafe_rate']:>10.3f} {r['false_done']:>10.3f}") |
|
|
| |
| print("\nPARETO FRONTIER:") |
| pareto = [] |
| for name, r in results.items(): |
| if name == "always_cheap": continue |
| dominated = False |
| for name2, r2 in results.items(): |
| if name == name2: continue |
| if r2["success"] >= r["success"] and r2["avg_cost"] <= r["avg_cost"]: |
| if r2["success"] > r["success"] or r2["avg_cost"] < r["avg_cost"]: |
| dominated = True; break |
| if not dominated: |
| pareto.append((name, r)) |
| cr = (1 - r["avg_cost"]/fc)*100 |
| print(f" {name:<25} success={r['success']:.3f} cost={r['avg_cost']:.4f} costRed={cr:.1f}% unsafe={r['unsafe_rate']:.3f}") |
|
|
| |
| print("\n[5] Saving models...") |
| os.makedirs("/app/router_models", exist_ok=True) |
| for tier, clf in tier_clfs.items(): |
| clf.save_model(f"/app/router_models/tier_{tier}_success.json") |
| with open("/app/router_models/feat_keys.json","w") as f: |
| json.dump(FEAT_KEYS, f) |
| with open("/app/router_models/tier_config.json","w") as f: |
| json.dump({"tier_cost":TIER_COST,"tier_str":TIER_STR,"task_floor":TASK_FLOOR}, f, indent=2) |
| with open("/app/router_models/eval_results_v3.json","w") as f: |
| json.dump(results, f, indent=2, default=str) |
| print(f" Saved all models to /app/router_models/") |
|
|
| |
| best_name = "" |
| best_score = -float("inf") |
| for name, r in results.items(): |
| if name in ("oracle","always_cheap"): continue |
| |
| score = r["success"]*20 - r["avg_cost"]*50 - r["unsafe_rate"]*100 |
| if score > best_score: |
| best_score = score |
| best_name = name |
| print(f"\n BEST CONFIG: {best_name} (composite score: {best_score:.2f})") |
| print(f"\nDONE!") |
|
|