#!/usr/bin/env python3 """ Baseline Inference Script for CloudOps Optimizer Environment. Uses OpenAI Client + HTTP calls to the server to run a model against the environment. Usage: python inference.py Environment Variables: API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1) MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct) HF_TOKEN: Your Hugging Face / API key (required) SERVER_URL: The environment server URL (default: http://localhost:7860) Expected format for STDOUT: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= """ import json import os import re import textwrap import time import requests from typing import List, Optional from openai import OpenAI API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_TOKEN") SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860") MAX_STEPS = 8 MAX_TOKENS = 256 TEMPERATURE = 0.7 SUCCESS_SCORE_THRESHOLD = 0.5 BENCHMARK = "cloud_ops_env" SYSTEM_PROMPT = textwrap.dedent( """ You are an expert Cloud SRE (Site Reliability Engineer). Your goal is to optimize cloud infrastructure to meet the SLA requirements while minimizing costs through ITERATIVE exploration. IMPORTANT: This is a multi-step optimization task. You must make 3-5 changes to find the optimal configuration. Do NOT expect to get high rewards with just 1-2 changes! Strategy: 1. Analyze the current infrastructure and identify overloaded servers 2. Make incremental changes - upgrade one server at a time 3. Evaluate the result and adjust your next action 4. Continue until you find the optimal configuration Available instance types (cost per month, capacity): - t3.nano: $3.60, capacity 1.0 - t3.small: $11.50, capacity 2.0 - t3.medium: $23.00, capacity 4.0 - m5.large: $70.00, capacity 8.0 - m5.xlarge: $140.00, capacity 16.0 Command format: "change [resource_id] to [instance_type]" Example: "change srv-1 to t3.small" You must output ONLY the command, nothing else.""" ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def reset_env(task: str) -> dict: """Reset the environment via HTTP.""" resp = requests.get(f"{SERVER_URL}/reset", params={"task": task}) resp.raise_for_status() return resp.json() def step_env(message: str) -> dict: """Send action to environment via HTTP.""" resp = requests.post(f"{SERVER_URL}/step", json={"message": message}) resp.raise_for_status() return resp.json() def build_user_prompt(obs_data: dict, step_num: int) -> str: inventory = obs_data.get("inventory", []) metrics = obs_data.get("metrics", {}) sla = obs_data.get("sla", {}) inv_str = "\n".join([ f" {r['id']}: {r['type']} - ${r['monthly_cost']}/mo, CPU: {r['cpu_usage']:.1f}%" for r in inventory ]) prompt = f"""Current Infrastructure (Step {step_num}): {inv_str} Metrics: - Latency: {metrics.get('avg_latency_ms', 0):.1f}ms - Error Rate: {metrics.get('error_rate', 0):.3f} SLA Requirements: - Max Latency: {sla.get('max_latency_ms', 0)}ms - Max Budget: ${sla.get('max_budget', 0)}/mo Task: {obs_data.get('task_name', 'Optimize')} ({obs_data.get('difficulty', 'easy')}) Remember: Make incremental changes. Evaluate result, then adjust. Aim for 3-5 changes total. Provide your next command:""" return prompt def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str: messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages.extend(history) messages.append({"role": "user", "content": user_prompt}) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() # Extract just the command if model adds explanation lines = text.split('\n') for line in lines: line = line.strip() if line.startswith('change '): return line return text if text else "change srv-1 to t3.small" except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) return "change srv-1 to t3.small" TASKS = { "easy": {"task_id": "easy_right_sizing", "name": "Right-Sizing", "difficulty": "easy"}, "medium": {"task_id": "medium_latency_fix", "name": "Latency Fix", "difficulty": "medium"}, "hard": {"task_id": "hard_balance", "name": "Balance Optimization", "difficulty": "hard"}, } def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict: """Run inference on a single task via HTTP.""" task = TASKS[task_key] task_name = task["name"] history: List[dict] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False error_msg = None log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) try: result = reset_env(task_key) obs_data = result.get("observation", {}) done = result.get("done", False) for step in range(1, MAX_STEPS + 1): if done: break user_prompt = build_user_prompt(obs_data, step) response_text = call_model(client, user_prompt, history) history.append({"role": "assistant", "content": response_text}) action_str = response_text[:50] + "..." if len(response_text) > 50 else response_text try: result = step_env(response_text) reward = result.get("reward", 0.0) done = result.get("done", False) error_msg = None obs_data = result.get("observation", {}) info = result.get("info", {}) if info.get("reason") == "crash": done = True reward = 0.0 error_msg = "system_crash" except Exception as exc: error_msg = str(exc) reward = 0.0 done = True obs_data = {} rewards.append(reward) steps_taken = step log_step(step=step, action=action_str, reward=reward, done=done, error=error_msg) if done: break max_reward = MAX_STEPS * 1.0 score = sum(rewards) / max_reward if max_reward > 0 else 0.0 score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as exc: error_msg = str(exc) print(f"[DEBUG] Task execution error: {exc}", flush=True) finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return { "task_id": task["task_id"], "task_name": task_name, "score": score, "success": success, "steps": steps_taken, "rewards": rewards, } def main(): print("=" * 60) print("CloudOps Optimizer — Baseline Inference") print("=" * 60) print(f"API URL : {API_BASE_URL}") print(f"Model : {MODEL_NAME}") print(f"Server : {SERVER_URL}") print() if not HF_TOKEN: print("ERROR: HF_TOKEN not set") return # Test server connection try: resp = requests.get(f"{SERVER_URL}/health", timeout=5) if resp.status_code != 200: print(f"ERROR: Server returned {resp.status_code}") return print("Server connection: OK") except Exception as e: print(f"ERROR: Cannot connect to server at {SERVER_URL}") print(f" Make sure server is running: python main.py") return client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) task_keys = ["easy", "medium", "hard"] results = [] for task_key in task_keys: task = TASKS[task_key] print(f"Running task: {task['name']} ({task['difficulty']})...") try: r = run_task(client, task_key, verbose=False) results.append(r) print(f" score={r['score']:.4f} steps={r['steps']}") except Exception as exc: print(f" ERROR: {exc}") results.append({ "task_id": task["task_id"], "task_name": task["name"], "score": 0.0, "success": False, "steps": 0, "rewards": [], }) print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) total = 0.0 for r in results: marker = {"easy": "[E]", "medium": "[M]", "hard": "[H]"}.get(r["task_id"].split("_")[0], "?") print(f"{marker} {r['task_id']:30s} score={r['score']:.4f}") total += r['score'] avg = total / len(results) if results else 0.0 print("-" * 40) print(f"Average score: {avg:.4f}") print() output_path = "inference_results.json" with open(output_path, "w") as f: json.dump( { "model": MODEL_NAME, "api_url": API_BASE_URL, "server_url": SERVER_URL, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "average_score": avg, "results": results, }, f, indent=2, ) print(f"Results saved to: {output_path}") if __name__ == "__main__": main()