A modular, phase-gated engineering plan for building the first RL environment for LLM inference control. Every phase ends with a fully functional, testable system. No phase leaves you broken. Deadline: April 7, 2026 · 11 days · 3 people.
simulate(action) right now.ServeAction, ServeObservation, and MetricsSnapshot on Day 1, before writing a single line of logic. Changing the schema mid-build is the #1 cause of integration hell.curl http://localhost:7860/health and get a 200 OK. All three people have cloned the repo, installed deps, and can run the stub server locally. The data schemas are written and committed to models.py. Nobody can start Day 2 until this is true.simulator/trace_sim.py with class stubs: TraceSimulator.__init__, simulate(action, workload) returning a hardcoded MetricsSnapshot.simulator/workload.py — stub that returns a fixed WorkloadState dict every time.server/app.py with all 8 endpoint stubs that return hardcoded valid responses.openenv init, understand what openenv validate checks. Make sure the stub server passes basic validation.feat/simulator, feat/api, etc.), set up .gitignore.simulator/data/workload_configs.json with the exact parameters for Task 1, 2, and 3 (arrival rate, SLO, prompt distribution params).from dataclasses import dataclass, field from typing import Optional, List, Dict, Any from enum import Enum # ── Action space ───────────────────────────────────────────────────────────── class QuantTier(Enum): FP16 = 0 INT8 = 1 INT4 = 2 @dataclass class ServeAction: kv_budget: float # 0.1 – 1.0 : fraction of KV cache allocated spec_length: int # 0,1,2,4,8 : speculative draft tokens batch_size: int # 1–512 : max concurrent requests prefill_disagg: bool # True/False : disaggregate prefill GPU quant_tier: QuantTier # FP16/INT8/INT4 def validate(self) -> bool: assert 0.1 <= self.kv_budget <= 1.0 assert self.spec_length in {0,1,2,4,8} assert 1 <= self.batch_size <= 512 return True # ── Simulator output ────────────────────────────────────────────────────────── @dataclass class MetricsSnapshot: ttft_p50_ms: float # median time to first token ttft_p99_ms: float # tail latency tpot_ms: float # time per output token tokens_per_sec: float # throughput gpu_memory_gb: float # simulated memory pressure cost_per_1k: float # compute cost (normalised units) spec_accept_rate: float # 0.0 if spec_length == 0 eviction_events: int # KV cache evictions this step slo_violations: int # requests that exceeded SLO this step # ── Observation (what agent sees) ──────────────────────────────────────────── @dataclass class ServeObservation: queue_depth: float mean_prompt_len: float arrival_rate: float kv_cache_occupancy: float ttft_p50: float tpot_p50: float slo_violation_rate: float gpu_memory_used_gb: float spec_accept_rate: float priority_distribution: List[float] # [interactive, batch, best_effort] timestep: int cost_so_far: float # ── Workload state ──────────────────────────────────────────────────────────── @dataclass class WorkloadState: arrival_rate: float mean_prompt_len: float prompt_len_bucket: int # 0–7, discrete bucket for lookup table queue_depth: int priority_distribution: List[float] is_burst: bool phase: str # "warmup" | "steady" | "burst" | "cooldown"
# From repo root: docker build -t inferencegym . && docker run -p 7860:7860 inferencegym & curl http://localhost:7860/health # → {"status": "ok"} curl http://localhost:7860/tasks # → {"tasks": [{...}, {...}, {...}]} python -c "from inferencegym.models import ServeAction, ServeObservation; print('schemas OK')"
TraceSimulator.simulate(action, workload) → MetricsSnapshot works, Person B can wire it into the API and Person C can build the grader. Both of those can proceed in parallel. Person A must finish this by end of Day 3 even if it means simplifying the interpolation.
python tests/test_simulator.py passes all tests. The simulator returns realistic-shaped numbers for a variety of (action, workload) inputs. The workload generator produces a different workload state on every call. These are the two things that need to be true before Phase 2 begins.(batch_bucket, kv_bucket, spec_bucket, prompt_bucket). Each value is a MetricsSnapshot. The lookup table must be loaded once at startup and cached in memory.scipy.interpolate.RegularGridInterpolator for continuous actions (kv_budget, batch_size) between discrete lookup points. For discrete actions (spec_length, quant_tier), use nearest-neighbor lookup.ttft_p50_ms and tpot_ms to simulate hardware jitter. Use np.random.default_rng(seed) so episodes are reproducible.gpu_memory_gb > 40.0, set a hard OOM flag, cap memory at 40GB, and multiply slo_violations by 5 as a penalty signal.np.random.poisson(lam=arrival_rate) per step. Arrival rate varies by task config loaded from workload_configs.json.np.random.uniform(64, 128). Task 2: np.random.lognormal(5.2, 1.3) clamped to [32, 8192]. Task 3: bimodal — 70% uniform(32, 128), 30% uniform(4096, 8192).np.digitize against [64, 128, 256, 512, 1024, 2048, 4096]. This is the lookup table key.queue_depth counter. Each step: add new arrivals, subtract min(batch_size, queue_depth) served requests. Queue cannot go negative.is_burst=True in WorkloadState during these steps.WorkloadState.priority_distribution.accept_rate = base_rate * (1 - complexity_penalty) * depth_decay where depth_decay = 1.0 / (1 + 0.15 * spec_length). Base rate by task: Task1=0.80, Task2=0.65, Task3=0.45.simulate(action, workload) with 20 random valid actions — all return a non-null MetricsSnapshot with values in expected ranges.batch_size while holding other actions constant should strictly increase tokens_per_sec (up to a threshold). This validates the lookup table is correctly loaded.batch_size=512, kv_budget=1.0 — confirm gpu_memory_gb triggers the overflow flag.import numpy as np import pandas as pd from scipy.interpolate import RegularGridInterpolator from pathlib import Path from inferencegym.models import ServeAction, WorkloadState, MetricsSnapshot, QuantTier class TraceSimulator: """ CPU-only trace-driven simulator. Loads a pre-built lookup table and interpolates (action, workload) → MetricsSnapshot. """ BATCH_POINTS = [1, 4, 8, 16, 32, 64, 128, 256, 512] KV_POINTS = [0.1, 0.25, 0.5, 0.75, 1.0] PLEN_BUCKETS = [64, 128, 256, 512, 1024, 2048, 4096, 8192] OOM_THRESHOLD = 40.0 # GB NOISE_STD = 0.05 # ±5% Gaussian jitter on latency metrics def __init__(self, trace_path: str, seed: int = 42): self.rng = np.random.default_rng(seed) self._load_tables(Path(trace_path)) self._build_interpolators() def _load_tables(self, path: Path) -> None: df = pd.read_parquet(path) # Expected columns: batch_size, kv_budget, spec_length, quant_tier, # prompt_len_bucket, ttft_p50, ttft_p99, tpot, tps, gpu_mem_gb, cost_per_1k self._df = df def _build_interpolators(self) -> None: # Build 4-D interpolator over (batch_size, kv_budget, spec_len, prompt_bucket) # for FP16 baseline. INT8/INT4 handled via multiplicative correction factors. fp16_df = self._df[self._df['quant_tier'] == 0] grid_vals = { 'ttft_p50': self._reshape_for_interp(fp16_df, 'ttft_p50'), 'ttft_p99': self._reshape_for_interp(fp16_df, 'ttft_p99'), 'tpot': self._reshape_for_interp(fp16_df, 'tpot'), 'tps': self._reshape_for_interp(fp16_df, 'tps'), 'gpu_mem': self._reshape_for_interp(fp16_df, 'gpu_mem_gb'), } points = (self.BATCH_POINTS, self.KV_POINTS, [0,1,2,4,8], self.PLEN_BUCKETS) self._interps = {k: RegularGridInterpolator(points, v, method='linear', bounds_error=False) for k, v in grid_vals.items()} def simulate(self, action: ServeAction, workload: WorkloadState) -> MetricsSnapshot: action.validate() query = [[action.batch_size, action.kv_budget, action.spec_length, workload.mean_prompt_len]] # Interpolate base metrics base = {k: float(fn(query)[0]) for k, fn in self._interps.items()} # Apply quant tier correction factors (from benchmark data) quant_factors = {QuantTier.FP16: 1.0, QuantTier.INT8: 0.82, QuantTier.INT4: 0.68} q_factor = quant_factors[action.quant_tier] base['ttft_p50'] *= q_factor base['tps'] /= q_factor # quantised models serve faster base['gpu_mem'] *= q_factor # quantised models use less memory # Apply speculative decoding acceptance bonus if action.spec_length > 0: depth_decay = 1.0 / (1 + 0.15 * action.spec_length) accept_rate = 0.75 * (1 - 0.1 * workload.prompt_len_bucket) * depth_decay accept_rate = max(0.0, min(1.0, accept_rate)) speedup = 1.0 + accept_rate * action.spec_length * 0.1 base['ttft_p50'] /= speedup else: accept_rate = 0.0 # Inject Gaussian noise noise = self.rng.normal(1.0, self.NOISE_STD, size=3) base['ttft_p50'] *= noise[0] base['ttft_p99'] *= noise[1] base['tpot'] *= noise[2] # OOM detection oom = base['gpu_mem'] > self.OOM_THRESHOLD slo_violations = 0 # computed by env, not simulator if oom: base['gpu_mem'] = self.OOM_THRESHOLD slo_violations = action.batch_size # all requests fail on OOM return MetricsSnapshot( ttft_p50_ms = max(1.0, base['ttft_p50']), ttft_p99_ms = max(1.0, base['ttft_p99']), tpot_ms = max(1.0, base['tpot']), tokens_per_sec = max(0.0, base['tps']), gpu_memory_gb = base['gpu_mem'], cost_per_1k = base['tps'] * q_factor * 0.001, spec_accept_rate = accept_rate, eviction_events = int(max(0, (1.0 - action.kv_budget) * workload.queue_depth)), slo_violations = slo_violations, )
github.com/vllm-project/vllm/tree/main/benchmarks and the HuggingFace llm-perf-leaderboard. These have real measured latencies across batch sizes. Fit a pandas pivot table to get the lookup grid.llmperf on a Colab free T4 with Llama-3.2-1B-Instruct (free tier works). Grid search over batch_size=[1,4,8,16,32] × prompt_len=[64,128,256,512] — that's 20 measurements. 2 hours of Colab time.ttft = base_ms + batch_factor * batch_size + memory_factor * prompt_len. These constants are documented in vLLM's OSDI paper. Fully deterministic, always works.obs = env.reset(task_id=1); [env.step(random_action()) for _ in range(200)]. Rewards are floats in [-1, 1]. The episode terminates at step 200. Session IDs are unique per reset call.import uuid, json, threading import numpy as np from dataclasses import dataclass from inferencegym.models import ServeAction, ServeObservation, WorkloadState, MetricsSnapshot from simulator.trace_sim import TraceSimulator from simulator.workload import WorkloadGenerator @dataclass class EnvConfig: task_id: int episode_len: int = 200 slo_target_ms: float = 300.0 max_memory_gb: float = 40.0 # Reward weights alpha: float = 0.40 # throughput beta: float = 0.25 # latency gamma: float = 0.25 # SLO violations delta: float = 0.10 # cost # Task configs — loaded from workload_configs.json TASK_CONFIGS = { 1: EnvConfig(task_id=1, slo_target_ms=500.0), 2: EnvConfig(task_id=2, slo_target_ms=300.0, gamma=0.30), 3: EnvConfig(task_id=3, slo_target_ms=200.0, gamma=0.35, delta=0.15), } # Max achievable throughput per task (set after running optimal solver) MAX_THROUGHPUT = {1: 8500.0, 2: 6200.0, 3: 4800.0} class InferenceEnv: def __init__(self, simulator: TraceSimulator, task_id: int, seed: int = 42): self.sim = simulator self.config = TASK_CONFIGS[task_id] self.gen = WorkloadGenerator(task_id=task_id, seed=seed) self.session_id = str(uuid.uuid4()) self._step = 0 self._cost_so_far = 0.0 self._workload = self.gen.reset() self._last_metrics: MetricsSnapshot = None self._episode_log: list = [] def reset(self) -> ServeObservation: self.session_id = str(uuid.uuid4()) self._step = 0 self._cost_so_far = 0.0 self._workload = self.gen.reset() self._episode_log = [] return self._build_obs(MetricsSnapshot( ttft_p50_ms=200.0, ttft_p99_ms=350.0, tpot_ms=20.0, tokens_per_sec=2000.0, gpu_memory_gb=24.0, cost_per_1k=0.001, spec_accept_rate=0.0, eviction_events=0, slo_violations=0)) def step(self, action: ServeAction): if self._step >= self.config.episode_len: raise RuntimeError("Episode already done. Call reset() first.") # Task 1 & 2: lock certain actions action = self._enforce_action_mask(action) # Advance workload one step self._workload = self.gen.step(action) # Simulate this step metrics = self.sim.simulate(action, self._workload) self._last_metrics = metrics # Compute SLO violations from simulator metrics + SLO target metrics.slo_violations += int( metrics.ttft_p50_ms > self.config.slo_target_ms) * self._workload.queue_depth # Compute reward reward = self._compute_reward(metrics) # Update episode state self._cost_so_far += metrics.cost_per_1k self._step += 1 done = self._step >= self.config.episode_len obs = self._build_obs(metrics) info = {"timestep": self._step, "metrics": metrics.__dict__, "workload": self._workload.__dict__} self._episode_log.append({"action": action.__dict__, "reward": reward, "metrics": metrics.__dict__}) return obs, reward, done, info def _compute_reward(self, m: MetricsSnapshot) -> float: c = self.config T = m.tokens_per_sec / MAX_THROUGHPUT[c.task_id] L = m.ttft_p50_ms / c.slo_target_ms V = m.slo_violations / max(self._workload.queue_depth, 1) C = m.cost_per_1k / 0.005 # normalise against budget ceiling reward = c.alpha * T - c.beta * L - c.gamma * V - c.delta * C return float(np.clip(reward, -1.0, 1.0)) def _enforce_action_mask(self, action: ServeAction) -> ServeAction: if self.config.task_id == 1: action.spec_length = 0; action.prefill_disagg = False; action.quant_tier = QuantTier.FP16 elif self.config.task_id == 2: action.prefill_disagg = False; action.quant_tier = QuantTier.FP16 return action def _build_obs(self, m: MetricsSnapshot) -> ServeObservation: w = self._workload return ServeObservation( queue_depth = float(w.queue_depth), mean_prompt_len = w.mean_prompt_len, arrival_rate = w.arrival_rate, kv_cache_occupancy = (1.0 - (m.eviction_events / max(w.queue_depth, 1))), ttft_p50 = m.ttft_p50_ms, tpot_p50 = m.tpot_ms, slo_violation_rate = m.slo_violations / max(w.queue_depth, 1), gpu_memory_used_gb = m.gpu_memory_gb, spec_accept_rate = m.spec_accept_rate, priority_distribution = w.priority_distribution, timestep = self._step, cost_so_far = self._cost_so_far, )
openenv validate --url http://localhost:7860. Every endpoint returns the correct shape. The Docker image is under 2GB. A full reset→step×200→grader cycle completes in under 60 seconds.| Endpoint | Method | Owns | Wired to | Key Behaviour |
|---|---|---|---|---|
/health | GET | Person B | Session cache count | Returns {"status":"ok","active_sessions":N,"uptime_s":T} |
/tasks | GET | Person B | Static task config dict | Returns list of 3 tasks with id, name, difficulty, description, active_actions |
/reset | POST | Person B | InferenceEnv.reset() |
Creates new session_id, instantiates InferenceEnv for that task, stores in LRU cache. Returns session_id + observation. |
/step | POST | Person B | InferenceEnv.step() |
Looks up session by session_id, validates ServeAction, calls step(), returns obs+reward+done+info. 422 if session not found. |
/state | GET | Person B | InferenceEnv.state() |
Returns current episode metadata: step_count, cumulative_reward, done, workload_phase. |
/grader | POST | Person C | GraderModule.score() |
Accepts episode_log JSON, returns score 0–1 with breakdown. Stateless — same input always same output. |
/baseline | GET | Person C | BaselineAgent.run() |
Runs the fixed-config baseline agent on all 3 tasks, returns scores. Fixed seed guarantees reproducibility. |
/info | GET | Person B | Static schema | Returns full JSON schema for action space, observation space, reward weights. Used by agent frameworks. |
import threading from collections import OrderedDict from typing import Optional from env.inference_env import InferenceEnv class SessionManager: """Thread-safe LRU cache of active InferenceEnv instances.""" MAX_SESSIONS = 50 def __init__(self, simulator): self._sim = simulator self._lock = threading.Lock() self._sessions: OrderedDict[str, InferenceEnv] = OrderedDict() def create(self, task_id: int, seed: int) -> InferenceEnv: with self._lock: if len(self._sessions) >= self.MAX_SESSIONS: self._sessions.popitem(last=False) # evict oldest env = InferenceEnv(self._sim, task_id, seed) self._sessions[env.session_id] = env return env def get(self, session_id: str) -> Optional[InferenceEnv]: with self._lock: env = self._sessions.get(session_id) if env: # move to end (mark as recently used) self._sessions.move_to_end(session_id) return env def remove(self, session_id: str) -> None: with self._lock: self._sessions.pop(session_id, None) def count(self) -> int: return len(self._sessions)
from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional import time from simulator.trace_sim import TraceSimulator from simulator.session_manager import SessionManager from inferencegym.models import ServeAction, QuantTier app = FastAPI(title="InferenceGym", version="1.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # ── App startup: load simulator once, create session manager ───────────────── _sim = None _sessions = None _start_time = time.time() @app.on_event("startup") async def startup(): global _sim, _sessions _sim = TraceSimulator("simulator/data/traces_llama3_8b.parquet") _sessions = SessionManager(_sim) # ── Pydantic request/response models ──────────────────────────────────────── class ResetRequest(BaseModel): task_id: int seed: int = 42 config: Optional[dict] = None # override alpha/beta/gamma/delta class StepRequest(BaseModel): session_id: str action: dict class GraderRequest(BaseModel): task_id: int episode_log: list # ── Endpoints ───────────────────────────────────────────────────────────────── @app.get("/health") def health(): return {"status": "ok", "active_sessions": _sessions.count(), "uptime_seconds": int(time.time() - _start_time)} @app.get("/tasks") def get_tasks(): return {"tasks": [ {"id":1, "name":"Static Uniform", "difficulty":"easy", "active_actions":["kv_budget","batch_size"]}, {"id":2, "name":"Bursty ShareGPT", "difficulty":"medium", "active_actions":["kv_budget","batch_size","spec_length"]}, {"id":3, "name":"Adversarial Multi-Tenant","difficulty":"hard", "active_actions":["kv_budget","batch_size","spec_length","prefill_disagg","quant_tier"]}, ]} @app.post("/reset") def reset(req: ResetRequest): if req.task_id not in {1, 2, 3}: raise HTTPException(422, f"task_id must be 1, 2, or 3. Got {req.task_id}") env = _sessions.create(req.task_id, req.seed) obs = env.reset() return {"session_id": env.session_id, "observation": obs.__dict__, "episode_length": 200} @app.post("/step") def step(req: StepRequest): env = _sessions.get(req.session_id) if not env: raise HTTPException(404, f"Session '{req.session_id}' not found. Call /reset first.") action = ServeAction( kv_budget = req.action.get("kv_budget", 1.0), spec_length = req.action.get("spec_length", 0), batch_size = req.action.get("batch_size", 32), prefill_disagg = req.action.get("prefill_disagg", False), quant_tier = QuantTier(req.action.get("quant_tier", 0)), ) obs, reward, done, info = env.step(action) if done: _sessions.remove(req.session_id) return {"observation": obs.__dict__, "reward": reward, "done": done, "info": info}
# Stage 1: Install dependencies only FROM python:3.11-slim AS builder WORKDIR /build COPY requirements.txt . RUN pip install --no-cache-dir --user -r requirements.txt # Stage 2: Minimal runtime (no build tools) FROM python:3.11-slim WORKDIR /app COPY --from=builder /root/.local /root/.local COPY . . ENV PATH=/root/.local/bin:$PATH ENV PYTHONPATH=/app EXPOSE 7860 # HuggingFace Spaces convention: port 7860 CMD ["uvicorn", "server.app:app", "--host", "0.0.0.0", "--port", "7860", "--workers", "2"] ## requirements.txt (CPU-only — NO torch, NO CUDA) # fastapi==0.115.0 # uvicorn[standard]==0.30.0 # pydantic==2.7.0 # numpy==1.26.4 # scipy==1.13.0 # pandas==2.2.0 # pyarrow==15.0.0 (for parquet reading) # stable-baselines3==2.3.0 (PPO demo only) # gymnasium==0.29.1 # httpx==0.27.0 (for integration tests)
# All values are means over the 200-step episode log score = (agent_tps - baseline_tps) / (optimal_tps - baseline_tps) score = max(0.0, min(1.0, score)) # baseline_tps ≈ 2800 tokens/s (batch=32, kv=1.0) # optimal_tps ≈ 8200 tokens/s (batch=128, kv=0.5)
ttft_score = max(0.0, 1.0 - mean_ttft_p50 / 300.0) peak_mem = max(episode_log, key=lambda x: x['metrics']['gpu_memory_gb']) mem_score = 1.0 if peak_mem < 36.0 else max(0.0, 1.0 - (peak_mem-36)/10) score = 0.5 * ttft_score + 0.5 * mem_score
T = mean_tps / optimal_tps # throughput S = 1.0 - mean_slo_violation_rate # SLO compliance C = max(0.0, 1.0 - total_cost/5.0) # cost (budget=5.0) A = 1.0 - action_variance_score # stability score = 0.40*T + 0.30*S + 0.20*C + 0.10*A
actions = [step['action'] for step in episode_log]
batch_diffs = np.diff([a['batch_size'] for a in actions])
kv_diffs = np.diff([a['kv_budget'] for a in actions])
variance = np.std(batch_diffs)/512 + np.std(kv_diffs)/1.0
action_variance_score = min(1.0, variance / 0.5) # 0=stable, 1=chaoticimport numpy as np from typing import List, Dict, Any class GraderModule: """Deterministic grader. Same episode_log → same score, always.""" BASELINE_TPS = {1: 2800.0, 2: 2100.0, 3: 1600.0} OPTIMAL_TPS = {1: 8200.0, 2: 5800.0, 3: 4200.0} def score(self, task_id: int, episode_log: List[Dict[str, Any]]) -> Dict: if not episode_log: return {"score": 0.0, "breakdown": {}, "feedback": "Empty episode log."} graders = {1: self._task1, 2: self._task2, 3: self._task3} if task_id not in graders: raise ValueError(f"Unknown task_id: {task_id}") return graders[task_id](episode_log) def _task1(self, log) -> Dict: mean_tps = np.mean([s['metrics']['tokens_per_sec'] for s in log]) score = (mean_tps - self.BASELINE_TPS[1]) / (self.OPTIMAL_TPS[1] - self.BASELINE_TPS[1]) score = float(np.clip(score, 0.0, 1.0)) feedback = self._throughput_feedback(mean_tps, 1) return {"score": score, "breakdown": {"throughput": score}, "feedback": feedback} def _task2(self, log) -> Dict: mean_ttft = np.mean([s['metrics']['ttft_p50_ms'] for s in log]) peak_mem = max(s['metrics']['gpu_memory_gb'] for s in log) ttft_score = float(np.clip(1.0 - mean_ttft / 300.0, 0.0, 1.0)) mem_score = 1.0 if peak_mem < 36.0 else float(np.clip(1.0 - (peak_mem-36)/10, 0.0, 1.0)) score = 0.5 * ttft_score + 0.5 * mem_score feedback = f"TTFT score: {ttft_score:.2f} (mean TTFT {mean_ttft:.0f}ms vs 300ms SLO). Memory score: {mem_score:.2f} (peak {peak_mem:.1f}GB vs 36GB limit)." return {"score": score, "breakdown": {"ttft": ttft_score, "memory": mem_score}, "feedback": feedback} def _task3(self, log) -> Dict: mean_tps = np.mean([s['metrics']['tokens_per_sec'] for s in log]) mean_slo = np.mean([s['metrics']['slo_violations'] for s in log]) total_cost = sum(s['metrics']['cost_per_1k'] for s in log) actions = [s['action'] for s in log] T = float(np.clip(mean_tps / self.OPTIMAL_TPS[3], 0.0, 1.0)) S = float(np.clip(1.0 - mean_slo / 100.0, 0.0, 1.0)) C = float(np.clip(1.0 - total_cost / 5.0, 0.0, 1.0)) A = 1.0 - self._action_variance(actions) score = 0.40*T + 0.30*S + 0.20*C + 0.10*A feedback = self._task3_feedback(T, S, C, A, log) return {"score": score, "breakdown": {"throughput":T,"slo":S,"cost":C,"stability":A}, "feedback": feedback} def _action_variance(self, actions) -> float: batch_vals = [a.get('batch_size', 32) for a in actions] kv_vals = [a.get('kv_budget', 1.0) for a in actions] variance = np.std(np.diff(batch_vals))/512 + np.std(np.diff(kv_vals))/1.0 return float(np.clip(variance / 0.5, 0.0, 1.0)) def _throughput_feedback(self, mean_tps, task_id) -> str: pct = (mean_tps - self.BASELINE_TPS[task_id]) / (self.OPTIMAL_TPS[task_id] - self.BASELINE_TPS[task_id]) * 100 return ff"Agent achieved {mean_tps:.0f} TPS ({pct:.0f}% of way from baseline to optimal)."
from inferencegym.models import ServeAction, QuantTier from env.inference_env import InferenceEnv from simulator.trace_sim import TraceSimulator from grader.grader import GraderModule # The fixed action that the baseline ALWAYS takes, regardless of observation BASELINE_ACTION = ServeAction( kv_budget = 1.0, # no eviction spec_length = 0, # speculative decoding off batch_size = 32, # vLLM default prefill_disagg = False, # colocated quant_tier = QuantTier.FP16, # full precision ) def run_baseline(task_id: int, seed: int = 0) -> dict: """Runs fixed baseline agent on one task, returns grader score.""" sim = TraceSimulator("simulator/data/traces_llama3_8b.parquet", seed=seed) env = InferenceEnv(sim, task_id=task_id, seed=seed) grader = GraderModule() env.reset() done = False while not done: _, _, done, _ = env.step(BASELINE_ACTION) result = grader.score(task_id, env._episode_log) return {"task_id": task_id, "score": result["score"], "breakdown": result["breakdown"], "action_config": BASELINE_ACTION.__dict__} def run_all_baselines() -> dict: # Seed=0 guarantees identical results every run return {"scores": {f"task{i}": run_baseline(i, seed=0)["score"] for i in [1,2,3]}, "expected_range": {"task1":[0.30,0.40], "task2":[0.22,0.32], "task3":[0.18,0.28]}}
title: InferenceGym, emoji: 🏋️, colorFrom: green, colorTo: blue, sdk: docker, pinned: false. This controls the HF Space landing page.gymnasium.Env. reset() calls POST /reset. step(action) calls POST /step. observation_space is Box(low=-inf, high=inf, shape=(12,)). action_space is Box for continuous knobs.stable_baselines3.PPO("MlpPolicy", env, verbose=1). Train 50k steps. Plot ep_rew_mean over time using matplotlib. It should go from ~0.1 at start to ~0.35+ by 50k steps.VecNormalize, (2) reduce learning rate to 1e-4, (3) increase n_steps to 2048, (4) check reward range is [-1,1] (it should be from InferenceEnv). The environment is designed to be learnable — reward engineering is correct.# Cell 1: Title markdown # "# InferenceGym Demo — Meta PyTorch × Scaler Hackathon 2026" # Cell 2: Install (runs in 90 seconds on Colab) !pip install stable-baselines3 gymnasium httpx pandas matplotlib -q # Cell 3: Connect to live environment HF_URL = "https://YOUR_ORG-inferencegym.hf.space" import httpx response = httpx.get(f"{HF_URL}/health") print("Environment status:", response.json()) # Cell 4: Show available tasks tasks = httpx.get(f"{HF_URL}/tasks").json() for t in tasks['tasks']: print(f"{t['id']}: {t['name']} ({t['difficulty']})") # Cell 5: Run baseline agent, show scores baseline = httpx.get(f"{HF_URL}/baseline").json() print("Baseline scores (naïve vLLM defaults):", baseline['scores']) # Cell 6: Manual episode — human in the loop res = httpx.post(f"{HF_URL}/reset", json={"task_id": 1, "seed": 42}).json() session_id = res['session_id']; obs = res['observation'] print("Initial observation:", obs) # Cell 7: Run 10 manual steps with a smart action episode_log = [] for _ in range(10): result = httpx.post(f"{HF_URL}/step", json={"session_id": session_id, "action": {"kv_budget":0.6, "batch_size":128, "spec_length":0, "prefill_disagg":False, "quant_tier":0}}).json() episode_log.append(result) # Cell 8: Gym wrapper import gymnasium as gym; import numpy as np; import httpx class InferenceGymEnv(gym.Env): def __init__(self, base_url, task_id=1): self.url = base_url; self.task_id = task_id; self.session_id = None self.observation_space = gym.spaces.Box(-np.inf, np.inf, shape=(12,), dtype=np.float32) self.action_space = gym.spaces.Box( low=np.array([0.1, 0.0, 1.0], dtype=np.float32), high=np.array([1.0, 1.0, 512.0], dtype=np.float32)) def obs_to_array(self, obs): return np.array(list(obs.values())[:12], dtype=np.float32) def reset(self, **kwargs): r = httpx.post(f"{self.url}/reset", json={"task_id":self.task_id}).json() self.session_id = r['session_id']; return self.obs_to_array(r['observation']), {} def step(self, action): act = {"kv_budget":float(action[0]), "spec_length":0, "batch_size":int(action[2]), "prefill_disagg":False, "quant_tier":0} r = httpx.post(f"{self.url}/step", json={"session_id":self.session_id,"action":act}).json() return self.obs_to_array(r['observation']), r['reward'], r['done'], False, {} # Cell 9: Train PPO (takes ~10 minutes on Colab T4) from stable_baselines3 import PPO env = InferenceGymEnv(HF_URL, task_id=1) model = PPO("MlpPolicy", env, verbose=1, learning_rate=3e-4, n_steps=512) model.learn(total_timesteps=50_000) # Cell 10: Plot reward curve (the money shot) import matplotlib.pyplot as plt rewards = [ep['r'] for ep in model.ep_info_buffer] plt.figure(figsize=(12,4)); plt.plot(rewards, alpha=0.3, label='Episode reward') plt.axhline(y=0.35, color='r', linestyle='--', label='Baseline score') plt.title('PPO Agent Learning on InferenceGym Task 1'); plt.legend(); plt.show() print(f"Final agent score: {np.mean(rewards[-20:]):.3f} vs baseline: 0.35")
| Time | Screen | What You Say / Show |
|---|---|---|
| 0:00–0:20 | Slide: problem statement | "LLM inference is where 80% of AI budget is spent. There's no RL environment for optimising it. We built one." |
| 0:20–0:40 | HF Space — /health → /tasks | "This is InferenceGym on HuggingFace Spaces, live right now. 3 tasks, 5 action knobs, fully CPU-only." Hit the endpoints live. |
| 0:40–1:00 | Colab — run baseline | "Naïve vLLM defaults score 0.35 on Task 1. That's your baseline — static config, no optimisation." |
| 1:00–1:30 | Colab — PPO reward curve | "A simple PPO agent trained for 50k steps hits 0.65 — almost double. No GPU, no model, just our trace-driven simulator." Show the plot. |
| 1:30–2:00 | Architecture diagram | "Any company can drop in their own trace data and train an agent for their specific workload. That's the value proposition. Thank you." |
inferencegym/ ├── models.py [ALL] — Locked Day 1. ServeAction, ServeObservation, MetricsSnapshot, WorkloadState │ ├── env/ │ ├── inference_env.py [A] — Core InferenceEnv class. reset(), step(), _compute_reward(), _enforce_action_mask() │ ├── observation.py [A] — _build_obs() helper, normalise values to [0,1] for RL agents │ ├── action.py [A] — ActionValidator, clamp continuous actions to valid ranges │ └── reward.py [A] — RewardComputer, configurable α β γ δ, TASK_CONFIGS dict │ ├── simulator/ │ ├── trace_sim.py [A] — TraceSimulator: load parquet, interpolate, noise, OOM detection │ ├── workload.py [A] — WorkloadGenerator: Poisson, LogNormal, burst injection, queue │ ├── session_manager.py [B] — SessionManager: thread-safe LRU cache of InferenceEnv instances │ └── data/ │ ├── traces_llama3_8b.parquet [C] — lookup table: (batch,kv,spec,plen) → metrics │ ├── sharegpt_dist.json [C] — LogNormal params for Task 2 prompt distribution │ └── workload_configs.json [C] — Task 1/2/3 workload configuration parameters │ ├── grader/ │ ├── grader.py [C] — GraderModule: dispatches to per-task graders, returns score+breakdown │ ├── task1_grader.py [C] — Throughput normalisation formula │ ├── task2_grader.py [C] — TTFT + memory compliance formula │ └── task3_grader.py [C] — 4-objective formula including action stability │ ├── agents/ │ ├── baseline.py [C] — BaselineAgent: fixed BASELINE_ACTION, run_all_baselines() │ └── ppo_demo.py [C] — HTTPGymEnv wrapper + PPO training script │ ├── server/ │ ├── app.py [B] — FastAPI application, all 8 endpoints, startup event │ ├── schemas.py [B] — Pydantic request/response models (ResetRequest, StepRequest, etc.) │ └── middleware.py [B] — CORS, rate limiting (max 100 req/min per IP), request logging │ ├── tests/ │ ├── test_simulator.py [A] — 20+ unit tests for TraceSimulator and WorkloadGenerator │ ├── test_env.py [A] — Contract tests for step/reset/state, edge cases │ ├── test_grader.py [C] — Unit tests for all 3 grader formulas with known expected outputs │ └── test_api.py [B] — Integration tests: httpx client hitting full FastAPI stack │ ├── notebooks/ │ └── InferenceGym_Demo.ipynb [C] — 10-cell Colab demo notebook │ ├── Dockerfile [B] — Multi-stage, CPU-only, port 7860, <2GB image ├── docker-compose.yml [B] — Local dev: volume mount source, hot reload ├── requirements.txt [B] — Pinned CPU-only deps. No torch. No CUDA. ├── README.md [C] — HF Spaces frontmatter + pitch + quickstart + links └── ENVIRONMENT.md [A] — Full technical spec for judges
__init__(trace_path: str, seed: int = 42) — loads parquet, builds interpolators, sets rngsimulate(action: ServeAction, workload: WorkloadState) → MetricsSnapshot — the core methodreset_seed(seed: int) — resets the rng for episode reproducibility__init__(task_id: int, seed: int = 42) — loads workload config for this taskreset() → WorkloadState — returns initial state, resets internal step counterstep(action: ServeAction) → WorkloadState — advances one step, updates queueis_burst_active() → bool — True during burst windows for Task 3reset() → ServeObservation — starts new episode, returns initial observationstep(action) → (obs, reward, done, info) — Gym-compatible signaturestate() → dict — returns episode metadata for /state endpoint_episode_log: list — accumulates step dicts for grader consumptionsession_id: str — unique UUID per episode, set on reset()score(task_id: int, episode_log: list) → dict — returns {score, breakdown, feedback}score must be a float in [0.0, 1.0]breakdown must contain one float per scoring componentfeedback must be a human-readable string explaining the score| Column | Type | Values | Description |
|---|---|---|---|
batch_size | int | 1,4,8,16,32,64,128,256,512 | Max concurrent requests served |
kv_budget | float | 0.1, 0.25, 0.5, 0.75, 1.0 | KV cache allocation fraction |
spec_length | int | 0, 1, 2, 4, 8 | Speculative draft tokens (0 = disabled) |
quant_tier | int | 0, 1, 2 | 0=FP16, 1=INT8, 2=INT4 |
prompt_len_bucket | int | 0–7 | Bucket index: [64,128,256,512,1024,2048,4096,8192] |
ttft_p50_ms | float | >0 | Median time to first token (milliseconds) |
ttft_p99_ms | float | >0 | 99th percentile TTFT |
tpot_ms | float | >0 | Time per output token |
tps | float | >0 | Output tokens per second |
gpu_mem_gb | float | 0–80 | GPU memory footprint in GB |
cost_per_1k | float | >0 | Relative cost per 1000 tokens (normalised) |
{
"tasks": {
"1": {
"name": "Static Uniform",
"arrival_rate_rps": 10.0,
"arrival_dist": "poisson",
"prompt_len_dist": "uniform",
"prompt_len_min": 64,
"prompt_len_max": 128,
"slo_target_ms": 500.0,
"burst_enabled": false,
"priority_routing": false,
"active_actions": ["kv_budget", "batch_size"]
},
"2": {
"name": "Bursty ShareGPT",
"arrival_rate_rps": 25.0,
"arrival_rate_burst": 80.0,
"burst_period_steps": 30,
"arrival_dist": "poisson_bursty",
"prompt_len_dist": "lognormal",
"prompt_len_mu": 5.2,
"prompt_len_sigma": 1.3,
"prompt_len_clamp_min": 32,
"prompt_len_clamp_max": 8192,
"memory_hard_limit_gb": 36.0,
"slo_target_ms": 300.0,
"burst_enabled": true,
"active_actions": ["kv_budget", "batch_size", "spec_length"]
},
"3": {
"name": "Adversarial Multi-Tenant",
"arrival_rate_rps": 30.0,
"burst_multiplier": 10.0,
"burst_interval_steps": 120,
"burst_duration_steps": 15,
"prompt_len_dist": "bimodal",
"short_request_frac": 0.7,
"short_prompt_max": 128,
"long_prompt_min": 4096,
"long_prompt_max": 8192,
"priority_mix": [0.2, 0.5, 0.3],
"slo_interactive_ms": 200.0,
"slo_batch_ms": 2000.0,
"cost_budget_episode": 5.0,
"memory_hard_limit_gb": 38.0,
"active_actions": ["kv_budget", "batch_size", "spec_length", "prefill_disagg", "quant_tier"]
}
}
}| Field | Type | Range | Normalised? | Description |
|---|---|---|---|---|
queue_depth | float | [0, 512] | No | Pending requests in serving queue |
mean_prompt_len | float | [32, 8192] | No | Mean token count of current window |
arrival_rate | float | [0, 200] | No | 10-step EMA requests/second |
kv_cache_occupancy | float | [0.0, 1.0] | Yes | Fraction of KV cache in use |
ttft_p50 | float | [0, 5000] ms | No | Median TTFT last 20 requests |
tpot_p50 | float | [0, 500] ms | No | Median time-per-output-token |
slo_violation_rate | float | [0.0, 1.0] | Yes | Fraction of requests missing SLO |
gpu_memory_used_gb | float | [0, 80] | No | Simulated GPU memory pressure |
spec_accept_rate | float | [0.0, 1.0] | Yes | Speculative token acceptance rate |
priority_distribution | float[3] | [0,1] each | Yes | [interactive, batch, best_effort] fractions |
timestep | int | [0, 200] | No | Current episode step |
cost_so_far | float | [0, ∞] | No | Cumulative cost this episode |
| Risk | Prob | Mitigation | Owner |
|---|---|---|---|
| Trace data is wrong shape Published benchmarks don't have the exact columns needed |
Medium | Implement Option C (synthetic data) on Day 1 before even trying Option A. This takes 30 minutes and gives you a valid fallback. Option A then becomes an enhancement, not a dependency. | C |
| PPO doesn't converge Reward curve is flat or decreasing |
Low | Task 1 is designed for easy learning. If PPO fails: (1) add VecNormalize wrapper, (2) lower learning rate to 1e-4, (3) check reward is truly in [-1,1]. If still failing, use a simple hill-climbing agent — just show any rising curve. | C |
| HuggingFace Spaces OOM Free tier has 16GB RAM — simulator might use too much |
Low | Load trace data as a numpy array, not a pandas DataFrame, at startup. Target <200MB for the lookup table. Use parquet with snappy compression. Test memory usage locally with psutil before deploying. |
B |
| Race condition in session cache Concurrent requests corrupt session state |
Medium | All reads and writes to self._sessions dict are wrapped in threading.Lock(). Individual InferenceEnv instances are not thread-safe but each session is owned by one caller at a time — this is fine because the /step endpoint is synchronous and FastAPI serialises calls per session_id. |
B |
| Grader gives score > 1.0 or < 0.0 Formula constants are miscalibrated |
Medium | All grader component scores are individually np.clip(x, 0.0, 1.0) before the weighted sum. The final score is also clipped. Calibrate BASELINE_TPS and OPTIMAL_TPS constants on Day 5 by running the actual baseline agent and verifying scores fall in [0.20, 0.40]. |
C |
| Person A is blocked on Day 3 Simulator not done, Person B and C can't proceed |
Medium | Person A prioritises the interface (simulate() returns a valid MetricsSnapshot) over the implementation quality. A synthetic linear model with hardcoded constants is enough for Day 3. Person B and C only need the method signature to work. Real trace data can be plugged in on Day 4. |
A |
| Docker image >2GB stable-baselines3 pulls large PyTorch dependency |
Medium | Install stable-baselines3[extra] only in a separate requirements-demo.txt that is NOT in the Dockerfile. The server only needs the environment. The PPO demo runs from outside the container (in Colab). This keeps the image under 500MB. |
B |
| OpenEnv spec compliance fails openenv validate finds schema mismatches |
Low | Run openenv validate at the end of every day starting Day 3. Validation issues are always about JSON schema — field names, types, missing fields. Fix immediately, never defer. Keep a local copy of the openenv spec open while writing endpoint response schemas. |
B |
session_id + initial observation dictobservation + reward (float) + done (bool) + info{"status": "ok"}openenv validate --url https://YOUR_SPACE.hf.space passes with no errorsdocker build -t test .docker image ls)InferenceGym (or your chosen name)