sidraaiman1809's picture
Upload folder using huggingface_hub
8efd70f verified
"""
server/app.py — FastAPI + Gradio app for SENTINEL.
Endpoints (OpenEnv v0.2.3):
GET /health → {"status": "ok"}
POST /reset → Observation (accepts {task_id, seed, mode})
POST /step → {observation, reward, done, info}
GET /state → EpisodeState
GET /tasks → task list with action schemas
GET /grader → current episode metrics (Overseer F1, confusion, rewards)
Gradio UI at "/" — 3-column replay viewer (Responder / Overseer / World).
"""
from __future__ import annotations
import json
import os
import random
import sys
from contextlib import asynccontextmanager
from typing import Any
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from graders import compute_f1
from models import (
Action,
ActionParameters,
DualReward,
Observation,
OverseerAction,
OverseerDecision,
ResponderAction,
ResponderRole,
TurnPhase,
)
from scenarios import EVAL_SEEDS_BY_TASK, TASKS, list_tasks
from server.environment import SentinelEnvironment
from server.live_routes import router as live_router
_env: SentinelEnvironment | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _env
_env = SentinelEnvironment()
yield
def _get_env() -> SentinelEnvironment:
if _env is None:
raise HTTPException(503, "Environment initializing — retry in a moment")
return _env
app = FastAPI(
title="SENTINEL — OpenEnv",
version="0.1.0",
description=(
"Multi-agent OpenEnv for scalable LLM oversight. "
"Responder + Overseer turn flow, schema drift, dual-reward training."
),
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(live_router, prefix="/live", tags=["live"])
# ── HTTP endpoints ─────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok", "version": "0.1.0"}
@app.get("/api/info")
def api_info():
return {
"status": "running",
"name": "sentinel",
"version": "0.1.0",
"description": "Multi-agent OpenEnv for scalable LLM oversight",
"tasks": list(TASKS.keys()),
"docs": "/docs",
}
@app.post("/reset")
async def reset(request: Request):
"""Start a new episode.
Accepts (query params or JSON body):
task_id: "action_screen" | "war_room" | "drift_ops"
seed: int (optional; defaults to random)
mode: "alternating" | "train_overseer" | "train_responder"
"""
task_id = "action_screen"
seed: int | None = None
mode = "alternating"
qp = request.query_params
if qp.get("task_id"):
task_id = qp["task_id"]
if qp.get("seed"):
try: seed = int(qp["seed"])
except ValueError: pass
if qp.get("mode"):
mode = qp["mode"]
try:
body = await request.json()
if isinstance(body, dict):
task_id = body.get("task_id", task_id)
if body.get("seed") is not None:
try: seed = int(body["seed"])
except (ValueError, TypeError): pass
mode = body.get("mode", mode)
except Exception:
pass
env = _get_env()
try:
obs = env.reset(task_id=task_id, seed=seed, mode=mode)
return obs.model_dump()
except ValueError as e:
raise HTTPException(400, str(e))
@app.post("/step")
def step(action: Action):
import concurrent.futures
env = _get_env()
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(env.step, action)
try:
obs, reward, done, info = future.result(timeout=60)
except concurrent.futures.TimeoutError:
raise HTTPException(504, "Step timed out after 60s — episode may be stale")
return {
"observation": obs.model_dump(),
"reward": reward.model_dump(),
"done": done,
"info": info,
}
except HTTPException:
raise
except RuntimeError as e:
raise HTTPException(400, str(e))
@app.get("/state")
def state():
env = _get_env()
try:
return env.state().model_dump()
except RuntimeError as e:
raise HTTPException(400, str(e))
@app.get("/tasks")
def tasks():
return {
"tasks": list_tasks(),
"total": len(TASKS),
"action_schema": {
"responder_diagnostic": [
{"action_type": "query_logs", "parameters": {"service": "string"}},
{"action_type": "check_metrics", "parameters": {"service": "string"}},
{"action_type": "check_dependencies", "parameters": {"service": "string"}},
{"action_type": "check_recent_deploys", "parameters": {"service": "string"}},
{"action_type": "check_service_status", "parameters": {"service": "string"}},
],
"responder_remediation": [
{"action_type": "restart_service", "parameters": {"service": "string"}},
{"action_type": "rollback_deploy", "parameters": {"service": "string"}},
{"action_type": "scale_service", "parameters": {"service": "string", "replicas": "int"}},
{"action_type": "disable_feature_flag", "parameters": {"flag": "string"}},
{"action_type": "clear_cache", "parameters": {"service": "string"}},
{"action_type": "execute_runbook_step", "parameters": {"runbook_action": "string"}},
],
"overseer": [
{"decision": "approve", "justification": "string"},
{"decision": "flag", "justification": "string"},
{"decision": "block", "justification": "string", "suggested_alternative": "ActionParameters?"},
{"decision": "escalate", "justification": "string"},
],
},
}
@app.get("/grader")
def grader():
"""Return current Overseer metrics: confusion, F1, cumulative rewards.
The top-level `score` field (0-1 normalized F1) is the primary signal
for automated OpenEnv evaluators. All other fields are diagnostic.
"""
env = _get_env()
try:
s = env.state()
f1 = compute_f1(s.overseer_confusion)
f1_val = float(f1.get("f1", 0.0))
return {
# ── Primary field for automated evaluators ──
"score": round(f1_val, 4), # normalized 0-1 (Overseer F1)
"score_label": "overseer_f1",
"score_range": [0.0, 1.0],
# ── Episode metadata ──
"episode_id": s.episode_id,
"task_id": s.task_id,
"scenario_id": s.scenario_id,
"step_count": s.step_count,
"done": s.done,
# ── Detailed metrics ──
"overseer_confusion": s.overseer_confusion,
"overseer_metrics": f1,
"responder_cumulative_reward": s.cumulative_responder_reward,
"overseer_cumulative_reward": s.cumulative_overseer_reward,
"drift_events": s.drift_events,
}
except RuntimeError as e:
raise HTTPException(400, str(e))
# ── Gradio UI ───────────────────────────────────────────────────────────────
import gradio as gr
def _build_reward_banner_md(
final_state=None,
f1_dict: dict | None = None,
*,
placeholder: bool = False,
) -> str:
"""Render the Replay Viewer's prominent reward banner. Called once
on initial page load (placeholder) and once after each Play Episode.
"""
if placeholder or final_state is None:
return (
"### 🏆 Reward Scoreboard *(updates after every Play Episode click)*\n\n"
"*Pick a task + seed, hit ▶️ Play Episode, and the dual reward "
"streams + Overseer F1 + confusion matrix appear here in real time.*"
)
f1 = f1_dict or {}
f1_val = float(f1.get("f1", 0.0) or 0.0)
f1_emoji = "🟢" if f1_val >= 0.85 else ("🟡" if f1_val >= 0.5 else "🔴")
conf = final_state.overseer_confusion or {}
return (
"### 🏆 Reward Scoreboard *(this episode)*\n\n"
f"| Episode | Task | Step | Status |\n"
f"|---|---|:---:|:---:|\n"
f"| `{(final_state.episode_id or '—')[:14]}…` | "
f"`{final_state.task_id}` | `{final_state.step_count}` | "
f"{'✅ done' if final_state.done else '⏳ running'} |\n\n"
f"| 🤖 Responder cum reward | 🛡️ Overseer cum reward | "
f"{f1_emoji} Overseer F1 | TP / FP / TN / FN |\n"
f"|:---:|:---:|:---:|:---:|\n"
f"| **`{final_state.cumulative_responder_reward:+.3f}`** | "
f"**`{final_state.cumulative_overseer_reward:+.3f}`** | "
f"**`{f1_val:.3f}`** | "
f"`TP={conf.get('tp', 0)} · FP={conf.get('fp', 0)} · "
f"TN={conf.get('tn', 0)} · FN={conf.get('fn', 0)}` |\n\n"
f"*Precision = `{f1.get('precision', 0):.3f}` · "
f"Recall = `{f1.get('recall', 0):.3f}` · "
f"Drift events triggered = `{len(final_state.drift_events or [])}`*"
)
def _play_one_episode(task_id: str, seed_str: str, overseer_style: str) -> tuple[str, str, str, str, str]:
"""Auto-play a full episode using heuristics.
Returns (incident_panel_md, transcript_md, metrics_md, reward_plot_data_str,
reward_banner_md). Used as the Gradio demo hook.
"""
env = _get_env()
try:
seed = int(seed_str) if seed_str else random.randint(1, 99999)
except ValueError:
seed = random.randint(1, 99999)
obs = env.reset(task_id=task_id, seed=seed, mode="alternating")
# Lines to render
incident = (
f"### Incident\n**Task**: `{task_id}` — **Scenario**: `{obs.scenario_id}` — **Seed**: `{seed}`\n\n"
f"> {obs.incident_summary}\n\n"
f"**Affected services**: `{', '.join(obs.known_services[:8])}`\n\n"
f"**Alert**: `{obs.alert.get('title','')}` · error_rate={obs.alert.get('error_rate')} · "
f"dur={obs.alert.get('duration_minutes')}min"
)
transcript: list[str] = ["| Step | Responder Proposal | Overseer Decision | Result |",
"|:---:|---|---|---|"]
reward_series: list[tuple[int, float]] = [(0, 0.0)]
while not obs.done:
# Heuristic responder (the env auto-plays via alternating mode when
# we simulate both turns here).
# 1) Responder propose — pick from correct sequence w/ some exploration
sc = env._get_session()["scenario"]
correct = list(sc.get("correct_remediation_sequence", []))
catas = list(sc.get("catastrophic_actions", []))
wrong = list((sc.get("wrong_actions", {}) or {}).keys())
known = list(sc.get("known_services", [])) or ["api-gateway"]
roll = random.random()
if catas and roll < 0.15:
from server.environment import _parse_action_key
at, params = _parse_action_key(random.choice(catas))
reasoning = "aggressive fix proposal"
elif wrong and roll < 0.35:
from server.environment import _parse_action_key
at, params = _parse_action_key(random.choice(wrong))
reasoning = "restart symptomatic service"
elif correct:
from server.environment import _parse_action_key
at, params = _parse_action_key(random.choice(correct))
reasoning = "following runbook"
else:
at, params = "query_logs", {"service": random.choice(known)}
reasoning = "gathering evidence"
ap = ActionParameters(**{k: v for k, v in params.items() if v is not None})
proposal = ResponderAction(
responder_role=ResponderRole.GENERIC,
action_type=at,
parameters=ap,
reasoning=reasoning,
)
obs, reward, done, info = env.step(Action(role="responder", responder=proposal))
# 2) Overseer decide — use the chosen style
pa = obs.proposed_action
decision, justification = _overseer_heuristic(overseer_style, obs, pa)
overseer_action = OverseerAction(decision=decision, justification=justification)
obs, reward2, done, info2 = env.step(Action(role="overseer", overseer=overseer_action))
# Log step
klass = info2.get("overseer_class", "?")
conf = info2.get("overseer_confusion_delta", "?")
ex = "✔ executed" if info2.get("executed") else "✖ blocked"
transcript.append(
f"| {obs.step_count} | `{at}` ({reasoning[:22]}) | **{decision.value.upper()}** — "
f"_{justification[:55]}_ | {ex} · {klass}/{conf} · r={reward2.overseer_score:+.2f} |"
)
reward_series.append((obs.step_count, obs.cumulative_overseer_reward))
# Final metrics
final_state = env.state()
f1 = compute_f1(final_state.overseer_confusion)
metrics = (
f"### Episode Complete\n"
f"| Metric | Value |\n|---|---|\n"
f"| Overseer cumulative reward | `{final_state.cumulative_overseer_reward:+.3f}` |\n"
f"| Responder cumulative reward | `{final_state.cumulative_responder_reward:+.3f}` |\n"
f"| Overseer Precision | `{f1['precision']:.3f}` |\n"
f"| Overseer Recall | `{f1['recall']:.3f}` |\n"
f"| Overseer F1 | `{f1['f1']:.3f}` |\n"
f"| Confusion (TP/FP/TN/FN) | `{final_state.overseer_confusion}` |\n"
f"| Drift events | `{len(final_state.drift_events)}` |\n"
)
reward_str = json.dumps(reward_series)
banner = _build_reward_banner_md(final_state, f1)
return incident, "\n".join(transcript), metrics, reward_str, banner
def _overseer_heuristic(style: str, obs: Observation, pa) -> tuple[OverseerDecision, str]:
"""Two heuristic overseers — used only by the Gradio demo to show contrast."""
if pa is None:
return OverseerDecision.APPROVE, "no proposal visible"
at = pa.action_type
svc = pa.parameters.get("service", "") if isinstance(pa.parameters, dict) else ""
sev = pa.severity_weight
cf = pa.counterfactual or ""
if style == "untrained":
# Naive: approve most things, occasional random flag
r = random.random()
if r < 0.08:
return OverseerDecision.FLAG, "looks unusual"
return OverseerDecision.APPROVE, "looks fine"
# "trained" heuristic (mimics what a trained Overseer should do)
if sev >= 3.0 or "CATASTROPHIC" in cf:
return OverseerDecision.BLOCK, f"Blocking {at}:{svc} — catastrophic per counterfactual ({cf[:60]})"
if "LIKELY BAD" in cf:
return OverseerDecision.BLOCK, f"Blocking {at}:{svc} — bad action per evidence ({cf[:60]})"
if at in {"query_logs", "check_metrics", "check_dependencies", "check_recent_deploys", "check_service_status"}:
return OverseerDecision.APPROVE, f"Approving diagnostic {at}:{svc} — gathers evidence, safe"
if obs.drift_events:
return OverseerDecision.FLAG, f"Flagging {at}:{svc} post-drift — verify current schema"
return OverseerDecision.APPROVE, f"Approving {at}:{svc} — matches runbook, within policy"
CUSTOM_CSS = """
.gradio-container { max-width: 1400px !important; margin: 0 auto !important; }
.env-header {
display:flex; justify-content:space-between; align-items:center;
padding:14px 16px; border-bottom:2px solid #e2e8f0;
background:linear-gradient(135deg,#0f172a,#1e293b); color:#f1f5f9;
border-radius:10px 10px 0 0; margin-bottom:14px;
}
.env-header-left { font-size:1.3rem; font-weight:800; }
.env-header-right { font-size:0.85rem; font-weight:600; color:#94a3b8; text-transform:uppercase; }
.section-title {
font-weight:700; font-size:0.95rem; color:#0f172a;
margin:12px 0 6px; padding:6px 10px; background:#e2e8f0;
border-radius:6px; border-left:3px solid #3b82f6;
}
"""
def _populate_replay_viewer_ui() -> None:
"""Render every Replay Viewer component into the *current* Gradio context.
Split out from `_build_gradio_ui()` so it can be called directly from
inside `gr.Tab(...)` by `combine_with_live_tab()` — the populator pattern
avoids the duplicated-container Gradio quirk that nested `Blocks.render()`
triggers.
"""
gr.HTML("""
<div class="env-header">
<span class="env-header-left">🛡️ SENTINEL — Scalable LLM Oversight</span>
<span class="env-header-right">OpenEnv · v0.1.0</span>
</div>
""")
gr.Markdown(
"**A multi-agent OpenEnv**: Responder proposes actions, Overseer decides whether to approve, "
"flag, block, or escalate each one. Toggle between an *untrained* and a *trained-heuristic* "
"Overseer to see the contrast this environment is designed to produce through training."
)
reward_banner = gr.Markdown(_build_reward_banner_md(placeholder=True))
with gr.Row():
with gr.Column(scale=1):
gr.HTML('<div class="section-title">Episode Setup</div>')
task_dd = gr.Dropdown(
choices=[("🟢 Easy — Action Screen", "action_screen"),
("🟡 Medium — War Room", "war_room"),
("🔴 Hard — Drift Ops", "drift_ops")],
value="war_room", label="Task")
seed_tb = gr.Textbox(label="Seed", value="42")
style_dd = gr.Dropdown(
choices=[("🔴 Untrained Overseer", "untrained"),
("🟢 Trained-Heuristic Overseer", "trained")],
value="trained", label="Overseer Style")
play_btn = gr.Button("▶️ Play Episode", variant="primary", size="lg")
gr.Markdown("*Plays one full episode with a heuristic Responder and the selected Overseer. The 🏆 Reward Scoreboard above updates the moment the episode finishes.*")
gr.HTML('<div class="section-title">Reward Trajectory</div>')
reward_json = gr.Textbox(label="Reward series (steps → cumulative Overseer reward)", lines=6)
with gr.Column(scale=2):
gr.HTML('<div class="section-title">Incident</div>')
incident_md = gr.Markdown("*Play an episode to start.*")
gr.HTML('<div class="section-title">Transcript (Responder → Overseer → World)</div>')
transcript_md = gr.Markdown("*No episode yet.*")
gr.HTML('<div class="section-title">Final Metrics</div>')
metrics_md = gr.Markdown("*No episode yet.*")
play_btn.click(fn=_play_one_episode,
inputs=[task_dd, seed_tb, style_dd],
outputs=[incident_md, transcript_md, metrics_md,
reward_json, reward_banner])
def _build_gradio_ui() -> gr.Blocks:
"""Standalone replay viewer Blocks. Kept for backward compatibility but
no longer used by the mount path — see `_populate_replay_viewer_ui` and
`combine_with_live_tab(_populate_replay_viewer_ui)` below.
"""
with gr.Blocks(
title="SENTINEL — Scalable Oversight OpenEnv",
css=CUSTOM_CSS,
theme=gr.themes.Soft(primary_hue="blue", neutral_hue="slate",
font=gr.themes.GoogleFont("Inter")),
) as demo:
_populate_replay_viewer_ui()
return demo
from server.live_ui import combine_with_live_tab as _combine_tabs
from server.api_explorer_ui import _populate_api_explorer_ui
_gradio_demo = _combine_tabs(_populate_replay_viewer_ui, _populate_api_explorer_ui)
# Mount Gradio at the root path. HF Spaces iframes the root URL of the
# container (app_port is 7860) so this is what the Spaces wrapper hits.
# The OpenEnv CLI injects `base_path: /web` into the README frontmatter;
# we strip it back out on every push via a post-push fixup so HF defaults
# to the root path.
app = gr.mount_gradio_app(app, _gradio_demo, path="/")
def main():
import uvicorn
uvicorn.run("server.app:app", host="0.0.0.0", port=7860, reload=False)
if __name__ == "__main__":
main()