# -*- coding: utf-8 -*- """FastAPI server exposing the OpenEnv API endpoints. Endpoints: POST /reset - Reset environment for a task (returns session_id) POST /step - Take an agent action (requires X-Session-ID header) GET /state - Get current environment state (requires X-Session-ID) GET /tasks - List available tasks with action schema POST /grader - Get grader score for episode (requires X-Session-ID) POST /baseline - Run rule-based baseline on all tasks (in-process) POST /sentinel/reset - Reset SENTINEL oversight environment (returns session_id) POST /sentinel/step - Execute SENTINEL decision (requires X-Session-ID header) GET /sentinel/state - Get current SENTINEL environment state (requires X-Session-ID) POST /sentinel/grade - Get SENTINEL grader score (requires X-Session-ID) GET /metrics - Telemetry counters (JSON or Prometheus text) GET /curriculum - Curriculum learning progression (ordered task stages) GET /prometheus/metrics - Live scenario service metrics (Prometheus text scrape) GET /prometheus/query - PromQL instant query (standard Prometheus JSON envelope) GET /prometheus/query_range - PromQL range query (matrix, from TSDB ring buffer) GET /render - Human-readable incident dashboard (requires X-Session-ID) GET /leaderboard - Top scores per task from completed episodes GET /health - Standard OpenEnv liveness probe GET / - Human landing page for the live demo GET /try - Human landing page for trying SENTINEL GET /info - Rich JSON service info with telemetry WS /ws - WebSocket persistent session (no session header needed) GET /web - Interactive browser-based incident dashboard """ from __future__ import annotations import asyncio import logging import os from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from src.tasks import get_all_tasks from routers.deps import ( _SESSION_REGISTRY, _TELEMETRY, WS_ACTIVE_CONNECTIONS, purge_expired_sessions, _log, ) import routers.deps as _deps from routers.irt import router as irt_router from routers.sentinel import router as sentinel_router from routers.observability import router as observability_router # --------------------------------------------------------------------------- # Structured JSON logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}', datefmt="%Y-%m-%dT%H:%M:%SZ", ) # --------------------------------------------------------------------------- # Lifespan # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): """Start background TTL-cleanup task; cancel it on shutdown.""" async def _cleanup_loop(): while True: await asyncio.sleep(300) # run every 5 minutes purge_expired_sessions() task = asyncio.create_task(_cleanup_loop()) _log.info("IRT environment started - TTL cleanup every 300s") try: yield finally: task.cancel() # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- app = FastAPI( title="Incident Response Triage - OpenEnv", description=( "An OpenEnv environment that simulates production incident response. " "Agents must triage alerts, investigate services, diagnose root causes, " "apply remediations, and communicate status updates." ), version="1.0.0", lifespan=lifespan, ) app.title = "SENTINEL Oversight Command - OpenEnv" app.description = ( "An OpenEnv environment for multi-agent AI oversight. SENTINEL supervises " "worker agents during production incident response and decides which " "proposed actions should execute." ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Include routers # --------------------------------------------------------------------------- app.include_router(irt_router) app.include_router(sentinel_router) app.include_router(observability_router) # --------------------------------------------------------------------------- # Native OpenEnv adapter mount # --------------------------------------------------------------------------- # The custom endpoints above expose the full hackathon demo surface. This mount # also gives latest OpenEnv clients the standard schema/reset/step/state/ws API # backed by OpenEnv's Environment base class. NATIVE_OPENENV_AVAILABLE = False try: # pragma: no cover - availability depends on the local OpenEnv install from openenv.core.env_server.http_server import create_app as create_openenv_app from server.openenv_native import ( SentinelNativeAction, SentinelNativeEnvironment, SentinelNativeObservation, ) app.mount( "/openenv", create_openenv_app( SentinelNativeEnvironment, SentinelNativeAction, SentinelNativeObservation, env_name="sentinel_oversight_command", max_concurrent_envs=32, ), ) NATIVE_OPENENV_AVAILABLE = True _log.info("native OpenEnv adapter mounted at /openenv") except Exception as exc: # pragma: no cover _log.warning("native OpenEnv adapter unavailable: %s", exc) # --------------------------------------------------------------------------- # MCP Server โ€” Model Context Protocol (step/state/done as MCP tools) # --------------------------------------------------------------------------- MCP_AVAILABLE = False try: from server.mcp_server import mcp_router app.include_router(mcp_router, prefix="/mcp") MCP_AVAILABLE = True _log.info("MCP server mounted at /mcp (Streamable HTTP transport)") except Exception as exc: # pragma: no cover _log.warning("MCP server unavailable: %s", exc) # --------------------------------------------------------------------------- # A2A Protocol โ€” Agent-to-Agent discovery and task handling # --------------------------------------------------------------------------- A2A_AVAILABLE = False try: from server.a2a_server import a2a_router app.include_router(a2a_router) A2A_AVAILABLE = True _log.info("A2A agent card at /.well-known/agent.json, endpoint at /a2a") except Exception as exc: # pragma: no cover _log.warning("A2A protocol unavailable: %s", exc) # --------------------------------------------------------------------------- # Root-level endpoints (health checks) # --------------------------------------------------------------------------- @app.get("/health") async def health_check(): """Standard OpenEnv health check.""" worker_backend = os.environ.get("SENTINEL_WORKER_BACKEND", "rule") return { "status": "healthy", "native_openenv_available": NATIVE_OPENENV_AVAILABLE, "native_openenv_mount": "/openenv" if NATIVE_OPENENV_AVAILABLE else None, "mcp_available": MCP_AVAILABLE, "mcp_endpoint": "/mcp" if MCP_AVAILABLE else None, "a2a_available": A2A_AVAILABLE, "a2a_agent_card": "/.well-known/agent.json" if A2A_AVAILABLE else None, "sentinel_worker_backend": worker_backend, "llm_worker_configured": bool(os.environ.get("GROQ_API_KEY")), } def _service_info(): """Return environment info and live telemetry for JSON endpoints.""" worker_backend = os.environ.get("SENTINEL_WORKER_BACKEND", "rule") return { "status": "ok", "environment": "sentinel-oversight-command", "version": "1.0.0", "tasks": [t.task_id for t in get_all_tasks()], "primary_theme": "multi-agent interactions", "native_openenv_available": NATIVE_OPENENV_AVAILABLE, "native_openenv_mount": "/openenv" if NATIVE_OPENENV_AVAILABLE else None, "mcp_available": MCP_AVAILABLE, "mcp_endpoint": "/mcp" if MCP_AVAILABLE else None, "a2a_available": A2A_AVAILABLE, "a2a_agent_card": "/.well-known/agent.json" if A2A_AVAILABLE else None, "protocols": { "http_rest": True, "openenv_native": NATIVE_OPENENV_AVAILABLE, "mcp": MCP_AVAILABLE, "a2a": A2A_AVAILABLE, }, "sentinel_worker_backend": worker_backend, "llm_worker_configured": bool(os.environ.get("GROQ_API_KEY")), "active_sessions": len(_SESSION_REGISTRY), "ws_active_connections": _deps.WS_ACTIVE_CONNECTIONS, "telemetry": _TELEMETRY, } _TRY_LANDING_HTML = """\ SENTINEL Oversight Command
OpenEnv Hackathon · Multi-agent oversight · Live Space

