#!/usr/bin/env python3 """Tune execution-feedback thresholds for optimal cost-quality tradeoff.""" import sys,json,random,math,pickle sys.path.insert(0,"/app") from collections import defaultdict from aco.classifier import TaskCostClassifier from aco.router import ModelCascadeRouter from aco.execution_feedback import ExecutionFeedbackRouter TIER_STR={1:0.35,2:0.55,3:0.80,4:0.93,5:0.97} TIER_COST={1:0.05,2:0.15,3:0.75,4:1.0,5:1.5} TASK_FLOOR={"legal_regulated":4,"long_horizon":3,"research":3,"coding":3, "unknown_ambiguous":3,"quick_answer":1,"document_drafting":2, "tool_heavy":2,"retrieval_heavy":2} TASKS = { "quick_answer":["What is 2+2?","Explain quantum computing briefly.","Convert 100F to Celsius."], "coding":["Write a Python function to reverse a linked list.","Fix a typo in the README.", "Debug this critical production segfault NOW.","Just fix the typo in line 42."], "research":["Research latest transformer advances.","Find sources comparing LoRA and full FT briefly."], "document_drafting":["Draft project proposal for ML pipeline."], "legal_regulated":["Review this contract for liability clauses.","Check GDPR compliance."], "tool_heavy":["Search open issues and create summary."], "retrieval_heavy":["Answer based on 50-page document."], "long_horizon":["Plan 3-month roadmap.","Orchestrate multi-region deployment."], "unknown_ambiguous":["Help me with this thing."], } classifier = TaskCostClassifier() router = ModelCascadeRouter(model_path="/app/router_models/router_bundle_v8.pkl") rng = random.Random(42) N = 2000 def sim_logprobs(tier, diff, success, rng): n = rng.randint(20, 150) base = {1:-3.5,2:-2.5,3:-1.5,4:-0.7,5:-0.3}[tier] base *= (1 + diff * 0.15) lps = [] for _ in range(n): noise = rng.gauss(0, 1.0 + diff*0.3) lps.append(base + noise * (0.3 if success else 0.8)) return lps # Sweep thresholds print("="*80) print("FEEDBACK THRESHOLD SWEEP") print("="*80) print(f"\n{'EntropyThr':>12} {'LowConfThr':>12} {'Success':>10} {'AvgCost':>10} {'CostRed':>10} {'Gap':>10}") print("-"*65) frontier_sr = 0.901 frontier_cost = 1.0 best_score = -999 best_config = None for ent_thr in [1.5, 2.0, 2.5, 3.0, 3.5, 4.0]: for lc_thr in [0.05, 0.10, 0.15, 0.20, 0.25, 0.30]: ef = ExecutionFeedbackRouter(entropy_threshold=ent_thr, low_conf_ratio_threshold=lc_thr, tier_costs=TIER_COST, task_floors=TASK_FLOOR) rng.seed(42) succ = 0; cost = 0.0 for i in range(N): tt = rng.choice(list(TASKS.keys())) req = rng.choice(TASKS[tt]) pred = classifier.classify(req) r = router.route(req, tt, pred["difficulty"], pred) tier = r.tier; diff = r.dynamic_difficulty ps = TIER_STR[tier]**(diff*0.6) initial_success = rng.random() < ps lps = sim_logprobs(tier, diff, initial_success, rng) signal = ef.analyze_output(lps, task_type=tt, current_tier=tier) if signal.should_escalate and tier < 5: final_tier = min(tier+1, 5) final_tier = max(final_tier, TASK_FLOOR.get(tt,1)) ps2 = TIER_STR[final_tier]**(diff*0.6) final_success = rng.random() < ps2 c = TIER_COST[tier] + TIER_COST[final_tier] if final_success: succ += 1 else: c = TIER_COST[tier] if initial_success: succ += 1 cost += c sr = succ/N; ac = cost/N cr = (1-ac/frontier_cost)*100 gap = frontier_sr - sr # Score: maximize success, minimize cost score = sr*20 - ac*10 if score > best_score: best_score = score best_config = (ent_thr, lc_thr, sr, ac, cr, gap) if ent_thr == 2.5 or ent_thr == 3.0: print(f"{ent_thr:>12.1f} {lc_thr:>12.2f} {sr:>10.3f} {ac:>10.4f} {cr:>9.1f}% {gap:>10.3f}") print(f"\n\nBest config: entropy_thr={best_config[0]}, low_conf_thr={best_config[1]}") print(f" success={best_config[2]:.3f}, cost={best_config[3]:.4f}, costRed={best_config[4]:.1f}%, gap={best_config[5]:.3f}")