#!/usr/bin/env python3 """ Baseline Inference Script for CloudOps Optimizer Environment. Uses OpenAI Client + HTTP calls to the server to run a model against the environment. Usage: python inference.py Environment Variables: API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1) MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct) HF_TOKEN: Your Hugging Face / API key (required) SERVER_URL: The environment server URL (default: http://localhost:7860) Expected format for STDOUT: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= """ import json import os import re import textwrap import time import requests from typing import List, Optional from openai import OpenAI API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_TOKEN") SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860") MAX_STEPS = 8 MAX_TOKENS = 256 TEMPERATURE = 0.7 SUCCESS_SCORE_THRESHOLD = 0.5 BENCHMARK = "cloud_ops_env" SYSTEM_PROMPT = textwrap.dedent( """ You are an expert Cloud SRE (Site Reliability Engineer). Your goal is to optimize cloud infrastructure to meet the SLA requirements while minimizing costs. Available instance types (cost per month, capacity): - t3.nano: $3.60, capacity 1.0 - t3.small: $11.50, capacity 2.0 - t3.medium: $23.00, capacity 4.0 - m5.large: $70.00, capacity 8.0 - m5.xlarge: $140.00, capacity 16.0 Command format: "change [resource_id] to [instance_type]" Example: "change srv-1 to t3.small" You must output ONLY the command, nothing else.""" ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def reset_env(task: str) -> dict: """Reset the environment via HTTP.""" resp = requests.get(f"{SERVER_URL}/reset", params={"task": task}) resp.raise_for_status() return resp.json() def step_env(message: str) -> dict: """Send action to environment via HTTP.""" resp = requests.post(f"{SERVER_URL}/step", json={"message": message}) resp.raise_for_status() return resp.json() def build_user_prompt(obs_data: dict) -> str: inventory = obs_data.get("inventory", []) metrics = obs_data.get("metrics", {}) sla = obs_data.get("sla", {}) inv_str = "\n".join([ f" {r['id']}: {r['type']} - ${r['monthly_cost']}/mo, CPU: {r['cpu_usage']:.1f}%" for r in inventory ]) prompt = f"""Current Infrastructure: {inv_str} Metrics: - Latency: {metrics.get('avg_latency_ms', 0):.1f}ms - Error Rate: {metrics.get('error_rate', 0):.3f} SLA Requirements: - Max Latency: {sla.get('max_latency_ms', 0)}ms - Max Budget: ${sla.get('max_budget', 0)}/mo Task: {obs_data.get('task_name', 'Optimize')} ({obs_data.get('difficulty', 'easy')}) Provide your next command:""" return prompt def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str: messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages.extend(history) messages.append({"role": "user", "content": user_prompt}) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() # Extract just the command if model adds explanation lines = text.split('\n') for line in lines: line = line.strip() if line.startswith('change '): return line return text if text else "change srv-1 to t3.small" except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) return "change srv-1 to t3.small" TASKS = { "easy": {"task_id": "easy_right_sizing", "name": "Right-Sizing", "difficulty": "easy"}, "medium": {"task_id": "medium_latency_fix", "name": "Latency Fix", "difficulty": "medium"}, "hard": {"task_id": "hard_balance", "name": "Balance Optimization", "difficulty": "hard"}, } def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict: """Run inference on a single task via HTTP.""" task = TASKS[task_key] task_name = task["name"] history: List[dict] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False error_msg = None log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) try: result = reset_env(task_key) obs_data = result.get("observation", {}) done = result.get("done", False) for step in range(1, MAX_STEPS + 1): if done: break user_prompt = build_user_prompt(obs_data) response_text = call_model(client, user_prompt, history) history.append({"role": "assistant", "content": response_text}) action_str = response_text[:50] + "..." if len(response_text) > 50 else response_text try: result = step_env(response_text) reward = result.get("reward", 0.0) done = result.get("done", False) error_msg = None obs_data = result.get("observation", {}) info = result.get("info", {}) if info.get("reason") == "crash": done = True reward = 0.0 error_msg = "system_crash" except Exception as exc: error_msg = str(exc) reward = 0.0 done = True obs_data = {} rewards.append(reward) steps_taken = step log_step(step=step, action=action_str, reward=reward, done=done, error=error_msg) if done: break max_reward = MAX_STEPS * 1.0 score = sum(rewards) / max_reward if max_reward > 0 else 0.0 score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as exc: error_msg = str(exc) print(f"[DEBUG] Task execution error: {exc}", flush=True) finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return { "task_id": task["task_id"], "task_name": task_name, "score": score, "success": success, "steps": steps_taken, "rewards": rewards, } def main(): print("=" * 60) print("CloudOps Optimizer — Baseline Inference") print("=" * 60) print(f"API URL : {API_BASE_URL}") print(f"Model : {MODEL_NAME}") print(f"Server : {SERVER_URL}") print() if not HF_TOKEN: print("ERROR: HF_TOKEN not set") return # Test server connection try: resp = requests.get(f"{SERVER_URL}/health", timeout=5) if resp.status_code != 200: print(f"ERROR: Server returned {resp.status_code}") return print("Server connection: OK") except Exception as e: print(f"ERROR: Cannot connect to server at {SERVER_URL}") print(f" Make sure server is running: python main.py") return client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) task_keys = ["easy", "medium", "hard"] results = [] for task_key in task_keys: task = TASKS[task_key] print(f"Running task: {task['name']} ({task['difficulty']})...") try: r = run_task(client, task_key, verbose=False) results.append(r) print(f" score={r['score']:.4f} steps={r['steps']}") except Exception as exc: print(f" ERROR: {exc}") results.append({ "task_id": task["task_id"], "task_name": task["name"], "score": 0.0, "success": False, "steps": 0, "rewards": [], }) print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) total = 0.0 for r in results: marker = {"easy": "[E]", "medium": "[M]", "hard": "[H]"}.get(r["task_id"].split("_")[0], "?") print(f"{marker} {r['task_id']:30s} score={r['score']:.4f}") total += r['score'] avg = total / len(results) if results else 0.0 print("-" * 40) print(f"Average score: {avg:.4f}") print() output_path = "inference_results.json" with open(output_path, "w") as f: json.dump( { "model": MODEL_NAME, "api_url": API_BASE_URL, "server_url": SERVER_URL, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "average_score": avg, "results": results, }, f, indent=2, ) print(f"Results saved to: {output_path}") if __name__ == "__main__": main()