"""
ImmunoOrg 2.0 — FastAPI OpenEnv Server
=======================================
Implements the OpenEnv REST API without requiring the openenv package.
Endpoints: GET /health POST /reset POST /step GET /state
"""
from __future__ import annotations
import json
import os
import secrets
import subprocess
import sys
import threading
import uuid
from pathlib import Path
from typing import Any, Optional
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, PlainTextResponse
from pydantic import BaseModel
from immunoorg.models import (
ActionType, TacticalAction, StrategicAction, DiagnosticAction, ImmunoAction,
)
from immunoorg.environment import ImmunoOrgEnvironment
from immunoorg.api_models import (
ResetRequest,
ImmunoOrgAction,
StepEnvelope,
ImmunoOrgObservation,
StepResponse,
)
from server.war_room_routes import router as war_room_router
# ─── Global environment instance ─────────────────────────────────────────────
_env: Optional[ImmunoOrgEnvironment] = None
_episode_id: str = ""
_training_lock = threading.Lock()
_training_proc: subprocess.Popen | None = None
def _repo_root() -> Path:
return Path(__file__).resolve().parent.parent
def _training_secret_ok(provided: str | None) -> bool:
expected = (os.environ.get("TRAINING_SECRET") or "").strip()
if not expected or provided is None:
return False
# Be forgiving about accidental whitespace/newlines in query param or secret UI.
provided_clean = provided.strip()
return secrets.compare_digest(provided_clean, expected)
def _require_training_token(token: str | None) -> None:
expected = (os.environ.get("TRAINING_SECRET") or "").strip()
if not expected:
raise HTTPException(
status_code=503,
detail="Training trigger is disabled. Set TRAINING_SECRET in Space secrets.",
)
if not _training_secret_ok(token):
raise HTTPException(status_code=401, detail="Invalid Training Token")
def _get_env() -> ImmunoOrgEnvironment:
if _env is None:
raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.")
return _env
def _build_action(req: ImmunoOrgAction) -> ImmunoAction:
try:
atype = ActionType(req.action_type)
except ValueError:
atype = ActionType.TACTICAL
tactical = TacticalAction(req.tactical_action) if req.tactical_action else None
strategic = StrategicAction(req.strategic_action) if req.strategic_action else None
diagnostic = DiagnosticAction(req.diagnostic_action) if req.diagnostic_action else None
return ImmunoAction(
action_type=atype,
tactical_action=tactical,
strategic_action=strategic,
diagnostic_action=diagnostic,
target=req.target or "",
secondary_target=req.secondary_target,
parameters=req.parameters or {},
reasoning=req.reasoning or "",
)
def _obs_to_payload(obs, done: bool) -> ImmunoOrgObservation:
return ImmunoOrgObservation(
done=done,
episode_id=_episode_id,
current_phase=obs.current_phase.value,
step_count=obs.step_count,
sim_time=obs.sim_time,
threat_level=obs.threat_level,
system_downtime=obs.system_downtime,
action_result=obs.action_result,
action_success=obs.action_success,
visible_nodes=[n.model_dump() for n in obs.visible_nodes],
detected_attacks=[a.model_dump() for a in obs.detected_attacks],
recent_logs=[lg.model_dump() for lg in obs.recent_logs[:10]],
network_health_summary=obs.network_health_summary,
org_nodes=[n.model_dump() for n in obs.org_nodes],
pending_approvals=[a.model_dump() for a in obs.pending_approvals],
belief_map_feedback=obs.belief_map_feedback,
alerts=obs.alerts,
)
def _step_response(obs, reward: float, done: bool) -> StepResponse:
observation = _obs_to_payload(obs, done=done)
info = {
"episode_id": _episode_id,
"phase": observation.current_phase,
"step_count": observation.step_count,
}
return StepResponse(observation=observation, reward=reward, done=done, info=info)
# ─── FastAPI app ──────────────────────────────────────────────────────────────
app = FastAPI(
title="ImmunoOrg 2.0 OpenEnv API",
description="The Autonomous, Self-Healing Enterprise — OpenEnv RL Environment",
version="2.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(war_room_router)
@app.get("/health")
async def health():
return {
"status": "healthy",
"version": "2.0.0",
"environment": "ImmunoOrg",
"episode_active": _env is not None,
}
@app.get("/trained_status")
async def trained_status():
"""Probe whether a GRPO-trained LoRA adapter is available on the Hub yet.
The HPC pipeline pushes to ``hirann/immunoorg-grpo-defender`` once
training completes. This endpoint lets the demo UI show a live
"trained agent: ready / pending" badge without us redeploying.
"""
try:
from immunoorg.trained_agent import TrainedDefender
return TrainedDefender.get().status()
except Exception as e:
return {
"loaded": False,
"load_attempted": False,
"error": f"{type(e).__name__}: {e}",
}
_LANDING_HTML = """
ImmunoOrg 2.0 — OpenEnv environment
🛡️ ImmunoOrg 2.0
An OpenEnv RL environment where an LLM defender learns to
contain cyber-attacks and restructure the organization that
lets them succeed. Built for the OpenEnv Hackathon (India 2026).
Judges: use /demo for the episode runner and
the Live LLM War Room (Theme #1) on the same page.
▶ Launch interactive demo
OpenAPI / FastAPI docs
Hugging Face Hub page stuck on “Refreshing”? This is often a CSS/JS cache issue in the Hub's iframe.
Try the Direct URL (bypasses the Hub frame):
hirann-immunoorg-v3.hf.space/demo
OpenEnv API endpoints (Gym-style)
POST /reset — start an episode
POST /step — apply an action
GET /state — full server-side state
GET /health — liveness + version
GET /trained_status — is the trained LoRA loaded yet?
GET /openenv.yaml — manifest
GET /demo — Gradio UI (episode demo + War Room accordion)
POST /api/war-room — LLM debate JSON API (optional)
Resources
"""
@app.get("/landing", response_class=HTMLResponse)
async def landing_page():
"""Former landing page, now moved to /landing."""
from fastapi.responses import HTMLResponse
return HTMLResponse(_LANDING_HTML)
@app.get("/admin/training/start")
async def admin_training_start(
token: str | None = Query(None, description="Must match TRAINING_SECRET"),
smoke_test: bool = Query(False),
model: str = Query("Qwen/Qwen2.5-0.5B-Instruct"),
epochs: int = Query(1, ge=1, le=20),
):
"""Start GRPO training in a background process (logs to persistent /data when available)."""
global _training_proc
_require_training_token(token)
with _training_lock:
if _training_proc is not None and _training_proc.poll() is None:
raise HTTPException(status_code=409, detail="Training already running")
repo = _repo_root()
from training.grpo_training_pipeline import log_file, training_root
out_dir = training_root() / "checkpoints" / "immunoorg-defender"
log_path = log_file()
log_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [
sys.executable,
"-u",
"-m",
"training.grpo_training_pipeline",
"run",
"--model",
model,
"--epochs",
str(epochs),
"--output-dir",
str(out_dir),
]
if smoke_test:
cmd.append("--smoke-test")
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
log_f = open(log_path, "ab", buffering=0)
_training_proc = subprocess.Popen(
cmd,
cwd=str(repo),
env=env,
stdout=log_f,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
close_fds=True,
)
return {
"status": "started",
"pid": _training_proc.pid,
"log_file": str(log_path),
"smoke_test": smoke_test,
"model": model,
"epochs": epochs,
}
@app.get("/admin/training/status")
async def admin_training_status(token: str | None = Query(None)):
_require_training_token(token)
try:
from training.grpo_training_pipeline import status_file
p = status_file()
if not p.exists():
return {"state": "never_started"}
return json.loads(p.read_text(encoding="utf-8"))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/admin/training/log")
async def admin_training_log(
token: str | None = Query(None),
lines: int = Query(200, ge=1, le=5000),
):
_require_training_token(token)
try:
from training.grpo_training_pipeline import log_file
p = log_file()
if not p.exists():
return PlainTextResponse("(no log yet)\n", media_type="text/plain; charset=utf-8")
text = p.read_text(encoding="utf-8", errors="replace").splitlines()
tail = "\n".join(text[-lines:]) + ("\n" if text else "")
return PlainTextResponse(tail, media_type="text/plain; charset=utf-8")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/reset")
async def reset(req: ResetRequest = ResetRequest()) -> StepResponse:
global _env, _episode_id
_episode_id = str(uuid.uuid4())
_env = ImmunoOrgEnvironment(difficulty=req.difficulty, seed=req.seed)
obs = _env.reset()
return _step_response(obs, reward=0.0, done=False)
@app.post("/step")
async def step(req: ImmunoOrgAction | StepEnvelope):
env = _get_env()
action_req = req.action if isinstance(req, StepEnvelope) else req
action = _build_action(action_req)
obs, reward, done = env.step(action)
return _step_response(obs, reward=reward, done=done)
class DirectiveRequest(BaseModel):
directive: str
@app.post("/directive")
async def inject_directive(req: DirectiveRequest):
env = _get_env()
env.inject_directive(req.directive)
return {"status": "success", "directive": req.directive}
@app.get("/state")
async def state():
env = _get_env()
s = env.state
return {
"episode_id": _episode_id,
"step_count": s.step_count,
"difficulty_level": s.difficulty_level,
"current_phase": s.current_phase.value,
"threat_level": s.threat_level,
"total_downtime": s.total_downtime,
"total_damage": s.total_damage,
"org_chaos_score": s.org_chaos_score,
"cumulative_reward": s.cumulative_reward,
"active_attacks": len(s.active_attacks),
"contained_attacks": len(s.contained_attacks),
"org_changes_made": s.org_changes_made,
"termination_reason": s.termination_reason,
# 2.0 metrics
"migration_progress": env.migration_engine.get_progress() if env.migration_engine else {},
"pipeline_integrity": env._last_pipeline_integrity,
"war_room_debates": len(env.war_room.debate_history) if env.war_room else 0,
"patronus_score": env.executive_context.get_patronus_score() if env.executive_context else 0.5,
"reasoning_traces": [t.model_dump() for t in s.reasoning_traces],
}
@app.get("/openenv.yaml")
async def get_openenv_yaml():
"""Serve the environment manifest."""
try:
with open("openenv.yaml", "r") as f:
content = f.read()
return PlainTextResponse(content, media_type="text/yaml")
except FileNotFoundError:
raise HTTPException(status_code=404, detail="openenv.yaml not found")
# ─── Debugging ──────────────────────────────────────────────────────────────
import logging
import io
log_stream = io.StringIO()
logging.basicConfig(level=logging.INFO, stream=log_stream)
logger = logging.getLogger("immunoorg")
@app.get("/debug/logs")
async def get_logs():
"""Returns the captured application logs for troubleshooting."""
return {"logs": log_stream.getvalue()}
# ─── Mount the Gradio visual demo at / ──────────────────────────────────────
#
# By mounting at / instead of /demo, the main Space URL on the Hugging Face Hub
# will load the Gradio UI directly, which is the best experience for judges.
# API routes (/health, /reset, /step) defined above still take precedence.
try:
import gradio as gr
from server.demo_ui import build_demo
logger.info("Building Gradio demo UI...")
_demo = build_demo()
# Mount the Gradio app at the root /
app = gr.mount_gradio_app(app, _demo, path="/")
logger.info("Gradio demo UI mounted at /")
@app.get("/demo")
@app.get("/launch")
async def demo_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/")
except Exception as _demo_exc:
logger.error("Failed to mount Gradio demo UI: %s", _demo_exc, exc_info=True)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", "7860"))
uvicorn.run(app, host="0.0.0.0", port=port)