landscapeforge / server /landscapeforge_environment.py
mnawfal29's picture
Upload folder using huggingface_hub
b0b140b verified
"""LandscapeForge OpenEnv environment — OptCoder REPL (Phase C).
For v1 we ship OptCoder-only: LandscapeForge is a fixed template picker
controlled by the env itself (uniform random over the tier menu). The agent
acting through OpenEnv is OptCoder.
Each `reset()` samples a new landscape from the current tier. Each `step()`
executes one OptCoder action (run_baseline / draft / inspect / commit),
mutates env state, and returns an Observation reflecting the new state.
Episode ends when OptCoder commits or budget is exhausted.
"""
from __future__ import annotations
from typing import Any, Optional
from uuid import uuid4
import numpy as np
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import (
ACTION_COSTS,
LandscapeforgeAction,
LandscapeforgeObservation,
)
from ..landscapes import (
TIER_MENU,
Landscape,
build_landscape,
structural_hints,
)
from ..reference_optimizers import run_baseline as run_reference_baseline
from ..reference_optimizers import tune_adam_lr
from ..sandbox import SandboxError, compile_optimizer
from ..arena import ArenaResult, auto_test_draft, run_arena
from ..rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward
except ImportError:
# Running from repo root or package layout quirks
from models import ( # type: ignore
ACTION_COSTS,
LandscapeforgeAction,
LandscapeforgeObservation,
)
from landscapes import ( # type: ignore
TIER_MENU,
Landscape,
build_landscape,
structural_hints,
)
from reference_optimizers import run_baseline as run_reference_baseline # type: ignore
from reference_optimizers import tune_adam_lr # type: ignore
from sandbox import SandboxError, compile_optimizer # type: ignore
from arena import ArenaResult, auto_test_draft, run_arena # type: ignore
from rewards import ast_novelty_score, compute_optcoder_reward, compute_step_reward # type: ignore
BUDGET_TOTAL = 12
ARENA_SEEDS = [101, 202, 303, 404, 505, 606, 707, 808, 909, 1010]
ARENA_STEPS = 200
BASELINE_STEPS = 30 # env-controlled; agent does not choose
# Reference source blobs for AST novelty comparison (short pseudo-implementations).
# Kept minimal — enough to detect "this commit is basically Adam".
_REF_SGD = """
class Optimizer:
def __init__(self, dim): self.lr = 0.01
def step(self, x, f, g): return x - self.lr * g
""".strip()
def _adam_source(lr: float) -> str:
"""Adam reference implementation parameterized by LR.
Used by `_ensure_adam_arena` after LR tuning — the baseline is
Adam-at-best-LR-for-this-landscape, not Adam-at-fixed-default.
"""
return f"""
class Optimizer:
def __init__(self, dim):
self.lr = {lr}
self.b1 = 0.9
self.b2 = 0.999
self.eps = 1e-8
self.m = np.zeros(dim)
self.v = np.zeros(dim)
self.t = 0
def step(self, x, f_val, g):
self.t += 1
self.m = self.b1*self.m + (1-self.b1)*g
self.v = self.b2*self.v + (1-self.b2)*g*g
mh = self.m/(1-self.b1**self.t)
vh = self.v/(1-self.b2**self.t)
return x - self.lr * mh / (np.sqrt(vh) + self.eps)
""".strip()
# Frozen default-LR source used only for AST-novelty comparison (so r_novelty
# measures "structurally different from Adam" regardless of the tuned LR).
_REF_ADAM = _adam_source(0.001)
_REF_MOMENTUM = """
class Optimizer:
def __init__(self, dim):
import numpy as np
self.lr=0.01; self.beta=0.9; self.v = np.zeros(dim)
def step(self, x, f, g):
self.v = self.beta*self.v - self.lr*g
return x + self.v
""".strip()
REFERENCE_SOURCES = [_REF_SGD, _REF_ADAM, _REF_MOMENTUM]
class LandscapeforgeEnvironment(Environment):
"""OptCoder-facing OpenEnv environment.
LandscapeForge is internal (template picker) in v1.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self, tier: str = "T0", seed: int = 0):
self._initial_tier = tier
self._master_rng = np.random.default_rng(seed)
self._reset_count = 0
self._tier = tier
self._state = State(episode_id=str(uuid4()), step_count=0)
# Populated by reset()
self._landscape: Optional[Landscape] = None
self._hints: dict = {}
self._baseline_history: list[dict] = []
self._draft_history: list[dict] = []
self._draft_details: list[list[dict]] = [] # per-draft per-step detail
self._inspect_requests: list[dict] = []
self._current_draft: Optional[str] = None
self._budget_spent: int = 0
self._committed: bool = False
self._final_obs: Optional[LandscapeforgeObservation] = None
# Cache Adam's full arena result per episode (computed lazily, for
# reward normalization via progress-based r_regret). The baseline is
# Adam-at-tuned-LR — per-landscape LR is selected via a short sweep.
self._adam_arena_cache: Optional[ArenaResult] = None
self._adam_tuned_lr: Optional[float] = None
# Stepwise feedback log (PBS delta + compile penalty). This is shown to
# the LLM in the observation so it can course-correct mid-episode, but
# NEVER added to the training scalar — final reward is purely terminal
# arena reward (§9.1) for robustness against reward hacking.
self._step_feedback_log: list[dict] = []
# ---------- OpenEnv API ----------
def reset(self) -> LandscapeforgeObservation:
self._reset_count += 1
self._state = State(episode_id=str(uuid4()), step_count=0)
# Pick a landscape from the current tier's menu.
menu = TIER_MENU[self._tier]
template = str(self._master_rng.choice(menu))
dim = int(self._master_rng.integers(2, 6)) # small dims for v1
params = self._sample_params(template)
self._landscape = build_landscape(
template=template, dim=dim, params=params,
rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))),
)
self._hints = structural_hints(
self._landscape,
rng=np.random.default_rng(int(self._master_rng.integers(0, 2**31))),
)
# Wipe REPL state
self._baseline_history = []
self._draft_history = []
self._draft_details = []
self._inspect_requests = []
self._current_draft = None
self._budget_spent = 0
self._committed = False
self._final_obs = None
self._adam_arena_cache = None
self._adam_tuned_lr = None
self._step_feedback_log = []
return self._make_observation(
last_kind=None, last_result={"reset": True}, done=False, reward=0.0,
)
def step(self, action: LandscapeforgeAction) -> LandscapeforgeObservation: # type: ignore[override]
if self._landscape is None:
raise RuntimeError("step() called before reset()")
if self._committed:
# Episode already done; return terminal obs.
assert self._final_obs is not None
return self._final_obs
self._state.step_count += 1
cost = ACTION_COSTS[action.kind]
# Charge budget first so over-limit actions are rejected.
if self._budget_spent + cost > BUDGET_TOTAL and action.kind != "commit":
return self._force_commit(reason="budget_exhausted")
self._budget_spent += cost
# Snapshot draft history for PBS computation
prev_draft_history_snapshot = list(self._draft_history)
if action.kind == "run_baseline":
result = self._do_run_baseline(action)
elif action.kind == "draft":
result = self._do_draft(action)
elif action.kind == "inspect":
result = self._do_inspect(action)
elif action.kind == "commit":
return self._do_commit()
else:
raise ValueError(f"Unknown action kind: {action.kind}")
# Compute stepwise FEEDBACK (NOT reward). Signals the LLM can use to
# course-correct mid-episode — exposed through last_action_result.
# Explicitly NOT summed into training reward; terminal arena reward
# is the only signal GRPO sees (robust against reward hacking).
step_feedback = compute_step_reward(
prev_draft_history=prev_draft_history_snapshot,
new_draft_history=self._draft_history,
action_kind=action.kind,
action_result=result,
)
if step_feedback["breakdown"]:
entry = {
"turn": self._state.step_count,
"action_kind": action.kind,
**step_feedback["breakdown"],
}
self._step_feedback_log.append(entry)
# Surface on this turn's action result so the LLM sees it immediately.
result = {**result, "feedback": step_feedback["breakdown"]}
# Check if budget now exhausted; if so, auto-commit.
if self._budget_spent >= BUDGET_TOTAL:
return self._force_commit(reason="budget_exhausted")
return self._make_observation(
last_kind=action.kind, last_result=result,
done=False, reward=0.0, # no reward on non-terminal steps
)
@property
def state(self) -> State:
return self._state
# ---------- Action handlers ----------
def _do_run_baseline(self, action: LandscapeforgeAction) -> dict:
assert self._landscape is not None
# Fixed init AND fixed step count for baseline comparability across
# episodes and rollouts (important for GRPO group-relative advantages).
rng = np.random.default_rng(42)
x0 = rng.normal(0.0, 0.5, size=self._landscape.dim)
result = run_reference_baseline(
name=action.baseline_name, f=self._landscape.f, grad=self._landscape.grad,
x0=x0, steps=BASELINE_STEPS,
)
self._baseline_history.append(result)
return {
"baseline_index": len(self._baseline_history) - 1,
"name": result["name"],
"n_steps": len(result["trajectory"]),
"final_f": (result["trajectory"][-1]["f"]
if result["trajectory"] and result["trajectory"][-1]["f"] is not None
else None),
}
def _do_draft(self, action: LandscapeforgeAction) -> dict:
assert self._landscape is not None
code = action.code or ""
self._current_draft = code
try:
opt = compile_optimizer(code, dim=self._landscape.dim)
except SandboxError as e:
# Record failed draft; still counts toward history for inspect.
self._draft_history.append({
"code": code,
"compile_error": str(e),
"summary": {"converged": False, "diverged": True, "error": str(e),
"final_f": None, "step_of_min": None, "min_f": None},
})
self._draft_details.append([])
return {"draft_index": len(self._draft_history) - 1,
"compile_error": str(e), "summary": None}
test = auto_test_draft(opt, self._landscape, seed=0, steps=20)
self._draft_history.append({
"code": code,
"compile_error": None,
"summary": test["summary"],
})
self._draft_details.append(test["detail"])
return {"draft_index": len(self._draft_history) - 1,
"compile_error": None, "summary": test["summary"]}
def _do_inspect(self, action: LandscapeforgeAction) -> dict:
idx = action.draft_idx
if idx is None or idx < 0 or idx >= len(self._draft_details):
return {"error": f"draft_idx {idx} out of range (have {len(self._draft_details)} drafts)"}
detail = self._draft_details[idx]
start = action.step_range_start
end = min(action.step_range_end, len(detail))
sliced = detail[start:end]
record = {
"draft_idx": idx,
"step_range": [start, end],
"detail": sliced,
}
self._inspect_requests.append(record)
return {"draft_idx": idx, "step_range": [start, end], "n_steps": len(sliced)}
def _do_commit(self) -> LandscapeforgeObservation:
return self._finalize_episode(reason="commit")
def _force_commit(self, reason: str) -> LandscapeforgeObservation:
return self._finalize_episode(reason=reason)
# ---------- Episode finalization ----------
def _finalize_episode(self, reason: str) -> LandscapeforgeObservation:
assert self._landscape is not None
self._committed = True
# Need a current_draft. If none, produce a worst-case result.
if not self._current_draft:
result = {
"reason": reason,
"no_draft": True,
"final_regret": 1.0,
}
r_total = -1.0
breakdown = {"no_draft": 1.0}
obs = self._make_observation(
last_kind="commit", last_result=result,
done=True, reward=r_total,
)
obs.committed = True
obs.final_regret = 1.0
obs.r_optcoder = r_total
obs.r_optcoder_breakdown = breakdown
self._final_obs = obs
return obs
# Full Phase-D arena eval
try:
opt = compile_optimizer(self._current_draft, dim=self._landscape.dim)
arena = run_arena(opt, self._landscape, seeds=ARENA_SEEDS, steps=ARENA_STEPS)
except SandboxError as e:
# Committed code fails to compile -> worst-case result
arena = ArenaResult(
initial_values=[1.0] * len(ARENA_SEEDS),
final_values=[float("nan")] * len(ARENA_SEEDS),
crashed=[True] * len(ARENA_SEEDS),
trajectories=[[] for _ in ARENA_SEEDS],
)
# Adam baseline arena for normalization (always run for reward stability).
adam_arena = self._ensure_adam_arena()
novelty = ast_novelty_score(self._current_draft, REFERENCE_SOURCES)
# Convergence step: first seed's trajectory, first step where f < 0.01 * f0
convergence_step = self._compute_convergence_step(arena)
reward = compute_optcoder_reward(
arena=arena,
adam_arena=adam_arena,
actions_used_cost=self._budget_spent,
budget_total=BUDGET_TOTAL,
novelty_score=novelty,
convergence_step=convergence_step,
arena_steps=ARENA_STEPS,
)
result = {
"reason": reason,
"my_mean_progress": arena.mean_progress,
"adam_mean_progress": adam_arena.mean_progress,
"adam_tuned_lr": self._adam_tuned_lr,
"speedup_vs_adam": reward.breakdown.get("speedup_vs_adam"),
"crash_fraction": arena.crash_fraction,
"novelty_score": novelty,
"convergence_step": convergence_step,
}
obs = self._make_observation(
last_kind="commit", last_result=result,
done=True, reward=reward.r_total,
)
obs.committed = True
# `final_regret` is reinterpreted (no f_min dependency): Adam-shortfall
# in [0, 1]. 0 = matched or beat Adam's descent; 1 = made zero progress
# while Adam descended normally. Capped at 1.
speedup = reward.breakdown.get("speedup_vs_adam", 0.0)
obs.final_regret = float(max(0.0, min(1.0, 1.0 - speedup)))
obs.r_optcoder = reward.r_total
obs.r_optcoder_breakdown = reward.breakdown
self._final_obs = obs
return obs
# ---------- Helpers ----------
def _make_observation(self, last_kind: Optional[str], last_result: dict,
done: bool, reward: float) -> LandscapeforgeObservation:
assert self._landscape is not None
return LandscapeforgeObservation(
landscape_description=self._landscape.description,
dim=self._landscape.dim,
structural_hints=self._hints,
baseline_history=self._serialize_baseline_history(),
draft_history=self._serialize_draft_history(),
inspect_requests=list(self._inspect_requests),
current_draft=self._current_draft,
budget_remaining=BUDGET_TOTAL - self._budget_spent,
last_action_kind=last_kind,
last_action_result=last_result,
done=done,
reward=reward,
)
def _serialize_baseline_history(self) -> list[dict]:
# Trim trajectory to summary-friendly size (every step, x as list).
return [
{"name": b["name"], "trajectory": b["trajectory"]}
for b in self._baseline_history
]
def _serialize_draft_history(self) -> list[dict]:
# For the observation we include code + summary per draft.
return [
{"code": d["code"], "summary": d["summary"], "compile_error": d["compile_error"]}
for d in self._draft_history
]
def _sample_params(self, template: str) -> dict:
rng = self._master_rng
if template == "quadratic":
# T0 uses cond up to 100; T1 up to 1000; T2 higher.
cap = {"T0": 100.0, "T1": 1000.0, "T2": 10_000.0}[self._tier]
return {"cond": float(rng.uniform(1.0, cap))}
if template == "gaussian_mix":
return {
"k": int(rng.integers(2, 6)),
"sigma": float(rng.uniform(0.3, 1.0)),
"spread": float(rng.uniform(1.0, 4.0)),
}
if template == "huber":
return {"delta": float(rng.uniform(0.5, 2.0))}
return {}
def _ensure_adam_arena(self) -> ArenaResult:
"""Build the Adam baseline, FAIRLY — LR is tuned per landscape before
running the arena. The tuning uses a short 30-step sweep on a dedicated
seed (not one of the arena seeds) to avoid overfitting.
Cached per episode in `_adam_arena_cache`. Tuned LR is stored in
`_adam_tuned_lr` for logging / demo surfacing.
"""
if self._adam_arena_cache is not None:
return self._adam_arena_cache
assert self._landscape is not None
try:
# Tune LR on seed 0 (not in ARENA_SEEDS), 30-step sweep.
tune_rng = np.random.default_rng(0)
tune_x0 = tune_rng.normal(0.0, 0.5, size=self._landscape.dim)
best_lr = tune_adam_lr(
f=self._landscape.f, grad=self._landscape.grad,
x0=tune_x0, sweep_steps=30,
)
self._adam_tuned_lr = best_lr
adam_opt = compile_optimizer(_adam_source(best_lr), dim=self._landscape.dim)
self._adam_arena_cache = run_arena(
adam_opt, self._landscape,
seeds=ARENA_SEEDS, steps=ARENA_STEPS,
)
except Exception:
self._adam_tuned_lr = None
self._adam_arena_cache = ArenaResult(
initial_values=[1.0] * len(ARENA_SEEDS),
final_values=[1.0] * len(ARENA_SEEDS),
crashed=[True] * len(ARENA_SEEDS),
trajectories=[[] for _ in ARENA_SEEDS],
)
return self._adam_arena_cache
def _compute_convergence_step(self, arena) -> Optional[int]:
"""First step on first seed where f < 1% of initial f."""
if not arena.trajectories or not arena.trajectories[0]:
return None
traj = arena.trajectories[0]
if not traj:
return None
f0 = traj[0]["f"]
if f0 <= 0:
return None
threshold = 0.01 * f0
for t, snap in enumerate(traj):
if snap["f"] < threshold:
return t
return None
# ---------- Tier advancement API (used by trainer, not agent) ----------
def advance_tier(self, new_tier: str) -> None:
if new_tier not in TIER_MENU:
raise ValueError(f"Unknown tier {new_tier}")
self._tier = new_tier