Spaces:
Sleeping
Sleeping
| """ | |
| CivicAI FastAPI Server β OpenEnv Compliant API | |
| Endpoints: | |
| POST /reset β Reset environment with task_id | |
| POST /step β Execute action | |
| GET /state β Get current state | |
| GET /tasks β List available tasks | |
| GET /metrics β Get evaluation metrics | |
| GET /health β Health check | |
| GET / β Dashboard UI | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from typing import Any | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from civicai.environment import CivicAIEnv | |
| from civicai.models import Action, Observation | |
| from civicai.tasks import TASKS | |
| from agents.orchestrator import Orchestrator | |
| # --------------------------------------------------------------------------- | |
| # App Setup | |
| # --------------------------------------------------------------------------- | |
| # Global environment and orchestrator (initialized before lifespan) | |
| env = CivicAIEnv() | |
| orchestrator = Orchestrator(env) | |
| step_history: list[dict[str, Any]] = [] | |
| async def lifespan(app: FastAPI): | |
| """Auto-reset the environment on server startup so Step/Auto work immediately.""" | |
| try: | |
| orchestrator.reset("stabilize_economy") | |
| print("[CivicAI] Environment auto-initialized with task: stabilize_economy") | |
| except Exception as exc: | |
| print(f"[CivicAI] WARNING: Could not auto-initialize environment: {exc}") | |
| yield # Server runs here | |
| app = FastAPI( | |
| title="CivicAI Society Simulator", | |
| description="Multi-agent society decision-making environment (OpenEnv compliant)", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Request / Response Models | |
| # --------------------------------------------------------------------------- | |
| class ResetRequest(BaseModel): | |
| task_id: str = "stabilize_economy" | |
| max_steps: int | None = None | |
| class StartSimulationRequest(BaseModel): | |
| task_id: str = "stabilize_economy" | |
| max_steps: int = 50 | |
| class StepRequest(BaseModel): | |
| action: Action | None = None # None = use multi-agent system | |
| use_agents: bool = True | |
| class StepResponse(BaseModel): | |
| observation: dict[str, Any] | |
| reward: float | |
| done: bool | |
| info: dict[str, Any] | |
| # --------------------------------------------------------------------------- | |
| # OpenEnv Endpoints | |
| # --------------------------------------------------------------------------- | |
| async def reset(req: ResetRequest) -> dict[str, Any]: | |
| """Reset the environment with a specific task.""" | |
| global step_history | |
| step_history = [] | |
| try: | |
| obs = orchestrator.reset(req.task_id, req.max_steps) | |
| step_history.append({"turn": 0, "observation": obs.model_dump()}) | |
| return { | |
| "observation": obs.model_dump(), | |
| "task": TASKS[req.task_id].__dict__ | |
| if req.task_id in TASKS else {}, | |
| } | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def step(req: StepRequest) -> StepResponse: | |
| """Execute one step in the environment.""" | |
| try: | |
| # Auto-initialize if env was never reset (e.g. first page load) | |
| if env._state is None: | |
| orchestrator.reset("stabilize_economy") | |
| if req.use_agents or req.action is None: | |
| obs, reward, done, info = orchestrator.run_step() | |
| else: | |
| obs, reward, done, info = env.step(req.action) | |
| # Sanitize info for JSON (remove non-serializable) | |
| safe_info = _sanitize_info(info) | |
| step_history.append({ | |
| "turn": obs.turn, | |
| "observation": obs.model_dump(), | |
| "reward": reward, | |
| "done": done, | |
| }) | |
| return StepResponse( | |
| observation=obs.model_dump(), | |
| reward=reward, | |
| done=done, | |
| info=safe_info, | |
| ) | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_state() -> dict[str, Any]: | |
| """Get full internal state.""" | |
| return env.state().model_dump() | |
| async def list_tasks() -> dict[str, Any]: | |
| """List available tasks.""" | |
| return { | |
| tid: { | |
| "name": t.name, | |
| "difficulty": t.difficulty, | |
| "description": t.description, | |
| "max_steps": t.max_steps, | |
| } | |
| for tid, t in TASKS.items() | |
| } | |
| async def get_metrics() -> dict[str, Any]: | |
| """Get current metrics and history.""" | |
| return { | |
| "step_history": step_history[-100:], # Last 100 steps | |
| "total_steps": len(step_history), | |
| "emergent_summary": env.tracker.get_summary() if env.tracker else {}, | |
| "debate_history": [ | |
| d.model_dump() for d in orchestrator.debate_history[-10:] | |
| ], | |
| } | |
| async def start_simulation(req: StartSimulationRequest) -> dict[str, Any]: | |
| """Run a complete episode and return the full trajectory.""" | |
| global step_history | |
| step_history = [] | |
| result = orchestrator.run_episode(req.task_id, req.max_steps) | |
| # Sanitize info dictionary for JSON serialization | |
| for step in result.get("step_log", []): | |
| if "info" in step: | |
| step["info"] = _sanitize_info(step["info"]) | |
| return result | |
| async def health_check() -> dict[str, str]: | |
| """Health check endpoint.""" | |
| return {"status": "healthy", "version": "1.0.0"} | |
| async def run_episode(task_id: str = "stabilize_economy", max_steps: int | None = None) -> dict[str, Any]: | |
| """Run a complete episode and return results.""" | |
| global step_history | |
| step_history = [] | |
| result = orchestrator.run_episode(task_id, max_steps) | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Dashboard Static Files | |
| # --------------------------------------------------------------------------- | |
| DASHBOARD_DIR = Path(__file__).parent.parent / "dashboard" | |
| if DASHBOARD_DIR.exists(): | |
| async def serve_dashboard(): | |
| return FileResponse(DASHBOARD_DIR / "index.html") | |
| app.mount( | |
| "/dashboard", | |
| StaticFiles(directory=str(DASHBOARD_DIR)), | |
| name="dashboard", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _sanitize_info(info: dict[str, Any]) -> dict[str, Any]: | |
| """Make info dict JSON serializable.""" | |
| clean: dict[str, Any] = {} | |
| for k, v in info.items(): | |
| if isinstance(v, dict): | |
| clean[k] = _sanitize_info(v) | |
| elif isinstance(v, (str, int, float, bool, type(None))): | |
| clean[k] = v | |
| elif isinstance(v, list): | |
| clean[k] = v | |
| else: | |
| clean[k] = str(v) | |
| return clean | |
| # --------------------------------------------------------------------------- | |
| # Entry Point | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |