openenv2 / inference.py
hissterical's picture
Upload 10 files
ebf4715 verified
from __future__ import annotations
import argparse
import json
import os
import random
from dataclasses import dataclass
from typing import Any
import requests
from openai import OpenAI
TASKS = ["easy_docker", "medium_k8s", "hard_ml_config"]
@dataclass
class EpisodeResult:
task_id: str
final_score: float
done: bool
steps: int
rewards: list[float]
def build_openai_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is required for OpenAI baseline mode")
return OpenAI(api_key=api_key)
def extract_json_object(text: str) -> dict[str, Any]:
text = text.strip()
if "```" in text:
blocks = text.split("```")
for block in blocks:
block = block.strip()
if block.startswith("json"):
block = block[4:].strip()
if block.startswith("{") and block.endswith("}"):
return json.loads(block)
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
return json.loads(text[start : end + 1])
raise ValueError("No JSON object found in model output")
def choose_heuristic_action(task_id: str, step: int) -> dict[str, Any]:
# Deterministic policy for reproducible baseline.
easy_plan = [
{"operation": "edit", "path": "services.web.image", "value": "nginx:latest"},
{"operation": "delete", "path": "services.web.ports.1"},
{"operation": "edit", "path": "services.web.environment", "value": {"DEBUG": "true", "API_KEY": "placeholder"}},
{"operation": "edit", "path": "services.db.ports.0", "value": "5432:5432"},
]
medium_plan = [
{"operation": "edit", "path": "metadata.namespace", "value": "default"},
{"operation": "edit", "path": "spec.replicas", "value": 3},
{"operation": "edit", "path": "spec.template.spec.containers.0.image", "value": "nginx:latest"},
{"operation": "edit", "path": "spec.template.spec.containers.0.resources.limits.memory", "value": "512Mi"},
{"operation": "edit", "path": "spec.template.spec.containers.0.resources.requests.memory", "value": "256Mi"},
{"operation": "edit", "path": "spec.template.spec.containers.0.resources.requests.cpu", "value": "500m"},
{"operation": "add", "path": "spec.template.spec.containers.0.ports", "value": [{"containerPort": 80}]},
]
hard_plan = [
{"operation": "delete", "path": "training.fp16"},
{"operation": "edit", "path": "training.batch_size", "value": 16},
{"operation": "edit", "path": "training.gradient_accumulation_steps", "value": 2},
{"operation": "edit", "path": "training.max_steps", "value": 1000},
{"operation": "edit", "path": "training.warmup_steps", "value": 100},
{"operation": "edit", "path": "training.optimizer.type", "value": "adamw"},
{"operation": "edit", "path": "hardware.gpu_count", "value": 1},
{"operation": "edit", "path": "data.train_batch_size", "value": 32},
{"operation": "edit", "path": "logging.log_interval", "value": 10},
]
plans = {
"easy_docker": easy_plan,
"medium_k8s": medium_plan,
"hard_ml_config": hard_plan,
}
plan = plans[task_id]
return plan[min(step, len(plan) - 1)]
def choose_openai_action(client: OpenAI, model: str, observation: dict[str, Any]) -> dict[str, Any]:
system_prompt = (
"You are an environment-control agent for configuration debugging. "
"Return exactly one JSON object action."
)
user_prompt = (
"Task:\n"
f"{observation['task_description']}\n\n"
"Allowed schema:\n"
"{\"operation\": \"edit|add|delete\", \"path\": \"dot.path\", \"value\": any|null}\n\n"
f"Current score: {observation['overall_score']}\n"
f"Validation errors: {observation['validation_errors']}\n"
f"Current YAML:\n{observation['current_config']}\n"
)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0,
top_p=1,
seed=42,
)
content = response.choices[0].message.content or ""
return extract_json_object(content)
def run_episode(
api_base_url: str,
task_id: str,
max_steps: int,
policy: str,
model: str,
openai_client: OpenAI | None,
) -> EpisodeResult:
reset_resp = requests.post(f"{api_base_url}/reset", json={"task_id": task_id}, timeout=30)
reset_resp.raise_for_status()
observation = reset_resp.json()["observation"]
rewards: list[float] = []
done = False
print(f"[START] task={task_id} policy={policy}")
for step in range(max_steps):
if done:
break
if policy == "heuristic":
action = choose_heuristic_action(task_id, step)
else:
assert openai_client is not None
action = choose_openai_action(openai_client, model, observation)
step_resp = requests.post(f"{api_base_url}/step", json=action, timeout=30)
if step_resp.status_code != 200:
rewards.append(0.0)
print(f"[STEP] task={task_id} step={step} action=invalid reward=0.00 done=false")
continue
payload = step_resp.json()
observation = payload["observation"]
reward = payload["reward"]
done = payload["done"]
reward_value = float(reward["value"])
rewards.append(reward_value)
print(
f"[STEP] task={task_id} step={step} action={action.get('operation')}:{action.get('path')} "
f"reward={reward_value:.3f} score={observation['overall_score']:.3f} done={str(done).lower()}"
)
result = EpisodeResult(
task_id=task_id,
final_score=float(observation["overall_score"]),
done=done,
steps=min(max_steps, len(rewards)),
rewards=rewards,
)
reward_text = ",".join(f"{v:.3f}" for v in rewards)
print(
f"[END] task={task_id} score={result.final_score:.3f} "
f"steps={result.steps} done={str(result.done).lower()} rewards={reward_text}"
)
return result
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Baseline inference for ConfigDebuggerEnv")
parser.add_argument("--api-base-url", default=os.getenv("API_BASE_URL", "http://localhost:8000"))
parser.add_argument("--max-steps", type=int, default=12)
parser.add_argument("--policy", choices=["heuristic", "openai"], default="heuristic")
parser.add_argument("--model", default=os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
parser.add_argument("--seed", type=int, default=42)
return parser.parse_args()
def main() -> None:
args = parse_args()
random.seed(args.seed)
openai_client: OpenAI | None = None
if args.policy == "openai":
openai_client = build_openai_client()
results: list[EpisodeResult] = []
for task_id in TASKS:
results.append(
run_episode(
api_base_url=args.api_base_url,
task_id=task_id,
max_steps=args.max_steps,
policy=args.policy,
model=args.model,
openai_client=openai_client,
)
)
avg = sum(r.final_score for r in results) / len(results)
print("\n=== BASELINE SUMMARY ===")
for result in results:
print(
f"{result.task_id}: final_score={result.final_score:.3f} steps={result.steps} done={str(result.done).lower()}"
)
print(f"average_score={avg:.3f}")
if __name__ == "__main__":
main()