""" BenchmarkRunnerV2 — Rigorous evaluation with train/val/test splits, memory ablation, shuffle control, and contamination detection. Key difference from V1: BenchmarkRunnerV2 enforces RunMode. In eval_test mode, no memory is written. This is the only mode whose numbers are trustworthy. """ from __future__ import annotations import json import logging import time from dataclasses import dataclass, field from pathlib import Path from typing import Any from purpose_agent.v2_types import RunMode from purpose_agent.evalport import EvalCase, EvalPort, DictEvalPort, ScoreBundle from purpose_agent.orchestrator import Orchestrator, TaskResult from purpose_agent.types import State logger = logging.getLogger(__name__) @dataclass class V2EvalResult: """Result of one evaluation case.""" case_id: str iteration: int split: str bundle: ScoreBundle steps: int = 0 wall_time_s: float = 0.0 @dataclass class V2BenchmarkResult: """Full benchmark result with per-split reporting.""" name: str results: list[V2EvalResult] = field(default_factory=list) config: dict[str, Any] = field(default_factory=dict) started_at: float = field(default_factory=time.time) finished_at: float = 0.0 def get_split_summary(self, split: str) -> dict[str, float]: """Get aggregate metrics for a specific split.""" split_results = [r for r in self.results if r.split == split] if not split_results: return {} n = len(split_results) pass_rate = sum(1 for r in split_results if r.bundle.passed) / n avg_steps = sum(r.steps for r in split_results) / n return { "n": n, "pass_rate": round(pass_rate, 3), "avg_steps": round(avg_steps, 1), } def get_improvement_curve(self, split: str = "test") -> list[dict]: """Get per-iteration metrics for one split.""" by_iter: dict[int, list[V2EvalResult]] = {} for r in self.results: if r.split == split: by_iter.setdefault(r.iteration, []).append(r) curve = [] for it in sorted(by_iter): results = by_iter[it] n = len(results) pass_rate = sum(1 for r in results if r.bundle.passed) / n curve.append({ "iteration": it, "pass_rate": round(pass_rate, 3), "n": n, }) return curve def summary(self) -> str: lines = [f"═══ Benchmark: {self.name} ═══"] for split in ["train", "validation", "test"]: s = self.get_split_summary(split) if s: lines.append(f" {split:>12}: n={s['n']}, pass_rate={s['pass_rate']:.1%}, avg_steps={s['avg_steps']:.1f}") curve = self.get_improvement_curve("test") if len(curve) >= 2: first = curve[0]["pass_rate"] last = curve[-1]["pass_rate"] delta = last - first if abs(delta) < 0.001: lines.append(f"\n Test improvement: {first:.1%} → {last:.1%} (no significant change)") else: lines.append(f"\n Test improvement: {first:.1%} → {last:.1%} ({delta:+.1%})") return "\n".join(lines) def save(self, path: str) -> None: Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump({ "name": self.name, "config": self.config, "splits": { s: self.get_split_summary(s) for s in ["train", "validation", "test"] }, "curve": self.get_improvement_curve("test"), "n_results": len(self.results), }, f, indent=2) class BenchmarkRunnerV2: """ Rigorous benchmark runner with train/val/test splits and ablation controls. Key guarantee: eval_test cases NEVER cause memory writes. Usage: cases = [ EvalCase(id="t1", input_purpose="...", split="train", ...), EvalCase(id="t2", input_purpose="...", split="test", ...), ] runner = BenchmarkRunnerV2(orchestrator=orch) result = runner.run(cases, train_iterations=3, eval_iterations=1) print(result.summary()) """ def __init__( self, orchestrator: Orchestrator, eval_port: EvalPort | None = None, ): self.orch = orchestrator self.eval_port = eval_port or DictEvalPort() def run( self, cases: list[EvalCase], train_iterations: int = 3, eval_iterations: int = 1, name: str = "v2_benchmark", ) -> V2BenchmarkResult: """ Run benchmark: train split with learning, test split without. 1. Train iterations: run train split cases with RunMode.LEARNING_TRAIN 2. Validation: run validation split with RunMode.LEARNING_VALIDATION 3. Test: run test split with RunMode.EVAL_TEST (no memory writes) """ result = V2BenchmarkResult(name=name, config={ "train_iterations": train_iterations, "eval_iterations": eval_iterations, }) train_cases = [c for c in cases if c.split == "train"] val_cases = [c for c in cases if c.split == "validation"] test_cases = [c for c in cases if c.split == "test"] # Phase 1: Training for it in range(1, train_iterations + 1): logger.info(f"Train iteration {it}/{train_iterations}") for case in train_cases: ev = self._run_case(case, it, RunMode.LEARNING_TRAIN) result.results.append(ev) # Phase 2: Validation for case in val_cases: ev = self._run_case(case, 1, RunMode.LEARNING_VALIDATION) result.results.append(ev) # Phase 3: Test (NO MEMORY WRITES) for it in range(1, eval_iterations + 1): logger.info(f"Test iteration {it}/{eval_iterations}") for case in test_cases: ev = self._run_case(case, it, RunMode.EVAL_TEST) result.results.append(ev) result.finished_at = time.time() return result def run_cold_warm( self, test_cases: list[EvalCase], train_cases: list[EvalCase], name: str = "cold_warm", ) -> dict[str, Any]: """Compare cold (no memory) vs warm (after training) on the same test set.""" # Cold: eval test cases with empty memory cold_results = [] for case in test_cases: ev = self._run_case(case, 0, RunMode.EVAL_TEST) cold_results.append(ev) cold_pass = sum(1 for r in cold_results if r.bundle.passed) / max(len(cold_results), 1) # Train for case in train_cases: self._run_case(case, 1, RunMode.LEARNING_TRAIN) # Warm: eval same test cases after training warm_results = [] for case in test_cases: ev = self._run_case(case, 1, RunMode.EVAL_TEST) warm_results.append(ev) warm_pass = sum(1 for r in warm_results if r.bundle.passed) / max(len(warm_results), 1) delta = warm_pass - cold_pass return { "cold_pass_rate": round(cold_pass, 3), "warm_pass_rate": round(warm_pass, 3), "delta": round(delta, 3), "improvement_significant": abs(delta) > 0.05, } def run_memory_ablation( self, test_cases: list[EvalCase], ) -> dict[str, Any]: """Run test cases with and without memory to measure memory contribution.""" # With memory with_results = [] for case in test_cases: ev = self._run_case(case, 1, RunMode.EVAL_TEST) with_results.append(ev) with_pass = sum(1 for r in with_results if r.bundle.passed) / max(len(with_results), 1) # Without memory (temporarily clear) saved_lib = list(self.orch.optimizer.heuristic_library) self.orch.optimizer.heuristic_library = [] self.orch.sync_memory() without_results = [] for case in test_cases: ev = self._run_case(case, 1, RunMode.EVAL_TEST) without_results.append(ev) without_pass = sum(1 for r in without_results if r.bundle.passed) / max(len(without_results), 1) # Restore self.orch.optimizer.heuristic_library = saved_lib self.orch.sync_memory() return { "with_memory_pass_rate": round(with_pass, 3), "without_memory_pass_rate": round(without_pass, 3), "memory_contribution": round(with_pass - without_pass, 3), } def _run_case(self, case: EvalCase, iteration: int, mode: RunMode) -> V2EvalResult: """Run a single case under a specific RunMode.""" start = time.time() # In EVAL_TEST: save and restore memory state saved_optimize = self.orch.optimize_every_n_tasks if mode.is_eval: self.orch.optimize_every_n_tasks = 999999 # Disable optimization try: task_result = self.orch.run_task( purpose=case.input_purpose, initial_state=State(data=case.input_state), max_steps=case.max_steps, ) except Exception as e: logger.error(f"Case {case.id} failed: {e}") task_result = TaskResult( trajectory=__import__("purpose_agent.types", fromlist=["Trajectory"]).Trajectory( task_description=case.input_purpose, purpose=case.input_purpose, ), final_state=State(data={"_error": str(e)}), ) # Restore self.orch.optimize_every_n_tasks = saved_optimize # Evaluate bundle = self.eval_port.evaluate( case, task_result.final_state.data, task_result.trajectory, ) return V2EvalResult( case_id=case.id, iteration=iteration, split=case.split, bundle=bundle, steps=task_result.total_steps, wall_time_s=time.time() - start, )