cloud-ops-optimizer / inference.py
hirann's picture
Upload folder using huggingface_hub
dc42cb3 verified
raw
history blame
9.95 kB
#!/usr/bin/env python3
"""
Baseline Inference Script for CloudOps Optimizer Environment.
Uses OpenAI Client + HTTP calls to the server to run a model against the environment.
Usage:
python inference.py
Environment Variables:
API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1)
MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct)
HF_TOKEN: Your Hugging Face / API key (required)
SERVER_URL: The environment server URL (default: http://localhost:7860)
Expected format for STDOUT:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
"""
import json
import os
import re
import textwrap
import time
import requests
from typing import List, Optional
from openai import OpenAI
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_TOKEN")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860")
MAX_STEPS = 8
MAX_TOKENS = 256
TEMPERATURE = 0.7
SUCCESS_SCORE_THRESHOLD = 0.5
BENCHMARK = "cloud_ops_env"
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an expert Cloud SRE (Site Reliability Engineer). Your goal is to optimize cloud infrastructure
to meet the SLA requirements while minimizing costs.
Available instance types (cost per month, capacity):
- t3.nano: $3.60, capacity 1.0
- t3.small: $11.50, capacity 2.0
- t3.medium: $23.00, capacity 4.0
- m5.large: $70.00, capacity 8.0
- m5.xlarge: $140.00, capacity 16.0
Command format: "change [resource_id] to [instance_type]"
Example: "change srv-1 to t3.small"
You must output ONLY the command, nothing else."""
).strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
def reset_env(task: str) -> dict:
"""Reset the environment via HTTP."""
resp = requests.get(f"{SERVER_URL}/reset", params={"task": task})
resp.raise_for_status()
return resp.json()
def step_env(message: str) -> dict:
"""Send action to environment via HTTP."""
resp = requests.post(f"{SERVER_URL}/step", json={"message": message})
resp.raise_for_status()
return resp.json()
def build_user_prompt(obs_data: dict) -> str:
inventory = obs_data.get("inventory", [])
metrics = obs_data.get("metrics", {})
sla = obs_data.get("sla", {})
inv_str = "\n".join([
f" {r['id']}: {r['type']} - ${r['monthly_cost']}/mo, CPU: {r['cpu_usage']:.1f}%"
for r in inventory
])
prompt = f"""Current Infrastructure:
{inv_str}
Metrics:
- Latency: {metrics.get('avg_latency_ms', 0):.1f}ms
- Error Rate: {metrics.get('error_rate', 0):.3f}
SLA Requirements:
- Max Latency: {sla.get('max_latency_ms', 0)}ms
- Max Budget: ${sla.get('max_budget', 0)}/mo
Task: {obs_data.get('task_name', 'Optimize')} ({obs_data.get('difficulty', 'easy')})
Provide your next command:"""
return prompt
def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(history)
messages.append({"role": "user", "content": user_prompt})
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
# Extract just the command if model adds explanation
lines = text.split('\n')
for line in lines:
line = line.strip()
if line.startswith('change '):
return line
return text if text else "change srv-1 to t3.small"
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
return "change srv-1 to t3.small"
TASKS = {
"easy": {"task_id": "easy_right_sizing", "name": "Right-Sizing", "difficulty": "easy"},
"medium": {"task_id": "medium_latency_fix", "name": "Latency Fix", "difficulty": "medium"},
"hard": {"task_id": "hard_balance", "name": "Balance Optimization", "difficulty": "hard"},
}
def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict:
"""Run inference on a single task via HTTP."""
task = TASKS[task_key]
task_name = task["name"]
history: List[dict] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME)
try:
result = reset_env(task_key)
obs_data = result.get("observation", {})
done = result.get("done", False)
for step in range(1, MAX_STEPS + 1):
if done:
break
user_prompt = build_user_prompt(obs_data)
response_text = call_model(client, user_prompt, history)
history.append({"role": "assistant", "content": response_text})
action_str = response_text[:50] + "..." if len(response_text) > 50 else response_text
try:
result = step_env(response_text)
reward = result.get("reward", 0.0)
done = result.get("done", False)
error_msg = None
obs_data = result.get("observation", {})
info = result.get("info", {})
if info.get("reason") == "crash":
done = True
reward = 0.0
error_msg = "system_crash"
except Exception as exc:
error_msg = str(exc)
reward = 0.0
done = True
obs_data = {}
rewards.append(reward)
steps_taken = step
log_step(step=step, action=action_str, reward=reward, done=done, error=error_msg)
if done:
break
max_reward = MAX_STEPS * 1.0
score = sum(rewards) / max_reward if max_reward > 0 else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as exc:
error_msg = str(exc)
print(f"[DEBUG] Task execution error: {exc}", flush=True)
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {
"task_id": task["task_id"],
"task_name": task_name,
"score": score,
"success": success,
"steps": steps_taken,
"rewards": rewards,
}
def main():
print("=" * 60)
print("CloudOps Optimizer — Baseline Inference")
print("=" * 60)
print(f"API URL : {API_BASE_URL}")
print(f"Model : {MODEL_NAME}")
print(f"Server : {SERVER_URL}")
print()
if not HF_TOKEN:
print("ERROR: HF_TOKEN not set")
return
# Test server connection
try:
resp = requests.get(f"{SERVER_URL}/health", timeout=5)
if resp.status_code != 200:
print(f"ERROR: Server returned {resp.status_code}")
return
print("Server connection: OK")
except Exception as e:
print(f"ERROR: Cannot connect to server at {SERVER_URL}")
print(f" Make sure server is running: python main.py")
return
client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
task_keys = ["easy", "medium", "hard"]
results = []
for task_key in task_keys:
task = TASKS[task_key]
print(f"Running task: {task['name']} ({task['difficulty']})...")
try:
r = run_task(client, task_key, verbose=False)
results.append(r)
print(f" score={r['score']:.4f} steps={r['steps']}")
except Exception as exc:
print(f" ERROR: {exc}")
results.append({
"task_id": task["task_id"],
"task_name": task["name"],
"score": 0.0,
"success": False,
"steps": 0,
"rewards": [],
})
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
total = 0.0
for r in results:
marker = {"easy": "[E]", "medium": "[M]", "hard": "[H]"}.get(r["task_id"].split("_")[0], "?")
print(f"{marker} {r['task_id']:30s} score={r['score']:.4f}")
total += r['score']
avg = total / len(results) if results else 0.0
print("-" * 40)
print(f"Average score: {avg:.4f}")
print()
output_path = "inference_results.json"
with open(output_path, "w") as f:
json.dump(
{
"model": MODEL_NAME,
"api_url": API_BASE_URL,
"server_url": SERVER_URL,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"average_score": avg,
"results": results,
},
f,
indent=2,
)
print(f"Results saved to: {output_path}")
if __name__ == "__main__":
main()