from __future__ import annotations import argparse import json import os import re from pathlib import Path from typing import Any, Callable, Protocol from openai import OpenAI from llmserve_env.client import LLMServeEnv from llmserve_env.models import EpisodeLog, QuantizationTier, ServeAction, ServeObservation, default_action from llmserve_env.task_catalog import get_task_catalog, get_task_config from server.baseline_agent import HeuristicPolicy from server.grader import GraderEngine from server.llmserve_environment import LLMServeEnvironment DEFAULT_BASE_URL = "http://localhost:7860" DEFAULT_MODEL = "gpt-4.1-mini" DEFAULT_SEED = 42 SYSTEM_PROMPT = """ You are controlling an LLM serving environment. Return exactly one JSON object with these keys: - batch_cap: integer 1..512 - kv_budget_fraction: float 0.1..1.0 - speculation_depth: integer 0..8 - quantization_tier: one of FP16, INT8, INT4 - prefill_decode_split: boolean - priority_routing: boolean Do not include markdown or extra text. """.strip() class BaselineEnvironment(Protocol): def reset(self, task_id: str, seed: int | None = None) -> ServeObservation: ... def step(self, action: dict[str, Any] | ServeAction) -> tuple[ServeObservation, float, bool, dict[str, Any]]: ... def grade(self, log: EpisodeLog | None = None) -> dict[str, Any]: ... class LocalBaselineRunner: def __init__(self, seed: int = DEFAULT_SEED, mode: str = "sim") -> None: self.env = LLMServeEnvironment(seed=seed, mode=mode) self.grader = GraderEngine() def reset(self, task_id: str, seed: int | None = None) -> ServeObservation: return self.env.reset(task_id=task_id, seed=seed) def step(self, action: dict[str, Any] | ServeAction) -> tuple[ServeObservation, float, bool, dict[str, Any]]: if isinstance(action, dict): action = ServeAction.model_validate(action) observation = self.env.step(action) return observation, float(observation.reward or 0.0), bool(observation.done), dict(observation.metadata) def grade(self, log: EpisodeLog | None = None) -> dict[str, Any]: episode_log = log or self.env.export_episode_log() return self.grader.grade(episode_log) def create_remote_runner(base_url: str | None = None) -> LLMServeEnv: return LLMServeEnv.from_url(base_url or os.getenv("LLMSERVE_BASE_URL", DEFAULT_BASE_URL)) def create_local_runner(seed: int = DEFAULT_SEED, mode: str = "sim") -> LocalBaselineRunner: return LocalBaselineRunner(seed=seed, mode=mode) def run_deterministic_baseline( task_id: str, seed: int = DEFAULT_SEED, runner: BaselineEnvironment | None = None, ) -> dict[str, Any]: environment = runner or create_local_runner(seed=seed) policy = HeuristicPolicy() policy.reset() observation = environment.reset(task_id=task_id, seed=seed) max_steps = int(get_task_config(task_id)["max_steps"]) steps = 0 while not observation.done and steps < max_steps: action = policy.act(observation, task_id) observation, _, _, _ = environment.step(action) steps += 1 grader_result = environment.grade() return { "task_id": task_id, "seed": seed, "steps": steps, "grader": grader_result, } def run_openai_baseline( task_id: str, seed: int = DEFAULT_SEED, api_key: str | None = None, base_url: str | None = None, model: str = DEFAULT_MODEL, runner: BaselineEnvironment | None = None, ) -> dict[str, Any]: resolved_key = api_key or os.getenv("OPENAI_API_KEY") if not resolved_key: raise RuntimeError("OPENAI_API_KEY is required for OpenAI baseline inference.") client = OpenAI(api_key=resolved_key, max_retries=2, timeout=30.0) environment = runner or create_remote_runner(base_url=base_url) observation = environment.reset(task_id=task_id, seed=seed) max_steps = int(get_task_config(task_id)["max_steps"]) steps = 0 while not observation.done and steps < max_steps: action = _action_from_model(client, model, task_id, observation) observation, _, _, _ = environment.step(action) steps += 1 grader_result = environment.grade() return { "task_id": task_id, "seed": seed, "model": model, "steps": steps, "grader": grader_result, } def run_baseline_suite( mode: str = "deterministic", task_ids: list[str] | None = None, seed: int = DEFAULT_SEED, model: str = DEFAULT_MODEL, api_key: str | None = None, base_url: str | None = None, runner_factory: Callable[[], BaselineEnvironment] | None = None, ) -> dict[str, Any]: resolved_task_ids = task_ids or [task["id"] for task in get_task_catalog()] results: dict[str, Any] = {} for task_id in resolved_task_ids: runner = runner_factory() if runner_factory is not None else None if mode == "openai": results[task_id] = run_openai_baseline( task_id=task_id, seed=seed, api_key=api_key, base_url=base_url, model=model, runner=runner, ) elif mode == "deterministic": results[task_id] = run_deterministic_baseline( task_id=task_id, seed=seed, runner=runner, ) else: raise ValueError(f"Unsupported baseline mode: {mode}") payload: dict[str, Any] = { "mode": mode, "seed": seed, "baseline": results, "summary": _summarize_results(results), } if mode == "openai": payload["model"] = model payload["runtime_target"] = ( "in_process_environment" if runner_factory is not None else base_url or os.getenv("LLMSERVE_BASE_URL", DEFAULT_BASE_URL) ) return payload def _summarize_results(results: dict[str, Any]) -> dict[str, Any]: scores = [float(result["grader"]["score"]) for result in results.values()] mean_score = round(sum(scores) / len(scores), 4) if scores else 0.0 return { "task_count": len(results), "mean_score": mean_score, "scores": {task_id: float(result["grader"]["score"]) for task_id, result in results.items()}, "heuristic_baselines": { task_id: float(result["grader"].get("heuristic_baseline", 0.0)) for task_id, result in results.items() }, "ppo_baselines": { task_id: float(result["grader"].get("ppo_baseline", 0.0)) for task_id, result in results.items() }, } def _action_from_model(client: OpenAI, model: str, task_id: str, observation: Any) -> ServeAction: user_prompt = json.dumps( { "task_id": task_id, "observation": observation.model_dump(mode="json"), } ) response = client.chat.completions.create( model=model, temperature=0, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], response_format={"type": "json_object"}, ) raw = response.choices[0].message.content or "{}" payload = _parse_model_payload(raw) if payload is None: return default_action() payload.setdefault("batch_cap", 32) payload.setdefault("kv_budget_fraction", 1.0) payload.setdefault("speculation_depth", 0) payload.setdefault("quantization_tier", QuantizationTier.FP16.value) payload.setdefault("prefill_decode_split", False) payload.setdefault("priority_routing", False) try: return ServeAction.model_validate(payload) except Exception: return default_action() def _parse_model_payload(raw: str) -> dict[str, Any] | None: candidate = raw.strip() if candidate.startswith("```"): candidate = re.sub(r"^```(?:json)?\s*|\s*```$", "", candidate, flags=re.IGNORECASE | re.DOTALL).strip() start = candidate.find("{") end = candidate.rfind("}") if start != -1 and end != -1 and end > start: candidate = candidate[start : end + 1] try: parsed = json.loads(candidate) except json.JSONDecodeError: return None return parsed if isinstance(parsed, dict) else None def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run deterministic or OpenAI baseline inference for LLMServeEnv.") parser.add_argument("--mode", choices=["deterministic", "openai"], default="deterministic") parser.add_argument( "--runtime", choices=["in-process", "http"], default="in-process", help="How to execute the environment during baseline inference.", ) parser.add_argument("--task-id", action="append", dest="task_ids", help="Task ID to run. Repeat for multiple tasks.") parser.add_argument("--seed", type=int, default=DEFAULT_SEED) parser.add_argument("--model", default=os.getenv("OPENAI_MODEL", DEFAULT_MODEL)) parser.add_argument("--base-url", default=os.getenv("LLMSERVE_BASE_URL", DEFAULT_BASE_URL)) parser.add_argument("--api-key", default=None) parser.add_argument("--output", default=None, help="Optional path to write the JSON result.") return parser def main(argv: list[str] | None = None) -> int: args = build_arg_parser().parse_args(argv) if args.mode == "openai": runner_factory = None base_url = args.base_url if args.runtime == "in-process": runner_factory = lambda: create_local_runner(seed=args.seed) base_url = None payload = run_baseline_suite( mode="openai", task_ids=args.task_ids, seed=args.seed, model=args.model, api_key=args.api_key, base_url=base_url, runner_factory=runner_factory, ) else: payload = run_baseline_suite( mode="deterministic", task_ids=args.task_ids, seed=args.seed, runner_factory=lambda: create_local_runner(seed=args.seed), ) rendered = json.dumps(payload, indent=2, sort_keys=True) if args.output: Path(args.output).write_text(rendered + "\n", encoding="utf-8") print(rendered) return 0 if __name__ == "__main__": raise SystemExit(main())