hirann's picture
Fix: state() method returns dict not property
3d164bb verified
import math
import random
import re
from typing import Any, Dict, Optional, Tuple
from uuid import uuid4
from dataclasses import dataclass, field
from models import (
Observation as ObsModel,
Action as ActModel,
Reward as RewModel,
Resource,
Metrics,
SLA,
)
INSTANCE_DATA = {
"t3.nano": {"cost": 3.6, "capacity": 1.0},
"t3.small": {"cost": 11.5, "capacity": 2.0},
"t3.medium": {"cost": 23.0, "capacity": 4.0},
"m5.large": {"cost": 70.0, "capacity": 8.0},
"m5.xlarge": {"cost": 140.0,"capacity": 16.0},
}
@dataclass
class TaskConfig:
task_id: str
name: str
difficulty: str
description: str
initial_resources: list
sla: dict
load: float
TASKS = {
"easy": TaskConfig(
task_id="easy_right_sizing",
name="Right-Sizing",
difficulty="easy",
description="Optimize this 3-server cluster. Start by analyzing load patterns, then iteratively adjust each server. Final reward requires ALL servers properly sized.",
initial_resources=[
{"id": "srv-1", "type": "m5.xlarge", "cpu_usage": 10.0, "mem_usage": 8.0, "monthly_cost": 140.0},
{"id": "srv-2", "type": "m5.xlarge", "cpu_usage": 8.0, "mem_usage": 6.0, "monthly_cost": 140.0},
{"id": "srv-3", "type": "m5.xlarge", "cpu_usage": 12.0, "mem_usage": 9.0, "monthly_cost": 140.0},
],
sla={"max_latency_ms": 120.0, "max_budget": 100.0, "min_uptime_pct": 99.0},
load=30.0
),
"medium": TaskConfig(
task_id="medium_latency_fix",
name="Latency Fix",
difficulty="medium",
description="Performance bottleneck! This cluster is struggling. Analyze each server's load, then iteratively upgrade undersized servers. Requires 4+ successful changes for max reward.",
initial_resources=[
{"id": "srv-1", "type": "t3.small", "cpu_usage": 40.0, "mem_usage": 30.0, "monthly_cost": 11.5},
{"id": "srv-2", "type": "t3.small", "cpu_usage": 38.0, "mem_usage": 28.0, "monthly_cost": 11.5},
{"id": "srv-3", "type": "t3.small", "cpu_usage": 42.0, "mem_usage": 32.0, "monthly_cost": 11.5},
],
sla={"max_latency_ms": 100.0, "max_budget": 80.0, "min_uptime_pct": 99.9},
load=4.5
),
"hard": TaskConfig(
task_id="hard_balance",
name="Balance Optimization",
difficulty="hard",
description="Tight budget constraint! Optimize a mixed 5-server cluster. Must achieve optimal cost-efficiency while maintaining performance. Requires 5+ iterative changes, exploring different configurations.",
initial_resources=[
{"id": "srv-1", "type": "m5.large", "cpu_usage": 15.0, "mem_usage": 10.0, "monthly_cost": 70.0},
{"id": "srv-2", "type": "m5.large", "cpu_usage": 12.0, "mem_usage": 8.0, "monthly_cost": 70.0},
{"id": "srv-3", "type": "t3.small", "cpu_usage": 50.0, "mem_usage": 40.0, "monthly_cost": 11.5},
{"id": "srv-4", "type": "t3.small", "cpu_usage": 55.0, "mem_usage": 45.0, "monthly_cost": 11.5},
{"id": "srv-5", "type": "t3.medium", "cpu_usage": 35.0, "mem_usage": 30.0, "monthly_cost": 23.0},
],
sla={"max_latency_ms": 100.0, "max_budget": 80.0, "min_uptime_pct": 99.9},
load=15.0
),
}
@dataclass
class EpisodeState:
task_config: TaskConfig
resources: list
current_load: float
initial_cost: float
initial_latency: float
steps: int = 0
crashed: bool = False
changes_made: int = 0
last_action_success: bool = False
exploration_history: list = field(default_factory=list)
episode_id: str = field(default_factory=lambda: str(uuid4()))
class CloudOpsEnvironment:
"""Cloud Infrastructure Optimization Environment.
The agent acts as a Cloud SRE optimizing cost and performance.
"""
def __init__(self, max_steps: int = 12):
self._max_steps = max_steps
self._ep: Optional[EpisodeState] = None
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs: Any,
) -> ObsModel:
if seed is not None:
random.seed(seed)
task_key = task_id or random.choice(["easy", "medium", "hard"])
if task_key not in TASKS:
task_key = "easy"
task = TASKS[task_key]
resources = [
Resource(**r) for r in task.initial_resources
]
initial_cost = sum(r.monthly_cost for r in resources)
initial_latency, _, _ = self._calculate_metrics(task.load, resources)
self._ep = EpisodeState(
task_config=task,
resources=resources,
current_load=task.load,
initial_cost=initial_cost,
initial_latency=initial_latency,
steps=0,
crashed=False,
changes_made=0,
last_action_success=False,
exploration_history=[],
episode_id=episode_id or str(uuid4()),
)
return self._build_observation("Environment ready. Analyze and optimize.")
def step(self, action: ActModel, **kwargs: Any) -> Tuple[ObsModel, RewModel, bool, Dict]:
if self._ep is None:
return self._error_obs("Environment not reset")
self._ep.steps += 1
msg = action.message.lower()
prev_cost = sum(r.monthly_cost for r in self._ep.resources)
prev_latency, _, _ = self._calculate_metrics(self._ep.current_load, self._ep.resources)
message = self._parse_and_execute(msg)
self._ep.last_action_success = message.startswith("Changed")
new_cost = sum(r.monthly_cost for r in self._ep.resources)
latency, error_rate, utilization = self._calculate_metrics(
self._ep.current_load,
self._ep.resources
)
if utilization > 1.5:
self._ep.crashed = True
obs = self._build_observation("SYSTEM CRASH: Resource exhaustion!")
reward = RewModel(value=0.0, reason="System crashed due to resource exhaustion")
return obs, reward, True, {"reason": "crash"}
self._ep.exploration_history.append({
"step": self._ep.steps,
"action": msg[:50],
"cost": new_cost,
"latency": latency,
})
reward = self._calculate_iterative_reward(latency, error_rate, new_cost, prev_cost, prev_latency, utilization)
done = (
self._ep.steps >= self._max_steps or
(self._ep.changes_made >= 3 and reward.value >= 0.95)
)
obs = self._build_observation(message)
return obs, reward, done, {"changes_made": self._ep.changes_made}
def _parse_and_execute(self, msg: str) -> str:
match = re.search(r"change\s+([a-z0-9-]+)\s+to\s+([a-z0-9.]+)", msg)
if match:
res_id, new_type = match.groups()
if new_type not in INSTANCE_DATA:
return f"Error: Unknown instance type '{new_type}'. Available: {', '.join(INSTANCE_DATA.keys())}"
for r in self._ep.resources:
if r.id == res_id:
old_type = r.type
r.type = new_type
r.monthly_cost = INSTANCE_DATA[new_type]["cost"]
self._ep.changes_made += 1
self._ep.last_action_success = True
return f"Changed {res_id} from {old_type} to {new_type} (change #{self._ep.changes_made})"
return f"Error: Resource '{res_id}' not found"
if "resize" in msg or "scale" in msg or "upgrade" in msg or "downgrade" in msg:
return "Use format: 'change [resource_id] to [instance_type]'"
return "Command not recognized. Use 'change [resource_id] to [instance_type]'"
def _calculate_metrics(self, load: float, resources: list) -> Tuple[float, float, float]:
total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in resources)
avg_utilization = load / total_cap if total_cap > 0 else 0
utilization = min(avg_utilization, 1.5)
latency = 30 + 70 * (avg_utilization ** 2)
error_rate = max(0, (avg_utilization - 0.85) * 2)
return latency, error_rate, avg_utilization
def _calculate_iterative_reward(
self,
latency: float,
error_rate: float,
new_cost: float,
prev_cost: float,
prev_latency: float,
utilization: float
) -> RewModel:
task = self._ep.task_config
budget = task.sla["max_budget"]
max_latency = task.sla["max_latency_ms"]
cost_improvement = (prev_cost - new_cost) / (prev_cost + 1e-6)
latency_improvement = (prev_latency - latency) / (prev_latency + 1e-6)
change_bonus = min(self._ep.changes_made * 0.06, 0.3)
cost_ratio = new_cost / budget
cost_reward = 0.3 * (1.0 / (1.0 + max(0, cost_ratio - 1)))
lat_ratio = latency / max_latency
perf_reward = 0.3 * (1.0 / (1.0 + max(0, lat_ratio - 1)))
improvement_bonus = 0.0
if cost_improvement > 0:
improvement_bonus += min(cost_improvement * 0.15, 0.1)
if latency_improvement > 0:
improvement_bonus += min(latency_improvement * 0.15, 0.1)
base_reward = cost_reward + perf_reward
total_reward = min(1.0, base_reward + change_bonus + improvement_bonus)
if error_rate > 0.2:
total_reward *= (1.0 - error_rate)
exploration_bonus = min(self._ep.steps * 0.03, 0.15)
if self._ep.last_action_success:
total_reward = min(1.0, total_reward + exploration_bonus)
initial_latency = self._ep.initial_latency
initial_cost = self._ep.initial_cost
cost_change = ((new_cost - initial_cost) / initial_cost) * 100 if initial_cost > 0 else 0
lat_change = ((latency - initial_latency) / initial_latency) * 100 if initial_latency > 0 else 0
return RewModel(
value=min(1.0, max(0.0, total_reward)),
reason=f"Changes: {self._ep.changes_made}, Cost: ${new_cost:.1f}, Latency: {latency:.1f}ms",
cost_change_pct=cost_change,
latency_change_pct=lat_change,
)
def _build_observation(self, message: str) -> ObsModel:
if self._ep is None:
return self._error_obs()
latency, error_rate, utilization = self._calculate_metrics(
self._ep.current_load,
self._ep.resources
)
total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in self._ep.resources)
for r in self._ep.resources:
cap = INSTANCE_DATA[r.type]["capacity"]
share = cap / total_cap if total_cap > 0 else 0
r.cpu_usage = min(100.0, self._ep.current_load * share / cap * 100)
r.mem_usage = min(100.0, r.cpu_usage * 0.85)
metrics = Metrics(
avg_latency_ms=latency,
error_rate=error_rate,
throughput_rps=100.0
)
sla = SLA(**self._ep.task_config.sla)
return ObsModel(
inventory=self._ep.resources,
metrics=metrics,
sla=sla,
echoed_message=message,
task_id=self._ep.task_config.task_id,
task_name=self._ep.task_config.name,
difficulty=self._ep.task_config.difficulty,
step=self._ep.steps,
)
def _error_obs(self, message: str = "Error: Environment not initialized") -> ObsModel:
return ObsModel(
inventory=[],
metrics=Metrics(avg_latency_ms=0, error_rate=0, throughput_rps=0),
sla=SLA(max_latency_ms=0, max_budget=0, min_uptime_pct=0),
echoed_message=message,
)
def state(self) -> Dict[str, Any]:
if self._ep is None:
return {}
latency, error_rate, utilization = self._calculate_metrics(
self._ep.current_load, self._ep.resources
)
total_cost = sum(r.monthly_cost for r in self._ep.resources)
return {
"episode_id": self._ep.episode_id,
"task_id": self._ep.task_config.task_id,
"task_name": self._ep.task_config.name,
"difficulty": self._ep.task_config.difficulty,
"steps": self._ep.steps,
"changes_made": self._ep.changes_made,
"crashed": self._ep.crashed,
"resources": [
{
"id": r.id,
"type": r.type,
"monthly_cost": r.monthly_cost,
"cpu_usage": r.cpu_usage,
"mem_usage": r.mem_usage,
}
for r in self._ep.resources
],
"metrics": {
"total_cost": total_cost,
"latency_ms": latency,
"error_rate": error_rate,
"utilization": utilization,
},
"sla": self._ep.task_config.sla,
"exploration_history": self._ep.exploration_history,
}
Environment = CloudOpsEnvironment