Spaces:
Sleeping
Sleeping
| import math | |
| import random | |
| import re | |
| from typing import Any, Dict, Optional, Tuple | |
| from uuid import uuid4 | |
| from dataclasses import dataclass, field | |
| from models import ( | |
| Observation as ObsModel, | |
| Action as ActModel, | |
| Reward as RewModel, | |
| Resource, | |
| Metrics, | |
| SLA, | |
| ) | |
| INSTANCE_DATA = { | |
| "t3.nano": {"cost": 3.6, "capacity": 1.0}, | |
| "t3.small": {"cost": 11.5, "capacity": 2.0}, | |
| "t3.medium": {"cost": 23.0, "capacity": 4.0}, | |
| "m5.large": {"cost": 70.0, "capacity": 8.0}, | |
| "m5.xlarge": {"cost": 140.0,"capacity": 16.0}, | |
| } | |
| class TaskConfig: | |
| task_id: str | |
| name: str | |
| difficulty: str | |
| description: str | |
| initial_resources: list | |
| sla: dict | |
| load: float | |
| TASKS = { | |
| "easy": TaskConfig( | |
| task_id="easy_right_sizing", | |
| name="Right-Sizing", | |
| difficulty="easy", | |
| description="Reduce an overpriced server without breaking the SLA", | |
| initial_resources=[ | |
| {"id": "srv-1", "type": "m5.xlarge", "cpu_usage": 2.0, "mem_usage": 2.0, "monthly_cost": 140.0} | |
| ], | |
| sla={"max_latency_ms": 200.0, "max_budget": 30.0, "min_uptime_pct": 99.0}, | |
| load=2.0 | |
| ), | |
| "medium": TaskConfig( | |
| task_id="medium_latency_fix", | |
| name="Latency Fix", | |
| difficulty="medium", | |
| description="Resolve performance bottleneck while staying under budget", | |
| initial_resources=[ | |
| {"id": "srv-1", "type": "t3.nano", "cpu_usage": 98.0, "mem_usage": 90.0, "monthly_cost": 3.6} | |
| ], | |
| sla={"max_latency_ms": 100.0, "max_budget": 60.0, "min_uptime_pct": 99.9}, | |
| load=12.0 | |
| ), | |
| "hard": TaskConfig( | |
| task_id="hard_balance", | |
| name="Balance Optimization", | |
| difficulty="hard", | |
| description="Optimize a mixed cluster under tight budget constraints", | |
| initial_resources=[ | |
| {"id": "srv-1", "type": "m5.large", "cpu_usage": 40.0, "mem_usage": 30.0, "monthly_cost": 70.0}, | |
| {"id": "srv-2", "type": "t3.nano", "cpu_usage": 90.0, "mem_usage": 80.0, "monthly_cost": 3.6} | |
| ], | |
| sla={"max_latency_ms": 150.0, "max_budget": 35.0, "min_uptime_pct": 99.9}, | |
| load=25.0 | |
| ), | |
| } | |
| class EpisodeState: | |
| task_config: TaskConfig | |
| resources: list | |
| current_load: float | |
| initial_cost: float | |
| initial_latency: float | |
| steps: int = 0 | |
| crashed: bool = False | |
| episode_id: str = field(default_factory=lambda: str(uuid4())) | |
| class CloudOpsEnvironment: | |
| """Cloud Infrastructure Optimization Environment. | |
| The agent acts as a Cloud SRE optimizing cost and performance. | |
| """ | |
| def __init__(self, max_steps: int = 12): | |
| self._max_steps = max_steps | |
| self._ep: Optional[EpisodeState] = None | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> ObsModel: | |
| if seed is not None: | |
| random.seed(seed) | |
| task_key = task_id or random.choice(["easy", "medium", "hard"]) | |
| if task_key not in TASKS: | |
| task_key = "easy" | |
| task = TASKS[task_key] | |
| resources = [ | |
| Resource(**r) for r in task.initial_resources | |
| ] | |
| initial_cost = sum(r.monthly_cost for r in resources) | |
| initial_latency, _, _ = self._calculate_metrics(task.load, resources) | |
| self._ep = EpisodeState( | |
| task_config=task, | |
| resources=resources, | |
| current_load=task.load, | |
| initial_cost=initial_cost, | |
| initial_latency=initial_latency, | |
| steps=0, | |
| crashed=False, | |
| episode_id=episode_id or str(uuid4()), | |
| ) | |
| return self._build_observation("Environment ready. Analyze and optimize.") | |
| def step(self, action: ActModel, **kwargs: Any) -> Tuple[ObsModel, RewModel, bool, Dict]: | |
| if self._ep is None: | |
| return self._error_obs("Environment not reset") | |
| self._ep.steps += 1 | |
| msg = action.message.lower() | |
| message = self._parse_and_execute(msg) | |
| latency, error_rate, utilization = self._calculate_metrics( | |
| self._ep.current_load, | |
| self._ep.resources | |
| ) | |
| if utilization > 1.1: | |
| self._ep.crashed = True | |
| obs = self._build_observation("SYSTEM CRASH: Resource exhaustion!") | |
| reward = RewModel(value=0.0, reason="System crashed due to resource exhaustion") | |
| return obs, reward, True, {"reason": "crash"} | |
| reward = self._calculate_reward(latency, error_rate) | |
| done = ( | |
| reward.value >= 0.98 or | |
| self._ep.steps >= self._max_steps | |
| ) | |
| obs = self._build_observation(message) | |
| return obs, reward, done, {} | |
| def _parse_and_execute(self, msg: str) -> str: | |
| match = re.search(r"change\s+([a-z0-9-]+)\s+to\s+([a-z0-9.]+)", msg) | |
| if match: | |
| res_id, new_type = match.groups() | |
| if new_type not in INSTANCE_DATA: | |
| return f"Error: Unknown instance type '{new_type}'. Available: {', '.join(INSTANCE_DATA.keys())}" | |
| for r in self._ep.resources: | |
| if r.id == res_id: | |
| r.type = new_type | |
| r.monthly_cost = INSTANCE_DATA[new_type]["cost"] | |
| return f"Changed {res_id} to {new_type}" | |
| return f"Error: Resource '{res_id}' not found" | |
| if "resize" in msg or "scale" in msg or "upgrade" in msg or "downgrade" in msg: | |
| return "Use format: 'change [resource_id] to [instance_type]'" | |
| return "Command not recognized. Use 'change [resource_id] to [instance_type]'" | |
| def _calculate_metrics(self, load: float, resources: list) -> Tuple[float, float, float]: | |
| total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in resources) | |
| utilization = load / (total_cap + 1e-6) | |
| latency = 50 * (1 + math.exp(utilization * 2 - 2)) | |
| error_rate = 0.0 if utilization < 0.9 else (utilization - 0.9) * 2.0 | |
| return latency, error_rate, utilization | |
| def _calculate_reward(self, latency: float, error_rate: float) -> RewModel: | |
| total_cost = sum(r.monthly_cost for r in self._ep.resources) | |
| budget = self._ep.task_config.sla["max_latency_ms"] | |
| cost_ratio = total_cost / budget | |
| cost_reward = 0.5 * (1.0 / (1.0 + max(0, cost_ratio - 1))) | |
| lat_ratio = latency / budget | |
| perf_reward = 0.5 * (1.0 / (1.0 + max(0, lat_ratio - 1))) | |
| total_reward = cost_reward + perf_reward | |
| initial_latency = self._ep.initial_latency | |
| initial_cost = self._ep.initial_cost | |
| cost_change = ((total_cost - initial_cost) / initial_cost) * 100 if initial_cost > 0 else 0 | |
| lat_change = ((latency - initial_latency) / initial_latency) * 100 if initial_latency > 0 else 0 | |
| return RewModel( | |
| value=min(1.0, max(0.0, total_reward)), | |
| reason=f"Cost: ${total_cost:.1f}/mo, Latency: {latency:.1f}ms", | |
| cost_change_pct=cost_change, | |
| latency_change_pct=lat_change, | |
| ) | |
| def _build_observation(self, message: str) -> ObsModel: | |
| if self._ep is None: | |
| return self._error_obs() | |
| latency, error_rate, _ = self._calculate_metrics( | |
| self._ep.current_load, | |
| self._ep.resources | |
| ) | |
| for r in self._ep.resources: | |
| r.cpu_usage = min(100.0, self._ep.current_load / INSTANCE_DATA[r.type]["capacity"] * 100) | |
| r.mem_usage = min(100.0, r.cpu_usage * 0.9) | |
| metrics = Metrics( | |
| avg_latency_ms=latency, | |
| error_rate=error_rate, | |
| throughput_rps=100.0 | |
| ) | |
| sla = SLA(**self._ep.task_config.sla) | |
| return ObsModel( | |
| inventory=self._ep.resources, | |
| metrics=metrics, | |
| sla=sla, | |
| echoed_message=message, | |
| task_id=self._ep.task_config.task_id, | |
| task_name=self._ep.task_config.name, | |
| difficulty=self._ep.task_config.difficulty, | |
| step=self._ep.steps, | |
| ) | |
| def _error_obs(self, message: str = "Error: Environment not initialized") -> ObsModel: | |
| return ObsModel( | |
| inventory=[], | |
| metrics=Metrics(avg_latency_ms=0, error_rate=0, throughput_rps=0), | |
| sla=SLA(max_latency_ms=0, max_budget=0, min_uptime_pct=0), | |
| echoed_message=message, | |
| ) | |
| def state(self) -> Dict[str, Any]: | |
| if self._ep is None: | |
| return {} | |
| return { | |
| "episode_id": self._ep.episode_id, | |
| "task_id": self._ep.task_config.task_id, | |
| "steps": self._ep.steps, | |
| "crashed": self._ep.crashed, | |
| } | |
| Environment = CloudOpsEnvironment |