"""LandscapeForge OpenEnv environment — OptCoder REPL (Phase C). For v1 we ship OptCoder-only: LandscapeForge is a fixed template picker controlled by the env itself (uniform random over the tier menu). The agent acting through OpenEnv is OptCoder. Each `reset()` samples a new landscape from the current tier. Each `step()` executes one OptCoder action (run_baseline / draft / inspect / commit), mutates env state, and returns an Observation reflecting the new state. Episode ends when OptCoder commits or budget is exhausted. """ from __future__ import annotations from typing import Any, Optional from uuid import uuid4 import numpy as np from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State try: from ..models import ( ACTION_COSTS, LandscapeforgeAction, LandscapeforgeObservation, ) from ..landscapes import ( TIER_MENU, Landscape, build_landscape, structural_hints, ) from ..reference_optimizers import run_baseline as run_reference_baseline from ..reference_optimizers import tune_adam_lr from ..sandbox import SandboxError, compile_optimizer from ..arena import ArenaResult, auto_test_draft, run_arena from ..rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward except ImportError: # Running from repo root or package layout quirks from models import ( # type: ignore ACTION_COSTS, LandscapeforgeAction, LandscapeforgeObservation, ) from landscapes import ( # type: ignore TIER_MENU, Landscape, build_landscape, structural_hints, ) from reference_optimizers import run_baseline as run_reference_baseline # type: ignore from reference_optimizers import tune_adam_lr # type: ignore from sandbox import SandboxError, compile_optimizer # type: ignore from arena import ArenaResult, auto_test_draft, run_arena # type: ignore from rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward # type: ignore BUDGET_TOTAL = 12 ARENA_SEEDS = [101, 202, 303, 404, 505, 606, 707, 808, 909, 1010] ARENA_STEPS = 200 BASELINE_STEPS = 30 # env-controlled; agent does not choose # Reference source blobs for AST novelty comparison (short pseudo-implementations). # Kept minimal — enough to detect "this commit is basically Adam". _REF_SGD = """ class Optimizer: def __init__(self, dim): self.lr = 0.01 def step(self, x, f, g): return x - self.lr * g """.strip() def _adam_source(lr: float) -> str: """Adam reference implementation parameterized by LR. Used by `_ensure_adam_arena` after LR tuning — the baseline is Adam-at-best-LR-for-this-landscape, not Adam-at-fixed-default. """ return f""" class Optimizer: def __init__(self, dim): self.lr = {lr} self.b1 = 0.9 self.b2 = 0.999 self.eps = 1e-8 self.m = np.zeros(dim) self.v = np.zeros(dim) self.t = 0 def step(self, x, f_val, g): self.t += 1 self.m = self.b1*self.m + (1-self.b1)*g self.v = self.b2*self.v + (1-self.b2)*g*g mh = self.m/(1-self.b1**self.t) vh = self.v/(1-self.b2**self.t) return x - self.lr * mh / (np.sqrt(vh) + self.eps) """.strip() # Frozen default-LR source used only for AST-novelty comparison (so r_novelty # measures "structurally different from Adam" regardless of the tuned LR). _REF_ADAM = _adam_source(0.001) _REF_MOMENTUM = """ class Optimizer: def __init__(self, dim): import numpy as np self.lr=0.01; self.beta=0.9; self.v = np.zeros(dim) def step(self, x, f, g): self.v = self.beta*self.v - self.lr*g return x + self.v """.strip() REFERENCE_SOURCES = [_REF_SGD, _REF_ADAM, _REF_MOMENTUM] class LandscapeforgeEnvironment(Environment): """OptCoder-facing OpenEnv environment. LandscapeForge is internal (template picker) in v1. """ SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self, tier: str = "T0", seed: int = 0): self._initial_tier = tier self._master_rng = np.random.default_rng(seed) self._reset_count = 0 self._tier = tier self._state = State(episode_id=str(uuid4()), step_count=0) # Populated by reset() self._landscape: Optional[Landscape] = None self._hints: dict = {} self._baseline_history: list[dict] = [] self._draft_history: list[dict] = [] self._draft_details: list[list[dict]] = [] # per-draft per-step detail self._inspect_requests: list[dict] = [] self._current_draft: Optional[str] = None self._budget_spent: int = 0 self._committed: bool = False self._final_obs: Optional[LandscapeforgeObservation] = None # Cache Adam's full arena result per episode (computed lazily, for # reward normalization via progress-based r_regret). The baseline is # Adam-at-tuned-LR — per-landscape LR is selected via a short sweep. self._adam_arena_cache: Optional[ArenaResult] = None self._adam_tuned_lr: Optional[float] = None # Stepwise feedback log (PBS delta + compile penalty). This is shown to # the LLM in the observation so it can course-correct mid-episode, but # NEVER added to the training scalar — final reward is purely terminal # arena reward (§9.1) for robustness against reward hacking. self._step_feedback_log: list[dict] = [] # ---------- OpenEnv API ---------- def reset(self) -> LandscapeforgeObservation: self._reset_count += 1 self._state = State(episode_id=str(uuid4()), step_count=0) # Pick a landscape from the current tier's menu. menu = TIER_MENU[self._tier] template = str(self._master_rng.choice(menu)) dim = int(self._master_rng.integers(2, 6)) # small dims for v1 params = self._sample_params(template) self._landscape = build_landscape( template=template, dim=dim, params=params, rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))), ) self._hints = structural_hints( self._landscape, rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))), ) # Wipe REPL state self._baseline_history = [] self._draft_history = [] self._draft_details = [] self._inspect_requests = [] self._current_draft = None self._budget_spent = 0 self._committed = False self._final_obs = None self._adam_arena_cache = None self._adam_tuned_lr = None self._step_feedback_log = [] return self._make_observation( last_kind=None, last_result={"reset": True}, done=False, reward=0.0, ) def step(self, action: LandscapeforgeAction) -> LandscapeforgeObservation: # type: ignore[override] if self._landscape is None: raise RuntimeError("step() called before reset()") if self._committed: # Episode already done; return terminal obs. assert self._final_obs is not None return self._final_obs self._state.step_count += 1 cost = ACTION_COSTS[action.kind] # Charge budget first so over-limit actions are rejected. if self._budget_spent + cost > BUDGET_TOTAL and action.kind != "commit": return self._force_commit(reason="budget_exhausted") self._budget_spent += cost # Snapshot draft history for PBS computation prev_draft_history_snapshot = list(self._draft_history) if action.kind == "run_baseline": result = self._do_run_baseline(action) elif action.kind == "draft": result = self._do_draft(action) elif action.kind == "inspect": result = self._do_inspect(action) elif action.kind == "commit": return self._do_commit() else: raise ValueError(f"Unknown action kind: {action.kind}") # Compute stepwise FEEDBACK (NOT reward). Signals the LLM can use to # course-correct mid-episode — exposed through last_action_result. # Explicitly NOT summed into training reward; terminal arena reward # is the only signal GRPO sees (robust against reward hacking). step_feedback = compute_step_reward( prev_draft_history=prev_draft_history_snapshot, new_draft_history=self._draft_history, action_kind=action.kind, action_result=result, ) if step_feedback["breakdown"]: entry = { "turn": self._state.step_count, "action_kind": action.kind, **step_feedback["breakdown"], } self._step_feedback_log.append(entry) # Surface on this turn's action result so the LLM sees it immediately. result = {**result, "feedback": step_feedback["breakdown"]} # Check if budget now exhausted; if so, auto-commit. if self._budget_spent >= BUDGET_TOTAL: return self._force_commit(reason="budget_exhausted") return self._make_observation( last_kind=action.kind, last_result=result, done=False, reward=0.0, # no reward on non-terminal steps ) @property def state(self) -> State: return self._state # ---------- Action handlers ---------- def _do_run_baseline(self, action: LandscapeforgeAction) -> dict: assert self._landscape is not None # Fixed init AND fixed step count for baseline comparability across # episodes and rollouts (important for GRPO group-relative advantages). rng = np.random.default_rng(42) x0 = rng.normal(0.0, 0.5, size=self._landscape.dim) result = run_reference_baseline( name=action.baseline_name, f=self._landscape.f, grad=self._landscape.grad, x0=x0, steps=BASELINE_STEPS, ) self._baseline_history.append(result) return { "baseline_index": len(self._baseline_history) - 1, "name": result["name"], "n_steps": len(result["trajectory"]), "final_f": (result["trajectory"][-1]["f"] if result["trajectory"] and result["trajectory"][-1]["f"] is not None else None), } def _do_draft(self, action: LandscapeforgeAction) -> dict: assert self._landscape is not None code = action.code or "" self._current_draft = code try: opt = compile_optimizer(code, dim=self._landscape.dim) except SandboxError as e: # Record failed draft; still counts toward history for inspect. self._draft_history.append({ "code": code, "compile_error": str(e), "summary": {"converged": False, "diverged": True, "error": str(e), "final_f": None, "step_of_min": None, "min_f": None}, }) self._draft_details.append([]) return {"draft_index": len(self._draft_history) - 1, "compile_error": str(e), "summary": None} test = auto_test_draft(opt, self._landscape, seed=0, steps=20) self._draft_history.append({ "code": code, "compile_error": None, "summary": test["summary"], }) self._draft_details.append(test["detail"]) return {"draft_index": len(self._draft_history) - 1, "compile_error": None, "summary": test["summary"]} def _do_inspect(self, action: LandscapeforgeAction) -> dict: idx = action.draft_idx if idx is None or idx < 0 or idx >= len(self._draft_details): return {"error": f"draft_idx {idx} out of range (have {len(self._draft_details)} drafts)"} detail = self._draft_details[idx] start = action.step_range_start end = min(action.step_range_end, len(detail)) sliced = detail[start:end] record = { "draft_idx": idx, "step_range": [start, end], "detail": sliced, } self._inspect_requests.append(record) return {"draft_idx": idx, "step_range": [start, end], "n_steps": len(sliced)} def _do_commit(self) -> LandscapeforgeObservation: return self._finalize_episode(reason="commit") def _force_commit(self, reason: str) -> LandscapeforgeObservation: return self._finalize_episode(reason=reason) # ---------- Episode finalization ---------- def _finalize_episode(self, reason: str) -> LandscapeforgeObservation: assert self._landscape is not None self._committed = True # Need a current_draft. If none, produce a worst-case result. if not self._current_draft: result = { "reason": reason, "no_draft": True, "final_regret": 1.0, } r_total = -1.0 breakdown = {"no_draft": 1.0} obs = self._make_observation( last_kind="commit", last_result=result, done=True, reward=r_total, ) obs.committed = True obs.final_regret = 1.0 obs.r_optcoder = r_total obs.r_optcoder_breakdown = breakdown self._final_obs = obs return obs # Full Phase-D arena eval try: opt = compile_optimizer(self._current_draft, dim=self._landscape.dim) arena = run_arena(opt, self._landscape, seeds=ARENA_SEEDS, steps=ARENA_STEPS) except SandboxError as e: # Committed code fails to compile -> worst-case result arena = ArenaResult( initial_values=[1.0] * len(ARENA_SEEDS), final_values=[float("nan")] * len(ARENA_SEEDS), crashed=[True] * len(ARENA_SEEDS), trajectories=[[] for _ in ARENA_SEEDS], ) # Adam baseline arena for normalization (always run for reward stability). adam_arena = self._ensure_adam_arena() novelty = ast_novelty_score(self._current_draft, REFERENCE_SOURCES) # Convergence step: first seed's trajectory, first step where f < 0.01 * f0 convergence_step = self._compute_convergence_step(arena) reward = compute_optcoder_reward( arena=arena, adam_arena=adam_arena, actions_used_cost=self._budget_spent, budget_total=BUDGET_TOTAL, novelty_score=novelty, convergence_step=convergence_step, arena_steps=ARENA_STEPS, ) result = { "reason": reason, "my_mean_progress": arena.mean_progress, "adam_mean_progress": adam_arena.mean_progress, "adam_tuned_lr": self._adam_tuned_lr, "speedup_vs_adam": reward.breakdown.get("speedup_vs_adam"), "crash_fraction": arena.crash_fraction, "novelty_score": novelty, "convergence_step": convergence_step, } obs = self._make_observation( last_kind="commit", last_result=result, done=True, reward=reward.r_total, ) obs.committed = True # `final_regret` is reinterpreted (no f_min dependency): Adam-shortfall # in [0, 1]. 0 = matched or beat Adam's descent; 1 = made zero progress # while Adam descended normally. Capped at 1. speedup = reward.breakdown.get("speedup_vs_adam", 0.0) obs.final_regret = float(max(0.0, min(1.0, 1.0 - speedup))) obs.r_optcoder = reward.r_total obs.r_optcoder_breakdown = reward.breakdown self._final_obs = obs return obs # ---------- Helpers ---------- def _make_observation(self, last_kind: Optional[str], last_result: dict, done: bool, reward: float) -> LandscapeforgeObservation: assert self._landscape is not None return LandscapeforgeObservation( landscape_description=self._landscape.description, dim=self._landscape.dim, structural_hints=self._hints, baseline_history=self._serialize_baseline_history(), draft_history=self._serialize_draft_history(), inspect_requests=list(self._inspect_requests), current_draft=self._current_draft, budget_remaining=BUDGET_TOTAL - self._budget_spent, last_action_kind=last_kind, last_action_result=last_result, done=done, reward=reward, ) def _serialize_baseline_history(self) -> list[dict]: # Trim trajectory to summary-friendly size (every step, x as list). return [ {"name": b["name"], "trajectory": b["trajectory"]} for b in self._baseline_history ] def _serialize_draft_history(self) -> list[dict]: # For the observation we include code + summary per draft. return [ {"code": d["code"], "summary": d["summary"], "compile_error": d["compile_error"]} for d in self._draft_history ] def _sample_params(self, template: str) -> dict: rng = self._master_rng if template == "quadratic": # T0 uses cond up to 100; T1 up to 1000; T2 higher. cap = {"T0": 100.0, "T1": 1000.0, "T2": 10_000.0}[self._tier] return {"cond": float(rng.uniform(1.0, cap))} if template == "gaussian_mix": return { "k": int(rng.integers(2, 6)), "sigma": float(rng.uniform(0.3, 1.0)), "spread": float(rng.uniform(1.0, 4.0)), } if template == "huber": return {"delta": float(rng.uniform(0.5, 2.0))} return {} def _ensure_adam_arena(self) -> ArenaResult: """Build the Adam baseline, FAIRLY — LR is tuned per landscape before running the arena. The tuning uses a short 30-step sweep on a dedicated seed (not one of the arena seeds) to avoid overfitting. Cached per episode in `_adam_arena_cache`. Tuned LR is stored in `_adam_tuned_lr` for logging / demo surfacing. """ if self._adam_arena_cache is not None: return self._adam_arena_cache assert self._landscape is not None try: # Tune LR on seed 0 (not in ARENA_SEEDS), 30-step sweep. tune_rng = np.random.default_rng(0) tune_x0 = tune_rng.normal(0.0, 0.5, size=self._landscape.dim) best_lr = tune_adam_lr( f=self._landscape.f, grad=self._landscape.grad, x0=tune_x0, sweep_steps=30, ) self._adam_tuned_lr = best_lr adam_opt = compile_optimizer(_adam_source(best_lr), dim=self._landscape.dim) self._adam_arena_cache = run_arena( adam_opt, self._landscape, seeds=ARENA_SEEDS, steps=ARENA_STEPS, ) except Exception: self._adam_tuned_lr = None self._adam_arena_cache = ArenaResult( initial_values=[1.0] * len(ARENA_SEEDS), final_values=[1.0] * len(ARENA_SEEDS), crashed=[True] * len(ARENA_SEEDS), trajectories=[[] for _ in ARENA_SEEDS], ) return self._adam_arena_cache def _compute_convergence_step(self, arena) -> Optional[int]: """First step on first seed where f < 1% of initial f.""" if not arena.trajectories or not arena.trajectories[0]: return None traj = arena.trajectories[0] if not traj: return None f0 = traj[0]["f"] if f0 <= 0: return None threshold = 0.01 * f0 for t, snap in enumerate(traj): if snap["f"] < threshold: return t return None # ---------- Tier advancement API (used by trainer, not agent) ---------- def advance_tier(self, new_tier: str) -> None: if new_tier not in TIER_MENU: raise ValueError(f"Unknown tier {new_tier}") self._tier = new_tier