landscapeforge / reference_optimizers.py
mnawfal29's picture
Upload folder using huggingface_hub
d0556ae verified
"""Reference optimizers run by `run_baseline` action.
These are invoked by the env — not by OptCoder's submitted code. They
produce diagnostic trajectories (x_t, f_t, |g_t|) that the agent sees.
The source code is NEVER exposed to the agent.
"""
from typing import Callable
import numpy as np
def _step_sgd(x, g, state, lr=0.01):
return x - lr * g, state
def _step_momentum(x, g, state, lr=0.01, beta=0.9):
v = state.get("v", np.zeros_like(x))
v = beta * v - lr * g
state["v"] = v
return x + v, state
def _step_adam(x, g, state, lr=0.001, b1=0.9, b2=0.999, eps=1e-8):
m = state.get("m", np.zeros_like(x))
v = state.get("v", np.zeros_like(x))
t = state.get("t", 0) + 1
m = b1 * m + (1 - b1) * g
v = b2 * v + (1 - b2) * g**2
m_hat = m / (1 - b1**t)
v_hat = v / (1 - b2**t)
state["m"], state["v"], state["t"] = m, v, t
return x - lr * m_hat / (np.sqrt(v_hat) + eps), state
def _run_adam_with_lr(f, grad, x0: np.ndarray, lr: float, steps: int) -> tuple[np.ndarray, float]:
"""Run Adam for `steps` steps from x0 with the given lr. Returns (x_final, f_final).
Used by the LR-tuning sweep for the Adam baseline. Returns (x0, inf) on divergence.
"""
x = x0.copy().astype(float)
state: dict = {}
for _ in range(steps):
g = np.asarray(grad(x), dtype=float)
x, state = _step_adam(x, g, state, lr=lr)
if not np.all(np.isfinite(x)):
return x0, float("inf")
return x, float(f(x))
def tune_adam_lr(f, grad, x0: np.ndarray,
lrs: tuple[float, ...] = (1e-4, 1e-3, 3e-3, 1e-2, 3e-2, 1e-1, 3e-1),
sweep_steps: int = 30) -> float:
"""Grid-search Adam's LR on a short run from x0. Returns the best LR."""
best_lr = lrs[0]
best_f = float("inf")
for lr in lrs:
_, f_final = _run_adam_with_lr(f, grad, x0, lr=lr, steps=sweep_steps)
if f_final < best_f:
best_f = f_final
best_lr = lr
return best_lr
def _run_baseline_with_lr(name: str, f, grad, x0: np.ndarray,
lr: float, steps: int) -> tuple[np.ndarray, float]:
"""Run any reference baseline with an overridden LR. Returns (x_final, f_final)."""
if name not in BASELINES:
raise ValueError(f"Unknown baseline {name!r}")
step_fn = BASELINES[name]
x = x0.copy().astype(float)
state: dict = {}
for _ in range(steps):
g = np.asarray(grad(x), dtype=float)
x, state = step_fn(x, g, state, lr=lr)
if not np.all(np.isfinite(x)):
return x0, float("inf")
return x, float(f(x))
def tune_baseline_lr(name: str, f, grad, x0: np.ndarray,
lrs: tuple[float, ...] = (1e-4, 1e-3, 3e-3, 1e-2, 3e-2, 1e-1, 3e-1),
sweep_steps: int = 30) -> float:
"""Grid-search the LR for any named baseline (sgd / momentum / adam / lbfgs).
Each baseline's `_step_*` fn accepts `lr` as a kwarg, so the sweep uses the
same harness as Adam tuning but is parameterised by the step function.
"""
best_lr = lrs[0]
best_f = float("inf")
for lr in lrs:
try:
_, f_final = _run_baseline_with_lr(name, f, grad, x0,
lr=lr, steps=sweep_steps)
except Exception:
f_final = float("inf")
if f_final < best_f:
best_f = f_final
best_lr = lr
return best_lr
def run_baseline_tuned(name: str, f, grad, x0: np.ndarray, steps: int = 30,
tune_x0: np.ndarray | None = None,
sweep_steps: int = 30) -> dict:
"""Run a baseline with its LR tuned to the landscape first.
Returns the same dict shape as `run_baseline`, plus a `lr` field.
"""
tune_start = tune_x0 if tune_x0 is not None else x0
best_lr = tune_baseline_lr(name, f, grad, tune_start, sweep_steps=sweep_steps)
step_fn = BASELINES[name]
x = x0.copy().astype(float)
state: dict = {}
traj: list[dict] = []
for t in range(steps):
fv = float(f(x))
g = np.asarray(grad(x), dtype=float)
gn = float(np.linalg.norm(g))
traj.append({"t": t, "x": x.tolist(), "f": fv, "grad_norm": gn})
x, state = step_fn(x, g, state, lr=best_lr)
if not np.all(np.isfinite(x)):
traj.append({"t": t + 1, "x": None, "f": None, "grad_norm": None,
"diverged": True})
break
if np.all(np.isfinite(x)):
traj.append({"t": len(traj), "x": x.tolist(), "f": float(f(x)),
"grad_norm": float(np.linalg.norm(np.asarray(grad(x))))})
return {"name": name, "trajectory": traj, "final_x": x.tolist(),
"lr": best_lr}
def _step_lbfgs(x, g, state, lr=0.01, m_size=5):
"""Crude L-BFGS with finite-step history. Good enough as a reference."""
xs = state.setdefault("xs", []) # positions
gs = state.setdefault("gs", []) # gradients
if len(xs) < 2:
# First step: plain gradient descent to seed history
x_new = x - lr * g
else:
# Two-loop recursion over last m_size pairs
s_list, y_list, rho_list = [], [], []
for i in range(1, min(m_size, len(xs)) + 1):
s = xs[-i] - xs[-i - 1] if len(xs) > i else None
if s is None:
continue
y = gs[-i] - gs[-i - 1]
denom = float(y @ s)
if abs(denom) < 1e-12:
continue
s_list.append(s); y_list.append(y); rho_list.append(1.0 / denom)
q = g.copy()
alpha = []
for s, y, rho in zip(s_list, y_list, rho_list):
a = rho * float(s @ q)
alpha.append(a)
q = q - a * y
# H0 scaling
if y_list:
y0 = y_list[0]; s0 = s_list[0]
gamma = float(s0 @ y0) / (float(y0 @ y0) + 1e-12)
else:
gamma = 1.0
r = gamma * q
for (s, y, rho), a in zip(reversed(list(zip(s_list, y_list, rho_list))), reversed(alpha)):
b = rho * float(y @ r)
r = r + (a - b) * s
x_new = x - lr * r
xs.append(x.copy())
gs.append(g.copy())
return x_new, state
BASELINES: dict[str, Callable] = {
"sgd": _step_sgd,
"momentum": _step_momentum,
"adam": _step_adam,
"lbfgs": _step_lbfgs,
}
def run_baseline(name: str, f, grad, x0: np.ndarray, steps: int = 30) -> dict:
"""Run a reference optimizer from x0 for `steps` steps.
Returns a trajectory dict with per-step (x, f, |g|).
"""
if name not in BASELINES:
raise ValueError(f"Unknown baseline {name!r}")
step_fn = BASELINES[name]
x = x0.copy().astype(float)
state: dict = {}
traj = []
for t in range(steps):
fv = float(f(x))
g = np.asarray(grad(x), dtype=float)
gn = float(np.linalg.norm(g))
traj.append({"t": t, "x": x.tolist(), "f": fv, "grad_norm": gn})
x, state = step_fn(x, g, state)
if not np.all(np.isfinite(x)):
# Pad with the last finite state; record divergence
traj.append({"t": t + 1, "x": None, "f": None, "grad_norm": None,
"diverged": True})
break
# Final state
if np.all(np.isfinite(x)):
traj.append({"t": len(traj), "x": x.tolist(), "f": float(f(x)),
"grad_norm": float(np.linalg.norm(np.asarray(grad(x))))})
return {"name": name, "trajectory": traj, "final_x": x.tolist()}