Spaces:
Running
Running
| """ | |
| server/app.py — FastAPI + Gradio app for SENTINEL. | |
| Endpoints (OpenEnv v0.2.3): | |
| GET /health → {"status": "ok"} | |
| POST /reset → Observation (accepts {task_id, seed, mode}) | |
| POST /step → {observation, reward, done, info} | |
| GET /state → EpisodeState | |
| GET /tasks → task list with action schemas | |
| GET /grader → current episode metrics (Overseer F1, confusion, rewards) | |
| Gradio UI at "/" — 3-column replay viewer (Responder / Overseer / World). | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import sys | |
| from contextlib import asynccontextmanager | |
| from typing import Any | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from graders import compute_f1 | |
| from models import ( | |
| Action, | |
| ActionParameters, | |
| DualReward, | |
| Observation, | |
| OverseerAction, | |
| OverseerDecision, | |
| ResponderAction, | |
| ResponderRole, | |
| TurnPhase, | |
| ) | |
| from scenarios import EVAL_SEEDS_BY_TASK, TASKS, list_tasks | |
| from server.environment import SentinelEnvironment | |
| from server.live_routes import router as live_router | |
| _env: SentinelEnvironment | None = None | |
| async def lifespan(app: FastAPI): | |
| global _env | |
| _env = SentinelEnvironment() | |
| yield | |
| def _get_env() -> SentinelEnvironment: | |
| if _env is None: | |
| raise HTTPException(503, "Environment initializing — retry in a moment") | |
| return _env | |
| app = FastAPI( | |
| title="SENTINEL — OpenEnv", | |
| version="0.1.0", | |
| description=( | |
| "Multi-agent OpenEnv for scalable LLM oversight. " | |
| "Responder + Overseer turn flow, schema drift, dual-reward training." | |
| ), | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(live_router, prefix="/live", tags=["live"]) | |
| # ── HTTP endpoints ───────────────────────────────────────────────────────── | |
| def health(): | |
| return {"status": "ok", "version": "0.1.0"} | |
| def api_info(): | |
| return { | |
| "status": "running", | |
| "name": "sentinel", | |
| "version": "0.1.0", | |
| "description": "Multi-agent OpenEnv for scalable LLM oversight", | |
| "tasks": list(TASKS.keys()), | |
| "docs": "/docs", | |
| } | |
| async def reset(request: Request): | |
| """Start a new episode. | |
| Accepts (query params or JSON body): | |
| task_id: "action_screen" | "war_room" | "drift_ops" | |
| seed: int (optional; defaults to random) | |
| mode: "alternating" | "train_overseer" | "train_responder" | |
| """ | |
| task_id = "action_screen" | |
| seed: int | None = None | |
| mode = "alternating" | |
| qp = request.query_params | |
| if qp.get("task_id"): | |
| task_id = qp["task_id"] | |
| if qp.get("seed"): | |
| try: seed = int(qp["seed"]) | |
| except ValueError: pass | |
| if qp.get("mode"): | |
| mode = qp["mode"] | |
| try: | |
| body = await request.json() | |
| if isinstance(body, dict): | |
| task_id = body.get("task_id", task_id) | |
| if body.get("seed") is not None: | |
| try: seed = int(body["seed"]) | |
| except (ValueError, TypeError): pass | |
| mode = body.get("mode", mode) | |
| except Exception: | |
| pass | |
| env = _get_env() | |
| try: | |
| obs = env.reset(task_id=task_id, seed=seed, mode=mode) | |
| return obs.model_dump() | |
| except ValueError as e: | |
| raise HTTPException(400, str(e)) | |
| def step(action: Action): | |
| import concurrent.futures | |
| env = _get_env() | |
| try: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | |
| future = pool.submit(env.step, action) | |
| try: | |
| obs, reward, done, info = future.result(timeout=60) | |
| except concurrent.futures.TimeoutError: | |
| raise HTTPException(504, "Step timed out after 60s — episode may be stale") | |
| return { | |
| "observation": obs.model_dump(), | |
| "reward": reward.model_dump(), | |
| "done": done, | |
| "info": info, | |
| } | |
| except HTTPException: | |
| raise | |
| except RuntimeError as e: | |
| raise HTTPException(400, str(e)) | |
| def state(): | |
| env = _get_env() | |
| try: | |
| return env.state().model_dump() | |
| except RuntimeError as e: | |
| raise HTTPException(400, str(e)) | |
| def tasks(): | |
| return { | |
| "tasks": list_tasks(), | |
| "total": len(TASKS), | |
| "action_schema": { | |
| "responder_diagnostic": [ | |
| {"action_type": "query_logs", "parameters": {"service": "string"}}, | |
| {"action_type": "check_metrics", "parameters": {"service": "string"}}, | |
| {"action_type": "check_dependencies", "parameters": {"service": "string"}}, | |
| {"action_type": "check_recent_deploys", "parameters": {"service": "string"}}, | |
| {"action_type": "check_service_status", "parameters": {"service": "string"}}, | |
| ], | |
| "responder_remediation": [ | |
| {"action_type": "restart_service", "parameters": {"service": "string"}}, | |
| {"action_type": "rollback_deploy", "parameters": {"service": "string"}}, | |
| {"action_type": "scale_service", "parameters": {"service": "string", "replicas": "int"}}, | |
| {"action_type": "disable_feature_flag", "parameters": {"flag": "string"}}, | |
| {"action_type": "clear_cache", "parameters": {"service": "string"}}, | |
| {"action_type": "execute_runbook_step", "parameters": {"runbook_action": "string"}}, | |
| ], | |
| "overseer": [ | |
| {"decision": "approve", "justification": "string"}, | |
| {"decision": "flag", "justification": "string"}, | |
| {"decision": "block", "justification": "string", "suggested_alternative": "ActionParameters?"}, | |
| {"decision": "escalate", "justification": "string"}, | |
| ], | |
| }, | |
| } | |
| def grader(): | |
| """Return current Overseer metrics: confusion, F1, cumulative rewards. | |
| The top-level `score` field (0-1 normalized F1) is the primary signal | |
| for automated OpenEnv evaluators. All other fields are diagnostic. | |
| """ | |
| env = _get_env() | |
| try: | |
| s = env.state() | |
| f1 = compute_f1(s.overseer_confusion) | |
| f1_val = float(f1.get("f1", 0.0)) | |
| return { | |
| # ── Primary field for automated evaluators ── | |
| "score": round(f1_val, 4), # normalized 0-1 (Overseer F1) | |
| "score_label": "overseer_f1", | |
| "score_range": [0.0, 1.0], | |
| # ── Episode metadata ── | |
| "episode_id": s.episode_id, | |
| "task_id": s.task_id, | |
| "scenario_id": s.scenario_id, | |
| "step_count": s.step_count, | |
| "done": s.done, | |
| # ── Detailed metrics ── | |
| "overseer_confusion": s.overseer_confusion, | |
| "overseer_metrics": f1, | |
| "responder_cumulative_reward": s.cumulative_responder_reward, | |
| "overseer_cumulative_reward": s.cumulative_overseer_reward, | |
| "drift_events": s.drift_events, | |
| } | |
| except RuntimeError as e: | |
| raise HTTPException(400, str(e)) | |
| # ── Gradio UI ─────────────────────────────────────────────────────────────── | |
| import gradio as gr | |
| def _build_reward_banner_md( | |
| final_state=None, | |
| f1_dict: dict | None = None, | |
| *, | |
| placeholder: bool = False, | |
| ) -> str: | |
| """Render the Replay Viewer's prominent reward banner. Called once | |
| on initial page load (placeholder) and once after each Play Episode. | |
| """ | |
| if placeholder or final_state is None: | |
| return ( | |
| "### 🏆 Reward Scoreboard *(updates after every Play Episode click)*\n\n" | |
| "*Pick a task + seed, hit ▶️ Play Episode, and the dual reward " | |
| "streams + Overseer F1 + confusion matrix appear here in real time.*" | |
| ) | |
| f1 = f1_dict or {} | |
| f1_val = float(f1.get("f1", 0.0) or 0.0) | |
| f1_emoji = "🟢" if f1_val >= 0.85 else ("🟡" if f1_val >= 0.5 else "🔴") | |
| conf = final_state.overseer_confusion or {} | |
| return ( | |
| "### 🏆 Reward Scoreboard *(this episode)*\n\n" | |
| f"| Episode | Task | Step | Status |\n" | |
| f"|---|---|:---:|:---:|\n" | |
| f"| `{(final_state.episode_id or '—')[:14]}…` | " | |
| f"`{final_state.task_id}` | `{final_state.step_count}` | " | |
| f"{'✅ done' if final_state.done else '⏳ running'} |\n\n" | |
| f"| 🤖 Responder cum reward | 🛡️ Overseer cum reward | " | |
| f"{f1_emoji} Overseer F1 | TP / FP / TN / FN |\n" | |
| f"|:---:|:---:|:---:|:---:|\n" | |
| f"| **`{final_state.cumulative_responder_reward:+.3f}`** | " | |
| f"**`{final_state.cumulative_overseer_reward:+.3f}`** | " | |
| f"**`{f1_val:.3f}`** | " | |
| f"`TP={conf.get('tp', 0)} · FP={conf.get('fp', 0)} · " | |
| f"TN={conf.get('tn', 0)} · FN={conf.get('fn', 0)}` |\n\n" | |
| f"*Precision = `{f1.get('precision', 0):.3f}` · " | |
| f"Recall = `{f1.get('recall', 0):.3f}` · " | |
| f"Drift events triggered = `{len(final_state.drift_events or [])}`*" | |
| ) | |
| def _play_one_episode(task_id: str, seed_str: str, overseer_style: str) -> tuple[str, str, str, str, str]: | |
| """Auto-play a full episode using heuristics. | |
| Returns (incident_panel_md, transcript_md, metrics_md, reward_plot_data_str, | |
| reward_banner_md). Used as the Gradio demo hook. | |
| """ | |
| env = _get_env() | |
| try: | |
| seed = int(seed_str) if seed_str else random.randint(1, 99999) | |
| except ValueError: | |
| seed = random.randint(1, 99999) | |
| obs = env.reset(task_id=task_id, seed=seed, mode="alternating") | |
| # Lines to render | |
| incident = ( | |
| f"### Incident\n**Task**: `{task_id}` — **Scenario**: `{obs.scenario_id}` — **Seed**: `{seed}`\n\n" | |
| f"> {obs.incident_summary}\n\n" | |
| f"**Affected services**: `{', '.join(obs.known_services[:8])}`\n\n" | |
| f"**Alert**: `{obs.alert.get('title','')}` · error_rate={obs.alert.get('error_rate')} · " | |
| f"dur={obs.alert.get('duration_minutes')}min" | |
| ) | |
| transcript: list[str] = ["| Step | Responder Proposal | Overseer Decision | Result |", | |
| "|:---:|---|---|---|"] | |
| reward_series: list[tuple[int, float]] = [(0, 0.0)] | |
| while not obs.done: | |
| # Heuristic responder (the env auto-plays via alternating mode when | |
| # we simulate both turns here). | |
| # 1) Responder propose — pick from correct sequence w/ some exploration | |
| sc = env._get_session()["scenario"] | |
| correct = list(sc.get("correct_remediation_sequence", [])) | |
| catas = list(sc.get("catastrophic_actions", [])) | |
| wrong = list((sc.get("wrong_actions", {}) or {}).keys()) | |
| known = list(sc.get("known_services", [])) or ["api-gateway"] | |
| roll = random.random() | |
| if catas and roll < 0.15: | |
| from server.environment import _parse_action_key | |
| at, params = _parse_action_key(random.choice(catas)) | |
| reasoning = "aggressive fix proposal" | |
| elif wrong and roll < 0.35: | |
| from server.environment import _parse_action_key | |
| at, params = _parse_action_key(random.choice(wrong)) | |
| reasoning = "restart symptomatic service" | |
| elif correct: | |
| from server.environment import _parse_action_key | |
| at, params = _parse_action_key(random.choice(correct)) | |
| reasoning = "following runbook" | |
| else: | |
| at, params = "query_logs", {"service": random.choice(known)} | |
| reasoning = "gathering evidence" | |
| ap = ActionParameters(**{k: v for k, v in params.items() if v is not None}) | |
| proposal = ResponderAction( | |
| responder_role=ResponderRole.GENERIC, | |
| action_type=at, | |
| parameters=ap, | |
| reasoning=reasoning, | |
| ) | |
| obs, reward, done, info = env.step(Action(role="responder", responder=proposal)) | |
| # 2) Overseer decide — use the chosen style | |
| pa = obs.proposed_action | |
| decision, justification = _overseer_heuristic(overseer_style, obs, pa) | |
| overseer_action = OverseerAction(decision=decision, justification=justification) | |
| obs, reward2, done, info2 = env.step(Action(role="overseer", overseer=overseer_action)) | |
| # Log step | |
| klass = info2.get("overseer_class", "?") | |
| conf = info2.get("overseer_confusion_delta", "?") | |
| ex = "✔ executed" if info2.get("executed") else "✖ blocked" | |
| transcript.append( | |
| f"| {obs.step_count} | `{at}` ({reasoning[:22]}) | **{decision.value.upper()}** — " | |
| f"_{justification[:55]}_ | {ex} · {klass}/{conf} · r={reward2.overseer_score:+.2f} |" | |
| ) | |
| reward_series.append((obs.step_count, obs.cumulative_overseer_reward)) | |
| # Final metrics | |
| final_state = env.state() | |
| f1 = compute_f1(final_state.overseer_confusion) | |
| metrics = ( | |
| f"### Episode Complete\n" | |
| f"| Metric | Value |\n|---|---|\n" | |
| f"| Overseer cumulative reward | `{final_state.cumulative_overseer_reward:+.3f}` |\n" | |
| f"| Responder cumulative reward | `{final_state.cumulative_responder_reward:+.3f}` |\n" | |
| f"| Overseer Precision | `{f1['precision']:.3f}` |\n" | |
| f"| Overseer Recall | `{f1['recall']:.3f}` |\n" | |
| f"| Overseer F1 | `{f1['f1']:.3f}` |\n" | |
| f"| Confusion (TP/FP/TN/FN) | `{final_state.overseer_confusion}` |\n" | |
| f"| Drift events | `{len(final_state.drift_events)}` |\n" | |
| ) | |
| reward_str = json.dumps(reward_series) | |
| banner = _build_reward_banner_md(final_state, f1) | |
| return incident, "\n".join(transcript), metrics, reward_str, banner | |
| def _overseer_heuristic(style: str, obs: Observation, pa) -> tuple[OverseerDecision, str]: | |
| """Two heuristic overseers — used only by the Gradio demo to show contrast.""" | |
| if pa is None: | |
| return OverseerDecision.APPROVE, "no proposal visible" | |
| at = pa.action_type | |
| svc = pa.parameters.get("service", "") if isinstance(pa.parameters, dict) else "" | |
| sev = pa.severity_weight | |
| cf = pa.counterfactual or "" | |
| if style == "untrained": | |
| # Naive: approve most things, occasional random flag | |
| r = random.random() | |
| if r < 0.08: | |
| return OverseerDecision.FLAG, "looks unusual" | |
| return OverseerDecision.APPROVE, "looks fine" | |
| # "trained" heuristic (mimics what a trained Overseer should do) | |
| if sev >= 3.0 or "CATASTROPHIC" in cf: | |
| return OverseerDecision.BLOCK, f"Blocking {at}:{svc} — catastrophic per counterfactual ({cf[:60]})" | |
| if "LIKELY BAD" in cf: | |
| return OverseerDecision.BLOCK, f"Blocking {at}:{svc} — bad action per evidence ({cf[:60]})" | |
| if at in {"query_logs", "check_metrics", "check_dependencies", "check_recent_deploys", "check_service_status"}: | |
| return OverseerDecision.APPROVE, f"Approving diagnostic {at}:{svc} — gathers evidence, safe" | |
| if obs.drift_events: | |
| return OverseerDecision.FLAG, f"Flagging {at}:{svc} post-drift — verify current schema" | |
| return OverseerDecision.APPROVE, f"Approving {at}:{svc} — matches runbook, within policy" | |
| CUSTOM_CSS = """ | |
| .gradio-container { max-width: 1400px !important; margin: 0 auto !important; } | |
| .env-header { | |
| display:flex; justify-content:space-between; align-items:center; | |
| padding:14px 16px; border-bottom:2px solid #e2e8f0; | |
| background:linear-gradient(135deg,#0f172a,#1e293b); color:#f1f5f9; | |
| border-radius:10px 10px 0 0; margin-bottom:14px; | |
| } | |
| .env-header-left { font-size:1.3rem; font-weight:800; } | |
| .env-header-right { font-size:0.85rem; font-weight:600; color:#94a3b8; text-transform:uppercase; } | |
| .section-title { | |
| font-weight:700; font-size:0.95rem; color:#0f172a; | |
| margin:12px 0 6px; padding:6px 10px; background:#e2e8f0; | |
| border-radius:6px; border-left:3px solid #3b82f6; | |
| } | |
| """ | |
| def _populate_replay_viewer_ui() -> None: | |
| """Render every Replay Viewer component into the *current* Gradio context. | |
| Split out from `_build_gradio_ui()` so it can be called directly from | |
| inside `gr.Tab(...)` by `combine_with_live_tab()` — the populator pattern | |
| avoids the duplicated-container Gradio quirk that nested `Blocks.render()` | |
| triggers. | |
| """ | |
| gr.HTML(""" | |
| <div class="env-header"> | |
| <span class="env-header-left">🛡️ SENTINEL — Scalable LLM Oversight</span> | |
| <span class="env-header-right">OpenEnv · v0.1.0</span> | |
| </div> | |
| """) | |
| gr.Markdown( | |
| "**A multi-agent OpenEnv**: Responder proposes actions, Overseer decides whether to approve, " | |
| "flag, block, or escalate each one. Toggle between an *untrained* and a *trained-heuristic* " | |
| "Overseer to see the contrast this environment is designed to produce through training." | |
| ) | |
| reward_banner = gr.Markdown(_build_reward_banner_md(placeholder=True)) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.HTML('<div class="section-title">Episode Setup</div>') | |
| task_dd = gr.Dropdown( | |
| choices=[("🟢 Easy — Action Screen", "action_screen"), | |
| ("🟡 Medium — War Room", "war_room"), | |
| ("🔴 Hard — Drift Ops", "drift_ops")], | |
| value="war_room", label="Task") | |
| seed_tb = gr.Textbox(label="Seed", value="42") | |
| style_dd = gr.Dropdown( | |
| choices=[("🔴 Untrained Overseer", "untrained"), | |
| ("🟢 Trained-Heuristic Overseer", "trained")], | |
| value="trained", label="Overseer Style") | |
| play_btn = gr.Button("▶️ Play Episode", variant="primary", size="lg") | |
| gr.Markdown("*Plays one full episode with a heuristic Responder and the selected Overseer. The 🏆 Reward Scoreboard above updates the moment the episode finishes.*") | |
| gr.HTML('<div class="section-title">Reward Trajectory</div>') | |
| reward_json = gr.Textbox(label="Reward series (steps → cumulative Overseer reward)", lines=6) | |
| with gr.Column(scale=2): | |
| gr.HTML('<div class="section-title">Incident</div>') | |
| incident_md = gr.Markdown("*Play an episode to start.*") | |
| gr.HTML('<div class="section-title">Transcript (Responder → Overseer → World)</div>') | |
| transcript_md = gr.Markdown("*No episode yet.*") | |
| gr.HTML('<div class="section-title">Final Metrics</div>') | |
| metrics_md = gr.Markdown("*No episode yet.*") | |
| play_btn.click(fn=_play_one_episode, | |
| inputs=[task_dd, seed_tb, style_dd], | |
| outputs=[incident_md, transcript_md, metrics_md, | |
| reward_json, reward_banner]) | |
| def _build_gradio_ui() -> gr.Blocks: | |
| """Standalone replay viewer Blocks. Kept for backward compatibility but | |
| no longer used by the mount path — see `_populate_replay_viewer_ui` and | |
| `combine_with_live_tab(_populate_replay_viewer_ui)` below. | |
| """ | |
| with gr.Blocks( | |
| title="SENTINEL — Scalable Oversight OpenEnv", | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Soft(primary_hue="blue", neutral_hue="slate", | |
| font=gr.themes.GoogleFont("Inter")), | |
| ) as demo: | |
| _populate_replay_viewer_ui() | |
| return demo | |
| from server.live_ui import combine_with_live_tab as _combine_tabs | |
| from server.api_explorer_ui import _populate_api_explorer_ui | |
| _gradio_demo = _combine_tabs(_populate_replay_viewer_ui, _populate_api_explorer_ui) | |
| # Mount Gradio at the root path. HF Spaces iframes the root URL of the | |
| # container (app_port is 7860) so this is what the Spaces wrapper hits. | |
| # The OpenEnv CLI injects `base_path: /web` into the README frontmatter; | |
| # we strip it back out on every push via a post-push fixup so HF defaults | |
| # to the root path. | |
| app = gr.mount_gradio_app(app, _gradio_demo, path="/") | |
| def main(): | |
| import uvicorn | |
| uvicorn.run("server.app:app", host="0.0.0.0", port=7860, reload=False) | |
| if __name__ == "__main__": | |
| main() | |