"""
FrontierLabs-Env: FastAPI Server (main.py)
OpenEnv-compliant endpoints + HTML Mission Control Dashboard.
"""
import os
import json
import asyncio
import threading
import subprocess
from typing import Optional, Dict, Any
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from environment import FrontierLabsEnv, TASKS
from graders import grade
# ---------------------------------------------------------------------------
# Pydantic models (typed per OpenEnv spec)
# ---------------------------------------------------------------------------
class ActionModel(BaseModel):
action_type: str = Field(..., description="One of: write_file, run_script, submit")
filename: Optional[str] = Field(None, description="Target filename")
content: Optional[str] = Field(None, description="File content (for write_file)")
class ResetModel(BaseModel):
task_id: Optional[str] = Field(None, description="Task to reset to. Defaults to task1_security_audit")
class ObservationModel(BaseModel):
task_id: str
step: int
done: bool
message: str
files: Dict[str, str]
metrics: Dict[str, Any]
partial_score: float
class RewardModel(BaseModel):
value: float
explanation: str
class StepResponseModel(BaseModel):
observation: ObservationModel
reward: RewardModel
done: bool
info: Dict[str, Any]
# ---------------------------------------------------------------------------
# Application setup
# ---------------------------------------------------------------------------
app = FastAPI(
title="FrontierLabs-Env",
description="OpenEnv-compliant AI Infrastructure simulation environment.",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Single global environment instance (thread-safe via asyncio lock)
_env = FrontierLabsEnv()
_env_lock = asyncio.Lock()
_baseline_results: Dict[str, Any] = {}
# ---------------------------------------------------------------------------
# Dashboard HTML
# ---------------------------------------------------------------------------
DASHBOARD_HTML = """
Platform Setup | Environment Settings
Platform Setup
A structured environment for configuring and validating engineering infrastructure tasks.
System Online
3 Scenarios Available
Standardized Validation
REST API
Containerized Environment
Security Protocol Audit
Tier 1
A dataset contains hidden vulnerabilities. Develop a script to scan and neutralize these threats, generating a clean output and a summary report of your findings.
Cluster Memory Reallocation
Tier 2
The cluster is frequently failing due to memory exhaustion during high load periods. Refactor the underlying logic to distribute the load efficiently across the available system resources.
Hardware Bottleneck Resolution
Tier 3
System latency is unusually high due to inefficient internal memory operations. Implement an optimized, lower-level set of instructions to fuse operations and significantly reduce the processing time per cycle.
API Reference
POST/resetInitialize session
POST/stepSubmit step action
GET/stateRetrieve internal state
GET/tasksList task parameters
GET/graderEvaluate score
GET/docsView specification
System Overview
Active Scenarios3
Evaluation Metric0.0 – 1.0
Cycle Limits (T1/T2/T3)20/25/30
Operation TypesRead, Write, Execute
Architecture CoreFastAPI
Usage Example
# Initialize a new session
curl -X POST /reset \
-H "Content-Type: application/json" \
-d '{"task_id":"task1_security_audit"}'
# Submit a script modification
curl -X POST /step \
-H "Content-Type: application/json" \
-d '{"action_type":"write_file",
"filename":"audit.py",
"content":"..."}'
"""
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/", response_class=HTMLResponse, tags=["Dashboard"])
async def dashboard():
"""Mission Control Dashboard — human-readable environment overview."""
return HTMLResponse(content=DASHBOARD_HTML)
@app.post("/reset", tags=["OpenEnv"])
async def reset(body: ResetModel = ResetModel()):
"""Reset the environment and return the initial observation."""
async with _env_lock:
obs = _env.reset(task_id=body.task_id)
return {"observation": obs}
@app.post("/step", tags=["OpenEnv"])
async def step(action: ActionModel):
"""Take one action in the environment. Returns observation, reward, done, info."""
async with _env_lock:
obs, reward_value, done, info = _env.step(action.model_dump(exclude_none=True))
return {
"observation": obs,
"reward": {"value": round(reward_value, 4), "explanation": info.get("explanation", "")},
"done": done,
"info": info,
}
@app.get("/state", tags=["OpenEnv"])
async def state():
"""Return full internal environment state (for debugging / judges)."""
async with _env_lock:
s = _env.state()
return s
@app.get("/tasks", tags=["OpenEnv"])
async def tasks():
"""Return list of available tasks and the action schema."""
return {
"tasks": [
{
"id": "task1_security_audit",
"name": "Security Audit & Self-Evaluation",
"difficulty": "easy",
"max_steps": 20,
"success_threshold": 0.8,
"description": "Detect and remove 50 backdoor prompts from dataset.jsonl. Write audit.py and evaluate.py, run them, then submit.",
},
{
"id": "task2_fsdp_cluster",
"name": "Distributed Cluster Crash (FSDP)",
"difficulty": "medium",
"max_steps": 25,
"success_threshold": 0.8,
"description": "Fix OOM crash in train.py by rewriting it as train_fsdp.py using PyTorch FSDP across 8 GPUs.",
},
{
"id": "task3_triton_kernel",
"name": "Triton Hardware Bottleneck",
"difficulty": "hard",
"max_steps": 30,
"success_threshold": 0.8,
"description": "Write a fused Triton kernel (fast_silu_kernel.py) replacing slow SiLU+multiply ops, targeting <20ms/step.",
},
],
"action_schema": {
"action_type": {
"type": "string",
"enum": ["write_file", "run_script", "submit"],
"required": True,
},
"filename": {
"type": "string",
"required": "for write_file and run_script",
},
"content": {
"type": "string",
"required": "for write_file",
},
},
}
@app.get("/grader", tags=["OpenEnv"])
async def grader():
"""Grade the current episode and return score in [0.0, 1.0]."""
async with _env_lock:
s = _env.state()
task_id = s["task_id"]
get_file = _env.get_filesystem_file
result = grade(task_id, s, get_file)
return result
@app.post("/baseline", tags=["OpenEnv"])
async def baseline_endpoint():
"""
Trigger the baseline inference script (non-blocking).
Returns cached results if already computed.
"""
if _baseline_results:
return {"status": "cached", "results": _baseline_results}
# Run baseline in background thread
def run_baseline():
try:
result = subprocess.run(
["python", "inference.py", "--json-output"],
capture_output=True, text=True, timeout=300
)
output = result.stdout.strip()
if output:
try:
data = json.loads(output.split("\n")[-1])
_baseline_results.update(data)
except json.JSONDecodeError:
_baseline_results["error"] = "Could not parse baseline output"
_baseline_results["raw"] = output[-500:]
else:
_baseline_results["error"] = result.stderr[-500:] if result.stderr else "No output"
except subprocess.TimeoutExpired:
_baseline_results["error"] = "Baseline timed out after 300s"
except Exception as e:
_baseline_results["error"] = str(e)
thread = threading.Thread(target=run_baseline, daemon=True)
thread.start()
return {
"status": "running",
"message": "Baseline agent started. Check /baseline again in ~60 seconds for results.",
}
@app.get("/health", tags=["Meta"])
async def health():
return {"status": "ok", "env": "FrontierLabs-Env", "version": "1.0.0"}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False)