Spaces:
Sleeping
Sleeping
| """FastAPI endpoints used by the React frontend. | |
| Provides: | |
| - /api/landscape build a template and return a Plotly contour + hints | |
| - /api/baseline_race run 4 LR-tuned baselines and return plots + summary | |
| - /api/arena full Phase-D evaluation of a user optimizer vs Adam | |
| - /api/llm_run SSE-streamed LLM-driven episode | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import re | |
| import time | |
| from typing import Any, Optional | |
| import numpy as np | |
| import requests | |
| from fastapi import APIRouter, Query | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| try: | |
| from ..arena import auto_test_draft, run_arena, ArenaResult | |
| from ..landscapes import BUILDERS, build_landscape, structural_hints | |
| from ..reference_optimizers import ( | |
| run_baseline_tuned, tune_adam_lr, | |
| ) | |
| from ..rewards import ast_novelty_score, compute_optcoder_reward | |
| from ..sandbox import SandboxError, compile_optimizer | |
| from ..models import LandscapeforgeAction | |
| from ..prompts import build_prompt, parse_action | |
| from .landscapeforge_environment import LandscapeforgeEnvironment | |
| except ImportError: # flat layout | |
| from arena import auto_test_draft, run_arena, ArenaResult # type: ignore | |
| from landscapes import BUILDERS, build_landscape, structural_hints # type: ignore | |
| from reference_optimizers import ( # type: ignore | |
| run_baseline_tuned, tune_adam_lr, | |
| ) | |
| from rewards import ast_novelty_score, compute_optcoder_reward # type: ignore | |
| from sandbox import SandboxError, compile_optimizer # type: ignore | |
| from models import LandscapeforgeAction # type: ignore | |
| from prompts import build_prompt, parse_action # type: ignore | |
| from server.landscapeforge_environment import LandscapeforgeEnvironment # type: ignore | |
| router = APIRouter(prefix="/api", tags=["lf-frontend"]) | |
| # ---------- palette constants for Plotly layouts ---------- | |
| _PLOTLY_LAYOUT = dict( | |
| font=dict(family="Inter", color="#f3f0e8", size=12), | |
| paper_bgcolor="#2a2824", plot_bgcolor="#1f1d1a", | |
| hoverlabel=dict(bgcolor="#f3f0e8", font_color="#1f1d1a"), | |
| legend=dict(bgcolor="rgba(31,29,26,0.85)", | |
| bordercolor="#403b34", borderwidth=1, | |
| font=dict(color="#f3f0e8")), | |
| ) | |
| _AXIS = dict(gridcolor="#403b34", zerolinecolor="#554e45", | |
| showline=True, linecolor="#554e45", | |
| tickfont=dict(color="#b5ada0")) | |
| _DEFAULT_MARGIN = dict(l=60, r=30, t=60, b=55) | |
| _TITLE = dict(x=0.02, xanchor="left", font=dict(size=14, color="#f3f0e8")) | |
| OPT_COLORS = { | |
| "sgd": "#c05450", | |
| "momentum": "#d9865b", | |
| "adam": "#5b7a6b", | |
| "lbfgs": "#556b99", | |
| "custom": "#e28763", | |
| } | |
| # ---------- shared plot helpers ---------- | |
| def _color(name: str) -> str: | |
| return OPT_COLORS.get(name.split("(")[0].strip(), "#e28763") | |
| def _contour_fig(ls, trajectories=None, title=None): | |
| import numpy as np | |
| if ls.dim != 2: | |
| return _empty_fig(f"{ls.name} · dim={ls.dim}\nContour is 2-D only", 480) | |
| CLIP = 8.0 | |
| xs_all, ys_all = [0.0], [0.0] | |
| for traj in (trajectories or {}).values(): | |
| arr = np.asarray(traj) | |
| if arr.size == 0: | |
| continue | |
| mask = (np.abs(arr) <= CLIP).all(axis=1) & np.isfinite(arr).all(axis=1) | |
| good = arr[mask] | |
| if good.size: | |
| xs_all.extend(good[:, 0].tolist()) | |
| ys_all.extend(good[:, 1].tolist()) | |
| x_min = max(min(xs_all) - 1.5, -CLIP); x_max = min(max(xs_all) + 1.5, CLIP) | |
| y_min = max(min(ys_all) - 1.5, -CLIP); y_max = min(max(ys_all) + 1.5, CLIP) | |
| x_min, x_max = min(x_min, -3.5), max(x_max, 3.5) | |
| y_min, y_max = min(y_min, -3.5), max(y_max, 3.5) | |
| g = 70 | |
| xs = np.linspace(x_min, x_max, g) | |
| ys = np.linspace(y_min, y_max, g) | |
| X, Y = np.meshgrid(xs, ys) | |
| Z = np.empty_like(X) | |
| for i in range(g): | |
| for j in range(g): | |
| Z[i, j] = ls.f(np.array([X[i, j], Y[i, j]])) | |
| finite = Z[np.isfinite(Z)] | |
| lo, hi = map(float, np.percentile(finite, [2, 95])) | |
| data = [dict( | |
| type="contour", x=xs.tolist(), y=ys.tolist(), z=Z.tolist(), | |
| zmin=lo, zmax=hi, | |
| colorscale=[ | |
| [0.0, "#1f1d1a"], [0.15, "#2f2a22"], [0.3, "#4a2f22"], | |
| [0.5, "#7a4229"], [0.7, "#c25a3a"], [0.85, "#e28763"], | |
| [1.0, "#f4d6c5"], | |
| ], | |
| contours=dict(coloring="heatmap", showlabels=False), | |
| line=dict(width=0.5, color="rgba(243,240,232,0.12)"), | |
| colorbar=dict(title=dict(text="f(x)", | |
| font=dict(size=11, color="#f3f0e8")), | |
| thickness=12, len=0.85, | |
| tickfont=dict(size=10, color="#b5ada0"), | |
| outlinewidth=0), | |
| hovertemplate="x₁=%{x:.3f}<br>x₂=%{y:.3f}<br>f=%{z:.3f}<extra></extra>", | |
| )] | |
| if trajectories: | |
| for name, traj in trajectories.items(): | |
| arr = np.asarray(traj) | |
| if not arr.size: | |
| continue | |
| mask = (np.abs(arr) <= CLIP).all(axis=1) & np.isfinite(arr).all(axis=1) | |
| diverged = not mask.all() | |
| arr = arr[mask] | |
| if arr.shape[0] == 0: | |
| continue | |
| color = _color(name) | |
| label = f"{name} · diverged" if diverged else name | |
| data.append(dict( | |
| type="scatter", mode="lines+markers", | |
| x=arr[:, 0].tolist(), y=arr[:, 1].tolist(), | |
| name=label, | |
| line=dict(color=color, width=2.5, dash="dash" if diverged else "solid"), | |
| marker=dict(size=4, color=color, | |
| line=dict(color="#ffffff", width=0.8)), | |
| hovertemplate="step %{pointNumber}<br>x₁=%{x:.3f}<br>x₂=%{y:.3f}" | |
| "<extra>" + label + "</extra>", | |
| )) | |
| data.append(dict(type="scatter", mode="markers", | |
| x=[arr[0, 0].item()], y=[arr[0, 1].item()], | |
| showlegend=False, | |
| marker=dict(size=12, color=color, symbol="circle-open", | |
| line=dict(color=color, width=2.5)), | |
| hoverinfo="skip")) | |
| end_sym = "x" if diverged else "star" | |
| data.append(dict(type="scatter", mode="markers", | |
| x=[arr[-1, 0].item()], y=[arr[-1, 1].item()], | |
| showlegend=False, | |
| marker=dict(size=14 if diverged else 16, | |
| color=color, symbol=end_sym, | |
| line=dict(color="#ffffff", width=1.2)), | |
| hoverinfo="skip")) | |
| layout = { | |
| **_PLOTLY_LAYOUT, | |
| "title": {"text": title or f"{ls.name} (dim=2)", **_TITLE}, | |
| "height": 480, "margin": _DEFAULT_MARGIN, | |
| "xaxis": {"title": "x₁", "range": [x_min, x_max], **_AXIS}, | |
| "yaxis": {"title": "x₂", "range": [y_min, y_max], | |
| "scaleanchor": "x", "scaleratio": 1, **_AXIS}, | |
| } | |
| return {"data": data, "layout": layout} | |
| def _empty_fig(msg: str, h: int = 480): | |
| return {"data": [], "layout": { | |
| **_PLOTLY_LAYOUT, "height": h, "margin": _DEFAULT_MARGIN, | |
| "xaxis": {"visible": False}, "yaxis": {"visible": False}, | |
| "annotations": [{"text": msg, "showarrow": False, | |
| "x": 0.5, "y": 0.5, "xref": "paper", "yref": "paper", | |
| "font": {"size": 14, "color": "#b5ada0"}}], | |
| }} | |
| def _curves_fig(curves, title): | |
| data = [] | |
| for name, fs in curves.items(): | |
| if not fs: | |
| continue | |
| color = _color(name) | |
| data.append(dict( | |
| type="scatter", mode="lines+markers", name=name, | |
| x=list(range(len(fs))), | |
| y=[v if np.isfinite(v) else None for v in fs], | |
| line=dict(color=color, width=2.2, shape="spline"), | |
| marker=dict(size=4, color=color), | |
| hovertemplate="step=%{x}<br>f=%{y:.4g}<extra>" + name + "</extra>", | |
| connectgaps=False, | |
| )) | |
| layout = { | |
| **_PLOTLY_LAYOUT, "title": {"text": title, **_TITLE}, | |
| "height": 360, "margin": _DEFAULT_MARGIN, | |
| "xaxis": {"title": "optimizer step", **_AXIS}, | |
| "yaxis": {"title": "f(x) (symlog)", "type": "log", **_AXIS}, | |
| } | |
| return {"data": data, "layout": layout} | |
| def _bar_fig(values, title, ylabel): | |
| names = list(values.keys()) | |
| vs = [values[n] for n in names] | |
| data = [dict(type="bar", x=names, y=vs, | |
| marker=dict(color=[_color(n) for n in names], | |
| line=dict(color="#ffffff", width=1)), | |
| text=[f"{v:.3g}" for v in vs], textposition="outside", | |
| textfont=dict(size=11, color="#f3f0e8"), | |
| hovertemplate="%{x}<br>" + ylabel + "=%{y:.4g}<extra></extra>")] | |
| layout = { | |
| **_PLOTLY_LAYOUT, "title": {"text": title, **_TITLE}, | |
| "height": 280, "margin": _DEFAULT_MARGIN, | |
| "xaxis": {**_AXIS}, | |
| "yaxis": {"title": ylabel, **_AXIS}, | |
| "showlegend": False, | |
| } | |
| return {"data": data, "layout": layout} | |
| # ---------- request/response models ---------- | |
| class LandscapeReq(BaseModel): | |
| template: str | |
| dim: int = 2 | |
| seed: int = 0 | |
| class BaselineReq(BaseModel): | |
| template: str | |
| seed: int = 1 | |
| class ArenaReq(BaseModel): | |
| template: str | |
| dim: int = 5 | |
| seed: int = 42 | |
| code: str | |
| # ---------- /api/landscape ---------- | |
| def _landscape_params(template: str) -> dict: | |
| if template == "quadratic": return {"cond": 10.0} | |
| if template == "gaussian_mix": return {"k": 3, "sigma": 0.5, "spread": 2.0} | |
| return {} | |
| def api_landscape(req: LandscapeReq): | |
| rng = np.random.default_rng(req.seed) | |
| dim = 2 if req.template == "himmelblau" else req.dim | |
| ls = build_landscape(template=req.template, dim=dim, | |
| params=_landscape_params(req.template), rng=rng) | |
| hints = structural_hints(ls, rng=rng) | |
| hints_rows = [[k, f"{v:.4g}" if isinstance(v, float) else str(v)] | |
| for k, v in hints.items()] | |
| hints_rows.append(["dim", str(ls.dim)]) | |
| hints_rows.append(["f_min (known)", f"{ls.f_min:.4g}"]) | |
| hints_rows.append(["description", ls.description]) | |
| return { | |
| "contour": _contour_fig(ls, title=f"{req.template} · dim={ls.dim}"), | |
| "hints": hints_rows, | |
| } | |
| # ---------- /api/baseline_race ---------- | |
| def api_baseline_race(req: BaselineReq): | |
| rng = np.random.default_rng(req.seed) | |
| ls = build_landscape(template=req.template, dim=2, | |
| params=_landscape_params(req.template), rng=rng) | |
| x0 = np.random.default_rng(req.seed + 999).normal(0.0, 0.5, size=2) | |
| traj_2d, curves, finals, lrs = {}, {}, {}, {} | |
| for name in ["sgd", "momentum", "adam", "lbfgs"]: | |
| r = run_baseline_tuned(name, ls.f, ls.grad, x0, steps=50) | |
| lrs[name] = r["lr"] | |
| traj = [s for s in r["trajectory"] if s.get("x") is not None] | |
| traj_2d[name] = [(s["x"][0], s["x"][1]) for s in traj] | |
| curves[name] = [s["f"] for s in traj if s.get("f") is not None] | |
| finals[name] = curves[name][-1] if curves[name] else float("inf") | |
| lr_list = " · ".join(f"<code>{n}</code>: <code>{lr:g}</code>" | |
| for n, lr in lrs.items()) | |
| best = min(finals, key=finals.get) | |
| return { | |
| "contour": _contour_fig(ls, trajectories=traj_2d, | |
| title=f"{req.template} — baselines racing (LR-tuned)"), | |
| "curves": _curves_fig(curves, "f(x) vs step"), | |
| "finals": _bar_fig(finals, "Final f after 50 steps", | |
| "f(x) at step 50"), | |
| "summary_md": ( | |
| f"<p><strong>{ls.description}</strong></p>" | |
| f"<p>Tuned LR per baseline (7-point sweep, 30 steps): {lr_list}</p>" | |
| f"<p>Best baseline: <code>{best}</code> at f = " | |
| f"<code>{finals[best]:.4f}</code></p>" | |
| ), | |
| } | |
| # ---------- /api/arena ---------- | |
| ADAM_TEMPLATE = """\ | |
| class Optimizer: | |
| def __init__(self, dim): | |
| self.lr = {lr} | |
| self.b1, self.b2, self.eps = 0.9, 0.999, 1e-8 | |
| self.m = np.zeros(dim); self.v = np.zeros(dim); self.t = 0 | |
| def step(self, x, f_val, grad): | |
| self.t += 1 | |
| self.m = self.b1*self.m + (1-self.b1)*grad | |
| self.v = self.b2*self.v + (1-self.b2)*grad*grad | |
| mh = self.m/(1-self.b1**self.t); vh = self.v/(1-self.b2**self.t) | |
| return x - self.lr * mh / (np.sqrt(vh) + self.eps) | |
| """ | |
| ARENA_SEEDS = [101, 202, 303, 404, 505, 606, 707, 808, 909, 1010] | |
| def api_arena(req: ArenaReq): | |
| rng = np.random.default_rng(req.seed) | |
| dim = 2 if req.template == "himmelblau" else req.dim | |
| ls = build_landscape(template=req.template, dim=dim, | |
| params=_landscape_params(req.template), rng=rng) | |
| tune_x0 = np.random.default_rng(0).normal(0.0, 0.5, size=dim) | |
| best_lr = tune_adam_lr(ls.f, ls.grad, tune_x0, sweep_steps=30) | |
| adam_src = ADAM_TEMPLATE.format(lr=best_lr) | |
| try: | |
| opt = compile_optimizer(req.code, dim=dim) | |
| except SandboxError as e: | |
| return {"error": str(e)} | |
| test = auto_test_draft(opt, ls, seed=req.seed, steps=20) | |
| user_arena = run_arena(opt, ls, seeds=ARENA_SEEDS, steps=200) | |
| adam_opt = compile_optimizer(adam_src, dim=dim) | |
| adam_arena = run_arena(adam_opt, ls, seeds=ARENA_SEEDS, steps=200) | |
| reward = compute_optcoder_reward( | |
| arena=user_arena, adam_arena=adam_arena, | |
| actions_used_cost=0, budget_total=12, | |
| novelty_score=ast_novelty_score(req.code, [adam_src]), | |
| convergence_step=None, arena_steps=200, | |
| ) | |
| # 2-D contour if applicable | |
| contour = None | |
| if dim == 2: | |
| try: | |
| from ..reference_optimizers import run_baseline as _rb | |
| except ImportError: | |
| from reference_optimizers import run_baseline as _rb # type: ignore | |
| user_traj = [(s["x"][0], s["x"][1]) for s in test["detail"]] | |
| adam_run = _rb("adam", ls.f, ls.grad, | |
| np.random.default_rng(req.seed).normal(0.0, 0.5, 2), | |
| steps=50) | |
| adam_traj = [(s["x"][0], s["x"][1]) for s in adam_run["trajectory"] | |
| if s.get("x") is not None] | |
| contour = _contour_fig(ls, | |
| trajectories={"custom": user_traj, "adam": adam_traj}, | |
| title=f"{req.template} — your optimizer vs tuned Adam") | |
| bk = reward.breakdown | |
| speedup = bk.get("speedup_vs_adam", 0.0) | |
| # Narrate the reward decomposition so users aren't confused when reward | |
| # is positive despite speedup≈1× (r_convergence + r_robustness contribute | |
| # independently of beating Adam; see §9.1 of LANDSCAPEFORGE_DESIGN.md). | |
| parts = [] | |
| if abs(bk["r_regret"] * 1.0) > 0.01: | |
| parts.append(f"regret {bk['r_regret']*1.0:+.3f}") | |
| if abs(bk["r_convergence"] * 0.3) > 0.01: | |
| parts.append(f"convergence {bk['r_convergence']*0.3:+.3f}") | |
| if abs(bk["r_robustness"] * 0.3) > 0.01: | |
| parts.append(f"robustness {bk['r_robustness']*0.3:+.3f}") | |
| if abs(bk["r_novelty"] * 0.1) > 0.01: | |
| parts.append(f"novelty {bk['r_novelty']*0.1:+.3f}") | |
| if abs(bk["r_budget"] * 0.05) > 0.01: | |
| parts.append(f"budget {-bk['r_budget']*0.05:+.3f}") | |
| if abs(bk["r_eval_failures"] * 0.5) > 0.01: | |
| parts.append(f"eval {-bk['r_eval_failures']*0.5:+.3f}") | |
| # Speedup phrasing — avoid nonsense like "0.00×" when diverged | |
| my_p, adam_p = user_arena.mean_progress, adam_arena.mean_progress | |
| if my_p < 0: | |
| speedup_line = "your optimizer <strong>diverged</strong> (f moved uphill)" | |
| elif adam_p <= 0: | |
| speedup_line = (f"Adam made no progress on this landscape; " | |
| f"your progress: <code>{my_p:.3g}</code>") | |
| else: | |
| speedup_line = (f"Speedup vs Adam: <code>{speedup:.3g}×</code> " | |
| f"(your descent <code>{my_p:.3g}</code>, Adam's " | |
| f"<code>{adam_p:.3g}</code>)") | |
| return { | |
| "contour": contour or _empty_fig(f"{req.template} · dim={dim}\nContour is 2-D only"), | |
| "progress": _bar_fig( | |
| {"custom": user_arena.mean_progress, | |
| "adam (tuned)": adam_arena.mean_progress}, | |
| "Arena mean progress", | |
| "mean(f₀ − f_N) over 10 seeds", | |
| ), | |
| "breakdown": bk, | |
| "total": reward.r_total, | |
| "summary_md": ( | |
| f"<h3>Results</h3>" | |
| f"<ul>" | |
| f"<li>{speedup_line}</li>" | |
| f"<li>Tuned Adam LR: <code>{best_lr:g}</code></li>" | |
| f"<li>Your crash fraction: <code>{user_arena.crash_fraction:.0%}</code></li>" | |
| f"<li><strong>Total reward: <code>{reward.r_total:+.3f}</code></strong>" | |
| + (f"<span style='color:#b5ada0'> " | |
| f"= {' + '.join(parts)}</span>" if parts else "") | |
| + "</li>" | |
| f"</ul>" | |
| ), | |
| } | |
| # ---------- /api/llm_run (SSE stream) ---------- | |
| def _sse(event: str, data: dict) -> str: | |
| return f"event: {event}\ndata: {json.dumps(data, default=str)}\n\n" | |
| def api_llm_run( | |
| base_url: str = Query(...), | |
| api_key: str = "", | |
| model: str = Query(...), | |
| tier: str = "T0", | |
| seed: int = 42, | |
| temperature: float = 0.7, | |
| max_turns: int = 10, | |
| ): | |
| """SSE-streamed LLM-driven episode. One event per turn.""" | |
| def gen(): | |
| url = base_url.rstrip("/") + "/chat/completions" | |
| headers = {"Content-Type": "application/json"} | |
| if api_key: | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| env = LandscapeforgeEnvironment(tier=tier, seed=int(seed)) | |
| obs = env.reset() | |
| yield _sse("message", { | |
| "kind": "header", "model": model, "base_url": base_url, | |
| "landscape": obs.landscape_description, | |
| "dim": obs.dim, "budget": obs.budget_remaining, | |
| }) | |
| for turn in range(1, int(max_turns) + 1): | |
| messages = build_prompt(obs) | |
| t0 = time.time() | |
| try: | |
| r = requests.post(url, headers=headers, json={ | |
| "model": model, "messages": messages, | |
| "temperature": float(temperature), | |
| "max_tokens": 1200, "stream": False, | |
| }, timeout=180) | |
| if r.status_code >= 400: | |
| yield _sse("message", { | |
| "kind": "error", | |
| "message": f"LLM {r.status_code}: {r.text[:400]}", | |
| }) | |
| return | |
| raw = r.json()["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| yield _sse("message", { | |
| "kind": "error", | |
| "message": f"request failed: {type(e).__name__}: {e}", | |
| }) | |
| return | |
| dt = time.time() - t0 | |
| try: | |
| action = parse_action(raw) | |
| except Exception as e: | |
| yield _sse("message", { | |
| "kind": "error", | |
| "message": f"parse error: {e}. Raw: {raw[:400]}", | |
| }) | |
| return | |
| obs = env.step(action) | |
| lar = obs.last_action_result or {} | |
| output_chips = [] | |
| if lar.get("compile_error"): | |
| output_chips.append({"kind": "bad", "text": "compile error"}) | |
| if lar.get("summary"): | |
| s = lar["summary"] | |
| if s.get("converged"): | |
| output_chips.append({"kind": "good", "text": "auto-test converged"}) | |
| elif s.get("diverged"): | |
| output_chips.append({"kind": "warn", "text": "auto-test diverged"}) | |
| if s.get("final_f") is not None: | |
| output_chips.append({ | |
| "kind": "info", | |
| "text": f"<code>final_f</code>=<b>{s['final_f']:.3g}</b>", | |
| }) | |
| if action.kind == "run_baseline" and lar.get("final_f") is not None: | |
| output_chips.append({ | |
| "kind": "info", | |
| "text": f"<code>final_f</code>=<b>{lar['final_f']:.3g}</b>", | |
| }) | |
| for k, v in (lar.get("feedback") or {}).items(): | |
| output_chips.append({ | |
| "kind": "good" if v >= 0 else "warn", | |
| "text": f"<code>{k}</code> <b>{v:+.3f}</b>", | |
| }) | |
| if action.kind == "draft": | |
| action_str = f"draft ({len(action.code or '')} chars)" | |
| elif action.kind == "run_baseline": | |
| action_str = f"run_baseline({action.baseline_name})" | |
| elif action.kind == "inspect": | |
| action_str = (f"inspect(draft={action.draft_idx}, " | |
| f"[{action.step_range_start},{action.step_range_end}])") | |
| else: | |
| action_str = "commit" | |
| yield _sse("message", { | |
| "kind": "turn", | |
| "turn": turn, "kind_of": action.kind, | |
| "action_str": action_str, "output": output_chips, | |
| "duration_s": dt, | |
| "budget_remaining": obs.budget_remaining, | |
| "code": action.code if action.kind == "draft" else None, | |
| }) | |
| if obs.done: | |
| bk = obs.r_optcoder_breakdown or {} | |
| yield _sse("message", { | |
| "kind": "done", | |
| "reason": (obs.last_action_result or {}).get("reason"), | |
| "reward": obs.r_optcoder or 0.0, | |
| "final_regret": obs.final_regret or 0.0, | |
| "my_progress": bk.get("my_progress", 0.0), | |
| "adam_progress": bk.get("adam_progress", 0.0), | |
| "speedup_vs_adam": bk.get("speedup_vs_adam", 0.0), | |
| "breakdown": bk, | |
| }) | |
| yield "event: end\ndata: {}\n\n" | |
| return | |
| yield _sse("message", { | |
| "kind": "error", | |
| "message": f"reached MAX_TURNS ({max_turns}) without commit", | |
| }) | |
| yield "event: end\ndata: {}\n\n" | |
| return StreamingResponse(gen(), media_type="text/event-stream") | |