""" ImmunoOrg 2.0 — FastAPI OpenEnv Server ======================================= Implements the OpenEnv REST API without requiring the openenv package. Endpoints: GET /health POST /reset POST /step GET /state """ from __future__ import annotations import json import os import uuid from typing import Any, Optional from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import PlainTextResponse from pydantic import BaseModel from immunoorg.models import ( ActionType, TacticalAction, StrategicAction, DiagnosticAction, ImmunoAction, ) from immunoorg.environment import ImmunoOrgEnvironment # ─── Request / Response schemas ────────────────────────────────────────────── class ResetRequest(BaseModel): seed: Optional[int] = None difficulty: int = 1 task: Optional[str] = None class ImmunoOrgAction(BaseModel): action_type: str = "tactical" tactical_action: Optional[str] = None strategic_action: Optional[str] = None diagnostic_action: Optional[str] = None target: str = "" secondary_target: Optional[str] = None parameters: dict[str, Any] = {} reasoning: str = "" class StepEnvelope(BaseModel): """OpenEnv-style request body: { action: {...} }""" action: ImmunoOrgAction class ImmunoOrgObservation(BaseModel): """OpenEnv-style observation payload returned in responses.""" done: bool episode_id: str current_phase: str step_count: int sim_time: float threat_level: float system_downtime: float action_result: str action_success: bool visible_nodes: list[dict[str, Any]] detected_attacks: list[dict[str, Any]] recent_logs: list[dict[str, Any]] network_health_summary: dict[str, Any] org_nodes: list[dict[str, Any]] pending_approvals: list[dict[str, Any]] belief_map_feedback: str alerts: list[str] class StepResponse(BaseModel): observation: ImmunoOrgObservation reward: float done: bool info: dict[str, Any] # ─── Global environment instance ───────────────────────────────────────────── _env: Optional[ImmunoOrgEnvironment] = None _episode_id: str = "" def _get_env() -> ImmunoOrgEnvironment: if _env is None: raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.") return _env def _build_action(req: ImmunoOrgAction) -> ImmunoAction: try: atype = ActionType(req.action_type) except ValueError: atype = ActionType.TACTICAL tactical = TacticalAction(req.tactical_action) if req.tactical_action else None strategic = StrategicAction(req.strategic_action) if req.strategic_action else None diagnostic = DiagnosticAction(req.diagnostic_action) if req.diagnostic_action else None return ImmunoAction( action_type=atype, tactical_action=tactical, strategic_action=strategic, diagnostic_action=diagnostic, target=req.target or "", secondary_target=req.secondary_target, parameters=req.parameters or {}, reasoning=req.reasoning or "", ) def _obs_to_payload(obs, done: bool) -> ImmunoOrgObservation: return ImmunoOrgObservation( done=done, episode_id=_episode_id, current_phase=obs.current_phase.value, step_count=obs.step_count, sim_time=obs.sim_time, threat_level=obs.threat_level, system_downtime=obs.system_downtime, action_result=obs.action_result, action_success=obs.action_success, visible_nodes=[n.model_dump() for n in obs.visible_nodes], detected_attacks=[a.model_dump() for a in obs.detected_attacks], recent_logs=[lg.model_dump() for lg in obs.recent_logs[:10]], network_health_summary=obs.network_health_summary, org_nodes=[n.model_dump() for n in obs.org_nodes], pending_approvals=[a.model_dump() for a in obs.pending_approvals], belief_map_feedback=obs.belief_map_feedback, alerts=obs.alerts, ) def _step_response(obs, reward: float, done: bool) -> StepResponse: observation = _obs_to_payload(obs, done=done) info = { "episode_id": _episode_id, "phase": observation.current_phase, "step_count": observation.step_count, } return StepResponse(observation=observation, reward=reward, done=done, info=info) # ─── FastAPI app ────────────────────────────────────────────────────────────── app = FastAPI( title="ImmunoOrg 2.0 OpenEnv API", description="The Autonomous, Self-Healing Enterprise — OpenEnv RL Environment", version="2.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") async def health(): return { "status": "healthy", "version": "2.0.0", "environment": "ImmunoOrg", "episode_active": _env is not None, } @app.get("/") async def root(): return { "name": "ImmunoOrg 2.0", "description": "Autonomous, Self-Healing Enterprise — OpenEnv RL Environment", "endpoints": ["/health", "/reset", "/step", "/state"], "hf_space": "https://huggingface.co/spaces/hirann/immunoorg-2", "version": "2.0.0", } @app.post("/reset") async def reset(req: ResetRequest = ResetRequest()) -> StepResponse: global _env, _episode_id _episode_id = str(uuid.uuid4()) _env = ImmunoOrgEnvironment(difficulty=req.difficulty, seed=req.seed) obs = _env.reset() return _step_response(obs, reward=0.0, done=False) @app.post("/step") async def step(req: ImmunoOrgAction | StepEnvelope): env = _get_env() action_req = req.action if isinstance(req, StepEnvelope) else req action = _build_action(action_req) obs, reward, done = env.step(action) return _step_response(obs, reward=reward, done=done) @app.get("/state") async def state(): env = _get_env() s = env.state return { "episode_id": _episode_id, "step_count": s.step_count, "difficulty_level": s.difficulty_level, "current_phase": s.current_phase.value, "threat_level": s.threat_level, "total_downtime": s.total_downtime, "total_damage": s.total_damage, "org_chaos_score": s.org_chaos_score, "cumulative_reward": s.cumulative_reward, "active_attacks": len(s.active_attacks), "contained_attacks": len(s.contained_attacks), "org_changes_made": s.org_changes_made, "termination_reason": s.termination_reason, # 2.0 metrics "migration_progress": env.migration_engine.get_progress() if env.migration_engine else {}, "pipeline_integrity": env._last_pipeline_integrity, "war_room_debates": len(env.war_room.debate_history) if env.war_room else 0, "patronus_score": env.executive_context.get_patronus_score() if env.executive_context else 0.5, } @app.get("/openenv.yaml") async def get_openenv_yaml(): """Serve the environment manifest.""" try: with open("openenv.yaml", "r") as f: content = f.read() return PlainTextResponse(content, media_type="text/yaml") except FileNotFoundError: raise HTTPException(status_code=404, detail="openenv.yaml not found") if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", "7860")) uvicorn.run(app, host="0.0.0.0", port=port)