cloud-ops-optimizer / inference.py
hirann's picture
Enhance: Multi-step iterative tasks
739386c verified
#!/usr/bin/env python3
"""
Baseline Inference Script for CloudOps Optimizer Environment.
Uses OpenAI Client + HTTP calls to the server to run a model against the environment.
Usage:
python inference.py
Environment Variables:
API_BASE_URL: The API endpoint (default: https://router.huggingface.co/v1)
MODEL_NAME: The model identifier (default: Qwen/Qwen2.5-72B-Instruct)
HF_TOKEN: Your Hugging Face / API key (required)
SERVER_URL: The environment server URL (default: http://localhost:7860)
Expected format for STDOUT:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
"""
import json
import os
import re
import textwrap
import time
import requests
from typing import List, Optional
from openai import OpenAI
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_TOKEN")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860")
MAX_STEPS = 8
MAX_TOKENS = 256
TEMPERATURE = 0.7
SUCCESS_SCORE_THRESHOLD = 0.5
BENCHMARK = "cloud_ops_env"
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an expert Cloud SRE (Site Reliability Engineer). Your goal is to optimize cloud infrastructure
to meet the SLA requirements while minimizing costs through ITERATIVE exploration.
IMPORTANT: This is a multi-step optimization task. You must make 3-5 changes to find the optimal configuration.
Do NOT expect to get high rewards with just 1-2 changes!
Strategy:
1. Analyze the current infrastructure and identify overloaded servers
2. Make incremental changes - upgrade one server at a time
3. Evaluate the result and adjust your next action
4. Continue until you find the optimal configuration
Available instance types (cost per month, capacity):
- t3.nano: $3.60, capacity 1.0
- t3.small: $11.50, capacity 2.0
- t3.medium: $23.00, capacity 4.0
- m5.large: $70.00, capacity 8.0
- m5.xlarge: $140.00, capacity 16.0
Command format: "change [resource_id] to [instance_type]"
Example: "change srv-1 to t3.small"
You must output ONLY the command, nothing else."""
).strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
def reset_env(task: str) -> dict:
"""Reset the environment via HTTP."""
resp = requests.get(f"{SERVER_URL}/reset", params={"task": task})
resp.raise_for_status()
return resp.json()
def step_env(message: str) -> dict:
"""Send action to environment via HTTP."""
resp = requests.post(f"{SERVER_URL}/step", json={"message": message})
resp.raise_for_status()
return resp.json()
def build_user_prompt(obs_data: dict, step_num: int) -> str:
inventory = obs_data.get("inventory", [])
metrics = obs_data.get("metrics", {})
sla = obs_data.get("sla", {})
inv_str = "\n".join([
f" {r['id']}: {r['type']} - ${r['monthly_cost']}/mo, CPU: {r['cpu_usage']:.1f}%"
for r in inventory
])
prompt = f"""Current Infrastructure (Step {step_num}):
{inv_str}
Metrics:
- Latency: {metrics.get('avg_latency_ms', 0):.1f}ms
- Error Rate: {metrics.get('error_rate', 0):.3f}
SLA Requirements:
- Max Latency: {sla.get('max_latency_ms', 0)}ms
- Max Budget: ${sla.get('max_budget', 0)}/mo
Task: {obs_data.get('task_name', 'Optimize')} ({obs_data.get('difficulty', 'easy')})
Remember: Make incremental changes. Evaluate result, then adjust. Aim for 3-5 changes total.
Provide your next command:"""
return prompt
def call_model(client: OpenAI, user_prompt: str, history: List[dict]) -> str:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(history)
messages.append({"role": "user", "content": user_prompt})
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
# Extract just the command if model adds explanation
lines = text.split('\n')
for line in lines:
line = line.strip()
if line.startswith('change '):
return line
return text if text else "change srv-1 to t3.small"
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
return "change srv-1 to t3.small"
TASKS = {
"easy": {"task_id": "easy_right_sizing", "name": "Right-Sizing", "difficulty": "easy"},
"medium": {"task_id": "medium_latency_fix", "name": "Latency Fix", "difficulty": "medium"},
"hard": {"task_id": "hard_balance", "name": "Balance Optimization", "difficulty": "hard"},
}
def run_task(client: OpenAI, task_key: str, verbose: bool = False) -> dict:
"""Run inference on a single task via HTTP."""
task = TASKS[task_key]
task_name = task["name"]
history: List[dict] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME)
try:
result = reset_env(task_key)
obs_data = result.get("observation", {})
done = result.get("done", False)
for step in range(1, MAX_STEPS + 1):
if done:
break
user_prompt = build_user_prompt(obs_data, step)
response_text = call_model(client, user_prompt, history)
history.append({"role": "assistant", "content": response_text})
action_str = response_text[:50] + "..." if len(response_text) > 50 else response_text
try:
result = step_env(response_text)
reward = result.get("reward", 0.0)
done = result.get("done", False)
error_msg = None
obs_data = result.get("observation", {})
info = result.get("info", {})
if info.get("reason") == "crash":
done = True
reward = 0.0
error_msg = "system_crash"
except Exception as exc:
error_msg = str(exc)
reward = 0.0
done = True
obs_data = {}
rewards.append(reward)
steps_taken = step
log_step(step=step, action=action_str, reward=reward, done=done, error=error_msg)
if done:
break
max_reward = MAX_STEPS * 1.0
score = sum(rewards) / max_reward if max_reward > 0 else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as exc:
error_msg = str(exc)
print(f"[DEBUG] Task execution error: {exc}", flush=True)
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {
"task_id": task["task_id"],
"task_name": task_name,
"score": score,
"success": success,
"steps": steps_taken,
"rewards": rewards,
}
def main():
print("=" * 60)
print("CloudOps Optimizer — Baseline Inference")
print("=" * 60)
print(f"API URL : {API_BASE_URL}")
print(f"Model : {MODEL_NAME}")
print(f"Server : {SERVER_URL}")
print()
if not HF_TOKEN:
print("ERROR: HF_TOKEN not set")
return
# Test server connection
try:
resp = requests.get(f"{SERVER_URL}/health", timeout=5)
if resp.status_code != 200:
print(f"ERROR: Server returned {resp.status_code}")
return
print("Server connection: OK")
except Exception as e:
print(f"ERROR: Cannot connect to server at {SERVER_URL}")
print(f" Make sure server is running: python main.py")
return
client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
task_keys = ["easy", "medium", "hard"]
results = []
for task_key in task_keys:
task = TASKS[task_key]
print(f"Running task: {task['name']} ({task['difficulty']})...")
try:
r = run_task(client, task_key, verbose=False)
results.append(r)
print(f" score={r['score']:.4f} steps={r['steps']}")
except Exception as exc:
print(f" ERROR: {exc}")
results.append({
"task_id": task["task_id"],
"task_name": task["name"],
"score": 0.0,
"success": False,
"steps": 0,
"rewards": [],
})
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
total = 0.0
for r in results:
marker = {"easy": "[E]", "medium": "[M]", "hard": "[H]"}.get(r["task_id"].split("_")[0], "?")
print(f"{marker} {r['task_id']:30s} score={r['score']:.4f}")
total += r['score']
avg = total / len(results) if results else 0.0
print("-" * 40)
print(f"Average score: {avg:.4f}")
print()
output_path = "inference_results.json"
with open(output_path, "w") as f:
json.dump(
{
"model": MODEL_NAME,
"api_url": API_BASE_URL,
"server_url": SERVER_URL,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"average_score": avg,
"results": results,
},
f,
indent=2,
)
print(f"Results saved to: {output_path}")
if __name__ == "__main__":
main()