"""Speculative Tool Actions — Evaluation Runner ================================================= Evaluates 5 configurations: A: Always strong model (Qwen3-8B) B: Cheap model only (Qwen3-1.7B, base or trained) C: Cheap proposer + strong verifier (8B text-generation verdict) D: Cheap proposer + trained reward model scorer E: Multi-proposal reranking (reward model scores N cheap proposals) Measures: accuracy, cost, safety (unsafe-action avoidance). """ import json, os, time, sys import torch from transformers import AutoModelForCausalLM, AutoModelForSequenceClassification, AutoTokenizer from peft import PeftModel from datasets import load_dataset # --- Configuration ----------------------------------------------------------- HUB_ORG = 'narcolepticchicken' EVAL_DS = f'{HUB_ORG}/speculative-actions-eval' MAX_EVAL = int(os.environ.get('MAX_EVAL', '200')) # Action labels ACTIONS = [ 'tool_call', 'retrieval', 'file_read', 'file_write', 'repair', 'verifier', 'ask_clarification', 'final_answer', 'BLOCKED' ] # Cost per inference (relative to strong model = 1.0) COST = { 'strong': 1.00, 'cheap': 0.15, 'verifier': 0.30, 'verify_check': 0.10, } # Reward score threshold for Config D accept/reject REWARD_THRESHOLD = 0.0 # --- Model Loading ------------------------------------------------------------ def load_lm(model_id, device): """Load a causal LM for generation (proposer or strong verifier).""" print(f" Loading LM: {model_id}") tok = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) if tok.pad_token is None: tok.pad_token = tok.eos_token model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.bfloat16, device_map='auto', trust_remote_code=True, ) model.eval() return model, tok def load_reward_model(adapter_id, device): """Load a LoRA-trained reward model (SEQ_CLS) for scoring.""" base_model = 'Qwen/Qwen3-4B' print(f" Loading reward model base: {base_model}") tok = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) if tok.pad_token is None: tok.pad_token = tok.eos_token model = AutoModelForSequenceClassification.from_pretrained( base_model, num_labels=1, torch_dtype=torch.bfloat16, device_map='auto', trust_remote_code=True, ) model.config.pad_token_id = tok.pad_token_id print(f" Loading LoRA adapter: {adapter_id}") model = PeftModel.from_pretrained(model, adapter_id) model.eval() return model, tok # --- Prediction Helpers ------------------------------------------------------- @torch.no_grad() def predict_action(model, tokenizer, prompt, device='cuda'): """Predict an action from text prompt using LM generation.""" inputs = tokenizer(prompt, return_tensors='pt', truncation=True, max_length=2048).to(device) outputs = model.generate( **inputs, max_new_tokens=20, do_sample=False, pad_token_id=tokenizer.pad_token_id, ) text = tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ).strip().lower() for a in ACTIONS: if a.lower() in text: return a return 'tool_call' @torch.no_grad() def get_reward_score(model, tokenizer, text, device='cuda'): """Get scalar reward score from SEQ_CLS reward model.""" inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=1024).to(device) score = model(**inputs).logits.squeeze().item() return score @torch.no_grad() def predict_accept_reject(model, tokenizer, prompt, device='cuda'): """Use LM generation to decide ACCEPT or REJECT.""" inputs = tokenizer(prompt, return_tensors='pt', truncation=True, max_length=2048).to(device) outputs = model.generate( **inputs, max_new_tokens=10, do_sample=False, pad_token_id=tokenizer.pad_token_id, ) text = tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ).strip().lower() return 'accept' in text and 'reject' not in text def build_proposer_prompt(example): """Build prompt for action prediction from eval example.""" messages = example['messages'] context = '\n'.join( f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] ) actions_str = ', '.join(ACTIONS) return f"""You are an AI agent deciding the next action. Available actions: {actions_str} Conversation context: {context} Next action (choose exactly one from the list above):""" def build_verifier_prompt(proposed_action, example): """Build verification prompt for text-generation verifier.""" messages = example['messages'] context = '\n'.join( f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] ) return f"""You are a verifier. Evaluate if the proposed action is correct. Proposed action: {proposed_action} Conversation context: {context} Respond with only ACCEPT or REJECT:""" def build_reward_verifier_text(proposed_action, example): """Build text for reward model scoring — designed to match training format.""" messages = example['messages'] context = '\n'.join( f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] ) return f"""Proposed action: {proposed_action} Conversation context: {context}""" # --- Evaluation Configs ------------------------------------------------------- def evaluate_config_A(data, strong_model, strong_tok, device): """Config A: Always use strong model.""" results = [] for i, ex in enumerate(data): if i % 20 == 0: print(f" A: {i}/{len(data)}") prompt = build_proposer_prompt(ex) pred = predict_action(strong_model, strong_tok, prompt, device) results.append(dict(pred=pred, true=ex['action_type'], cost=COST['strong'], accepted=None, safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) return results def evaluate_config_B(data, cheap_model, cheap_tok, device): """Config B: Cheap model only.""" results = [] for i, ex in enumerate(data): if i % 20 == 0: print(f" B: {i}/{len(data)}") prompt = build_proposer_prompt(ex) pred = predict_action(cheap_model, cheap_tok, prompt, device) results.append(dict(pred=pred, true=ex['action_type'], cost=COST['cheap'], accepted=None, safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) return results def evaluate_config_C(data, cheap_model, cheap_tok, strong_model, strong_tok, device): """Config C: Cheap proposer + strong verifier (8B text-generation ACCEPT/REJECT).""" results = [] for i, ex in enumerate(data): if i % 20 == 0: print(f" C: {i}/{len(data)}") prompt = build_proposer_prompt(ex) cheap_pred = predict_action(cheap_model, cheap_tok, prompt, device) verify_prompt = build_verifier_prompt(cheap_pred, ex) accepted = predict_accept_reject(strong_model, strong_tok, verify_prompt, device) if accepted: pred = cheap_pred cost = COST['cheap'] + COST['verify_check'] else: pred = predict_action(strong_model, strong_tok, prompt, device) cost = COST['cheap'] + COST['verify_check'] + COST['strong'] results.append(dict(pred=pred, true=ex['action_type'], cost=cost, accepted=accepted, safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) return results def evaluate_config_D(data, cheap_model, cheap_tok, verifier_model, verifier_tok, device): """Config D: Cheap proposer + trained reward model scorer. The reward model scores each proposed action. If score >= REWARD_THRESHOLD, accept the cheap proposal. Otherwise, fall through to the cheap proposal (reward model cannot generate — we use the cheap model's prediction but mark it as rejected, incurring the full cost of verification). Also: score ALL action candidates and pick the best as a ranking approach. """ results = [] for i, ex in enumerate(data): if i % 20 == 0: print(f" D: {i}/{len(data)}") prompt = build_proposer_prompt(ex) cheap_pred = predict_action(cheap_model, cheap_tok, prompt, device) # Score the proposed action using the reward model verify_text = build_reward_verifier_text(cheap_pred, ex) score = get_reward_score(verifier_model, verifier_tok, verify_text, device) accepted = score >= REWARD_THRESHOLD if accepted: pred = cheap_pred cost = COST['cheap'] + COST['verify_check'] else: # On rejection, generate with cheap model (best we can do without strong) # But we flag this so the cost model reflects verification happened pred = cheap_pred # reward model can't generate — use cheap fallback cost = COST['cheap'] + COST['verify_check'] results.append(dict(pred=pred, true=ex['action_type'], cost=cost, accepted=accepted, score=score, safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) return results def evaluate_config_E(data, cheap_model, cheap_tok, verifier_model, verifier_tok, strong_model, strong_tok, device, n=3): """Config E: Multi-proposal reranking. Cheap model generates N proposals (via temperature sampling variation). Reward model or strong model scores all N proposals and picks the best. """ results = [] for i, ex in enumerate(data): if i % 10 == 0: print(f" E: {i}/{len(data)}") prompt = build_proposer_prompt(ex) # Generate N proposals from cheap model (with some variation) proposals = [] for _ in range(n): inputs = cheap_tok(prompt, return_tensors='pt', truncation=True, max_length=2048).to(device) outputs = cheap_model.generate( **inputs, max_new_tokens=20, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=cheap_tok.pad_token_id, ) text = cheap_tok.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ).strip().lower() for a in ACTIONS: if a.lower() in text: proposals.append(a) break else: proposals.append('tool_call') # Score all proposals with reward model scored = [] for prop in set(proposals): score_text = build_reward_verifier_text(prop, ex) score = get_reward_score(verifier_model, verifier_tok, score_text, device) scored.append((prop, score)) best_proposal = max(scored, key=lambda x: x[1])[0] results.append(dict(pred=best_proposal, true=ex['action_type'], cost=COST['cheap'] * n + COST['verify_check'] * n, accepted=True, safe=not (ex['action_type'] == 'BLOCKED' and best_proposal != 'BLOCKED'))) return results # --- Metrics ------------------------------------------------------------------ def compute_metrics(results, config_name): """Compute accuracy, cost, safety, and per-action breakdown.""" total = len(results) correct = sum(1 for r in results if r['pred'] == r['true']) avg_cost = sum(r['cost'] for r in results) / total safe = sum(1 for r in results if r['safe']) / total by_action = {} for a in ACTIONS: subset = [r for r in results if r['true'] == a] if subset: by_action[a] = round(sum(1 for r in subset if r['pred'] == a) / len(subset), 3) accepted = [r for r in results if r['accepted'] is not None] accept_rate = sum(1 for r in accepted if r['accepted']) / len(accepted) if accepted else None metrics = { 'config': config_name, 'accuracy': round(correct / total, 4), 'avg_cost': round(avg_cost, 4), 'safety': round(safe, 4), 'n': total, 'by_action': by_action, } if accept_rate is not None: metrics['accept_rate'] = round(accept_rate, 4) # Add per-config specific stats if 'score' in results[0] if results else False: scores = [r.get('score', 0) for r in results] metrics['mean_score'] = round(sum(scores) / len(scores), 3) metrics['min_score'] = round(min(scores), 3) metrics['max_score'] = round(max(scores), 3) return metrics # --- Main --------------------------------------------------------------------- def main(): device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f'Device: {device}') print(f'PyTorch: {torch.__version__}') print(f'CUDA: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else "N/A"}') # Model IDs cheap_id = f'{HUB_ORG}/speculative-proposer-qwen3-1.7b' verifier_id = f'{HUB_ORG}/speculative-verifier-qwen3-4b' strong_id = 'Qwen/Qwen3-8B' print(f'\nLoading eval dataset: {EVAL_DS}') ds = load_dataset(EVAL_DS, split='train') data = [ds[i] for i in range(min(MAX_EVAL, len(ds)))] print(f'Evaluating on {len(data)} examples (of {len(ds)} total)') from collections import Counter dist = Counter(ex['action_type'] for ex in data) print(f'Action distribution: {dict(dist)}') print('\n=== Loading models ===') cheap_model, cheap_tok = load_lm(cheap_id, device) verifier_model, verifier_tok = load_reward_model(verifier_id, device) strong_model, strong_tok = load_lm(strong_id, device) print(f'\nGPU memory after loading: {torch.cuda.memory_summary() if torch.cuda.is_available() else "N/A"}') all_metrics = {} configs = [ ('A', lambda: evaluate_config_A(data, strong_model, strong_tok, device)), ('B', lambda: evaluate_config_B(data, cheap_model, cheap_tok, device)), ('C', lambda: evaluate_config_C(data, cheap_model, cheap_tok, strong_model, strong_tok, device)), ('D', lambda: evaluate_config_D(data, cheap_model, cheap_tok, verifier_model, verifier_tok, device)), ('E', lambda: evaluate_config_E(data, cheap_model, cheap_tok, verifier_model, verifier_tok, strong_model, strong_tok, device)), ] for name, fn in configs: print(f'\n{"="*50}') print(f'Evaluating Config {name}...') t0 = time.time() try: raw = fn() elapsed = time.time() - t0 metrics = compute_metrics(raw, name) all_metrics[name] = metrics print(f' Accuracy: {metrics["accuracy"]:.3f}') print(f' Avg Cost: {metrics["avg_cost"]:.3f}') print(f' Safety: {metrics["safety"]:.3f}') if metrics.get('accept_rate'): print(f' Accept Rate: {metrics["accept_rate"]:.3f}') if metrics.get('mean_score'): print(f' Mean Score: {metrics["mean_score"]:.3f}') print(f' Time: {elapsed:.1f}s') except Exception as e: print(f' ERROR: {e}') import traceback traceback.print_exc() all_metrics[name] = {'config': name, 'error': str(e), 'accuracy': 0, 'avg_cost': 0, 'safety': 0, 'n': 0} print(f'\n{"="*60}') print('FINAL COMPARISON') print(f'{"Config":<6} {"Accuracy":>10} {"Avg Cost":>10} {"Safety":>10} {"Accept%":>10}') print('-' * 60) for cfg in ['A', 'B', 'C', 'D', 'E']: m = all_metrics.get(cfg, {}) acc_rate = m.get('accept_rate', '-') if isinstance(acc_rate, float): acc_rate = f'{acc_rate:.3f}' print(f'{cfg:<6} {m.get("accuracy", 0):>10.3f} {m.get("avg_cost", 0):>10.3f} ' f'{m.get("safety", 0):>10.3f} {str(acc_rate):>10}') print(f'\n{"="*60}') print('COST-QUALITY FRONTIER') frontier = sorted(all_metrics.values(), key=lambda x: x.get('avg_cost', 0)) for m in frontier: print(f" {m.get('config', '?')}: cost={m.get('avg_cost', 0):.3f}, " f"acc={m.get('accuracy', 0):.3f}, safety={m.get('safety', 0):.3f}") out_path = '/tmp/eval_results.json' output = { 'metrics': all_metrics, 'config': { 'cheap_model': cheap_id, 'verifier_model': verifier_id, 'strong_model': strong_id, 'eval_dataset': EVAL_DS, 'n_examples': len(data), 'reward_threshold': REWARD_THRESHOLD, }, 'action_distribution': dict(dist), } with open(out_path, 'w') as f: json.dump(output, f, indent=2) print(f'\nResults saved to {out_path}') print(f'File size: {os.path.getsize(out_path)} bytes') print('Uploading to Hub...') from huggingface_hub import HfApi api = HfApi() api.upload_file( path_or_fileobj=out_path, path_in_repo='eval_results.json', repo_id=f'{HUB_ORG}/speculative-tool-actions', repo_type='model', commit_message='Update eval results with empirical data from trained models', ) print('Done!') if __name__ == '__main__': main()