#!/usr/bin/env python3 """v10 Router: Fixed regularization for 500-sample training set. from collections import Counter Problem: XGBoost with 23 features and 500 samples overfits (100% train acc). Solution: Heavy regularization + fewer estimators + stratified CV. """ import sys, json, random, pickle, numpy as np from collections import defaultdict from datasets import load_dataset import warnings from collections import Counter warnings.filterwarnings('ignore') from xgboost import XGBClassifier from sklearn.calibration import IsotonicRegression from sklearn.model_selection import cross_val_score print("="*80) print("v10 ROUTER: FIXED REGULARIZATION") print("="*80) # Load traces MODELS = ['claude-opus-4.7','gpt-5-mini','gpt-5-nano','gpt-5.2', 'gemini-2.5-pro','gemini-3-pro','deepseek-v3.2','deepseek-v4-flash'] MODEL_TIER = { 'deepseek-v4-flash':1,'gpt-5-nano':1,'gpt-5-mini':2,'deepseek-v3.2':2, 'gemini-2.5-pro':3,'claude-opus-4.7':4,'gpt-5.2':4,'gemini-3-pro':5, } TIER_COST = {1:0.01,2:0.05,3:0.15,4:0.30,5:0.50} TIER_TO_MODEL = {1:'deepseek-v4-flash',2:'gpt-5-mini',3:'gemini-2.5-pro',4:'claude-opus-4.7',5:'gemini-3-pro'} # Feature extraction (same as before) CODE_KW=["python","code","function","bug","debug","refactor","implement","test","error","traceback","import"] CRITICAL_KW=["critical","production","urgent","emergency","live","deployed","safety","security"] SIMPLE_KW=["typo","simple","quick","brief","minor","small","easy","trivial","just"] FEAT_KEYS = sorted([ 'req_len','num_words','has_code','n_code','has_critical','has_simple', 'has_error_msg','has_file_path','n_lines','has_fix','has_add', 'has_change','has_test','has_doc', ]) def extract_features(text): r = text.lower() return { 'req_len':len(text),'num_words':len(text.split()), 'has_code':int(any(k in r for k in CODE_KW)), 'n_code':sum(1 for k in CODE_KW if k in r), 'has_critical':int(any(k in r for k in CRITICAL_KW)), 'has_simple':int(any(k in r for k in SIMPLE_KW)), 'has_error_msg':int('error' in r or 'traceback' in r or 'exception' in r), 'has_file_path':int('/' in r), 'n_lines':text.count('\n')+1, 'has_fix':int('fix' in r or 'bug' in r or 'issue' in r), 'has_add':int('add' in r or 'new' in r or 'create' in r), 'has_change':int('change' in r or 'modify' in r or 'update' in r), 'has_test':int('test' in r or 'spec' in r), 'has_doc':int('doc' in r or 'readme' in r), } print("\n[1] Loading traces...") traces = defaultdict(dict) for model in MODELS: ds = load_dataset(f'SWE-Router/swebench-verified-{model}', split='test') for row in ds: traces[row['instance_id']][model] = { 'resolved':row['resolved'], 'cost':float(row['instance_cost']), 'problem':row['problem_statement'], } print(f" {len(traces)} tasks loaded") print("\n[2] Building features...") X = [] tier_labels = {t:[] for t in range(1,6)} optimal_tiers = [] for iid, model_results in traces.items(): problem = next(iter(model_results.values()))['problem'] feats = extract_features(problem) feat_vec = [float(feats.get(k,0.0)) for k in FEAT_KEYS] X.append(feat_vec) tier_success = {} for model, result in model_results.items(): tier = MODEL_TIER[model] if tier not in tier_success: tier_success[tier] = False if result['resolved']: tier_success[tier] = True for t in range(1,6): tier_labels[t].append(int(tier_success.get(t, False))) opt = 5 for t in range(1,6): if tier_success.get(t, False): opt = t; break optimal_tiers.append(opt) X = np.array(X, dtype=np.float32) print(f" X shape: {X.shape}") print(f" Optimal tier dist: {Counter(optimal_tiers)}") # Train with HEAVY regularization print("\n[3] Training with heavy regularization...") tier_clfs = {} tier_calibs = {} for t in range(1,6): y = np.array(tier_labels[t]) n_pos = y.sum() spw = max(1, (len(y)-n_pos)/max(n_pos,1)) # Heavy regularization to prevent overfitting on 500 samples clf = XGBClassifier( n_estimators=50, # Reduced from 200 max_depth=3, # Reduced from 5 learning_rate=0.1, subsample=0.7, colsample_bytree=0.6, min_child_weight=10, # Prevent memorization gamma=1.0, # Require significant splits reg_alpha=1.0, # L1 regularization reg_lambda=5.0, # L2 regularization scale_pos_weight=spw, eval_metric='logloss', random_state=42, ) # Cross-validate try: scores = cross_val_score(clf, X, y, cv=5, scoring='f1') cv_f1 = scores.mean() except: cv_f1 = 0.0 clf.fit(X, y) # Check train accuracy train_pred = clf.predict(X) train_acc = np.mean(train_pred == y) # Calibrate p_raw = clf.predict_proba(X)[:,1] cal = IsotonicRegression(out_of_bounds='clip') cal.fit(p_raw, y) p_cal = cal.transform(p_raw) # Check calibration range p_min, p_max = p_cal.min(), p_cal.max() p_mean = p_cal.mean() tier_clfs[t] = clf tier_calibs[t] = cal print(f" Tier {t}: cv_f1={cv_f1:.3f}, train_acc={train_acc:.3f}, " f"P(success) range=[{p_min:.3f},{p_max:.3f}], mean={p_mean:.3f}") from collections import Counter # Evaluate with different thresholds print("\n[4] Evaluating with threshold sweep...") best_thr = None best_score = -999 for thr in [0.60, 0.65, 0.70, 0.75, 0.80, 0.85]: succ=0; cost=0.0 for iid, model_results in traces.items(): problem = next(iter(model_results.values()))['problem'] feats = extract_features(problem) feat_vec = np.array([float(feats.get(k,0.0)) for k in FEAT_KEYS], dtype=np.float32).reshape(1,-1) # Route: cheapest tier with P(success) >= thr selected_tier = 5 tier_probs = {} for t in range(1,6): p_raw = tier_clfs[t].predict_proba(feat_vec)[0,1] p_cal = float(tier_calibs[t].transform([p_raw])[0]) tier_probs[t] = p_cal if p_cal >= thr and selected_tier == 5: selected_tier = t model = TIER_TO_MODEL.get(selected_tier, 'claude-opus-4.7') if model in model_results and model_results[model]['resolved']: succ += 1 cost += model_results[model]['cost'] else: cost += model_results.get(model,{}).get('cost', TIER_COST[selected_tier]) sr = succ/len(traces) ac = cost/len(traces) cr = (1-ac/0.3167)*100 score = sr*20 - ac*10 # weighted score print(f" thr={thr:.2f}: success={sr:.3f}, cost=${ac:.4f}, costRed={cr:.1f}%") if score > best_score: best_score = score best_thr = thr print(f"\n Best threshold: {best_thr}") # v10 + feedback: route cheap, escalate on failure print("\n[5] v10 + feedback evaluation...") for thr in [0.70, 0.75, 0.80]: succ=0; cost=0.0; escalated=0 for iid, model_results in traces.items(): problem = next(iter(model_results.values()))['problem'] feats = extract_features(problem) feat_vec = np.array([float(feats.get(k,0.0)) for k in FEAT_KEYS], dtype=np.float32).reshape(1,-1) selected_tier = 5 for t in range(1,6): p_raw = tier_clfs[t].predict_proba(feat_vec)[0,1] p_cal = float(tier_calibs[t].transform([p_raw])[0]) if p_cal >= thr and selected_tier == 5: selected_tier = t model = TIER_TO_MODEL.get(selected_tier, 'claude-opus-4.7') # Try cheap model first if model in model_results and model_results[model]['resolved']: succ += 1 cost += model_results[model]['cost'] elif selected_tier < 5: # Escalate up_tier = min(selected_tier+1, 5) up_model = TIER_TO_MODEL.get(up_tier, 'claude-opus-4.7') escalated += 1 if up_model in model_results and model_results[up_model]['resolved']: succ += 1 cost += model_results[model]['cost'] + model_results[up_model]['cost'] else: cost += model_results[model]['cost'] + model_results.get(up_model,{}).get('cost', TIER_COST[up_tier]) else: cost += model_results.get(model,{}).get('cost', TIER_COST[selected_tier]) sr = succ/len(traces) ac = cost/len(traces) cr = (1-ac/0.3167)*100 print(f" v10_feedback(thr={thr:.2f}): success={sr:.3f}, cost=${ac:.4f}, costRed={cr:.1f}%, escalated={escalated}") # Save fixed bundle v10_fixed = { 'tier_clfs': {str(k):v for k,v in tier_clfs.items()}, 'tier_calibrators': {str(k):v for k,v in tier_calibs.items()}, 'feat_keys': FEAT_KEYS, 'tier_config': {str(k):v for k,v in TIER_COST.items()}, 'version': '10.1', 'description': 'ACO v10.1: Regularized XGBoost on SWE-Router data', 'best_threshold': best_thr, } with open('/app/router_models/router_bundle_v10_fixed.pkl', 'wb') as f: pickle.dump(v10_fixed, f) print(f"\nSaved v10.1 bundle") print("DONE!")