SENTINEL supervises AI workers before they act.

Try a control-room environment where worker agents propose actions during production incidents. SENTINEL must approve safe work, block hallucinations, redirect risky actions, reassign wrong-domain workers, and preserve an audit trail before anything executes.

Demo beat

Worker proposal
"Restart auth-service now. Confidence 0.99."
SENTINEL check
No investigation, high blast radius, prior over-escalation pattern.
Decision
REDIRECT: inspect deployment timeline and error-rate metrics first.
Proof
Trust, reward, counterfactual damage, and audit log update after the step.
7OpenEnv tasks
4worker-agent roles
200Phase 1 GRPO steps
18proof dashboard plots

Full Episode Dashboard

Run the real SENTINEL environment end to end: choose a task, inspect the worker proposal, make decisions, step the environment, and grade the episode.

Best for showing the full OpenEnv loop: reset → observe → decide → step → reward → audit.

Universal Oversight Playground

Paste any agent action from infrastructure, healthcare, finance, or generic workflows and see SENTINEL's constitutional and counterfactual analysis.

Best for quickly testing hallucination, prompt injection, destructive action, and missing-evidence cases.

OpenEnv API

Use the native OpenEnv routes for programmatic evaluation. The API remains available for judges, trainers, and automated clients.

Also available: /tasks, /sentinel/reset, /sentinel/step, /metrics, /mcp, and A2A discovery.

