"""Reference optimizers run by `run_baseline` action. These are invoked by the env — not by OptCoder's submitted code. They produce diagnostic trajectories (x_t, f_t, |g_t|) that the agent sees. The source code is NEVER exposed to the agent. """ from typing import Callable import numpy as np def _step_sgd(x, g, state, lr=0.01): return x - lr * g, state def _step_momentum(x, g, state, lr=0.01, beta=0.9): v = state.get("v", np.zeros_like(x)) v = beta * v - lr * g state["v"] = v return x + v, state def _step_adam(x, g, state, lr=0.001, b1=0.9, b2=0.999, eps=1e-8): m = state.get("m", np.zeros_like(x)) v = state.get("v", np.zeros_like(x)) t = state.get("t", 0) + 1 m = b1 * m + (1 - b1) * g v = b2 * v + (1 - b2) * g**2 m_hat = m / (1 - b1**t) v_hat = v / (1 - b2**t) state["m"], state["v"], state["t"] = m, v, t return x - lr * m_hat / (np.sqrt(v_hat) + eps), state def _run_adam_with_lr(f, grad, x0: np.ndarray, lr: float, steps: int) -> tuple[np.ndarray, float]: """Run Adam for `steps` steps from x0 with the given lr. Returns (x_final, f_final). Used by the LR-tuning sweep for the Adam baseline. Returns (x0, inf) on divergence. """ x = x0.copy().astype(float) state: dict = {} for _ in range(steps): g = np.asarray(grad(x), dtype=float) x, state = _step_adam(x, g, state, lr=lr) if not np.all(np.isfinite(x)): return x0, float("inf") return x, float(f(x)) def tune_adam_lr(f, grad, x0: np.ndarray, lrs: tuple[float, ...] = (1e-4, 1e-3, 3e-3, 1e-2, 3e-2, 1e-1, 3e-1), sweep_steps: int = 30) -> float: """Grid-search Adam's LR on a short run from x0. Returns the best LR.""" best_lr = lrs[0] best_f = float("inf") for lr in lrs: _, f_final = _run_adam_with_lr(f, grad, x0, lr=lr, steps=sweep_steps) if f_final < best_f: best_f = f_final best_lr = lr return best_lr def _run_baseline_with_lr(name: str, f, grad, x0: np.ndarray, lr: float, steps: int) -> tuple[np.ndarray, float]: """Run any reference baseline with an overridden LR. Returns (x_final, f_final).""" if name not in BASELINES: raise ValueError(f"Unknown baseline {name!r}") step_fn = BASELINES[name] x = x0.copy().astype(float) state: dict = {} for _ in range(steps): g = np.asarray(grad(x), dtype=float) x, state = step_fn(x, g, state, lr=lr) if not np.all(np.isfinite(x)): return x0, float("inf") return x, float(f(x)) def tune_baseline_lr(name: str, f, grad, x0: np.ndarray, lrs: tuple[float, ...] = (1e-4, 1e-3, 3e-3, 1e-2, 3e-2, 1e-1, 3e-1), sweep_steps: int = 30) -> float: """Grid-search the LR for any named baseline (sgd / momentum / adam / lbfgs). Each baseline's `_step_*` fn accepts `lr` as a kwarg, so the sweep uses the same harness as Adam tuning but is parameterised by the step function. """ best_lr = lrs[0] best_f = float("inf") for lr in lrs: try: _, f_final = _run_baseline_with_lr(name, f, grad, x0, lr=lr, steps=sweep_steps) except Exception: f_final = float("inf") if f_final < best_f: best_f = f_final best_lr = lr return best_lr def run_baseline_tuned(name: str, f, grad, x0: np.ndarray, steps: int = 30, tune_x0: np.ndarray | None = None, sweep_steps: int = 30) -> dict: """Run a baseline with its LR tuned to the landscape first. Returns the same dict shape as `run_baseline`, plus a `lr` field. """ tune_start = tune_x0 if tune_x0 is not None else x0 best_lr = tune_baseline_lr(name, f, grad, tune_start, sweep_steps=sweep_steps) step_fn = BASELINES[name] x = x0.copy().astype(float) state: dict = {} traj: list[dict] = [] for t in range(steps): fv = float(f(x)) g = np.asarray(grad(x), dtype=float) gn = float(np.linalg.norm(g)) traj.append({"t": t, "x": x.tolist(), "f": fv, "grad_norm": gn}) x, state = step_fn(x, g, state, lr=best_lr) if not np.all(np.isfinite(x)): traj.append({"t": t + 1, "x": None, "f": None, "grad_norm": None, "diverged": True}) break if np.all(np.isfinite(x)): traj.append({"t": len(traj), "x": x.tolist(), "f": float(f(x)), "grad_norm": float(np.linalg.norm(np.asarray(grad(x))))}) return {"name": name, "trajectory": traj, "final_x": x.tolist(), "lr": best_lr} def _step_lbfgs(x, g, state, lr=0.01, m_size=5): """Crude L-BFGS with finite-step history. Good enough as a reference.""" xs = state.setdefault("xs", []) # positions gs = state.setdefault("gs", []) # gradients if len(xs) < 2: # First step: plain gradient descent to seed history x_new = x - lr * g else: # Two-loop recursion over last m_size pairs s_list, y_list, rho_list = [], [], [] for i in range(1, min(m_size, len(xs)) + 1): s = xs[-i] - xs[-i - 1] if len(xs) > i else None if s is None: continue y = gs[-i] - gs[-i - 1] denom = float(y @ s) if abs(denom) < 1e-12: continue s_list.append(s); y_list.append(y); rho_list.append(1.0 / denom) q = g.copy() alpha = [] for s, y, rho in zip(s_list, y_list, rho_list): a = rho * float(s @ q) alpha.append(a) q = q - a * y # H0 scaling if y_list: y0 = y_list[0]; s0 = s_list[0] gamma = float(s0 @ y0) / (float(y0 @ y0) + 1e-12) else: gamma = 1.0 r = gamma * q for (s, y, rho), a in zip(reversed(list(zip(s_list, y_list, rho_list))), reversed(alpha)): b = rho * float(y @ r) r = r + (a - b) * s x_new = x - lr * r xs.append(x.copy()) gs.append(g.copy()) return x_new, state BASELINES: dict[str, Callable] = { "sgd": _step_sgd, "momentum": _step_momentum, "adam": _step_adam, "lbfgs": _step_lbfgs, } def run_baseline(name: str, f, grad, x0: np.ndarray, steps: int = 30) -> dict: """Run a reference optimizer from x0 for `steps` steps. Returns a trajectory dict with per-step (x, f, |g|). """ if name not in BASELINES: raise ValueError(f"Unknown baseline {name!r}") step_fn = BASELINES[name] x = x0.copy().astype(float) state: dict = {} traj = [] for t in range(steps): fv = float(f(x)) g = np.asarray(grad(x), dtype=float) gn = float(np.linalg.norm(g)) traj.append({"t": t, "x": x.tolist(), "f": fv, "grad_norm": gn}) x, state = step_fn(x, g, state) if not np.all(np.isfinite(x)): # Pad with the last finite state; record divergence traj.append({"t": t + 1, "x": None, "f": None, "grad_norm": None, "diverged": True}) break # Final state if np.all(np.isfinite(x)): traj.append({"t": len(traj), "x": x.tolist(), "f": float(f(x)), "grad_norm": float(np.linalg.norm(np.asarray(grad(x))))}) return {"name": name, "trajectory": traj, "final_x": x.tolist()}