import math import random import re from typing import Any, Dict, Optional, Tuple from uuid import uuid4 from dataclasses import dataclass, field from models import ( Observation as ObsModel, Action as ActModel, Reward as RewModel, Resource, Metrics, SLA, ) INSTANCE_DATA = { "t3.nano": {"cost": 3.6, "capacity": 1.0}, "t3.small": {"cost": 11.5, "capacity": 2.0}, "t3.medium": {"cost": 23.0, "capacity": 4.0}, "m5.large": {"cost": 70.0, "capacity": 8.0}, "m5.xlarge": {"cost": 140.0,"capacity": 16.0}, } @dataclass class TaskConfig: task_id: str name: str difficulty: str description: str initial_resources: list sla: dict load: float TASKS = { "easy": TaskConfig( task_id="easy_right_sizing", name="Right-Sizing", difficulty="easy", description="Optimize this 3-server cluster. Start by analyzing load patterns, then iteratively adjust each server. Final reward requires ALL servers properly sized.", initial_resources=[ {"id": "srv-1", "type": "m5.xlarge", "cpu_usage": 10.0, "mem_usage": 8.0, "monthly_cost": 140.0}, {"id": "srv-2", "type": "m5.xlarge", "cpu_usage": 8.0, "mem_usage": 6.0, "monthly_cost": 140.0}, {"id": "srv-3", "type": "m5.xlarge", "cpu_usage": 12.0, "mem_usage": 9.0, "monthly_cost": 140.0}, ], sla={"max_latency_ms": 120.0, "max_budget": 100.0, "min_uptime_pct": 99.0}, load=30.0 ), "medium": TaskConfig( task_id="medium_latency_fix", name="Latency Fix", difficulty="medium", description="Performance bottleneck! This cluster is struggling. Analyze each server's load, then iteratively upgrade undersized servers. Requires 4+ successful changes for max reward.", initial_resources=[ {"id": "srv-1", "type": "t3.small", "cpu_usage": 40.0, "mem_usage": 30.0, "monthly_cost": 11.5}, {"id": "srv-2", "type": "t3.small", "cpu_usage": 38.0, "mem_usage": 28.0, "monthly_cost": 11.5}, {"id": "srv-3", "type": "t3.small", "cpu_usage": 42.0, "mem_usage": 32.0, "monthly_cost": 11.5}, ], sla={"max_latency_ms": 100.0, "max_budget": 80.0, "min_uptime_pct": 99.9}, load=4.5 ), "hard": TaskConfig( task_id="hard_balance", name="Balance Optimization", difficulty="hard", description="Tight budget constraint! Optimize a mixed 5-server cluster. Must achieve optimal cost-efficiency while maintaining performance. Requires 5+ iterative changes, exploring different configurations.", initial_resources=[ {"id": "srv-1", "type": "m5.large", "cpu_usage": 15.0, "mem_usage": 10.0, "monthly_cost": 70.0}, {"id": "srv-2", "type": "m5.large", "cpu_usage": 12.0, "mem_usage": 8.0, "monthly_cost": 70.0}, {"id": "srv-3", "type": "t3.small", "cpu_usage": 50.0, "mem_usage": 40.0, "monthly_cost": 11.5}, {"id": "srv-4", "type": "t3.small", "cpu_usage": 55.0, "mem_usage": 45.0, "monthly_cost": 11.5}, {"id": "srv-5", "type": "t3.medium", "cpu_usage": 35.0, "mem_usage": 30.0, "monthly_cost": 23.0}, ], sla={"max_latency_ms": 100.0, "max_budget": 80.0, "min_uptime_pct": 99.9}, load=15.0 ), } @dataclass class EpisodeState: task_config: TaskConfig resources: list current_load: float initial_cost: float initial_latency: float steps: int = 0 crashed: bool = False changes_made: int = 0 last_action_success: bool = False exploration_history: list = field(default_factory=list) episode_id: str = field(default_factory=lambda: str(uuid4())) class CloudOpsEnvironment: """Cloud Infrastructure Optimization Environment. The agent acts as a Cloud SRE optimizing cost and performance. """ def __init__(self, max_steps: int = 12): self._max_steps = max_steps self._ep: Optional[EpisodeState] = None def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: Optional[str] = None, **kwargs: Any, ) -> ObsModel: if seed is not None: random.seed(seed) task_key = task_id or random.choice(["easy", "medium", "hard"]) if task_key not in TASKS: task_key = "easy" task = TASKS[task_key] resources = [ Resource(**r) for r in task.initial_resources ] initial_cost = sum(r.monthly_cost for r in resources) initial_latency, _, _ = self._calculate_metrics(task.load, resources) self._ep = EpisodeState( task_config=task, resources=resources, current_load=task.load, initial_cost=initial_cost, initial_latency=initial_latency, steps=0, crashed=False, changes_made=0, last_action_success=False, exploration_history=[], episode_id=episode_id or str(uuid4()), ) return self._build_observation("Environment ready. Analyze and optimize.") def step(self, action: ActModel, **kwargs: Any) -> Tuple[ObsModel, RewModel, bool, Dict]: if self._ep is None: return self._error_obs("Environment not reset") self._ep.steps += 1 msg = action.message.lower() prev_cost = sum(r.monthly_cost for r in self._ep.resources) prev_latency, _, _ = self._calculate_metrics(self._ep.current_load, self._ep.resources) message = self._parse_and_execute(msg) self._ep.last_action_success = message.startswith("Changed") new_cost = sum(r.monthly_cost for r in self._ep.resources) latency, error_rate, utilization = self._calculate_metrics( self._ep.current_load, self._ep.resources ) if utilization > 1.5: self._ep.crashed = True obs = self._build_observation("SYSTEM CRASH: Resource exhaustion!") reward = RewModel(value=0.0, reason="System crashed due to resource exhaustion") return obs, reward, True, {"reason": "crash"} self._ep.exploration_history.append({ "step": self._ep.steps, "action": msg[:50], "cost": new_cost, "latency": latency, }) reward = self._calculate_iterative_reward(latency, error_rate, new_cost, prev_cost, prev_latency, utilization) done = ( self._ep.steps >= self._max_steps or (self._ep.changes_made >= 3 and reward.value >= 0.95) ) obs = self._build_observation(message) return obs, reward, done, {"changes_made": self._ep.changes_made} def _parse_and_execute(self, msg: str) -> str: match = re.search(r"change\s+([a-z0-9-]+)\s+to\s+([a-z0-9.]+)", msg) if match: res_id, new_type = match.groups() if new_type not in INSTANCE_DATA: return f"Error: Unknown instance type '{new_type}'. Available: {', '.join(INSTANCE_DATA.keys())}" for r in self._ep.resources: if r.id == res_id: old_type = r.type r.type = new_type r.monthly_cost = INSTANCE_DATA[new_type]["cost"] self._ep.changes_made += 1 self._ep.last_action_success = True return f"Changed {res_id} from {old_type} to {new_type} (change #{self._ep.changes_made})" return f"Error: Resource '{res_id}' not found" if "resize" in msg or "scale" in msg or "upgrade" in msg or "downgrade" in msg: return "Use format: 'change [resource_id] to [instance_type]'" return "Command not recognized. Use 'change [resource_id] to [instance_type]'" def _calculate_metrics(self, load: float, resources: list) -> Tuple[float, float, float]: total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in resources) avg_utilization = load / total_cap if total_cap > 0 else 0 utilization = min(avg_utilization, 1.5) latency = 30 + 70 * (avg_utilization ** 2) error_rate = max(0, (avg_utilization - 0.85) * 2) return latency, error_rate, avg_utilization def _calculate_iterative_reward( self, latency: float, error_rate: float, new_cost: float, prev_cost: float, prev_latency: float, utilization: float ) -> RewModel: task = self._ep.task_config budget = task.sla["max_budget"] max_latency = task.sla["max_latency_ms"] cost_improvement = (prev_cost - new_cost) / (prev_cost + 1e-6) latency_improvement = (prev_latency - latency) / (prev_latency + 1e-6) change_bonus = min(self._ep.changes_made * 0.06, 0.3) cost_ratio = new_cost / budget cost_reward = 0.3 * (1.0 / (1.0 + max(0, cost_ratio - 1))) lat_ratio = latency / max_latency perf_reward = 0.3 * (1.0 / (1.0 + max(0, lat_ratio - 1))) improvement_bonus = 0.0 if cost_improvement > 0: improvement_bonus += min(cost_improvement * 0.15, 0.1) if latency_improvement > 0: improvement_bonus += min(latency_improvement * 0.15, 0.1) base_reward = cost_reward + perf_reward total_reward = min(1.0, base_reward + change_bonus + improvement_bonus) if error_rate > 0.2: total_reward *= (1.0 - error_rate) exploration_bonus = min(self._ep.steps * 0.03, 0.15) if self._ep.last_action_success: total_reward = min(1.0, total_reward + exploration_bonus) initial_latency = self._ep.initial_latency initial_cost = self._ep.initial_cost cost_change = ((new_cost - initial_cost) / initial_cost) * 100 if initial_cost > 0 else 0 lat_change = ((latency - initial_latency) / initial_latency) * 100 if initial_latency > 0 else 0 return RewModel( value=min(1.0, max(0.0, total_reward)), reason=f"Changes: {self._ep.changes_made}, Cost: ${new_cost:.1f}, Latency: {latency:.1f}ms", cost_change_pct=cost_change, latency_change_pct=lat_change, ) def _build_observation(self, message: str) -> ObsModel: if self._ep is None: return self._error_obs() latency, error_rate, utilization = self._calculate_metrics( self._ep.current_load, self._ep.resources ) total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in self._ep.resources) for r in self._ep.resources: cap = INSTANCE_DATA[r.type]["capacity"] share = cap / total_cap if total_cap > 0 else 0 r.cpu_usage = min(100.0, self._ep.current_load * share / cap * 100) r.mem_usage = min(100.0, r.cpu_usage * 0.85) metrics = Metrics( avg_latency_ms=latency, error_rate=error_rate, throughput_rps=100.0 ) sla = SLA(**self._ep.task_config.sla) return ObsModel( inventory=self._ep.resources, metrics=metrics, sla=sla, echoed_message=message, task_id=self._ep.task_config.task_id, task_name=self._ep.task_config.name, difficulty=self._ep.task_config.difficulty, step=self._ep.steps, ) def _error_obs(self, message: str = "Error: Environment not initialized") -> ObsModel: return ObsModel( inventory=[], metrics=Metrics(avg_latency_ms=0, error_rate=0, throughput_rps=0), sla=SLA(max_latency_ms=0, max_budget=0, min_uptime_pct=0), echoed_message=message, ) def state(self) -> Dict[str, Any]: if self._ep is None: return {} latency, error_rate, utilization = self._calculate_metrics( self._ep.current_load, self._ep.resources ) total_cost = sum(r.monthly_cost for r in self._ep.resources) return { "episode_id": self._ep.episode_id, "task_id": self._ep.task_config.task_id, "task_name": self._ep.task_config.name, "difficulty": self._ep.task_config.difficulty, "steps": self._ep.steps, "changes_made": self._ep.changes_made, "crashed": self._ep.crashed, "resources": [ { "id": r.id, "type": r.type, "monthly_cost": r.monthly_cost, "cpu_usage": r.cpu_usage, "mem_usage": r.mem_usage, } for r in self._ep.resources ], "metrics": { "total_cost": total_cost, "latency_ms": latency, "error_rate": error_rate, "utilization": utilization, }, "sla": self._ep.task_config.sla, "exploration_history": self._ep.exploration_history, } Environment = CloudOpsEnvironment