File size: 10,763 Bytes
b0b140b
 
 
 
 
 
 
 
 
 
 
 
962ad43
 
 
 
b0b140b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""Observation β†’ prompt rendering + LLM response β†’ action parsing.

Keeps prompt format aligned with Appendix A of LANDSCAPEFORGE_DESIGN.md while
trimming obs fields that bloat tokens (e.g. full trajectories get summarised).
"""

from __future__ import annotations

import json
import re
from typing import Any

try:
    from .models import LandscapeforgeAction, LandscapeforgeObservation
except ImportError:  # flat layout (HF Space container)
    from models import LandscapeforgeAction, LandscapeforgeObservation  # type: ignore


SYSTEM = """You are OptCoder. You will design an optimization algorithm for a
hidden landscape f: R^n β†’ R by iteratively: running reference optimizers to
observe their behaviour, writing candidate `Optimizer` classes and seeing how
they perform, inspecting past drafts to diagnose failures, and committing when
you are satisfied.

How the episode ends:
  - When you call `commit`, the env runs the full arena evaluation
    (10 seeds Γ— 200 steps) on your MOST RECENT draft and that becomes your
    reward. This is the normal, preferred way to finish.
  - If you never call `commit`, when your budget runs out the env will
    automatically do the same thing β€” evaluate your most recent draft.
    Your last draft is always what gets evaluated, whether you commit
    explicitly or the budget runs out.
  - So: make sure your last draft is the one you actually want evaluated.
    If you improve a draft then change your mind, re-submit the good one
    before ending the episode.

A typical good episode is ~4 turns:
  draft β†’ (maybe) inspect β†’ (maybe) refine β†’ commit.

Reply with a single JSON object β€” nothing else, no prose, no markdown.

JSON formatting rules (important, models frequently get this wrong):
  - All strings use standard JSON double-quotes: "like this"
  - Do NOT use Python triple-quoted strings \"\"\"...\"\"\" β€” they are NOT valid JSON
  - For multi-line code, escape newlines as \\n inside the string value:
      {"kind": "draft", "code": "class Optimizer:\\n    def __init__(self, dim): ..."}
""".strip()


ACTION_SPEC = """
Available actions (cost charged against your budget):

  run_baseline  (cost 2)  Run a reference optimizer on the hidden landscape.
    JSON: {"kind": "run_baseline", "baseline_name": "sgd"|"momentum"|"adam"|"lbfgs"}
    Returns a 30-step trajectory (x_t, f_t, grad_norm_t). Source code not revealed.

  draft         (cost 2)  Submit a full Optimizer class; env auto-tests it.
    JSON: {"kind": "draft", "code": "<python source>"}
    The code MUST be a standalone class with no base class:

        class Optimizer:
            def __init__(self, dim):
                ...
            def step(self, x, f_val, grad):
                ...
                return x_new

    Rules:
      - Top-level line must be exactly:  class Optimizer:
        (no parent class β€” BaseOptimizer, nn.Module, object, etc. do NOT exist)
      - Use only numpy as `np` and math β€” both pre-injected; DO NOT write import lines
      - step(x, f_val, grad) must return a numpy array of shape (dim,)
      - No I/O, no globals, no file operations
      - Only the class definition is kept; demo code at module level is stripped

  inspect       (cost 1)  Zoom into a prior draft's per-step behaviour.
    JSON: {"kind": "inspect", "draft_idx": 0, "step_range_start": 10, "step_range_end": 20}
    Returns per-step (x, f, grad, update_norm, step_size_eff).

  commit        (cost 0)  Evaluate your most recent draft on the full arena.
    JSON: {"kind": "commit"}
    Preferred way to end the episode. Call it when you have a draft you
    trust. If you don't call it, budget exhaustion triggers the same
    evaluation on whatever your latest draft is β€” so your most recent
    draft should always be your best one. Committing explicitly just
    ends the episode sooner.
