Spaces:
Sleeping
Sleeping
| """LandscapeForge OpenEnv environment — OptCoder REPL (Phase C). | |
| For v1 we ship OptCoder-only: LandscapeForge is a fixed template picker | |
| controlled by the env itself (uniform random over the tier menu). The agent | |
| acting through OpenEnv is OptCoder. | |
| Each `reset()` samples a new landscape from the current tier. Each `step()` | |
| executes one OptCoder action (run_baseline / draft / inspect / commit), | |
| mutates env state, and returns an Observation reflecting the new state. | |
| Episode ends when OptCoder commits or budget is exhausted. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| import numpy as np | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ( | |
| ACTION_COSTS, | |
| LandscapeforgeAction, | |
| LandscapeforgeObservation, | |
| ) | |
| from ..landscapes import ( | |
| TIER_MENU, | |
| Landscape, | |
| build_landscape, | |
| structural_hints, | |
| ) | |
| from ..reference_optimizers import run_baseline as run_reference_baseline | |
| from ..reference_optimizers import tune_adam_lr | |
| from ..sandbox import SandboxError, compile_optimizer | |
| from ..arena import ArenaResult, auto_test_draft, run_arena | |
| from ..rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward | |
| except ImportError: | |
| # Running from repo root or package layout quirks | |
| from models import ( # type: ignore | |
| ACTION_COSTS, | |
| LandscapeforgeAction, | |
| LandscapeforgeObservation, | |
| ) | |
| from landscapes import ( # type: ignore | |
| TIER_MENU, | |
| Landscape, | |
| build_landscape, | |
| structural_hints, | |
| ) | |
| from reference_optimizers import run_baseline as run_reference_baseline # type: ignore | |
| from reference_optimizers import tune_adam_lr # type: ignore | |
| from sandbox import SandboxError, compile_optimizer # type: ignore | |
| from arena import ArenaResult, auto_test_draft, run_arena # type: ignore | |
| from rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward # type: ignore | |
| BUDGET_TOTAL = 12 | |
| ARENA_SEEDS = [101, 202, 303, 404, 505, 606, 707, 808, 909, 1010] | |
| ARENA_STEPS = 200 | |
| BASELINE_STEPS = 30 # env-controlled; agent does not choose | |
| # Reference source blobs for AST novelty comparison (short pseudo-implementations). | |
| # Kept minimal — enough to detect "this commit is basically Adam". | |
| _REF_SGD = """ | |
| class Optimizer: | |
| def __init__(self, dim): self.lr = 0.01 | |
| def step(self, x, f, g): return x - self.lr * g | |
| """.strip() | |
| def _adam_source(lr: float) -> str: | |
| """Adam reference implementation parameterized by LR. | |
| Used by `_ensure_adam_arena` after LR tuning — the baseline is | |
| Adam-at-best-LR-for-this-landscape, not Adam-at-fixed-default. | |
| """ | |
| return f""" | |
| class Optimizer: | |
| def __init__(self, dim): | |
| self.lr = {lr} | |
| self.b1 = 0.9 | |
| self.b2 = 0.999 | |
| self.eps = 1e-8 | |
| self.m = np.zeros(dim) | |
| self.v = np.zeros(dim) | |
| self.t = 0 | |
| def step(self, x, f_val, g): | |
| self.t += 1 | |
| self.m = self.b1*self.m + (1-self.b1)*g | |
| self.v = self.b2*self.v + (1-self.b2)*g*g | |
| mh = self.m/(1-self.b1**self.t) | |
| vh = self.v/(1-self.b2**self.t) | |
| return x - self.lr * mh / (np.sqrt(vh) + self.eps) | |
| """.strip() | |
| # Frozen default-LR source used only for AST-novelty comparison (so r_novelty | |
| # measures "structurally different from Adam" regardless of the tuned LR). | |
| _REF_ADAM = _adam_source(0.001) | |
| _REF_MOMENTUM = """ | |
| class Optimizer: | |
| def __init__(self, dim): | |
| import numpy as np | |
| self.lr=0.01; self.beta=0.9; self.v = np.zeros(dim) | |
| def step(self, x, f, g): | |
| self.v = self.beta*self.v - self.lr*g | |
| return x + self.v | |
| """.strip() | |
| REFERENCE_SOURCES = [_REF_SGD, _REF_ADAM, _REF_MOMENTUM] | |
| class LandscapeforgeEnvironment(Environment): | |
| """OptCoder-facing OpenEnv environment. | |
| LandscapeForge is internal (template picker) in v1. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, tier: str = "T0", seed: int = 0): | |
| self._initial_tier = tier | |
| self._master_rng = np.random.default_rng(seed) | |
| self._reset_count = 0 | |
| self._tier = tier | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Populated by reset() | |
| self._landscape: Optional[Landscape] = None | |
| self._hints: dict = {} | |
| self._baseline_history: list[dict] = [] | |
| self._draft_history: list[dict] = [] | |
| self._draft_details: list[list[dict]] = [] # per-draft per-step detail | |
| self._inspect_requests: list[dict] = [] | |
| self._current_draft: Optional[str] = None | |
| self._budget_spent: int = 0 | |
| self._committed: bool = False | |
| self._final_obs: Optional[LandscapeforgeObservation] = None | |
| # Cache Adam's full arena result per episode (computed lazily, for | |
| # reward normalization via progress-based r_regret). The baseline is | |
| # Adam-at-tuned-LR — per-landscape LR is selected via a short sweep. | |
| self._adam_arena_cache: Optional[ArenaResult] = None | |
| self._adam_tuned_lr: Optional[float] = None | |
| # Stepwise feedback log (PBS delta + compile penalty). This is shown to | |
| # the LLM in the observation so it can course-correct mid-episode, but | |
| # NEVER added to the training scalar — final reward is purely terminal | |
| # arena reward (§9.1) for robustness against reward hacking. | |
| self._step_feedback_log: list[dict] = [] | |
| # ---------- OpenEnv API ---------- | |
| def reset(self) -> LandscapeforgeObservation: | |
| self._reset_count += 1 | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Pick a landscape from the current tier's menu. | |
| menu = TIER_MENU[self._tier] | |
| template = str(self._master_rng.choice(menu)) | |
| dim = int(self._master_rng.integers(2, 6)) # small dims for v1 | |
| params = self._sample_params(template) | |
| self._landscape = build_landscape( | |
| template=template, dim=dim, params=params, | |
| rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))), | |
| ) | |
| self._hints = structural_hints( | |
| self._landscape, | |
| rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))), | |
| ) | |
| # Wipe REPL state | |
| self._baseline_history = [] | |
| self._draft_history = [] | |
| self._draft_details = [] | |
| self._inspect_requests = [] | |
| self._current_draft = None | |
| self._budget_spent = 0 | |
| self._committed = False | |
| self._final_obs = None | |
| self._adam_arena_cache = None | |
| self._adam_tuned_lr = None | |
| self._step_feedback_log = [] | |
| return self._make_observation( | |
| last_kind=None, last_result={"reset": True}, done=False, reward=0.0, | |
| ) | |
| def step(self, action: LandscapeforgeAction) -> LandscapeforgeObservation: # type: ignore[override] | |
| if self._landscape is None: | |
| raise RuntimeError("step() called before reset()") | |
| if self._committed: | |
| # Episode already done; return terminal obs. | |
| assert self._final_obs is not None | |
| return self._final_obs | |
| self._state.step_count += 1 | |
| cost = ACTION_COSTS[action.kind] | |
| # Charge budget first so over-limit actions are rejected. | |
| if self._budget_spent + cost > BUDGET_TOTAL and action.kind != "commit": | |
| return self._force_commit(reason="budget_exhausted") | |
| self._budget_spent += cost | |
| # Snapshot draft history for PBS computation | |
| prev_draft_history_snapshot = list(self._draft_history) | |
| if action.kind == "run_baseline": | |
| result = self._do_run_baseline(action) | |
| elif action.kind == "draft": | |
| result = self._do_draft(action) | |
| elif action.kind == "inspect": | |
| result = self._do_inspect(action) | |
| elif action.kind == "commit": | |
| return self._do_commit() | |
| else: | |
| raise ValueError(f"Unknown action kind: {action.kind}") | |
| # Compute stepwise FEEDBACK (NOT reward). Signals the LLM can use to | |
| # course-correct mid-episode — exposed through last_action_result. | |
| # Explicitly NOT summed into training reward; terminal arena reward | |
| # is the only signal GRPO sees (robust against reward hacking). | |
| step_feedback = compute_step_reward( | |
| prev_draft_history=prev_draft_history_snapshot, | |
| new_draft_history=self._draft_history, | |
| action_kind=action.kind, | |
| action_result=result, | |
| ) | |
| if step_feedback["breakdown"]: | |
| entry = { | |
| "turn": self._state.step_count, | |
| "action_kind": action.kind, | |
| **step_feedback["breakdown"], | |
| } | |
| self._step_feedback_log.append(entry) | |
| # Surface on this turn's action result so the LLM sees it immediately. | |
| result = {**result, "feedback": step_feedback["breakdown"]} | |
| # Check if budget now exhausted; if so, auto-commit. | |
| if self._budget_spent >= BUDGET_TOTAL: | |
| return self._force_commit(reason="budget_exhausted") | |
| return self._make_observation( | |
| last_kind=action.kind, last_result=result, | |
| done=False, reward=0.0, # no reward on non-terminal steps | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| # ---------- Action handlers ---------- | |
| def _do_run_baseline(self, action: LandscapeforgeAction) -> dict: | |
| assert self._landscape is not None | |
| # Fixed init AND fixed step count for baseline comparability across | |
| # episodes and rollouts (important for GRPO group-relative advantages). | |
| rng = np.random.default_rng(42) | |
| x0 = rng.normal(0.0, 0.5, size=self._landscape.dim) | |
| result = run_reference_baseline( | |
| name=action.baseline_name, f=self._landscape.f, grad=self._landscape.grad, | |
| x0=x0, steps=BASELINE_STEPS, | |
| ) | |
| self._baseline_history.append(result) | |
| return { | |
| "baseline_index": len(self._baseline_history) - 1, | |
| "name": result["name"], | |
| "n_steps": len(result["trajectory"]), | |
| "final_f": (result["trajectory"][-1]["f"] | |
| if result["trajectory"] and result["trajectory"][-1]["f"] is not None | |
| else None), | |
| } | |
| def _do_draft(self, action: LandscapeforgeAction) -> dict: | |
| assert self._landscape is not None | |
| code = action.code or "" | |
| self._current_draft = code | |
| try: | |
| opt = compile_optimizer(code, dim=self._landscape.dim) | |
| except SandboxError as e: | |
| # Record failed draft; still counts toward history for inspect. | |
| self._draft_history.append({ | |
| "code": code, | |
| "compile_error": str(e), | |
| "summary": {"converged": False, "diverged": True, "error": str(e), | |
| "final_f": None, "step_of_min": None, "min_f": None}, | |
| }) | |
| self._draft_details.append([]) | |
| return {"draft_index": len(self._draft_history) - 1, | |
| "compile_error": str(e), "summary": None} | |
| test = auto_test_draft(opt, self._landscape, seed=0, steps=20) | |
| self._draft_history.append({ | |
| "code": code, | |
| "compile_error": None, | |
| "summary": test["summary"], | |
| }) | |
| self._draft_details.append(test["detail"]) | |
| return {"draft_index": len(self._draft_history) - 1, | |
| "compile_error": None, "summary": test["summary"]} | |
| def _do_inspect(self, action: LandscapeforgeAction) -> dict: | |
| idx = action.draft_idx | |
| if idx is None or idx < 0 or idx >= len(self._draft_details): | |
| return {"error": f"draft_idx {idx} out of range (have {len(self._draft_details)} drafts)"} | |
| detail = self._draft_details[idx] | |
| start = action.step_range_start | |
| end = min(action.step_range_end, len(detail)) | |
| sliced = detail[start:end] | |
| record = { | |
| "draft_idx": idx, | |
| "step_range": [start, end], | |
| "detail": sliced, | |
| } | |
| self._inspect_requests.append(record) | |
| return {"draft_idx": idx, "step_range": [start, end], "n_steps": len(sliced)} | |
| def _do_commit(self) -> LandscapeforgeObservation: | |
| return self._finalize_episode(reason="commit") | |
| def _force_commit(self, reason: str) -> LandscapeforgeObservation: | |
| return self._finalize_episode(reason=reason) | |
| # ---------- Episode finalization ---------- | |
| def _finalize_episode(self, reason: str) -> LandscapeforgeObservation: | |
| assert self._landscape is not None | |
| self._committed = True | |
| # Need a current_draft. If none, produce a worst-case result. | |
| if not self._current_draft: | |
| result = { | |
| "reason": reason, | |
| "no_draft": True, | |
| "final_regret": 1.0, | |
| } | |
| r_total = -1.0 | |
| breakdown = {"no_draft": 1.0} | |
| obs = self._make_observation( | |
| last_kind="commit", last_result=result, | |
| done=True, reward=r_total, | |
| ) | |
| obs.committed = True | |
| obs.final_regret = 1.0 | |
| obs.r_optcoder = r_total | |
| obs.r_optcoder_breakdown = breakdown | |
| self._final_obs = obs | |
| return obs | |
| # Full Phase-D arena eval | |
| try: | |
| opt = compile_optimizer(self._current_draft, dim=self._landscape.dim) | |
| arena = run_arena(opt, self._landscape, seeds=ARENA_SEEDS, steps=ARENA_STEPS) | |
| except SandboxError as e: | |
| # Committed code fails to compile -> worst-case result | |
| arena = ArenaResult( | |
| initial_values=[1.0] * len(ARENA_SEEDS), | |
| final_values=[float("nan")] * len(ARENA_SEEDS), | |
| crashed=[True] * len(ARENA_SEEDS), | |
| trajectories=[[] for _ in ARENA_SEEDS], | |
| ) | |
| # Adam baseline arena for normalization (always run for reward stability). | |
| adam_arena = self._ensure_adam_arena() | |
| novelty = ast_novelty_score(self._current_draft, REFERENCE_SOURCES) | |
| # Convergence step: first seed's trajectory, first step where f < 0.01 * f0 | |
| convergence_step = self._compute_convergence_step(arena) | |
| reward = compute_optcoder_reward( | |
| arena=arena, | |
| adam_arena=adam_arena, | |
| actions_used_cost=self._budget_spent, | |
| budget_total=BUDGET_TOTAL, | |
| novelty_score=novelty, | |
| convergence_step=convergence_step, | |
| arena_steps=ARENA_STEPS, | |
| ) | |
| result = { | |
| "reason": reason, | |
| "my_mean_progress": arena.mean_progress, | |
| "adam_mean_progress": adam_arena.mean_progress, | |
| "adam_tuned_lr": self._adam_tuned_lr, | |
| "speedup_vs_adam": reward.breakdown.get("speedup_vs_adam"), | |
| "crash_fraction": arena.crash_fraction, | |
| "novelty_score": novelty, | |
| "convergence_step": convergence_step, | |
| } | |
| obs = self._make_observation( | |
| last_kind="commit", last_result=result, | |
| done=True, reward=reward.r_total, | |
| ) | |
| obs.committed = True | |
| # `final_regret` is reinterpreted (no f_min dependency): Adam-shortfall | |
| # in [0, 1]. 0 = matched or beat Adam's descent; 1 = made zero progress | |
| # while Adam descended normally. Capped at 1. | |
| speedup = reward.breakdown.get("speedup_vs_adam", 0.0) | |
| obs.final_regret = float(max(0.0, min(1.0, 1.0 - speedup))) | |
| obs.r_optcoder = reward.r_total | |
| obs.r_optcoder_breakdown = reward.breakdown | |
| self._final_obs = obs | |
| return obs | |
| # ---------- Helpers ---------- | |
| def _make_observation(self, last_kind: Optional[str], last_result: dict, | |
| done: bool, reward: float) -> LandscapeforgeObservation: | |
| assert self._landscape is not None | |
| return LandscapeforgeObservation( | |
| landscape_description=self._landscape.description, | |
| dim=self._landscape.dim, | |
| structural_hints=self._hints, | |
| baseline_history=self._serialize_baseline_history(), | |
| draft_history=self._serialize_draft_history(), | |
| inspect_requests=list(self._inspect_requests), | |
| current_draft=self._current_draft, | |
| budget_remaining=BUDGET_TOTAL - self._budget_spent, | |
| last_action_kind=last_kind, | |
| last_action_result=last_result, | |
| done=done, | |
| reward=reward, | |
| ) | |
| def _serialize_baseline_history(self) -> list[dict]: | |
| # Trim trajectory to summary-friendly size (every step, x as list). | |
| return [ | |
| {"name": b["name"], "trajectory": b["trajectory"]} | |
| for b in self._baseline_history | |
| ] | |
| def _serialize_draft_history(self) -> list[dict]: | |
| # For the observation we include code + summary per draft. | |
| return [ | |
| {"code": d["code"], "summary": d["summary"], "compile_error": d["compile_error"]} | |
| for d in self._draft_history | |
| ] | |
| def _sample_params(self, template: str) -> dict: | |
| rng = self._master_rng | |
| if template == "quadratic": | |
| # T0 uses cond up to 100; T1 up to 1000; T2 higher. | |
| cap = {"T0": 100.0, "T1": 1000.0, "T2": 10_000.0}[self._tier] | |
| return {"cond": float(rng.uniform(1.0, cap))} | |
| if template == "gaussian_mix": | |
| return { | |
| "k": int(rng.integers(2, 6)), | |
| "sigma": float(rng.uniform(0.3, 1.0)), | |
| "spread": float(rng.uniform(1.0, 4.0)), | |
| } | |
| if template == "huber": | |
| return {"delta": float(rng.uniform(0.5, 2.0))} | |
| return {} | |
| def _ensure_adam_arena(self) -> ArenaResult: | |
| """Build the Adam baseline, FAIRLY — LR is tuned per landscape before | |
| running the arena. The tuning uses a short 30-step sweep on a dedicated | |
| seed (not one of the arena seeds) to avoid overfitting. | |
| Cached per episode in `_adam_arena_cache`. Tuned LR is stored in | |
| `_adam_tuned_lr` for logging / demo surfacing. | |
| """ | |
| if self._adam_arena_cache is not None: | |
| return self._adam_arena_cache | |
| assert self._landscape is not None | |
| try: | |
| # Tune LR on seed 0 (not in ARENA_SEEDS), 30-step sweep. | |
| tune_rng = np.random.default_rng(0) | |
| tune_x0 = tune_rng.normal(0.0, 0.5, size=self._landscape.dim) | |
| best_lr = tune_adam_lr( | |
| f=self._landscape.f, grad=self._landscape.grad, | |
| x0=tune_x0, sweep_steps=30, | |
| ) | |
| self._adam_tuned_lr = best_lr | |
| adam_opt = compile_optimizer(_adam_source(best_lr), dim=self._landscape.dim) | |
| self._adam_arena_cache = run_arena( | |
| adam_opt, self._landscape, | |
| seeds=ARENA_SEEDS, steps=ARENA_STEPS, | |
| ) | |
| except Exception: | |
| self._adam_tuned_lr = None | |
| self._adam_arena_cache = ArenaResult( | |
| initial_values=[1.0] * len(ARENA_SEEDS), | |
| final_values=[1.0] * len(ARENA_SEEDS), | |
| crashed=[True] * len(ARENA_SEEDS), | |
| trajectories=[[] for _ in ARENA_SEEDS], | |
| ) | |
| return self._adam_arena_cache | |
| def _compute_convergence_step(self, arena) -> Optional[int]: | |
| """First step on first seed where f < 1% of initial f.""" | |
| if not arena.trajectories or not arena.trajectories[0]: | |
| return None | |
| traj = arena.trajectories[0] | |
| if not traj: | |
| return None | |
| f0 = traj[0]["f"] | |
| if f0 <= 0: | |
| return None | |
| threshold = 0.01 * f0 | |
| for t, snap in enumerate(traj): | |
| if snap["f"] < threshold: | |
| return t | |
| return None | |
| # ---------- Tier advancement API (used by trainer, not agent) ---------- | |
| def advance_tier(self, new_tier: str) -> None: | |
| if new_tier not in TIER_MENU: | |
| raise ValueError(f"Unknown tier {new_tier}") | |
| self._tier = new_tier | |