| """ |
| BenchmarkRunnerV2 — Rigorous evaluation with train/val/test splits, |
| memory ablation, shuffle control, and contamination detection. |
| |
| Key difference from V1: BenchmarkRunnerV2 enforces RunMode. In eval_test |
| mode, no memory is written. This is the only mode whose numbers are trustworthy. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any |
|
|
| from purpose_agent.v2_types import RunMode |
| from purpose_agent.evalport import EvalCase, EvalPort, DictEvalPort, ScoreBundle |
| from purpose_agent.orchestrator import Orchestrator, TaskResult |
| from purpose_agent.types import State |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class V2EvalResult: |
| """Result of one evaluation case.""" |
| case_id: str |
| iteration: int |
| split: str |
| bundle: ScoreBundle |
| steps: int = 0 |
| wall_time_s: float = 0.0 |
|
|
|
|
| @dataclass |
| class V2BenchmarkResult: |
| """Full benchmark result with per-split reporting.""" |
| name: str |
| results: list[V2EvalResult] = field(default_factory=list) |
| config: dict[str, Any] = field(default_factory=dict) |
| started_at: float = field(default_factory=time.time) |
| finished_at: float = 0.0 |
|
|
| def get_split_summary(self, split: str) -> dict[str, float]: |
| """Get aggregate metrics for a specific split.""" |
| split_results = [r for r in self.results if r.split == split] |
| if not split_results: |
| return {} |
| n = len(split_results) |
| pass_rate = sum(1 for r in split_results if r.bundle.passed) / n |
| avg_steps = sum(r.steps for r in split_results) / n |
| return { |
| "n": n, |
| "pass_rate": round(pass_rate, 3), |
| "avg_steps": round(avg_steps, 1), |
| } |
|
|
| def get_improvement_curve(self, split: str = "test") -> list[dict]: |
| """Get per-iteration metrics for one split.""" |
| by_iter: dict[int, list[V2EvalResult]] = {} |
| for r in self.results: |
| if r.split == split: |
| by_iter.setdefault(r.iteration, []).append(r) |
|
|
| curve = [] |
| for it in sorted(by_iter): |
| results = by_iter[it] |
| n = len(results) |
| pass_rate = sum(1 for r in results if r.bundle.passed) / n |
| curve.append({ |
| "iteration": it, |
| "pass_rate": round(pass_rate, 3), |
| "n": n, |
| }) |
| return curve |
|
|
| def summary(self) -> str: |
| lines = [f"═══ Benchmark: {self.name} ═══"] |
| for split in ["train", "validation", "test"]: |
| s = self.get_split_summary(split) |
| if s: |
| lines.append(f" {split:>12}: n={s['n']}, pass_rate={s['pass_rate']:.1%}, avg_steps={s['avg_steps']:.1f}") |
|
|
| curve = self.get_improvement_curve("test") |
| if len(curve) >= 2: |
| first = curve[0]["pass_rate"] |
| last = curve[-1]["pass_rate"] |
| delta = last - first |
| if abs(delta) < 0.001: |
| lines.append(f"\n Test improvement: {first:.1%} → {last:.1%} (no significant change)") |
| else: |
| lines.append(f"\n Test improvement: {first:.1%} → {last:.1%} ({delta:+.1%})") |
| return "\n".join(lines) |
|
|
| def save(self, path: str) -> None: |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| json.dump({ |
| "name": self.name, |
| "config": self.config, |
| "splits": { |
| s: self.get_split_summary(s) for s in ["train", "validation", "test"] |
| }, |
| "curve": self.get_improvement_curve("test"), |
| "n_results": len(self.results), |
| }, f, indent=2) |
|
|
|
|
| class BenchmarkRunnerV2: |
| """ |
| Rigorous benchmark runner with train/val/test splits and ablation controls. |
| |
| Key guarantee: eval_test cases NEVER cause memory writes. |
| |
| Usage: |
| cases = [ |
| EvalCase(id="t1", input_purpose="...", split="train", ...), |
| EvalCase(id="t2", input_purpose="...", split="test", ...), |
| ] |
| runner = BenchmarkRunnerV2(orchestrator=orch) |
| result = runner.run(cases, train_iterations=3, eval_iterations=1) |
| print(result.summary()) |
| """ |
|
|
| def __init__( |
| self, |
| orchestrator: Orchestrator, |
| eval_port: EvalPort | None = None, |
| ): |
| self.orch = orchestrator |
| self.eval_port = eval_port or DictEvalPort() |
|
|
| def run( |
| self, |
| cases: list[EvalCase], |
| train_iterations: int = 3, |
| eval_iterations: int = 1, |
| name: str = "v2_benchmark", |
| ) -> V2BenchmarkResult: |
| """ |
| Run benchmark: train split with learning, test split without. |
| |
| 1. Train iterations: run train split cases with RunMode.LEARNING_TRAIN |
| 2. Validation: run validation split with RunMode.LEARNING_VALIDATION |
| 3. Test: run test split with RunMode.EVAL_TEST (no memory writes) |
| """ |
| result = V2BenchmarkResult(name=name, config={ |
| "train_iterations": train_iterations, |
| "eval_iterations": eval_iterations, |
| }) |
|
|
| train_cases = [c for c in cases if c.split == "train"] |
| val_cases = [c for c in cases if c.split == "validation"] |
| test_cases = [c for c in cases if c.split == "test"] |
|
|
| |
| for it in range(1, train_iterations + 1): |
| logger.info(f"Train iteration {it}/{train_iterations}") |
| for case in train_cases: |
| ev = self._run_case(case, it, RunMode.LEARNING_TRAIN) |
| result.results.append(ev) |
|
|
| |
| for case in val_cases: |
| ev = self._run_case(case, 1, RunMode.LEARNING_VALIDATION) |
| result.results.append(ev) |
|
|
| |
| for it in range(1, eval_iterations + 1): |
| logger.info(f"Test iteration {it}/{eval_iterations}") |
| for case in test_cases: |
| ev = self._run_case(case, it, RunMode.EVAL_TEST) |
| result.results.append(ev) |
|
|
| result.finished_at = time.time() |
| return result |
|
|
| def run_cold_warm( |
| self, |
| test_cases: list[EvalCase], |
| train_cases: list[EvalCase], |
| name: str = "cold_warm", |
| ) -> dict[str, Any]: |
| """Compare cold (no memory) vs warm (after training) on the same test set.""" |
| |
| cold_results = [] |
| for case in test_cases: |
| ev = self._run_case(case, 0, RunMode.EVAL_TEST) |
| cold_results.append(ev) |
| cold_pass = sum(1 for r in cold_results if r.bundle.passed) / max(len(cold_results), 1) |
|
|
| |
| for case in train_cases: |
| self._run_case(case, 1, RunMode.LEARNING_TRAIN) |
|
|
| |
| warm_results = [] |
| for case in test_cases: |
| ev = self._run_case(case, 1, RunMode.EVAL_TEST) |
| warm_results.append(ev) |
| warm_pass = sum(1 for r in warm_results if r.bundle.passed) / max(len(warm_results), 1) |
|
|
| delta = warm_pass - cold_pass |
| return { |
| "cold_pass_rate": round(cold_pass, 3), |
| "warm_pass_rate": round(warm_pass, 3), |
| "delta": round(delta, 3), |
| "improvement_significant": abs(delta) > 0.05, |
| } |
|
|
| def run_memory_ablation( |
| self, |
| test_cases: list[EvalCase], |
| ) -> dict[str, Any]: |
| """Run test cases with and without memory to measure memory contribution.""" |
| |
| with_results = [] |
| for case in test_cases: |
| ev = self._run_case(case, 1, RunMode.EVAL_TEST) |
| with_results.append(ev) |
| with_pass = sum(1 for r in with_results if r.bundle.passed) / max(len(with_results), 1) |
|
|
| |
| saved_lib = list(self.orch.optimizer.heuristic_library) |
| self.orch.optimizer.heuristic_library = [] |
| self.orch.sync_memory() |
|
|
| without_results = [] |
| for case in test_cases: |
| ev = self._run_case(case, 1, RunMode.EVAL_TEST) |
| without_results.append(ev) |
| without_pass = sum(1 for r in without_results if r.bundle.passed) / max(len(without_results), 1) |
|
|
| |
| self.orch.optimizer.heuristic_library = saved_lib |
| self.orch.sync_memory() |
|
|
| return { |
| "with_memory_pass_rate": round(with_pass, 3), |
| "without_memory_pass_rate": round(without_pass, 3), |
| "memory_contribution": round(with_pass - without_pass, 3), |
| } |
|
|
| def _run_case(self, case: EvalCase, iteration: int, mode: RunMode) -> V2EvalResult: |
| """Run a single case under a specific RunMode.""" |
| start = time.time() |
|
|
| |
| saved_optimize = self.orch.optimize_every_n_tasks |
| if mode.is_eval: |
| self.orch.optimize_every_n_tasks = 999999 |
|
|
| try: |
| task_result = self.orch.run_task( |
| purpose=case.input_purpose, |
| initial_state=State(data=case.input_state), |
| max_steps=case.max_steps, |
| ) |
| except Exception as e: |
| logger.error(f"Case {case.id} failed: {e}") |
| task_result = TaskResult( |
| trajectory=__import__("purpose_agent.types", fromlist=["Trajectory"]).Trajectory( |
| task_description=case.input_purpose, purpose=case.input_purpose, |
| ), |
| final_state=State(data={"_error": str(e)}), |
| ) |
|
|
| |
| self.orch.optimize_every_n_tasks = saved_optimize |
|
|
| |
| bundle = self.eval_port.evaluate( |
| case, task_result.final_state.data, task_result.trajectory, |
| ) |
|
|
| return V2EvalResult( |
| case_id=case.id, |
| iteration=iteration, |
| split=case.split, |
| bundle=bundle, |
| steps=task_result.total_steps, |
| wall_time_s=time.time() - start, |
| ) |
|
|