""".strip()


def render_observation(obs: LandscapeforgeObservation) -> str:
    """Turn an Observation into a compact prompt-friendly state summary."""
    lines: list[str] = []
    lines.append(f"Landscape: {obs.landscape_description}")
    lines.append(f"Dim: {obs.dim}")
    lines.append(f"Structural hints:")
    for k, v in (obs.structural_hints or {}).items():
        lines.append(f"  {k}: {_fmt(v)}")
    lines.append(f"Budget remaining: {obs.budget_remaining}")

    if obs.baseline_history:
        lines.append("\nBaseline runs (diagnostic trajectories):")
        for i, b in enumerate(obs.baseline_history):
            summary = _summarise_trajectory(b.get("trajectory", []))
            lines.append(f"  [{i}] {b['name']}: {summary}")

    if obs.draft_history:
        lines.append("\nDraft history:")
        for i, d in enumerate(obs.draft_history):
            if d.get("compile_error"):
                lines.append(f"  [{i}] COMPILE ERROR: {d['compile_error']}")
            else:
                s = d["summary"] or {}
                status = "CONVERGED" if s.get("converged") else (
                    "DIVERGED" if s.get("diverged") else "partial"
                )
                lines.append(
                    f"  [{i}] {status} | initial_f={_fmt(s.get('initial_f'))} "
                    f"final_f={_fmt(s.get('final_f'))} "
                    f"step_of_min={s.get('step_of_min')}"
                )
                code = d.get("code") or ""
                lines.append("       code:")
                for cl in code.splitlines()[:40]:    # first 40 lines only
                    lines.append(f"         {cl}")

    if obs.inspect_requests:
        lines.append("\nInspect results:")
        for r in obs.inspect_requests:
            detail = r.get("detail") or []
            lines.append(
                f"  draft={r.get('draft_idx')} range={r.get('step_range')} "
                f"({len(detail)} steps)"
            )
            for d in detail[:8]:    # first 8 of the slice
                lines.append(
                    f"    t={d.get('t'):>3}  f={_fmt(d.get('f'))}  "
                    f"|g|={_fmt(d.get('grad_norm'))}  "
                    f"|Ξ”x|={_fmt(d.get('update_norm'))}  "
                    f"Ξ·_eff={_fmt(d.get('step_size_eff'))}"
                )

    if obs.current_draft:
        lines.append(f"\nCurrent draft ({len(obs.current_draft)} chars) β€” will be evaluated on commit.")

    if obs.last_action_kind:
        lines.append(f"\nLast action: {obs.last_action_kind}")
        feedback = (obs.last_action_result or {}).get("feedback")
        if feedback:
            parts = ", ".join(f"{k}={_fmt(v)}" for k, v in feedback.items())
            lines.append(f"Step feedback: {parts}   "
                         "(signals for your reasoning; not added to final reward)")

    return "\n".join(lines)


def build_prompt(obs: LandscapeforgeObservation) -> list[dict]:
    """Return OpenAI-style messages list for the chat completions endpoint."""
    state_text = render_observation(obs)
    return [
        {"role": "system", "content": SYSTEM},
        {"role": "user", "content": f"{ACTION_SPEC}\n\nCurrent state:\n{state_text}\n\n"
                                     "Reply with a single JSON object for your next action."},
    ]


# ---------- response β†’ action ----------

_JSON_RE = re.compile(r"\{.*\}", re.DOTALL)


def parse_action(response_text: str) -> LandscapeforgeAction:
    """Extract the first JSON object from the LLM's reply and build an Action.

    Accepts code-fenced JSON, raw JSON, and JSON embedded in prose. Tolerates
    the common LLM failure mode of emitting unescaped newlines / tabs inside
    string values (especially for the `code` field of a `draft` action).
    Raises ValueError if no parseable object is found.
    """
    text = response_text.strip()
    if text.startswith("```"):
        text = re.sub(r"^```(?:json)?\n?", "", text)
        text = re.sub(r"\n?```\s*$", "", text)

    match = _JSON_RE.search(text)
    if not match:
        raise ValueError(f"No JSON object in response: {response_text[:200]!r}")

    raw_json = match.group(0)

    # First pass: strict.
    try:
        data = json.loads(raw_json)
    except json.JSONDecodeError:
        # Second pass: escape raw control chars inside string literals.
        fixed = _escape_string_controls(raw_json)
        try:
            data = json.loads(fixed)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON even after control-char fix: {e}; "
                             f"raw: {raw_json[:200]!r}") from e

    if "kind" not in data:
        raise ValueError(f"Missing `kind`: {data}")

    return LandscapeforgeAction(**data)


def _escape_string_controls(s: str) -> str:
    """Escape raw newlines, carriage returns, and tabs inside JSON string literals.

    Walks character-by-character tracking whether we're inside a double-quoted
    string, and replaces raw control chars with their escaped forms. Handles
    the common case: `"code": "class Optimizer:\\n  def __init__..."` where the
    LLM emitted literal newlines.
    """
    out: list[str] = []
    in_string = False
    escape_next = False
    for ch in s:
        if escape_next:
            out.append(ch)
            escape_next = False
            continue
        if ch == "\\":
            out.append(ch)
            escape_next = True
            continue
        if ch == '"':
            in_string = not in_string
            out.append(ch)
            continue
        if in_string:
            if ch == "\n":
                out.append("\\n"); continue
            if ch == "\r":
                out.append("\\r"); continue
            if ch == "\t":
                out.append("\\t"); continue
        out.append(ch)
    return "".join(out)


# ---------- helpers ----------

def _fmt(v: Any) -> str:
    if v is None:
        return "None"
    if isinstance(v, float):
        if abs(v) < 1e-4 or abs(v) >= 1e4:
            return f"{v:.3e}"
        return f"{v:.4f}"
    if isinstance(v, list):
        if len(v) <= 4:
            return "[" + ", ".join(_fmt(x) for x in v) + "]"
        return f"[{_fmt(v[0])}, {_fmt(v[1])}, ..., {_fmt(v[-1])}] (len={len(v)})"
    return str(v)


def _summarise_trajectory(traj: list[dict]) -> str:
    """Condense a 30-step baseline trajectory to head/tail snapshots."""
    finite = [s for s in traj if s.get("f") is not None]
    if not finite:
        return "diverged immediately"
    head = finite[0]
    mid = finite[len(finite) // 2] if len(finite) > 2 else finite[-1]
    tail = finite[-1]
    diverged_mark = "  (DIVERGED)" if len(finite) < len(traj) else ""
    return (f"t=0: f={_fmt(head['f'])}, |g|={_fmt(head['grad_norm'])}  "
            f"β†’ t={mid['t']}: f={_fmt(mid['f'])}  "
            f"β†’ t={tail['t']}: f={_fmt(tail['f'])}{diverged_mark}")