Spaces:
Running
Running
File size: 17,454 Bytes
c452421 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 | # -*- coding: utf-8 -*-
"""Observability, metrics, dashboard, and WebSocket endpoints.
Extracted from app.py - handles /metrics, /render, /leaderboard, /curriculum,
/prometheus/*, /ws, /web, and /sentinel/dashboard.
"""
from __future__ import annotations
import secrets
import time
from typing import Any, Dict
from fastapi import APIRouter, Header, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, PlainTextResponse
from src.environment import IncidentResponseEnv
from src.models import Action, StepResult
from routers.deps import (
_LEADERBOARD,
_SESSION_REGISTRY,
_SENTINEL_REGISTRY,
_SESSION_TTL,
_TELEMETRY,
_log,
WS_ACTIVE_CONNECTIONS,
record_leaderboard,
scenario_live_to_prom_text,
parse_prom_selector,
build_prom_vector,
build_prom_matrix,
)
import routers.deps as _deps
router = APIRouter()
# ---------------------------------------------------------------------------
# Native OpenEnv mount info (prevents 404 at /openenv root)
# ---------------------------------------------------------------------------
@router.get("/openenv")
async def openenv_root():
"""Info endpoint for the native OpenEnv sub-mount.
The OpenEnv adapter is mounted at ``/openenv`` and exposes the standard
schema / reset / step / state / ws contract. This route is registered on
the main app so that ``GET /openenv`` (no trailing path) returns helpful
info instead of a 404.
"""
return {
"name": "SENTINEL native OpenEnv adapter",
"description": (
"Exposes the SENTINEL oversight environment through the standard "
"OpenEnv Environment base-class contract for OpenEnv-compatible clients."
),
"mount": "/openenv",
"endpoints": {
"schema": "/openenv/schema",
"reset": "/openenv/reset",
"step": "/openenv/step",
"state": "/openenv/state",
"websocket": "/openenv/ws",
},
"note": (
"If native_openenv_available is false in /health, the openenv package "
"is not installed in this environment. Use the standard /reset, /step, "
"/state, /sentinel/reset, /sentinel/step endpoints instead."
),
}
# ---------------------------------------------------------------------------
# Metrics / telemetry
# ---------------------------------------------------------------------------
@router.get("/metrics")
async def metrics(format: str = "json"):
"""Return telemetry counters.
?format=prometheus -> Prometheus text format
?format=json -> JSON (default)
"""
if format == "prometheus":
lines = ["# HELP irt_counter OpenEnv IRT telemetry", "# TYPE irt_counter gauge"]
for key, value in _TELEMETRY.items():
lines.append(f'irt_{key} {value}')
lines.append(f'irt_active_sessions {len(_SESSION_REGISTRY)}')
return PlainTextResponse("\n".join(lines) + "\n", media_type="text/plain; version=0.0.4")
return {
**_TELEMETRY,
"active_sessions": len(_SESSION_REGISTRY),
"session_ttl_seconds": _SESSION_TTL,
"max_sessions": 256,
}
@router.get("/render")
async def render(
x_session_id: str | None = Header(default=None, alias="X-Session-ID"),
):
"""Return a human-readable incident dashboard for the current session.
Useful for debugging agent behaviour or as a REPL-style interface.
"""
if not x_session_id or x_session_id not in _SESSION_REGISTRY:
raise HTTPException(
status_code=400,
detail="Missing or unknown X-Session-ID header. Call /reset first.",
)
env = _SESSION_REGISTRY[x_session_id]
try:
s = env.state()
except RuntimeError as exc:
raise HTTPException(status_code=400, detail=str(exc))
sev = s.severity_classified.value if s.severity_classified else "(not classified)"
status_icon = "[done]" if s.done else "[open]"
bar_filled = int((s.step_number / s.max_steps) * 20)
progress_bar = "#" * bar_filled + "." * (20 - bar_filled)
lines = [
f"## INCIDENT DASHBOARD - {s.task_id.replace('_', ' ').upper()}",
"",
f"| Field | Value |",
f"|----------------|-------|",
f"| **Incident ID**| `{s.task_id}` |",
f"| **Status** | {status_icon} `{s.incident_status.value}` |",
f"| **Progress** | `[{progress_bar}]` {s.step_number}/{s.max_steps} steps |",
f"| **Severity** | `{sev}` |",
f"| **Diagnosis** | `{s.diagnosis or '(none)'}` |",
f"| **Reward** | `{s.cumulative_reward:.4f}` |",
"",
"### Actions Taken",
]
if s.actions_history:
for i, a in enumerate(s.actions_history, 1):
lines.append(f"{i}. `{a['action_type'].value}` -> `{a.get('target', '')}` | {a.get('reasoning', '')[:80]}")
else:
lines.append("_No actions yet._")
lines += [
"",
f"### Investigated Services",
", ".join(f"`{s}`" for s in s.investigated_services) or "_None_",
"",
f"### Remediations Applied",
", ".join(f"`{r}`" for r in s.remediations_applied) or "_None_",
"",
f"### Escalations",
", ".join(f"`{e}`" for e in s.escalations_made) or "_None_",
]
return {"markdown": "\n".join(lines), "state": s.model_dump()}
@router.get("/leaderboard")
async def leaderboard():
"""Return top scores per task from all completed episodes in this session.
Scores are ranked by (score DESC, steps ASC) - accuracy first, then efficiency.
"""
return {
task_id: board
for task_id, board in _LEADERBOARD.items()
}
@router.get("/curriculum")
async def curriculum():
"""Return the ordered curriculum learning progression for this environment.
Tasks are listed from easiest to hardest so training agents can be
scheduled to start from the first stage and progressively advance.
Each stage carries the metadata needed to build a curriculum sampler:
task_id, difficulty label, reward dimension count, step budget,
temporal degradation rate, and number of distinct scenario variants.
"""
return {
"description": (
"Curriculum from easy to hard: agents accumulate reward signal "
"from the first episode and progressively face more complex scenarios."
),
"stages": [
{
"stage": 1,
"task_id": "severity_classification",
"difficulty": "easy",
"reward_components": 3,
"max_steps": 10,
"degradation_per_step": 0.005,
"variants": 2,
"graded_dimensions": ["severity_accuracy", "investigation_quality", "efficiency"],
"rationale": (
"Introduces the action loop. Model must investigate then classify. "
"Guaranteed non-zero reward even with minimal exploration."
),
},
{
"stage": 2,
"task_id": "root_cause_analysis",
"difficulty": "medium",
"reward_components": 5,
"max_steps": 15,
"degradation_per_step": 0.010,
"variants": 2,
"graded_dimensions": [
"severity_accuracy", "investigated_root_cause",
"diagnosis_accuracy", "remediation_quality", "efficiency",
],
"rationale": (
"Requires causal reasoning: distinguish root cause from downstream symptoms. "
"Adds diagnosis and remediation actions not present in stage 1."
),
},
{
"stage": 3,
"task_id": "full_incident_management",
"difficulty": "hard",
"reward_components": 8,
"max_steps": 20,
"degradation_per_step": 0.015,
"variants": 3,
"graded_dimensions": [
"severity_accuracy", "diagnosis_accuracy", "remediation_quality",
"escalation_quality", "communication", "investigation_thoroughness",
"investigation_precision", "efficiency",
],
"rationale": (
"Full incident commander workflow requiring all 6 action types. "
"Includes red-herring services. Tests strategic investigation under "
"cascading blast-radius temporal pressure."
),
},
],
}
# ---------------------------------------------------------------------------
# Prometheus endpoints
# ---------------------------------------------------------------------------
@router.get("/prometheus/metrics")
async def prometheus_scenario_metrics(
fmt: str = "text",
x_session_id: str | None = Header(default=None, alias="X-Session-ID"),
):
"""Prometheus text-format scrape endpoint for the current scenario state.
Returns all service metrics with blast-radius degradation applied at the
current step - the system degrades the longer the agent waits, exactly as
in production Prometheus. No action cost: purely passive observability.
- ``?fmt=text`` (default) - Prometheus text exposition format (standard scrape)
- ``?fmt=json`` - JSON dict keyed by service name
"""
if not x_session_id or x_session_id not in _SESSION_REGISTRY:
raise HTTPException(
status_code=400,
detail="Missing or unknown X-Session-ID. Call /reset first.",
)
env = _SESSION_REGISTRY[x_session_id]
live = env.live_metrics()
if not live:
raise HTTPException(status_code=400, detail="No active episode. Call /reset first.")
s = env.state()
if fmt == "json":
return {svc: m.model_dump() for svc, m in live.items()}
prom_text = scenario_live_to_prom_text(live, s.scenario_id, s.task_id, s.step_number)
return PlainTextResponse(prom_text, media_type="text/plain; version=0.0.4")
@router.get("/prometheus/query")
async def prometheus_instant_query(
query: str,
x_session_id: str | None = Header(default=None, alias="X-Session-ID"),
):
"""Simplified Prometheus instant-query API (subset of /api/v1/query).
Returns a standard Prometheus JSON response envelope so agents can use
``prometheus-api-client`` or any PromQL helper directly. No server-side
evaluation of complex PromQL - selectors only.
Supported selectors::
irt_error_rate # all services
irt_error_rate{service="auth-service"} # specific service
error_rate{service="payment-api"} # irt_ prefix auto-added
{service="payment-api"} # all metrics for one service
"""
if not x_session_id or x_session_id not in _SESSION_REGISTRY:
raise HTTPException(
status_code=400,
detail="Missing or unknown X-Session-ID. Call /reset first.",
)
env = _SESSION_REGISTRY[x_session_id]
live = env.live_metrics()
if not live:
raise HTTPException(status_code=400, detail="No active episode. Call /reset first.")
s = env.state()
metric_name, label_filters = parse_prom_selector(query)
vector = build_prom_vector(live, metric_name, label_filters, s.scenario_id, s.task_id)
return {
"status": "success",
"data": {
"resultType": "vector",
"result": vector,
},
}
@router.get("/prometheus/query_range")
async def prometheus_range_query(
query: str,
start: float | None = None,
end: float | None = None,
step: float = 1.0,
x_session_id: str | None = Header(default=None, alias="X-Session-ID"),
):
"""Prometheus range-query API (subset of /api/v1/query_range).
Returns a standard Prometheus **matrix** response from the per-session
TSDB ring buffer. One sample is recorded per environment step, so the
timeseries reflects real metric degradation over the episode lifetime.
Parameters:
query: PromQL selector (same syntax as /prometheus/query)
start: Unix timestamp (inclusive). Defaults to episode start.
end: Unix timestamp (inclusive). Defaults to now.
step: Step duration seconds (accepted for API compatibility; ring buffer
has one sample per episode step regardless).
Example::
GET /prometheus/query_range?query=irt_error_rate&start=1712500000&end=1712500060
"""
if not x_session_id or x_session_id not in _SESSION_REGISTRY:
raise HTTPException(
status_code=400,
detail="Missing or unknown X-Session-ID. Call /reset first.",
)
env = _SESSION_REGISTRY[x_session_id]
now = time.time()
start_ts = start if start is not None else now - 3600
end_ts = end if end is not None else now
if start_ts > end_ts:
raise HTTPException(status_code=400, detail="start must be <= end")
history = env.metric_history(start_ts, end_ts, step_seconds=step)
if history is None or (not history and env.live_metrics() == {}):
raise HTTPException(status_code=400, detail="No active episode. Call /reset first.")
s = env.state()
metric_name, label_filters = parse_prom_selector(query)
matrix = build_prom_matrix(history, metric_name, label_filters, s.scenario_id, s.task_id)
return {
"status": "success",
"data": {
"resultType": "matrix",
"result": matrix,
},
}
# ---------------------------------------------------------------------------
# WebSocket endpoint - one env instance per connection, no session header
# ---------------------------------------------------------------------------
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket persistent session - one isolated env instance per connection.
Message protocol (JSON):
Client sends: {"type": "reset", "task_id": "...", "variant_seed": 0}
Client sends: {"type": "step", "action": {action_type, target, parameters, reasoning}}
Client sends: {"type": "state"}
Client sends: {"type": "grade"}
Server replies: {"type": "reset"|"step"|"state"|"grade"|"error", ...payload}
No X-Session-ID header needed - the connection itself is the session.
"""
await websocket.accept()
env = IncidentResponseEnv()
_deps.WS_ACTIVE_CONNECTIONS += 1
_TELEMETRY["ws_connections_total"] += 1
_log.info("ws connected - active=%d", _deps.WS_ACTIVE_CONNECTIONS)
try:
while True:
raw = await websocket.receive_json()
msg_type = raw.get("type", "")
if msg_type == "reset":
task_id = raw.get("task_id", "severity_classification")
seed = raw.get("variant_seed")
seed = seed if seed is not None else secrets.randbelow(100)
try:
obs = env.reset(task_id, variant_seed=seed)
_TELEMETRY["episodes_total"] += 1
await websocket.send_json({"type": "reset", **obs.model_dump(mode="json")})
except ValueError as exc:
await websocket.send_json({"type": "error", "detail": str(exc)})
elif msg_type == "step":
action_data = raw.get("action", {})
try:
action = Action(**action_data)
result: StepResult = env.step(action)
_TELEMETRY["steps_total"] += 1
await websocket.send_json({"type": "step", **result.model_dump(mode="json")})
except (RuntimeError, Exception) as exc:
_TELEMETRY["errors_total"] += 1
await websocket.send_json({"type": "error", "detail": str(exc)})
elif msg_type == "state":
try:
await websocket.send_json({"type": "state", **env.state().model_dump(mode="json")})
except RuntimeError as exc:
await websocket.send_json({"type": "error", "detail": str(exc)})
elif msg_type == "grade":
try:
result = env.grade()
_TELEMETRY["grader_calls"] += 1
s = env.state()
record_leaderboard(s.task_id, result.score, s.total_steps_taken)
await websocket.send_json({"type": "grade", **result.model_dump(mode="json")})
except RuntimeError as exc:
await websocket.send_json({"type": "error", "detail": str(exc)})
else:
await websocket.send_json({
"type": "error",
"detail": f"Unknown type '{msg_type}'. Supported: reset, step, state, grade",
})
except WebSocketDisconnect:
pass
except Exception as exc:
_TELEMETRY["errors_total"] += 1
try:
await websocket.send_json({"type": "error", "detail": str(exc)})
except Exception:
pass
finally:
_deps.WS_ACTIVE_CONNECTIONS -= 1
_log.info("ws disconnected - active=%d", _deps.WS_ACTIVE_CONNECTIONS)
|