""" FrontierLabs-Env: FastAPI Server (main.py) OpenEnv-compliant endpoints + HTML Mission Control Dashboard. """ import os import json import asyncio import threading import subprocess from typing import Optional, Dict, Any from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from environment import FrontierLabsEnv, TASKS from graders import grade # --------------------------------------------------------------------------- # Pydantic models (typed per OpenEnv spec) # --------------------------------------------------------------------------- class ActionModel(BaseModel): action_type: str = Field(..., description="One of: write_file, run_script, submit") filename: Optional[str] = Field(None, description="Target filename") content: Optional[str] = Field(None, description="File content (for write_file)") class ResetModel(BaseModel): task_id: Optional[str] = Field(None, description="Task to reset to. Defaults to task1_security_audit") class ObservationModel(BaseModel): task_id: str step: int done: bool message: str files: Dict[str, str] metrics: Dict[str, Any] partial_score: float class RewardModel(BaseModel): value: float explanation: str class StepResponseModel(BaseModel): observation: ObservationModel reward: RewardModel done: bool info: Dict[str, Any] # --------------------------------------------------------------------------- # Application setup # --------------------------------------------------------------------------- app = FastAPI( title="FrontierLabs-Env", description="OpenEnv-compliant AI Infrastructure simulation environment.", version="1.0.0", docs_url="/docs", redoc_url="/redoc", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Single global environment instance (thread-safe via asyncio lock) _env = FrontierLabsEnv() _env_lock = asyncio.Lock() _baseline_results: Dict[str, Any] = {} # --------------------------------------------------------------------------- # Dashboard HTML # --------------------------------------------------------------------------- DASHBOARD_HTML = """ Platform Setup | Environment Settings

Platform Setup

A structured environment for configuring and validating engineering infrastructure tasks.

System Online
3 Scenarios Available
Standardized Validation
REST API
Containerized Environment
Security Protocol Audit
Tier 1
A dataset contains hidden vulnerabilities. Develop a script to scan and neutralize these threats, generating a clean output and a summary report of your findings.
Cluster Memory Reallocation
Tier 2
The cluster is frequently failing due to memory exhaustion during high load periods. Refactor the underlying logic to distribute the load efficiently across the available system resources.
Hardware Bottleneck Resolution
Tier 3
System latency is unusually high due to inefficient internal memory operations. Implement an optimized, lower-level set of instructions to fuse operations and significantly reduce the processing time per cycle.

API Reference

POST/resetInitialize session
POST/stepSubmit step action
GET/stateRetrieve internal state
GET/tasksList task parameters
GET/graderEvaluate score
GET/docsView specification

System Overview

Active Scenarios3
Evaluation Metric0.0 – 1.0
Cycle Limits (T1/T2/T3)20/25/30
Operation TypesRead, Write, Execute
Architecture CoreFastAPI

Usage Example

# Initialize a new session curl -X POST /reset \ -H "Content-Type: application/json" \ -d '{"task_id":"task1_security_audit"}' # Submit a script modification curl -X POST /step \ -H "Content-Type: application/json" \ -d '{"action_type":"write_file", "filename":"audit.py", "content":"..."}'
""" # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/", response_class=HTMLResponse, tags=["Dashboard"]) async def dashboard(): """Mission Control Dashboard — human-readable environment overview.""" return HTMLResponse(content=DASHBOARD_HTML) @app.post("/reset", tags=["OpenEnv"]) async def reset(body: ResetModel = ResetModel()): """Reset the environment and return the initial observation.""" async with _env_lock: obs = _env.reset(task_id=body.task_id) return {"observation": obs} @app.post("/step", tags=["OpenEnv"]) async def step(action: ActionModel): """Take one action in the environment. Returns observation, reward, done, info.""" async with _env_lock: obs, reward_value, done, info = _env.step(action.model_dump(exclude_none=True)) return { "observation": obs, "reward": {"value": round(reward_value, 4), "explanation": info.get("explanation", "")}, "done": done, "info": info, } @app.get("/state", tags=["OpenEnv"]) async def state(): """Return full internal environment state (for debugging / judges).""" async with _env_lock: s = _env.state() return s @app.get("/tasks", tags=["OpenEnv"]) async def tasks(): """Return list of available tasks and the action schema.""" return { "tasks": [ { "id": "task1_security_audit", "name": "Security Audit & Self-Evaluation", "difficulty": "easy", "max_steps": 20, "success_threshold": 0.8, "description": "Detect and remove 50 backdoor prompts from dataset.jsonl. Write audit.py and evaluate.py, run them, then submit.", }, { "id": "task2_fsdp_cluster", "name": "Distributed Cluster Crash (FSDP)", "difficulty": "medium", "max_steps": 25, "success_threshold": 0.8, "description": "Fix OOM crash in train.py by rewriting it as train_fsdp.py using PyTorch FSDP across 8 GPUs.", }, { "id": "task3_triton_kernel", "name": "Triton Hardware Bottleneck", "difficulty": "hard", "max_steps": 30, "success_threshold": 0.8, "description": "Write a fused Triton kernel (fast_silu_kernel.py) replacing slow SiLU+multiply ops, targeting <20ms/step.", }, ], "action_schema": { "action_type": { "type": "string", "enum": ["write_file", "run_script", "submit"], "required": True, }, "filename": { "type": "string", "required": "for write_file and run_script", }, "content": { "type": "string", "required": "for write_file", }, }, } @app.get("/grader", tags=["OpenEnv"]) async def grader(): """Grade the current episode and return score in [0.0, 1.0].""" async with _env_lock: s = _env.state() task_id = s["task_id"] get_file = _env.get_filesystem_file result = grade(task_id, s, get_file) return result @app.post("/baseline", tags=["OpenEnv"]) async def baseline_endpoint(): """ Trigger the baseline inference script (non-blocking). Returns cached results if already computed. """ if _baseline_results: return {"status": "cached", "results": _baseline_results} # Run baseline in background thread def run_baseline(): try: result = subprocess.run( ["python", "inference.py", "--json-output"], capture_output=True, text=True, timeout=300 ) output = result.stdout.strip() if output: try: data = json.loads(output.split("\n")[-1]) _baseline_results.update(data) except json.JSONDecodeError: _baseline_results["error"] = "Could not parse baseline output" _baseline_results["raw"] = output[-500:] else: _baseline_results["error"] = result.stderr[-500:] if result.stderr else "No output" except subprocess.TimeoutExpired: _baseline_results["error"] = "Baseline timed out after 300s" except Exception as e: _baseline_results["error"] = str(e) thread = threading.Thread(target=run_baseline, daemon=True) thread.start() return { "status": "running", "message": "Baseline agent started. Check /baseline again in ~60 seconds for results.", } @app.get("/health", tags=["Meta"]) async def health(): return {"status": "ok", "env": "FrontierLabs-Env", "version": "1.0.0"} # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)