purpose-agent / purpose_agent /benchmark_v2.py
Rohan03's picture
V2 merge: purpose_agent/benchmark_v2.py
08321e5 verified
"""
BenchmarkRunnerV2 — Rigorous evaluation with train/val/test splits,
memory ablation, shuffle control, and contamination detection.
Key difference from V1: BenchmarkRunnerV2 enforces RunMode. In eval_test
mode, no memory is written. This is the only mode whose numbers are trustworthy.
"""
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from purpose_agent.v2_types import RunMode
from purpose_agent.evalport import EvalCase, EvalPort, DictEvalPort, ScoreBundle
from purpose_agent.orchestrator import Orchestrator, TaskResult
from purpose_agent.types import State
logger = logging.getLogger(__name__)
@dataclass
class V2EvalResult:
"""Result of one evaluation case."""
case_id: str
iteration: int
split: str
bundle: ScoreBundle
steps: int = 0
wall_time_s: float = 0.0
@dataclass
class V2BenchmarkResult:
"""Full benchmark result with per-split reporting."""
name: str
results: list[V2EvalResult] = field(default_factory=list)
config: dict[str, Any] = field(default_factory=dict)
started_at: float = field(default_factory=time.time)
finished_at: float = 0.0
def get_split_summary(self, split: str) -> dict[str, float]:
"""Get aggregate metrics for a specific split."""
split_results = [r for r in self.results if r.split == split]
if not split_results:
return {}
n = len(split_results)
pass_rate = sum(1 for r in split_results if r.bundle.passed) / n
avg_steps = sum(r.steps for r in split_results) / n
return {
"n": n,
"pass_rate": round(pass_rate, 3),
"avg_steps": round(avg_steps, 1),
}
def get_improvement_curve(self, split: str = "test") -> list[dict]:
"""Get per-iteration metrics for one split."""
by_iter: dict[int, list[V2EvalResult]] = {}
for r in self.results:
if r.split == split:
by_iter.setdefault(r.iteration, []).append(r)
curve = []
for it in sorted(by_iter):
results = by_iter[it]
n = len(results)
pass_rate = sum(1 for r in results if r.bundle.passed) / n
curve.append({
"iteration": it,
"pass_rate": round(pass_rate, 3),
"n": n,
})
return curve
def summary(self) -> str:
lines = [f"═══ Benchmark: {self.name} ═══"]
for split in ["train", "validation", "test"]:
s = self.get_split_summary(split)
if s:
lines.append(f" {split:>12}: n={s['n']}, pass_rate={s['pass_rate']:.1%}, avg_steps={s['avg_steps']:.1f}")
curve = self.get_improvement_curve("test")
if len(curve) >= 2:
first = curve[0]["pass_rate"]
last = curve[-1]["pass_rate"]
delta = last - first
if abs(delta) < 0.001:
lines.append(f"\n Test improvement: {first:.1%}{last:.1%} (no significant change)")
else:
lines.append(f"\n Test improvement: {first:.1%}{last:.1%} ({delta:+.1%})")
return "\n".join(lines)
def save(self, path: str) -> None:
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump({
"name": self.name,
"config": self.config,
"splits": {
s: self.get_split_summary(s) for s in ["train", "validation", "test"]
},
"curve": self.get_improvement_curve("test"),
"n_results": len(self.results),
}, f, indent=2)
class BenchmarkRunnerV2:
"""
Rigorous benchmark runner with train/val/test splits and ablation controls.
Key guarantee: eval_test cases NEVER cause memory writes.
Usage:
cases = [
EvalCase(id="t1", input_purpose="...", split="train", ...),
EvalCase(id="t2", input_purpose="...", split="test", ...),
]
runner = BenchmarkRunnerV2(orchestrator=orch)
result = runner.run(cases, train_iterations=3, eval_iterations=1)
print(result.summary())
"""
def __init__(
self,
orchestrator: Orchestrator,
eval_port: EvalPort | None = None,
):
self.orch = orchestrator
self.eval_port = eval_port or DictEvalPort()
def run(
self,
cases: list[EvalCase],
train_iterations: int = 3,
eval_iterations: int = 1,
name: str = "v2_benchmark",
) -> V2BenchmarkResult:
"""
Run benchmark: train split with learning, test split without.
1. Train iterations: run train split cases with RunMode.LEARNING_TRAIN
2. Validation: run validation split with RunMode.LEARNING_VALIDATION
3. Test: run test split with RunMode.EVAL_TEST (no memory writes)
"""
result = V2BenchmarkResult(name=name, config={
"train_iterations": train_iterations,
"eval_iterations": eval_iterations,
})
train_cases = [c for c in cases if c.split == "train"]
val_cases = [c for c in cases if c.split == "validation"]
test_cases = [c for c in cases if c.split == "test"]
# Phase 1: Training
for it in range(1, train_iterations + 1):
logger.info(f"Train iteration {it}/{train_iterations}")
for case in train_cases:
ev = self._run_case(case, it, RunMode.LEARNING_TRAIN)
result.results.append(ev)
# Phase 2: Validation
for case in val_cases:
ev = self._run_case(case, 1, RunMode.LEARNING_VALIDATION)
result.results.append(ev)
# Phase 3: Test (NO MEMORY WRITES)
for it in range(1, eval_iterations + 1):
logger.info(f"Test iteration {it}/{eval_iterations}")
for case in test_cases:
ev = self._run_case(case, it, RunMode.EVAL_TEST)
result.results.append(ev)
result.finished_at = time.time()
return result
def run_cold_warm(
self,
test_cases: list[EvalCase],
train_cases: list[EvalCase],
name: str = "cold_warm",
) -> dict[str, Any]:
"""Compare cold (no memory) vs warm (after training) on the same test set."""
# Cold: eval test cases with empty memory
cold_results = []
for case in test_cases:
ev = self._run_case(case, 0, RunMode.EVAL_TEST)
cold_results.append(ev)
cold_pass = sum(1 for r in cold_results if r.bundle.passed) / max(len(cold_results), 1)
# Train
for case in train_cases:
self._run_case(case, 1, RunMode.LEARNING_TRAIN)
# Warm: eval same test cases after training
warm_results = []
for case in test_cases:
ev = self._run_case(case, 1, RunMode.EVAL_TEST)
warm_results.append(ev)
warm_pass = sum(1 for r in warm_results if r.bundle.passed) / max(len(warm_results), 1)
delta = warm_pass - cold_pass
return {
"cold_pass_rate": round(cold_pass, 3),
"warm_pass_rate": round(warm_pass, 3),
"delta": round(delta, 3),
"improvement_significant": abs(delta) > 0.05,
}
def run_memory_ablation(
self,
test_cases: list[EvalCase],
) -> dict[str, Any]:
"""Run test cases with and without memory to measure memory contribution."""
# With memory
with_results = []
for case in test_cases:
ev = self._run_case(case, 1, RunMode.EVAL_TEST)
with_results.append(ev)
with_pass = sum(1 for r in with_results if r.bundle.passed) / max(len(with_results), 1)
# Without memory (temporarily clear)
saved_lib = list(self.orch.optimizer.heuristic_library)
self.orch.optimizer.heuristic_library = []
self.orch.sync_memory()
without_results = []
for case in test_cases:
ev = self._run_case(case, 1, RunMode.EVAL_TEST)
without_results.append(ev)
without_pass = sum(1 for r in without_results if r.bundle.passed) / max(len(without_results), 1)
# Restore
self.orch.optimizer.heuristic_library = saved_lib
self.orch.sync_memory()
return {
"with_memory_pass_rate": round(with_pass, 3),
"without_memory_pass_rate": round(without_pass, 3),
"memory_contribution": round(with_pass - without_pass, 3),
}
def _run_case(self, case: EvalCase, iteration: int, mode: RunMode) -> V2EvalResult:
"""Run a single case under a specific RunMode."""
start = time.time()
# In EVAL_TEST: save and restore memory state
saved_optimize = self.orch.optimize_every_n_tasks
if mode.is_eval:
self.orch.optimize_every_n_tasks = 999999 # Disable optimization
try:
task_result = self.orch.run_task(
purpose=case.input_purpose,
initial_state=State(data=case.input_state),
max_steps=case.max_steps,
)
except Exception as e:
logger.error(f"Case {case.id} failed: {e}")
task_result = TaskResult(
trajectory=__import__("purpose_agent.types", fromlist=["Trajectory"]).Trajectory(
task_description=case.input_purpose, purpose=case.input_purpose,
),
final_state=State(data={"_error": str(e)}),
)
# Restore
self.orch.optimize_every_n_tasks = saved_optimize
# Evaluate
bundle = self.eval_port.evaluate(
case, task_result.final_state.data, task_result.trajectory,
)
return V2EvalResult(
case_id=case.id,
iteration=iteration,
split=case.split,
bundle=bundle,
steps=task_result.total_steps,
wall_time_s=time.time() - start,
)