import math import random import re from typing import Any, Dict, Optional, Tuple from uuid import uuid4 from dataclasses import dataclass, field from models import ( Observation as ObsModel, Action as ActModel, Reward as RewModel, Resource, Metrics, SLA, ) INSTANCE_DATA = { "t3.nano": {"cost": 3.6, "capacity": 1.0}, "t3.small": {"cost": 11.5, "capacity": 2.0}, "t3.medium": {"cost": 23.0, "capacity": 4.0}, "m5.large": {"cost": 70.0, "capacity": 8.0}, "m5.xlarge": {"cost": 140.0,"capacity": 16.0}, } @dataclass class TaskConfig: task_id: str name: str difficulty: str description: str initial_resources: list sla: dict load: float TASKS = { "easy": TaskConfig( task_id="easy_right_sizing", name="Right-Sizing", difficulty="easy", description="Reduce an overpriced server without breaking the SLA", initial_resources=[ {"id": "srv-1", "type": "m5.xlarge", "cpu_usage": 2.0, "mem_usage": 2.0, "monthly_cost": 140.0} ], sla={"max_latency_ms": 200.0, "max_budget": 30.0, "min_uptime_pct": 99.0}, load=2.0 ), "medium": TaskConfig( task_id="medium_latency_fix", name="Latency Fix", difficulty="medium", description="Resolve performance bottleneck while staying under budget", initial_resources=[ {"id": "srv-1", "type": "t3.nano", "cpu_usage": 98.0, "mem_usage": 90.0, "monthly_cost": 3.6} ], sla={"max_latency_ms": 100.0, "max_budget": 60.0, "min_uptime_pct": 99.9}, load=12.0 ), "hard": TaskConfig( task_id="hard_balance", name="Balance Optimization", difficulty="hard", description="Optimize a mixed cluster under tight budget constraints", initial_resources=[ {"id": "srv-1", "type": "m5.large", "cpu_usage": 40.0, "mem_usage": 30.0, "monthly_cost": 70.0}, {"id": "srv-2", "type": "t3.nano", "cpu_usage": 90.0, "mem_usage": 80.0, "monthly_cost": 3.6} ], sla={"max_latency_ms": 150.0, "max_budget": 35.0, "min_uptime_pct": 99.9}, load=25.0 ), } @dataclass class EpisodeState: task_config: TaskConfig resources: list current_load: float initial_cost: float initial_latency: float steps: int = 0 crashed: bool = False episode_id: str = field(default_factory=lambda: str(uuid4())) class CloudOpsEnvironment: """Cloud Infrastructure Optimization Environment. The agent acts as a Cloud SRE optimizing cost and performance. """ def __init__(self, max_steps: int = 12): self._max_steps = max_steps self._ep: Optional[EpisodeState] = None def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: Optional[str] = None, **kwargs: Any, ) -> ObsModel: if seed is not None: random.seed(seed) task_key = task_id or random.choice(["easy", "medium", "hard"]) if task_key not in TASKS: task_key = "easy" task = TASKS[task_key] resources = [ Resource(**r) for r in task.initial_resources ] initial_cost = sum(r.monthly_cost for r in resources) initial_latency, _, _ = self._calculate_metrics(task.load, resources) self._ep = EpisodeState( task_config=task, resources=resources, current_load=task.load, initial_cost=initial_cost, initial_latency=initial_latency, steps=0, crashed=False, episode_id=episode_id or str(uuid4()), ) return self._build_observation("Environment ready. Analyze and optimize.") def step(self, action: ActModel, **kwargs: Any) -> Tuple[ObsModel, RewModel, bool, Dict]: if self._ep is None: return self._error_obs("Environment not reset") self._ep.steps += 1 msg = action.message.lower() message = self._parse_and_execute(msg) latency, error_rate, utilization = self._calculate_metrics( self._ep.current_load, self._ep.resources ) if utilization > 1.1: self._ep.crashed = True obs = self._build_observation("SYSTEM CRASH: Resource exhaustion!") reward = RewModel(value=0.0, reason="System crashed due to resource exhaustion") return obs, reward, True, {"reason": "crash"} reward = self._calculate_reward(latency, error_rate) done = ( reward.value >= 0.98 or self._ep.steps >= self._max_steps ) obs = self._build_observation(message) return obs, reward, done, {} def _parse_and_execute(self, msg: str) -> str: match = re.search(r"change\s+([a-z0-9-]+)\s+to\s+([a-z0-9.]+)", msg) if match: res_id, new_type = match.groups() if new_type not in INSTANCE_DATA: return f"Error: Unknown instance type '{new_type}'. Available: {', '.join(INSTANCE_DATA.keys())}" for r in self._ep.resources: if r.id == res_id: r.type = new_type r.monthly_cost = INSTANCE_DATA[new_type]["cost"] return f"Changed {res_id} to {new_type}" return f"Error: Resource '{res_id}' not found" if "resize" in msg or "scale" in msg or "upgrade" in msg or "downgrade" in msg: return "Use format: 'change [resource_id] to [instance_type]'" return "Command not recognized. Use 'change [resource_id] to [instance_type]'" def _calculate_metrics(self, load: float, resources: list) -> Tuple[float, float, float]: total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in resources) utilization = load / (total_cap + 1e-6) latency = 50 * (1 + math.exp(utilization * 2 - 2)) error_rate = 0.0 if utilization < 0.9 else (utilization - 0.9) * 2.0 return latency, error_rate, utilization def _calculate_reward(self, latency: float, error_rate: float) -> RewModel: total_cost = sum(r.monthly_cost for r in self._ep.resources) budget = self._ep.task_config.sla["max_latency_ms"] cost_ratio = total_cost / budget cost_reward = 0.5 * (1.0 / (1.0 + max(0, cost_ratio - 1))) lat_ratio = latency / budget perf_reward = 0.5 * (1.0 / (1.0 + max(0, lat_ratio - 1))) total_reward = cost_reward + perf_reward initial_latency = self._ep.initial_latency initial_cost = self._ep.initial_cost cost_change = ((total_cost - initial_cost) / initial_cost) * 100 if initial_cost > 0 else 0 lat_change = ((latency - initial_latency) / initial_latency) * 100 if initial_latency > 0 else 0 return RewModel( value=min(1.0, max(0.0, total_reward)), reason=f"Cost: ${total_cost:.1f}/mo, Latency: {latency:.1f}ms", cost_change_pct=cost_change, latency_change_pct=lat_change, ) def _build_observation(self, message: str) -> ObsModel: if self._ep is None: return self._error_obs() latency, error_rate, _ = self._calculate_metrics( self._ep.current_load, self._ep.resources ) for r in self._ep.resources: r.cpu_usage = min(100.0, self._ep.current_load / INSTANCE_DATA[r.type]["capacity"] * 100) r.mem_usage = min(100.0, r.cpu_usage * 0.9) metrics = Metrics( avg_latency_ms=latency, error_rate=error_rate, throughput_rps=100.0 ) sla = SLA(**self._ep.task_config.sla) return ObsModel( inventory=self._ep.resources, metrics=metrics, sla=sla, echoed_message=message, task_id=self._ep.task_config.task_id, task_name=self._ep.task_config.name, difficulty=self._ep.task_config.difficulty, step=self._ep.steps, ) def _error_obs(self, message: str = "Error: Environment not initialized") -> ObsModel: return ObsModel( inventory=[], metrics=Metrics(avg_latency_ms=0, error_rate=0, throughput_rps=0), sla=SLA(max_latency_ms=0, max_budget=0, min_uptime_pct=0), echoed_message=message, ) @property def state(self) -> Dict[str, Any]: if self._ep is None: return {} return { "episode_id": self._ep.episode_id, "task_id": self._ep.task_config.task_id, "steps": self._ep.steps, "crashed": self._ep.crashed, } Environment = CloudOpsEnvironment