Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Baseline Inference Script for CloudOps Optimizer Environment. | |
| Uses OpenAI Client + HTTP calls to the server to run a model against the environment. | |
| Usage: | |
| python inference.py | |
| Environment Variables: | |
| API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1) | |
| MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct) | |
| HF_TOKEN: Your Hugging Face / API key (required) | |
| SERVER_URL: The environment server URL (default: http://localhost:7860) | |
| Expected format for STDOUT: | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> | |
| """ | |
| import json | |
| import os | |
| import re | |
| import textwrap | |
| import time | |
| import requests | |
| from typing import List, Optional | |
| from openai import OpenAI | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_TOKEN") | |
| SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860") | |
| MAX_STEPS = 8 | |
| MAX_TOKENS = 256 | |
| TEMPERATURE = 0.7 | |
| SUCCESS_SCORE_THRESHOLD = 0.5 | |
| BENCHMARK = "cloud_ops_env" | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are an expert Cloud SRE (Site Reliability Engineer). Your goal is to optimize cloud infrastructure | |
| to meet the SLA requirements while minimizing costs through ITERATIVE exploration. | |
| IMPORTANT: This is a multi-step optimization task. You must make 3-5 changes to find the optimal configuration. | |
| Do NOT expect to get high rewards with just 1-2 changes! | |
| Strategy: | |
| 1. Analyze the current infrastructure and identify overloaded servers | |
| 2. Make incremental changes - upgrade one server at a time | |
| 3. Evaluate the result and adjust your next action | |
| 4. Continue until you find the optimal configuration | |
| Available instance types (cost per month, capacity): | |
| - t3.nano: $3.60, capacity 1.0 | |
| - t3.small: $11.50, capacity 2.0 | |
| - t3.medium: $23.00, capacity 4.0 | |
| - m5.large: $70.00, capacity 8.0 | |
| - m5.xlarge: $140.00, capacity 16.0 | |
| Command format: "change [resource_id] to [instance_type]" | |
| Example: "change srv-1 to t3.small" | |
| You must output ONLY the command, nothing else.""" | |
| ).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| def reset_env(task: str) -> dict: | |
| """Reset the environment via HTTP.""" | |
| resp = requests.get(f"{SERVER_URL}/reset", params={"task": task}) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def step_env(message: str) -> dict: | |
| """Send action to environment via HTTP.""" | |
| resp = requests.post(f"{SERVER_URL}/step", json={"message": message}) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def build_user_prompt(obs_data: dict, step_num: int) -> str: | |
| inventory = obs_data.get("inventory", []) | |
| metrics = obs_data.get("metrics", {}) | |
| sla = obs_data.get("sla", {}) | |
| inv_str = "\n".join([ | |
| f" {r['id']}: {r['type']} - ${r['monthly_cost']}/mo, CPU: {r['cpu_usage']:.1f}%" | |
| for r in inventory | |
| ]) | |
| prompt = f"""Current Infrastructure (Step {step_num}): | |
| {inv_str} | |
| Metrics: | |
| - Latency: {metrics.get('avg_latency_ms', 0):.1f}ms | |
| - Error Rate: {metrics.get('error_rate', 0):.3f} | |
| SLA Requirements: | |
| - Max Latency: {sla.get('max_latency_ms', 0)}ms | |
| - Max Budget: ${sla.get('max_budget', 0)}/mo | |
| Task: {obs_data.get('task_name', 'Optimize')} ({obs_data.get('difficulty', 'easy')}) | |
| Remember: Make incremental changes. Evaluate result, then adjust. Aim for 3-5 changes total. | |
| Provide your next command:""" | |
| return prompt | |
| def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str: | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| messages.extend(history) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=messages, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Extract just the command if model adds explanation | |
| lines = text.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith('change '): | |
| return line | |
| return text if text else "change srv-1 to t3.small" | |
| except Exception as exc: | |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) | |
| return "change srv-1 to t3.small" | |
| TASKS = { | |
| "easy": {"task_id": "easy_right_sizing", "name": "Right-Sizing", "difficulty": "easy"}, | |
| "medium": {"task_id": "medium_latency_fix", "name": "Latency Fix", "difficulty": "medium"}, | |
| "hard": {"task_id": "hard_balance", "name": "Balance Optimization", "difficulty": "hard"}, | |
| } | |
| def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict: | |
| """Run inference on a single task via HTTP.""" | |
| task = TASKS[task_key] | |
| task_name = task["name"] | |
| history: List[dict] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| error_msg = None | |
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = reset_env(task_key) | |
| obs_data = result.get("observation", {}) | |
| done = result.get("done", False) | |
| for step in range(1, MAX_STEPS + 1): | |
| if done: | |
| break | |
| user_prompt = build_user_prompt(obs_data, step) | |
| response_text = call_model(client, user_prompt, history) | |
| history.append({"role": "assistant", "content": response_text}) | |
| action_str = response_text[:50] + "..." if len(response_text) > 50 else response_text | |
| try: | |
| result = step_env(response_text) | |
| reward = result.get("reward", 0.0) | |
| done = result.get("done", False) | |
| error_msg = None | |
| obs_data = result.get("observation", {}) | |
| info = result.get("info", {}) | |
| if info.get("reason") == "crash": | |
| done = True | |
| reward = 0.0 | |
| error_msg = "system_crash" | |
| except Exception as exc: | |
| error_msg = str(exc) | |
| reward = 0.0 | |
| done = True | |
| obs_data = {} | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=error_msg) | |
| if done: | |
| break | |
| max_reward = MAX_STEPS * 1.0 | |
| score = sum(rewards) / max_reward if max_reward > 0 else 0.0 | |
| score = min(max(score, 0.0), 1.0) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| except Exception as exc: | |
| error_msg = str(exc) | |
| print(f"[DEBUG] Task execution error: {exc}", flush=True) | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return { | |
| "task_id": task["task_id"], | |
| "task_name": task_name, | |
| "score": score, | |
| "success": success, | |
| "steps": steps_taken, | |
| "rewards": rewards, | |
| } | |
| def main(): | |
| print("=" * 60) | |
| print("CloudOps Optimizer — Baseline Inference") | |
| print("=" * 60) | |
| print(f"API URL : {API_BASE_URL}") | |
| print(f"Model : {MODEL_NAME}") | |
| print(f"Server : {SERVER_URL}") | |
| print() | |
| if not HF_TOKEN: | |
| print("ERROR: HF_TOKEN not set") | |
| return | |
| # Test server connection | |
| try: | |
| resp = requests.get(f"{SERVER_URL}/health", timeout=5) | |
| if resp.status_code != 200: | |
| print(f"ERROR: Server returned {resp.status_code}") | |
| return | |
| print("Server connection: OK") | |
| except Exception as e: | |
| print(f"ERROR: Cannot connect to server at {SERVER_URL}") | |
| print(f" Make sure server is running: python main.py") | |
| return | |
| client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| task_keys = ["easy", "medium", "hard"] | |
| results = [] | |
| for task_key in task_keys: | |
| task = TASKS[task_key] | |
| print(f"Running task: {task['name']} ({task['difficulty']})...") | |
| try: | |
| r = run_task(client, task_key, verbose=False) | |
| results.append(r) | |
| print(f" score={r['score']:.4f} steps={r['steps']}") | |
| except Exception as exc: | |
| print(f" ERROR: {exc}") | |
| results.append({ | |
| "task_id": task["task_id"], | |
| "task_name": task["name"], | |
| "score": 0.0, | |
| "success": False, | |
| "steps": 0, | |
| "rewards": [], | |
| }) | |
| print("\n" + "=" * 60) | |
| print("SUMMARY") | |
| print("=" * 60) | |
| total = 0.0 | |
| for r in results: | |
| marker = {"easy": "[E]", "medium": "[M]", "hard": "[H]"}.get(r["task_id"].split("_")[0], "?") | |
| print(f"{marker} {r['task_id']:30s} score={r['score']:.4f}") | |
| total += r['score'] | |
| avg = total / len(results) if results else 0.0 | |
| print("-" * 40) | |
| print(f"Average score: {avg:.4f}") | |
| print() | |
| output_path = "inference_results.json" | |
| with open(output_path, "w") as f: | |
| json.dump( | |
| { | |
| "model": MODEL_NAME, | |
| "api_url": API_BASE_URL, | |
| "server_url": SERVER_URL, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "average_score": avg, | |
| "results": results, | |
| }, | |
| f, | |
| indent=2, | |
| ) | |
| print(f"Results saved to: {output_path}") | |
| if __name__ == "__main__": | |
| main() |