from __future__ import annotations import argparse import json import os import random from dataclasses import dataclass from typing import Any import requests from openai import OpenAI TASKS = ["easy_docker", "medium_k8s", "hard_ml_config"] @dataclass class EpisodeResult: task_id: str final_score: float done: bool steps: int rewards: list[float] def build_openai_client() -> OpenAI: api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise RuntimeError("OPENAI_API_KEY is required for OpenAI baseline mode") return OpenAI(api_key=api_key) def extract_json_object(text: str) -> dict[str, Any]: text = text.strip() if "```" in text: blocks = text.split("```") for block in blocks: block = block.strip() if block.startswith("json"): block = block[4:].strip() if block.startswith("{") and block.endswith("}"): return json.loads(block) start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: return json.loads(text[start : end + 1]) raise ValueError("No JSON object found in model output") def choose_heuristic_action(task_id: str, step: int) -> dict[str, Any]: # Deterministic policy for reproducible baseline. easy_plan = [ {"operation": "edit", "path": "services.web.image", "value": "nginx:latest"}, {"operation": "delete", "path": "services.web.ports.1"}, {"operation": "edit", "path": "services.web.environment", "value": {"DEBUG": "true", "API_KEY": "placeholder"}}, {"operation": "edit", "path": "services.db.ports.0", "value": "5432:5432"}, ] medium_plan = [ {"operation": "edit", "path": "metadata.namespace", "value": "default"}, {"operation": "edit", "path": "spec.replicas", "value": 3}, {"operation": "edit", "path": "spec.template.spec.containers.0.image", "value": "nginx:latest"}, {"operation": "edit", "path": "spec.template.spec.containers.0.resources.limits.memory", "value": "512Mi"}, {"operation": "edit", "path": "spec.template.spec.containers.0.resources.requests.memory", "value": "256Mi"}, {"operation": "edit", "path": "spec.template.spec.containers.0.resources.requests.cpu", "value": "500m"}, {"operation": "add", "path": "spec.template.spec.containers.0.ports", "value": [{"containerPort": 80}]}, ] hard_plan = [ {"operation": "delete", "path": "training.fp16"}, {"operation": "edit", "path": "training.batch_size", "value": 16}, {"operation": "edit", "path": "training.gradient_accumulation_steps", "value": 2}, {"operation": "edit", "path": "training.max_steps", "value": 1000}, {"operation": "edit", "path": "training.warmup_steps", "value": 100}, {"operation": "edit", "path": "training.optimizer.type", "value": "adamw"}, {"operation": "edit", "path": "hardware.gpu_count", "value": 1}, {"operation": "edit", "path": "data.train_batch_size", "value": 32}, {"operation": "edit", "path": "logging.log_interval", "value": 10}, ] plans = { "easy_docker": easy_plan, "medium_k8s": medium_plan, "hard_ml_config": hard_plan, } plan = plans[task_id] return plan[min(step, len(plan) - 1)] def choose_openai_action(client: OpenAI, model: str, observation: dict[str, Any]) -> dict[str, Any]: system_prompt = ( "You are an environment-control agent for configuration debugging. " "Return exactly one JSON object action." ) user_prompt = ( "Task:\n" f"{observation['task_description']}\n\n" "Allowed schema:\n" "{\"operation\": \"edit|add|delete\", \"path\": \"dot.path\", \"value\": any|null}\n\n" f"Current score: {observation['overall_score']}\n" f"Validation errors: {observation['validation_errors']}\n" f"Current YAML:\n{observation['current_config']}\n" ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=0, top_p=1, seed=42, ) content = response.choices[0].message.content or "" return extract_json_object(content) def run_episode( api_base_url: str, task_id: str, max_steps: int, policy: str, model: str, openai_client: OpenAI | None, ) -> EpisodeResult: reset_resp = requests.post(f"{api_base_url}/reset", json={"task_id": task_id}, timeout=30) reset_resp.raise_for_status() observation = reset_resp.json()["observation"] rewards: list[float] = [] done = False print(f"[START] task={task_id} policy={policy}") for step in range(max_steps): if done: break if policy == "heuristic": action = choose_heuristic_action(task_id, step) else: assert openai_client is not None action = choose_openai_action(openai_client, model, observation) step_resp = requests.post(f"{api_base_url}/step", json=action, timeout=30) if step_resp.status_code != 200: rewards.append(0.0) print(f"[STEP] task={task_id} step={step} action=invalid reward=0.00 done=false") continue payload = step_resp.json() observation = payload["observation"] reward = payload["reward"] done = payload["done"] reward_value = float(reward["value"]) rewards.append(reward_value) print( f"[STEP] task={task_id} step={step} action={action.get('operation')}:{action.get('path')} " f"reward={reward_value:.3f} score={observation['overall_score']:.3f} done={str(done).lower()}" ) result = EpisodeResult( task_id=task_id, final_score=float(observation["overall_score"]), done=done, steps=min(max_steps, len(rewards)), rewards=rewards, ) reward_text = ",".join(f"{v:.3f}" for v in rewards) print( f"[END] task={task_id} score={result.final_score:.3f} " f"steps={result.steps} done={str(result.done).lower()} rewards={reward_text}" ) return result def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Baseline inference for ConfigDebuggerEnv") parser.add_argument("--api-base-url", default=os.getenv("API_BASE_URL", "http://localhost:8000")) parser.add_argument("--max-steps", type=int, default=12) parser.add_argument("--policy", choices=["heuristic", "openai"], default="heuristic") parser.add_argument("--model", default=os.getenv("OPENAI_MODEL", "gpt-4o-mini")) parser.add_argument("--seed", type=int, default=42) return parser.parse_args() def main() -> None: args = parse_args() random.seed(args.seed) openai_client: OpenAI | None = None if args.policy == "openai": openai_client = build_openai_client() results: list[EpisodeResult] = [] for task_id in TASKS: results.append( run_episode( api_base_url=args.api_base_url, task_id=task_id, max_steps=args.max_steps, policy=args.policy, model=args.model, openai_client=openai_client, ) ) avg = sum(r.final_score for r in results) / len(results) print("\n=== BASELINE SUMMARY ===") for result in results: print( f"{result.task_id}: final_score={result.final_score:.3f} steps={result.steps} done={str(result.done).lower()}" ) print(f"average_score={avg:.3f}") if __name__ == "__main__": main()