File size: 10,899 Bytes
1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d adfe21e 1d27c7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | """ChaosOps AI — Hugging Face Space entry point.
Gradio UI that lets a judge replay any incident scenario with any policy
(random / heuristic / oracle / trained) and watch the multi-agent response
unfold step-by-step. The trained-policy lane activates when the environment
variable ``CHAOSOPS_ADAPTER_PATH`` points at a LoRA adapter directory —
otherwise the Space still runs, silently falling back to the heuristic so
the UI works during cold-start or when no checkpoint has been uploaded yet.
Deploy layout:
hf_space/
app.py — this file (entry point HF Spaces picks up)
requirements.txt — pulls chaosops from GitHub + Gradio + torch stack
README.md — HF Space card (YAML frontmatter)
"""
from __future__ import annotations
import html
import logging
import os
import sys
from pathlib import Path
import gradio as gr
_LOG = logging.getLogger("chaosops.app")
if not _LOG.handlers:
_h = logging.StreamHandler(sys.stderr)
_h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
_LOG.addHandler(_h)
_LOG.setLevel(logging.INFO)
from chaosops.agents.policies import (
Policy,
heuristic_policy,
oracle_policy,
random_policy,
)
from chaosops.agents.runner import EpisodeResult, run_episode
from chaosops.dashboard.transcript import ROLE_TAG, render_transcript
from chaosops.env.environment import ChaosOpsEnvironment
from chaosops.env.models import AgentRole, DifficultyTier, FailureType
from chaosops.env.world_sim import Scenario
ADAPTER_ENV = "CHAOSOPS_ADAPTER_PATH"
_TRAINED_POLICY_CACHE = None
# Last failure reason — surfaced in the run-summary so judges aren't tricked
# by a silent heuristic fallback when the trained lane is broken.
_TRAINED_LOAD_ERROR: str | None = None
# ---------------------------------------------------------------------------
# Policy resolution
# ---------------------------------------------------------------------------
def _lazy_trained_policy():
"""Load the trained LoRA adapter once per process, lazily.
``CHAOSOPS_ADAPTER_PATH`` accepts either:
* a local filesystem path (used in Colab / local dev), or
* an HF Hub repo id like ``helloAK96/chaosops-grpo-lora`` (Spaces).
For repo ids we materialise the adapter to local disk via
``snapshot_download`` on the first call — the second call hits the
in-process cache and is free.
Failures are logged at ERROR level and recorded in
:data:`_TRAINED_LOAD_ERROR` so the Gradio summary can surface
"trained adapter unavailable" instead of silently swapping in the
heuristic policy.
"""
global _TRAINED_POLICY_CACHE, _TRAINED_LOAD_ERROR
if _TRAINED_POLICY_CACHE is not None:
return _TRAINED_POLICY_CACHE
adapter_ref = os.environ.get(ADAPTER_ENV)
if not adapter_ref:
_TRAINED_LOAD_ERROR = (
f"{ADAPTER_ENV} env var is unset; trained lane disabled"
)
_LOG.warning(_TRAINED_LOAD_ERROR)
return None
local_path = Path(adapter_ref)
if not local_path.exists():
# Treat the value as an HF Hub repo id and snapshot_download it.
try:
from huggingface_hub import snapshot_download
except ImportError as exc:
_TRAINED_LOAD_ERROR = (
f"huggingface_hub import failed ({exc}); cannot fetch adapter"
)
_LOG.error(_TRAINED_LOAD_ERROR)
return None
try:
local_path = Path(
snapshot_download(repo_id=adapter_ref, repo_type="model")
)
except Exception as exc:
_TRAINED_LOAD_ERROR = (
f"snapshot_download({adapter_ref!r}) failed: {exc!r}"
)
_LOG.exception(_TRAINED_LOAD_ERROR)
return None
try:
from chaosops.agents.trained_policy import TrainedPolicy
_TRAINED_POLICY_CACHE = TrainedPolicy.from_adapter(local_path)
except Exception as exc:
_TRAINED_LOAD_ERROR = (
f"TrainedPolicy.from_adapter({local_path}) failed: {exc!r}"
)
_LOG.exception(_TRAINED_LOAD_ERROR)
return None
_LOG.info("trained adapter loaded from %s", local_path)
_TRAINED_LOAD_ERROR = None
return _TRAINED_POLICY_CACHE
def _build_policy(name: str, scenario: Scenario) -> Policy:
if name == "random":
return random_policy(seed=scenario.seed)
if name == "heuristic":
return heuristic_policy(seed=scenario.seed)
if name == "oracle":
return oracle_policy(scenario.failure_type)
if name == "trained":
trained = _lazy_trained_policy()
if trained is None:
# Graceful fallback — Space is still useful before adapter lands.
return heuristic_policy(seed=scenario.seed)
return trained.as_policy()
raise ValueError(f"unknown policy '{name}'")
# ---------------------------------------------------------------------------
# Rendering helpers
# ---------------------------------------------------------------------------
_ROLE_COLOR: dict[str, str] = {
"SRE": "#2980b9",
"DEV": "#16a085",
"MGR": "#8e44ad",
"OVS": "#c0392b",
}
def _render_chat_html(result: EpisodeResult) -> str:
"""Render the episode as a coloured chat log for the Gradio HTML widget."""
blocks: list[str] = []
for step in result.steps:
tag = ROLE_TAG[step.role]
color = _ROLE_COLOR.get(tag, "#333")
args = step.action.args or {}
args_str = " ".join(f"{k}={v}" for k, v in args.items())
target = step.action.target or "-"
summary = (
f"{step.action.action_type.value} target={target}"
+ (f" {args_str}" if args_str else "")
)
blocks.append(
f'<div style="margin-bottom:6px;">'
f'<span style="color:{color};font-weight:600;">t{step.turn:02d} [{tag}]</span> '
f'<span style="font-family:monospace;">{html.escape(summary)}</span> '
f'<span style="color:#888;">reward={step.reward:+.1f}</span>'
f"</div>"
)
footer = (
f'<hr style="margin:10px 0;">'
f'<div><b>resolved:</b> {result.resolved} · '
f'<b>steps:</b> {result.final_step} · '
f'<b>cum_reward:</b> {result.cumulative_reward:+.1f} · '
f'<b>wrong_fixes:</b> {result.wrong_fixes} · '
f'<b>oversight_flags:</b> {result.oversight_flags or "[]"}</div>'
)
return '<div style="font-size:13px;line-height:1.5;">' + "".join(blocks) + footer + "</div>"
# ---------------------------------------------------------------------------
# Episode runner (called from the Gradio button)
# ---------------------------------------------------------------------------
def run_scenario(failure: str, difficulty: str, policy_name: str, seed: int):
scenario = Scenario.from_type(
FailureType(failure),
seed=int(seed),
difficulty=DifficultyTier(difficulty),
)
policy = _build_policy(policy_name, scenario)
env = ChaosOpsEnvironment()
result = run_episode(env, scenario, {r: policy for r in AgentRole})
chat_html = _render_chat_html(result)
transcript = render_transcript(result)
summary = {
"failure_type": failure,
"difficulty": difficulty,
"policy": policy_name,
"seed": int(seed),
"resolved": result.resolved,
"steps_to_resolve": result.final_step if result.resolved else None,
"cumulative_reward": round(result.cumulative_reward, 2),
"wrong_fixes": result.wrong_fixes,
"oversight_flags": result.oversight_flags,
}
if policy_name == "trained":
if _TRAINED_POLICY_CACHE is None:
summary["trained_adapter_status"] = (
f"UNAVAILABLE (fell back to heuristic): "
f"{_TRAINED_LOAD_ERROR or 'unknown'}"
)
else:
summary["trained_adapter_status"] = "loaded"
return chat_html, summary, transcript
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
INTRO_MARKDOWN = """
# ChaosOps AI — Multi-Agent Incident-Response Gym
A reinforcement-learning environment where a **four-agent fleet**
(SRE · Dev · Manager · **Oversight**) resolves a randomly injected
infrastructure incident. The fourth agent is a **scalable-oversight model**
whose job is to detect when *another AI in the fleet* (autoscaler,
load_balancer, deploy_bot) caused the incident — before the remediation
team touches the services.
**Policies**
- `random` · hard lower bound
- `heuristic` · what a decent human SRE would try
- `oracle` · cheats (knows ground truth) — upper-bound curve
- `trained` · our GRPO-tuned Qwen 2.5 1.5B LoRA checkpoint
Pick a failure type, smash **Run episode**, watch the team coordinate (or fail).
"""
def build_demo() -> gr.Blocks:
failure_choices = [f.value for f in FailureType]
tier_choices = [t.value for t in DifficultyTier]
policy_choices = ["random", "heuristic", "oracle", "trained"]
with gr.Blocks(title="ChaosOps AI") as demo:
gr.Markdown(INTRO_MARKDOWN)
with gr.Row():
with gr.Column(scale=1):
failure = gr.Dropdown(
failure_choices,
value="rogue_deploy_bot",
label="Failure type",
)
difficulty = gr.Dropdown(
tier_choices,
value="hard",
label="Difficulty",
)
policy = gr.Dropdown(
policy_choices,
value="oracle",
label="Policy",
)
seed = gr.Number(value=42, precision=0, label="Seed")
run_btn = gr.Button("▶ Run episode", variant="primary")
gr.Markdown(
"_Trained policy requires `CHAOSOPS_ADAPTER_PATH` to be "
"set on the Space. It falls back to the heuristic otherwise._"
)
with gr.Column(scale=2):
chat_out = gr.HTML(label="Episode chat")
summary_out = gr.JSON(label="Summary")
transcript_out = gr.Textbox(
label="Full transcript (reward breakdown)",
lines=18,
)
run_btn.click(
run_scenario,
inputs=[failure, difficulty, policy, seed],
outputs=[chat_out, summary_out, transcript_out],
)
return demo
if __name__ == "__main__":
# Docker Spaces route external traffic to port 7860; bind on 0.0.0.0 so
# the container's network namespace exposes the server beyond localhost.
build_demo().launch(server_name="0.0.0.0", server_port=7860)
|