tenacious-bench-adapter / ablations /run_ablations.py
rafiakedir's picture
fix: corrected Colab notebook (judge-format) + fixed ablation HF loader
c4b7766 verified
#!/usr/bin/env python3
"""
Day 6 — Run ablations on held-out partition.
Three conditions:
Condition 1 (baseline): Week 10 raw scoring_evaluator output, no judge
Condition 2 (trained): Trained LoRA judge from training/adapter/
Condition 3 (prompt_only): Qwen 2.5 1.5B with no LoRA, best prompt
Writes:
ablations/ablation_results.json
ablations/held_out_traces.jsonl
"""
import json
import os
import sys
import time
import datetime
import statistics
from pathlib import Path
import requests
from bootstrap_test import paired_bootstrap
ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(ROOT))
from scoring_evaluator import score_task
HELD_OUT_PATH = ROOT / "tenacious_bench_v0.1/held_out/tasks.jsonl"
HF_JUDGE_MODEL = "rafiakedir/tenacious-bench-adapter" # merged model on HuggingFace
TRACES_PATH = Path(__file__).parent / "held_out_traces.jsonl"
RESULTS_PATH = Path(__file__).parent / "ablation_results.json"
COST_LOG = ROOT / "cost_log.csv"
OPENROUTER_KEY = ""
DEEPSEEK_MODEL = "deepseek/deepseek-chat-v3-0324"
JUDGE_SYSTEM_PROMPT = """You are a Tenacious Consulting sales judge. Evaluate the following B2B outbound sales email on these four rubric dimensions:
1. SIGNAL_GROUNDING_FIDELITY: Does the email only assert claims supported by the hiring signal brief? Use assertive language for high-confidence signals, questions for low-confidence ones.
2. BENCH_COMMITMENT_HONESTY: Does the email avoid committing more engineers than available on bench?
3. ICP_SEGMENT_APPROPRIATENESS: Does the email use language matching the correct ICP segment (growth/scale for Segment 1, cost/efficiency for Segment 2, transition/leadership for Segment 3, hedged for ABSTAIN)?
4. TONE_PRESERVATION: Does the email avoid banned re-engagement clichés, over-apologetic language, and include a 30-minute scoping CTA with a direct opener?
Additional Tenacious tone markers:
- No "just wanted to circle back", "touching base", "hope you're doing well"
- No "I apologize for taking your time", "we may not be the best fit"
- Must include calendar CTA: "30-minute scoping conversation" or equivalent
- No fabricated competitor claims
Score each dimension 0.0 to 1.0. Return ONLY a JSON object:
{"signal_grounding": <0-1>, "bench_honesty": <0-1>, "icp_segment": <0-1>, "tone": <0-1>, "overall": <0-1>, "reasoning": "<one sentence>"}"""
def _load_env():
env_path = ROOT / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip().strip('"'))
def call_openrouter(messages: list, model: str, max_tokens: int = 200) -> tuple[str, int, float]:
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {os.environ.get('OPENROUTER_API_KEY', '')}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/rafiakedir/tenacious-bench",
}
body = {"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": 0.0}
t0 = time.time()
resp = requests.post(url, headers=headers, json=body, timeout=60)
latency_ms = int((time.time() - t0) * 1000)
try:
data = resp.json()
usage = data.get("usage", {})
prompt_toks = usage.get("prompt_tokens", 0)
comp_toks = usage.get("completion_tokens", 0)
cost = 0.0
if "deepseek" in model.lower():
cost = (prompt_toks * 0.14 + comp_toks * 0.28) / 1000000
else:
cost = (prompt_toks * 0.40 + comp_toks * 0.40) / 1000000
return data["choices"][0]["message"]["content"].strip(), latency_ms, cost
except Exception:
return "[failed]", latency_ms, 0.0
def load_held_out_tasks():
tasks = []
with open(HELD_OUT_PATH) as f:
for line in f:
tasks.append(json.loads(line))
return tasks
def generate_candidate_if_missing(task: dict) -> tuple[str, float]:
"""If task has no candidate_output, generate one with DeepSeek."""
if task.get("candidate_output"):
return task["candidate_output"], 0.0
inp = task.get("input", {})
hsb = inp.get("hiring_signal_brief")
bs = inp.get("bench_summary")
task_type = task.get("task_type", "email_generation")
brief_text = json.dumps(hsb or bs or {}, indent=2)[:800]
msg = [
{"role": "system", "content": "You are a Tenacious Consulting sales agent writing B2B outreach emails."},
{"role": "user", "content": f"Write a {task_type} email for this prospect:\n{brief_text}\n\nKeep it under 120 words with a 30-minute scoping CTA."},
]
try:
text, _, cost = call_openrouter(msg, DEEPSEEK_MODEL, max_tokens=300)
return text, cost
except Exception as e:
return f"[generation failed: {e}]", 0.0
def score_with_evaluator(task: dict, candidate_output: str) -> dict:
"""Condition 1: machine-verifiable scoring_evaluator only."""
t = {**task, "candidate_output": candidate_output}
result = score_task(t)
return {
"signal_grounding": result.get("score", 0.0),
"bench_honesty": result.get("score", 0.0),
"icp_segment": result.get("score", 0.0),
"tone": result.get("score", 0.0),
"overall": result.get("score", 0.0),
"passed": result.get("passed", False),
"rubric_score": result.get("score", 0.0),
}
def score_with_prompt_judge(task: dict, candidate_output: str) -> tuple[dict, int, float]:
"""Condition 3: zero-shot Qwen judge via OpenRouter (Qwen3-30B)."""
inp = task.get("input", {})
brief = json.dumps(inp.get("hiring_signal_brief") or inp.get("bench_summary") or {})[:600]
prompt = f"""TASK INPUT:
{brief}
CANDIDATE EMAIL:
{candidate_output[:600]}
Score this email on all four rubric dimensions."""
msg = [
{"role": "system", "content": JUDGE_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
try:
text, latency_ms, cost = call_openrouter(msg, "qwen/qwen3-30b-a3b", max_tokens=200)
# Extract JSON from response
import re
json_match = re.search(r'\{[^}]+\}', text, re.DOTALL)
if json_match:
scores = json.loads(json_match.group())
else:
scores = {"overall": 0.5, "reasoning": "parse_error"}
scores["raw_response"] = text[:200]
return scores, latency_ms, cost
except Exception as e:
return {"overall": 0.5, "error": str(e)}, 0, 0.0
TRAINED_MODEL = None
TRAINED_TOKENIZER = None
JUDGE_SYSTEM_FOR_TRAINED = (
"You are a rubric-aware judge for Tenacious Consulting B2B outbound sales emails. "
"Given a task context and a candidate email, score the email on the specified rubric "
"dimension. Respond with a JSON object only:\n"
'{"dimension": "<dim>", "score": <0.0-1.0>, "pass": <true|false>, "reasoning": "<one sentence>"}'
)
def _load_trained_model():
"""Load merged judge model from HuggingFace (once, cached in module globals)."""
global TRAINED_MODEL, TRAINED_TOKENIZER
if TRAINED_MODEL is not None:
return TRAINED_MODEL, TRAINED_TOKENIZER
try:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
print(f" Loading trained judge from {HF_JUDGE_MODEL}...")
TRAINED_TOKENIZER = AutoTokenizer.from_pretrained(HF_JUDGE_MODEL)
TRAINED_MODEL = AutoModelForCausalLM.from_pretrained(
HF_JUDGE_MODEL,
torch_dtype=torch.float16,
device_map="auto",
)
TRAINED_MODEL.eval()
print(f" Trained judge loaded")
return TRAINED_MODEL, TRAINED_TOKENIZER
except Exception as e:
print(f" Could not load trained judge from HF: {e}")
return None, None
def score_with_trained_judge(task: dict, candidate_output: str) -> tuple[dict, int, float]:
"""Condition 2: merged judge model loaded from HuggingFace."""
import re, torch
model, tokenizer = _load_trained_model()
if model is None:
# Graceful fallback — mark clearly so results aren't confused with trained scores
return {"overall": 0.5, "error": "hf_model_unavailable", "note": "judge not loaded"}, 0, 0.0
dim = task.get("dimension", "signal_grounding_fidelity")
inp = task.get("input", {})
brief = json.dumps(
inp.get("hiring_signal_brief") or inp.get("bench_summary") or {}
)[:600]
user_content = (
f"EVALUATION DIMENSION: {dim}\n\n"
f"TASK CONTEXT:\n{brief}\n\n"
f"CANDIDATE EMAIL:\n{candidate_output.strip()[:500]}\n\n"
f"Score this email on the {dim} dimension."
)
msgs = [
{"role": "system", "content": JUDGE_SYSTEM_FOR_TRAINED},
{"role": "user", "content": user_content},
]
text = tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(text, return_tensors="pt").to(model.device)
t0 = time.time()
with torch.no_grad():
output = model.generate(
**inputs, max_new_tokens=150, temperature=0.1, do_sample=True,
pad_token_id=tokenizer.eos_token_id,
)
latency_ms = int((time.time() - t0) * 1000)
generated = tokenizer.decode(output[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
json_match = re.search(r'\{[^}]+\}', generated, re.DOTALL)
if json_match:
try:
scores = json.loads(json_match.group())
scores["overall"] = scores.get("score", 0.5)
return scores, latency_ms, 0.0
except json.JSONDecodeError:
pass
return {"overall": 0.5, "reasoning": "parse_error", "raw": generated[:200]}, latency_ms, 0.0
def append_trace(entry: dict):
with open(TRACES_PATH, "a") as f:
f.write(json.dumps(entry) + "\n")
def condition_baseline(tasks: list) -> list:
"""Condition 1: scoring_evaluator only, no judge."""
print("\n=== CONDITION 1: Baseline (scoring_evaluator) ===")
results = []
for i, task in enumerate(tasks):
t0 = time.time()
candidate, cost_gen = generate_candidate_if_missing(task)
scores = score_with_evaluator(task, candidate)
latency_ms = int((time.time() - t0) * 1000)
entry = {
"task_id": task["task_id"],
"condition": "baseline",
"candidate_output": candidate[:300],
"score": scores,
"latency_ms": latency_ms,
"cost_usd": cost_gen,
}
append_trace(entry)
results.append(scores.get("overall", 0.0))
print(f" [{i+1}/{len(tasks)}] {task['task_id']} score={scores.get('overall',0):.3f}")
return results
def condition_trained_judge(tasks: list) -> list:
"""Condition 2: trained LoRA adapter."""
print("\n=== CONDITION 2: Trained Judge (LoRA adapter) ===")
results = []
for i, task in enumerate(tasks):
t0 = time.time()
candidate, cost_gen = generate_candidate_if_missing(task)
scores, latency_ms, cost_judge = score_with_trained_judge(task, candidate)
# Blend with machine scorer for reliability
machine_scores = score_with_evaluator(task, candidate)
blended_overall = 0.6 * scores.get("overall", 0.5) + 0.4 * machine_scores.get("overall", 0.5)
scores["blended_overall"] = round(blended_overall, 4)
scores["machine_score"] = machine_scores.get("overall", 0.5)
entry = {
"task_id": task["task_id"],
"condition": "trained",
"candidate_output": candidate[:300],
"score": scores,
"latency_ms": latency_ms,
"cost_usd": cost_gen + cost_judge,
}
append_trace(entry)
results.append(blended_overall)
print(f" [{i+1}/{len(tasks)}] {task['task_id']} overall={blended_overall:.3f}")
return results
def condition_prompt_only(tasks: list) -> list:
"""Condition 3: Qwen3 with prompt-engineered judge, no training."""
print("\n=== CONDITION 3: Prompt-Only Judge (Qwen3-30B) ===")
results = []
for i, task in enumerate(tasks):
t0 = time.time()
candidate, cost_gen = generate_candidate_if_missing(task)
scores, latency_ms, cost_judge = score_with_prompt_judge(task, candidate)
# Blend with machine scorer
machine_scores = score_with_evaluator(task, candidate)
blended_overall = 0.6 * scores.get("overall", 0.5) + 0.4 * machine_scores.get("overall", 0.5)
scores["blended_overall"] = round(blended_overall, 4)
scores["machine_score"] = machine_scores.get("overall", 0.5)
entry = {
"task_id": task["task_id"],
"condition": "prompt_only",
"candidate_output": candidate[:300],
"score": scores,
"latency_ms": latency_ms,
"cost_usd": cost_gen + cost_judge,
}
append_trace(entry)
results.append(blended_overall)
print(f" [{i+1}/{len(tasks)}] {task['task_id']} overall={blended_overall:.3f}")
return results
def main():
_load_env()
tasks = load_held_out_tasks()
print(f"Loaded {len(tasks)} held-out tasks")
# Clear traces file
TRACES_PATH.unlink(missing_ok=True)
baseline_scores = condition_baseline(tasks)
trained_scores = condition_trained_judge(tasks)
prompt_scores = condition_prompt_only(tasks)
def summarize(scores: list) -> dict:
if not scores:
return {"mean": 0, "std": 0, "min": 0, "max": 0, "p95": 0}
return {
"mean": round(statistics.mean(scores), 4),
"std": round(statistics.stdev(scores) if len(scores) > 1 else 0, 4),
"min": round(min(scores), 4),
"max": round(max(scores), 4),
"p95": round(sorted(scores)[int(0.95 * len(scores))], 4),
"n": len(scores),
}
# Compute latencies from traces
traces = []
with open(TRACES_PATH) as f:
for line in f:
traces.append(json.loads(line))
def latency_p95(condition: str) -> int:
lats = [t["latency_ms"] for t in traces if t["condition"] == condition]
if not lats:
return 0
return sorted(lats)[int(0.95 * len(lats))]
def cost_p95(condition: str) -> float:
costs = [t.get("cost_usd", 0.0) for t in traces if t["condition"] == condition]
if not costs:
return 0.0
return round(sorted(costs)[int(0.95 * len(costs))], 5)
delta_a_boot = paired_bootstrap(trained_scores, baseline_scores)
delta_a_boot["description"] = "trained judge vs baseline"
delta_b_boot = paired_bootstrap(trained_scores, prompt_scores)
delta_b_boot["description"] = "trained judge vs prompt-only"
delta_c_boot = paired_bootstrap(prompt_scores, baseline_scores)
delta_c_boot["description"] = "prompt-only vs baseline"
results = {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"held_out_task_count": len(tasks),
"baseline": {**summarize(baseline_scores), "p95_latency_ms": latency_p95("baseline"), "p95_cost_usd": cost_p95("baseline")},
"trained": {**summarize(trained_scores), "p95_latency_ms": latency_p95("trained"), "p95_cost_usd": cost_p95("trained")},
"prompt_only": {**summarize(prompt_scores), "p95_latency_ms": latency_p95("prompt_only"), "p95_cost_usd": cost_p95("prompt_only")},
"delta_a": delta_a_boot,
"delta_b": delta_b_boot,
"delta_c": delta_c_boot,
}
with open(RESULTS_PATH, "w") as f:
json.dump(results, f, indent=2)
print(f"\n=== ABLATION RESULTS ===")
print(f"Baseline mean: {results['baseline']['mean']:.4f}")
print(f"Trained mean: {results['trained']['mean']:.4f}")
print(f"Prompt mean: {results['prompt_only']['mean']:.4f}")
print(f"Delta A (trained vs baseline): {results['delta_a']['mean_diff']:+.4f} (p={results['delta_a']['p_value']:.4f})")
print(f"Delta B (trained vs prompt): {results['delta_b']['mean_diff']:+.4f} (p={results['delta_b']['p_value']:.4f})")
print(f"Delta C (prompt vs baseline): {results['delta_c']['mean_diff']:+.4f} (p={results['delta_c']['p_value']:.4f})")
print(f"\nResults written to {RESULTS_PATH}")
print(f"Traces written to {TRACES_PATH}")
if __name__ == "__main__":
main()