| """Speculative Tool Actions — Evaluation Runner |
| ================================================= |
| Evaluates 5 configurations: |
| A: Always strong model (Qwen3-8B) |
| B: Cheap model only (Qwen3-1.7B, base or trained) |
| C: Cheap proposer + strong verifier (8B text-generation verdict) |
| D: Cheap proposer + trained reward model scorer |
| E: Multi-proposal reranking (reward model scores N cheap proposals) |
| |
| Measures: accuracy, cost, safety (unsafe-action avoidance). |
| """ |
|
|
| import json, os, time, sys |
| import torch |
| from transformers import AutoModelForCausalLM, AutoModelForSequenceClassification, AutoTokenizer |
| from peft import PeftModel |
| from datasets import load_dataset |
|
|
| |
| HUB_ORG = 'narcolepticchicken' |
| EVAL_DS = f'{HUB_ORG}/speculative-actions-eval' |
| MAX_EVAL = int(os.environ.get('MAX_EVAL', '200')) |
|
|
| |
| ACTIONS = [ |
| 'tool_call', 'retrieval', 'file_read', 'file_write', |
| 'repair', 'verifier', 'ask_clarification', 'final_answer', 'BLOCKED' |
| ] |
|
|
| |
| COST = { |
| 'strong': 1.00, |
| 'cheap': 0.15, |
| 'verifier': 0.30, |
| 'verify_check': 0.10, |
| } |
|
|
| |
| REWARD_THRESHOLD = 0.0 |
|
|
| |
| def load_lm(model_id, device): |
| """Load a causal LM for generation (proposer or strong verifier).""" |
| print(f" Loading LM: {model_id}") |
| tok = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) |
| if tok.pad_token is None: |
| tok.pad_token = tok.eos_token |
| model = AutoModelForCausalLM.from_pretrained( |
| model_id, torch_dtype=torch.bfloat16, device_map='auto', |
| trust_remote_code=True, |
| ) |
| model.eval() |
| return model, tok |
|
|
| def load_reward_model(adapter_id, device): |
| """Load a LoRA-trained reward model (SEQ_CLS) for scoring.""" |
| base_model = 'Qwen/Qwen3-4B' |
| print(f" Loading reward model base: {base_model}") |
| tok = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) |
| if tok.pad_token is None: |
| tok.pad_token = tok.eos_token |
| model = AutoModelForSequenceClassification.from_pretrained( |
| base_model, num_labels=1, |
| torch_dtype=torch.bfloat16, device_map='auto', |
| trust_remote_code=True, |
| ) |
| model.config.pad_token_id = tok.pad_token_id |
| print(f" Loading LoRA adapter: {adapter_id}") |
| model = PeftModel.from_pretrained(model, adapter_id) |
| model.eval() |
| return model, tok |
|
|
| |
| @torch.no_grad() |
| def predict_action(model, tokenizer, prompt, device='cuda'): |
| """Predict an action from text prompt using LM generation.""" |
| inputs = tokenizer(prompt, return_tensors='pt', truncation=True, |
| max_length=2048).to(device) |
| outputs = model.generate( |
| **inputs, max_new_tokens=20, do_sample=False, |
| pad_token_id=tokenizer.pad_token_id, |
| ) |
| text = tokenizer.decode( |
| outputs[0][inputs['input_ids'].shape[1]:], |
| skip_special_tokens=True |
| ).strip().lower() |
| for a in ACTIONS: |
| if a.lower() in text: |
| return a |
| return 'tool_call' |
|
|
| @torch.no_grad() |
| def get_reward_score(model, tokenizer, text, device='cuda'): |
| """Get scalar reward score from SEQ_CLS reward model.""" |
| inputs = tokenizer(text, return_tensors='pt', truncation=True, |
| max_length=1024).to(device) |
| score = model(**inputs).logits.squeeze().item() |
| return score |
|
|
| @torch.no_grad() |
| def predict_accept_reject(model, tokenizer, prompt, device='cuda'): |
| """Use LM generation to decide ACCEPT or REJECT.""" |
| inputs = tokenizer(prompt, return_tensors='pt', truncation=True, |
| max_length=2048).to(device) |
| outputs = model.generate( |
| **inputs, max_new_tokens=10, do_sample=False, |
| pad_token_id=tokenizer.pad_token_id, |
| ) |
| text = tokenizer.decode( |
| outputs[0][inputs['input_ids'].shape[1]:], |
| skip_special_tokens=True |
| ).strip().lower() |
| return 'accept' in text and 'reject' not in text |
|
|
| def build_proposer_prompt(example): |
| """Build prompt for action prediction from eval example.""" |
| messages = example['messages'] |
| context = '\n'.join( |
| f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] |
| ) |
| actions_str = ', '.join(ACTIONS) |
| return f"""You are an AI agent deciding the next action. |
| Available actions: {actions_str} |
| |
| Conversation context: |
| {context} |
| |
| Next action (choose exactly one from the list above):""" |
|
|
| def build_verifier_prompt(proposed_action, example): |
| """Build verification prompt for text-generation verifier.""" |
| messages = example['messages'] |
| context = '\n'.join( |
| f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] |
| ) |
| return f"""You are a verifier. Evaluate if the proposed action is correct. |
| |
| Proposed action: {proposed_action} |
| |
| Conversation context: |
| {context} |
| |
| Respond with only ACCEPT or REJECT:""" |
|
|
| def build_reward_verifier_text(proposed_action, example): |
| """Build text for reward model scoring — designed to match training format.""" |
| messages = example['messages'] |
| context = '\n'.join( |
| f"{m['role']}: {str(m['content'])[:200]}" for m in messages[-3:] |
| ) |
| return f"""Proposed action: {proposed_action} |
| |
| Conversation context: |
| {context}""" |
|
|
| |
| def evaluate_config_A(data, strong_model, strong_tok, device): |
| """Config A: Always use strong model.""" |
| results = [] |
| for i, ex in enumerate(data): |
| if i % 20 == 0: |
| print(f" A: {i}/{len(data)}") |
| prompt = build_proposer_prompt(ex) |
| pred = predict_action(strong_model, strong_tok, prompt, device) |
| results.append(dict(pred=pred, true=ex['action_type'], |
| cost=COST['strong'], accepted=None, |
| safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) |
| return results |
|
|
| def evaluate_config_B(data, cheap_model, cheap_tok, device): |
| """Config B: Cheap model only.""" |
| results = [] |
| for i, ex in enumerate(data): |
| if i % 20 == 0: |
| print(f" B: {i}/{len(data)}") |
| prompt = build_proposer_prompt(ex) |
| pred = predict_action(cheap_model, cheap_tok, prompt, device) |
| results.append(dict(pred=pred, true=ex['action_type'], |
| cost=COST['cheap'], accepted=None, |
| safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) |
| return results |
|
|
| def evaluate_config_C(data, cheap_model, cheap_tok, strong_model, strong_tok, device): |
| """Config C: Cheap proposer + strong verifier (8B text-generation ACCEPT/REJECT).""" |
| results = [] |
| for i, ex in enumerate(data): |
| if i % 20 == 0: |
| print(f" C: {i}/{len(data)}") |
| prompt = build_proposer_prompt(ex) |
| cheap_pred = predict_action(cheap_model, cheap_tok, prompt, device) |
|
|
| verify_prompt = build_verifier_prompt(cheap_pred, ex) |
| accepted = predict_accept_reject(strong_model, strong_tok, verify_prompt, device) |
|
|
| if accepted: |
| pred = cheap_pred |
| cost = COST['cheap'] + COST['verify_check'] |
| else: |
| pred = predict_action(strong_model, strong_tok, prompt, device) |
| cost = COST['cheap'] + COST['verify_check'] + COST['strong'] |
|
|
| results.append(dict(pred=pred, true=ex['action_type'], |
| cost=cost, accepted=accepted, |
| safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) |
| return results |
|
|
| def evaluate_config_D(data, cheap_model, cheap_tok, verifier_model, verifier_tok, device): |
| """Config D: Cheap proposer + trained reward model scorer. |
| |
| The reward model scores each proposed action. If score >= REWARD_THRESHOLD, |
| accept the cheap proposal. Otherwise, fall through to the cheap proposal |
| (reward model cannot generate — we use the cheap model's prediction |
| but mark it as rejected, incurring the full cost of verification). |
| |
| Also: score ALL action candidates and pick the best as a ranking approach. |
| """ |
| results = [] |
| for i, ex in enumerate(data): |
| if i % 20 == 0: |
| print(f" D: {i}/{len(data)}") |
| prompt = build_proposer_prompt(ex) |
| cheap_pred = predict_action(cheap_model, cheap_tok, prompt, device) |
|
|
| |
| verify_text = build_reward_verifier_text(cheap_pred, ex) |
| score = get_reward_score(verifier_model, verifier_tok, verify_text, device) |
| accepted = score >= REWARD_THRESHOLD |
|
|
| if accepted: |
| pred = cheap_pred |
| cost = COST['cheap'] + COST['verify_check'] |
| else: |
| |
| |
| pred = cheap_pred |
| cost = COST['cheap'] + COST['verify_check'] |
|
|
| results.append(dict(pred=pred, true=ex['action_type'], |
| cost=cost, accepted=accepted, score=score, |
| safe=not (ex['action_type'] == 'BLOCKED' and pred != 'BLOCKED'))) |
| return results |
|
|
| def evaluate_config_E(data, cheap_model, cheap_tok, verifier_model, verifier_tok, strong_model, strong_tok, device, n=3): |
| """Config E: Multi-proposal reranking. |
| |
| Cheap model generates N proposals (via temperature sampling variation). |
| Reward model or strong model scores all N proposals and picks the best. |
| """ |
| results = [] |
| for i, ex in enumerate(data): |
| if i % 10 == 0: |
| print(f" E: {i}/{len(data)}") |
| prompt = build_proposer_prompt(ex) |
|
|
| |
| proposals = [] |
| for _ in range(n): |
| inputs = cheap_tok(prompt, return_tensors='pt', truncation=True, |
| max_length=2048).to(device) |
| outputs = cheap_model.generate( |
| **inputs, max_new_tokens=20, do_sample=True, |
| temperature=0.7, top_p=0.9, |
| pad_token_id=cheap_tok.pad_token_id, |
| ) |
| text = cheap_tok.decode( |
| outputs[0][inputs['input_ids'].shape[1]:], |
| skip_special_tokens=True |
| ).strip().lower() |
| for a in ACTIONS: |
| if a.lower() in text: |
| proposals.append(a) |
| break |
| else: |
| proposals.append('tool_call') |
|
|
| |
| scored = [] |
| for prop in set(proposals): |
| score_text = build_reward_verifier_text(prop, ex) |
| score = get_reward_score(verifier_model, verifier_tok, score_text, device) |
| scored.append((prop, score)) |
|
|
| best_proposal = max(scored, key=lambda x: x[1])[0] |
|
|
| results.append(dict(pred=best_proposal, true=ex['action_type'], |
| cost=COST['cheap'] * n + COST['verify_check'] * n, |
| accepted=True, |
| safe=not (ex['action_type'] == 'BLOCKED' and best_proposal != 'BLOCKED'))) |
| return results |
|
|
| |
| def compute_metrics(results, config_name): |
| """Compute accuracy, cost, safety, and per-action breakdown.""" |
| total = len(results) |
| correct = sum(1 for r in results if r['pred'] == r['true']) |
| avg_cost = sum(r['cost'] for r in results) / total |
| safe = sum(1 for r in results if r['safe']) / total |
|
|
| by_action = {} |
| for a in ACTIONS: |
| subset = [r for r in results if r['true'] == a] |
| if subset: |
| by_action[a] = round(sum(1 for r in subset if r['pred'] == a) / len(subset), 3) |
|
|
| accepted = [r for r in results if r['accepted'] is not None] |
| accept_rate = sum(1 for r in accepted if r['accepted']) / len(accepted) if accepted else None |
|
|
| metrics = { |
| 'config': config_name, |
| 'accuracy': round(correct / total, 4), |
| 'avg_cost': round(avg_cost, 4), |
| 'safety': round(safe, 4), |
| 'n': total, |
| 'by_action': by_action, |
| } |
| if accept_rate is not None: |
| metrics['accept_rate'] = round(accept_rate, 4) |
| |
| if 'score' in results[0] if results else False: |
| scores = [r.get('score', 0) for r in results] |
| metrics['mean_score'] = round(sum(scores) / len(scores), 3) |
| metrics['min_score'] = round(min(scores), 3) |
| metrics['max_score'] = round(max(scores), 3) |
|
|
| return metrics |
|
|
| |
| def main(): |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f'Device: {device}') |
| print(f'PyTorch: {torch.__version__}') |
| print(f'CUDA: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else "N/A"}') |
|
|
| |
| cheap_id = f'{HUB_ORG}/speculative-proposer-qwen3-1.7b' |
| verifier_id = f'{HUB_ORG}/speculative-verifier-qwen3-4b' |
| strong_id = 'Qwen/Qwen3-8B' |
|
|
| print(f'\nLoading eval dataset: {EVAL_DS}') |
| ds = load_dataset(EVAL_DS, split='train') |
| data = [ds[i] for i in range(min(MAX_EVAL, len(ds)))] |
| print(f'Evaluating on {len(data)} examples (of {len(ds)} total)') |
|
|
| from collections import Counter |
| dist = Counter(ex['action_type'] for ex in data) |
| print(f'Action distribution: {dict(dist)}') |
|
|
| print('\n=== Loading models ===') |
| cheap_model, cheap_tok = load_lm(cheap_id, device) |
| verifier_model, verifier_tok = load_reward_model(verifier_id, device) |
| strong_model, strong_tok = load_lm(strong_id, device) |
|
|
| print(f'\nGPU memory after loading: {torch.cuda.memory_summary() if torch.cuda.is_available() else "N/A"}') |
|
|
| all_metrics = {} |
|
|
| configs = [ |
| ('A', lambda: evaluate_config_A(data, strong_model, strong_tok, device)), |
| ('B', lambda: evaluate_config_B(data, cheap_model, cheap_tok, device)), |
| ('C', lambda: evaluate_config_C(data, cheap_model, cheap_tok, strong_model, strong_tok, device)), |
| ('D', lambda: evaluate_config_D(data, cheap_model, cheap_tok, verifier_model, verifier_tok, device)), |
| ('E', lambda: evaluate_config_E(data, cheap_model, cheap_tok, verifier_model, verifier_tok, strong_model, strong_tok, device)), |
| ] |
|
|
| for name, fn in configs: |
| print(f'\n{"="*50}') |
| print(f'Evaluating Config {name}...') |
| t0 = time.time() |
| try: |
| raw = fn() |
| elapsed = time.time() - t0 |
| metrics = compute_metrics(raw, name) |
| all_metrics[name] = metrics |
|
|
| print(f' Accuracy: {metrics["accuracy"]:.3f}') |
| print(f' Avg Cost: {metrics["avg_cost"]:.3f}') |
| print(f' Safety: {metrics["safety"]:.3f}') |
| if metrics.get('accept_rate'): |
| print(f' Accept Rate: {metrics["accept_rate"]:.3f}') |
| if metrics.get('mean_score'): |
| print(f' Mean Score: {metrics["mean_score"]:.3f}') |
| print(f' Time: {elapsed:.1f}s') |
| except Exception as e: |
| print(f' ERROR: {e}') |
| import traceback |
| traceback.print_exc() |
| all_metrics[name] = {'config': name, 'error': str(e), 'accuracy': 0, 'avg_cost': 0, 'safety': 0, 'n': 0} |
|
|
| print(f'\n{"="*60}') |
| print('FINAL COMPARISON') |
| print(f'{"Config":<6} {"Accuracy":>10} {"Avg Cost":>10} {"Safety":>10} {"Accept%":>10}') |
| print('-' * 60) |
| for cfg in ['A', 'B', 'C', 'D', 'E']: |
| m = all_metrics.get(cfg, {}) |
| acc_rate = m.get('accept_rate', '-') |
| if isinstance(acc_rate, float): |
| acc_rate = f'{acc_rate:.3f}' |
| print(f'{cfg:<6} {m.get("accuracy", 0):>10.3f} {m.get("avg_cost", 0):>10.3f} ' |
| f'{m.get("safety", 0):>10.3f} {str(acc_rate):>10}') |
|
|
| print(f'\n{"="*60}') |
| print('COST-QUALITY FRONTIER') |
| frontier = sorted(all_metrics.values(), key=lambda x: x.get('avg_cost', 0)) |
| for m in frontier: |
| print(f" {m.get('config', '?')}: cost={m.get('avg_cost', 0):.3f}, " |
| f"acc={m.get('accuracy', 0):.3f}, safety={m.get('safety', 0):.3f}") |
|
|
| out_path = '/tmp/eval_results.json' |
| output = { |
| 'metrics': all_metrics, |
| 'config': { |
| 'cheap_model': cheap_id, |
| 'verifier_model': verifier_id, |
| 'strong_model': strong_id, |
| 'eval_dataset': EVAL_DS, |
| 'n_examples': len(data), |
| 'reward_threshold': REWARD_THRESHOLD, |
| }, |
| 'action_distribution': dict(dist), |
| } |
| with open(out_path, 'w') as f: |
| json.dump(output, f, indent=2) |
|
|
| print(f'\nResults saved to {out_path}') |
| print(f'File size: {os.path.getsize(out_path)} bytes') |
|
|
| print('Uploading to Hub...') |
| from huggingface_hub import HfApi |
| api = HfApi() |
| api.upload_file( |
| path_or_fileobj=out_path, |
| path_in_repo='eval_results.json', |
| repo_id=f'{HUB_ORG}/speculative-tool-actions', |
| repo_type='model', |
| commit_message='Update eval results with empirical data from trained models', |
| ) |
| print('Done!') |
|
|
| if __name__ == '__main__': |
| main() |
|
|