The live UI uses the deterministic SENTINEL verifier/gate so it runs reliably on the Space. The trained LoRA model is published at srikrish2004/sentinel-qwen3-4b-grpo and the proof pack is in the GitHub repository.

""" _DEMO_HTML = """\ SENTINEL ยท MCP & A2A Live Demo

๐Ÿ›ก SENTINEL ยท Protocol Demo

Live MCP + A2A calls โ€” all running from your browser against the real API

โ€”
Total tests
โ€”
Passed
โ€”
Failed

MCP Model Context Protocol ยท /mcp

initializePENDING
POST /mcp ยท method: initialize
Waiting...
tools/list โ€” 6 toolsPENDING
POST /mcp ยท method: tools/list
Waiting...
intercept โ†’ BLOCK (hallucination)PENDING
POST /mcp ยท tools/call: intercept ยท target not in available_services
Waiting...
intercept โ†’ APPROVE (safe)PENDING
POST /mcp ยท tools/call: intercept ยท safe investigate
Waiting...
intercept โ†’ FLAG (loop exploitation)PENDING
POST /mcp ยท tools/call: intercept ยท same service investigated ร—2
Waiting...
reset โ†’ step โ†’ grade (episode)PENDING
POST /mcp ยท reset + step + grade tool chain
Waiting...

A2A Agent-to-Agent Protocol ยท /a2a

Agent Card discoveryPENDING
GET /.well-known/agent.json ยท A2A skill discovery
Waiting...
message/send (A2A v0.3+)PENDING
POST /a2a ยท method: message/send ยท kind: text (v0.3 schema)
Waiting...
tasks/send (A2A v0.2)PENDING
POST /a2a ยท method: tasks/send ยท type: text (v0.2 schema)
Waiting...
Human instruction endpointPENDING
POST /a2a/human ยท plain English โ†’ oversight decision
Waiting...
tasks/get (retrieve result)PENDING
POST /a2a ยท method: tasks/get ยท retrieve submitted task
Waiting...
tasks/cancelPENDING
POST /a2a ยท method: tasks/cancel
Waiting...
""" @app.get("/", response_class=HTMLResponse) async def landing_page(): """Human landing page for Hugging Face Spaces.""" return HTMLResponse(_TRY_LANDING_HTML) @app.get("/try", response_class=HTMLResponse) async def try_page(): """Alias for the human landing page.""" return HTMLResponse(_TRY_LANDING_HTML) @app.get("/demo", response_class=HTMLResponse) async def demo_page(): """Live interactive demo of MCP and A2A protocol communication with SENTINEL.""" return HTMLResponse(_DEMO_HTML) @app.get("/info") async def info(): """JSON service information and live telemetry.""" return _service_info() # --------------------------------------------------------------------------- # Dashboard HTML templates (kept here as large string constants) # --------------------------------------------------------------------------- # NOTE: The SENTINEL dashboard and IRT web UI HTML are large inline templates. # They are kept in this file to avoid adding template dependencies. # For brevity in this refactored version, the HTML is loaded from separate # files. If you need the inline versions, see the git history. _SENTINEL_DASHBOARD_HTML = None _WEB_UI_HTML = None def _load_dashboard_html(): """Load dashboard HTML from inline templates (lazy-loaded on first request).""" global _SENTINEL_DASHBOARD_HTML, _WEB_UI_HTML if _SENTINEL_DASHBOARD_HTML is not None: return # The HTML templates are stored as module-level strings. # We import them here to keep the main module clean. try: from routers._dashboard_html import SENTINEL_DASHBOARD_HTML, WEB_UI_HTML _SENTINEL_DASHBOARD_HTML = SENTINEL_DASHBOARD_HTML _WEB_UI_HTML = WEB_UI_HTML except ImportError: _SENTINEL_DASHBOARD_HTML = "

SENTINEL Dashboard

Dashboard template not found.

" _WEB_UI_HTML = "

IRT Dashboard

Dashboard template not found.

" @app.get("/sentinel/dashboard", response_class=HTMLResponse) async def sentinel_dashboard(): """Interactive browser dashboard for the SENTINEL oversight environment.""" _load_dashboard_html() return HTMLResponse(_SENTINEL_DASHBOARD_HTML) @app.get("/web", response_class=HTMLResponse) async def web_ui(): """Interactive browser-based incident dashboard (uses WebSocket under the hood).""" _load_dashboard_html() return HTMLResponse(_WEB_UI_HTML) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False)