cloud-ops-optimizer / env\core.py
hirann's picture
Upload env\core.py
97e0833 verified
import math
import random
import re
from typing import Any, Dict, Optional, Tuple
from uuid import uuid4
from dataclasses import dataclass, field
from models import (
Observation as ObsModel,
Action as ActModel,
Reward as RewModel,
Resource,
Metrics,
SLA,
)
INSTANCE_DATA = {
"t3.nano": {"cost": 3.6, "capacity": 1.0},
"t3.small": {"cost": 11.5, "capacity": 2.0},
"t3.medium": {"cost": 23.0, "capacity": 4.0},
"m5.large": {"cost": 70.0, "capacity": 8.0},
"m5.xlarge": {"cost": 140.0,"capacity": 16.0},
}
@dataclass
class TaskConfig:
task_id: str
name: str
difficulty: str
description: str
initial_resources: list
sla: dict
load: float
TASKS = {
"easy": TaskConfig(
task_id="easy_right_sizing",
name="Right-Sizing",
difficulty="easy",
description="Reduce an overpriced server without breaking the SLA",
initial_resources=[
{"id": "srv-1", "type": "m5.xlarge", "cpu_usage": 2.0, "mem_usage": 2.0, "monthly_cost": 140.0}
],
sla={"max_latency_ms": 200.0, "max_budget": 30.0, "min_uptime_pct": 99.0},
load=2.0
),
"medium": TaskConfig(
task_id="medium_latency_fix",
name="Latency Fix",
difficulty="medium",
description="Resolve performance bottleneck while staying under budget",
initial_resources=[
{"id": "srv-1", "type": "t3.nano", "cpu_usage": 98.0, "mem_usage": 90.0, "monthly_cost": 3.6}
],
sla={"max_latency_ms": 100.0, "max_budget": 60.0, "min_uptime_pct": 99.9},
load=12.0
),
"hard": TaskConfig(
task_id="hard_balance",
name="Balance Optimization",
difficulty="hard",
description="Optimize a mixed cluster under tight budget constraints",
initial_resources=[
{"id": "srv-1", "type": "m5.large", "cpu_usage": 40.0, "mem_usage": 30.0, "monthly_cost": 70.0},
{"id": "srv-2", "type": "t3.nano", "cpu_usage": 90.0, "mem_usage": 80.0, "monthly_cost": 3.6}
],
sla={"max_latency_ms": 150.0, "max_budget": 35.0, "min_uptime_pct": 99.9},
load=25.0
),
}
@dataclass
class EpisodeState:
task_config: TaskConfig
resources: list
current_load: float
initial_cost: float
initial_latency: float
steps: int = 0
crashed: bool = False
episode_id: str = field(default_factory=lambda: str(uuid4()))
class CloudOpsEnvironment:
"""Cloud Infrastructure Optimization Environment.
The agent acts as a Cloud SRE optimizing cost and performance.
"""
def __init__(self, max_steps: int = 12):
self._max_steps = max_steps
self._ep: Optional[EpisodeState] = None
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs: Any,
) -> ObsModel:
if seed is not None:
random.seed(seed)
task_key = task_id or random.choice(["easy", "medium", "hard"])
if task_key not in TASKS:
task_key = "easy"
task = TASKS[task_key]
resources = [
Resource(**r) for r in task.initial_resources
]
initial_cost = sum(r.monthly_cost for r in resources)
initial_latency, _, _ = self._calculate_metrics(task.load, resources)
self._ep = EpisodeState(
task_config=task,
resources=resources,
current_load=task.load,
initial_cost=initial_cost,
initial_latency=initial_latency,
steps=0,
crashed=False,
episode_id=episode_id or str(uuid4()),
)
return self._build_observation("Environment ready. Analyze and optimize.")
def step(self, action: ActModel, **kwargs: Any) -> Tuple[ObsModel, RewModel, bool, Dict]:
if self._ep is None:
return self._error_obs("Environment not reset")
self._ep.steps += 1
msg = action.message.lower()
message = self._parse_and_execute(msg)
latency, error_rate, utilization = self._calculate_metrics(
self._ep.current_load,
self._ep.resources
)
if utilization > 1.1:
self._ep.crashed = True
obs = self._build_observation("SYSTEM CRASH: Resource exhaustion!")
reward = RewModel(value=0.0, reason="System crashed due to resource exhaustion")
return obs, reward, True, {"reason": "crash"}
reward = self._calculate_reward(latency, error_rate)
done = (
reward.value >= 0.98 or
self._ep.steps >= self._max_steps
)
obs = self._build_observation(message)
return obs, reward, done, {}
def _parse_and_execute(self, msg: str) -> str:
match = re.search(r"change\s+([a-z0-9-]+)\s+to\s+([a-z0-9.]+)", msg)
if match:
res_id, new_type = match.groups()
if new_type not in INSTANCE_DATA:
return f"Error: Unknown instance type '{new_type}'. Available: {', '.join(INSTANCE_DATA.keys())}"
for r in self._ep.resources:
if r.id == res_id:
r.type = new_type
r.monthly_cost = INSTANCE_DATA[new_type]["cost"]
return f"Changed {res_id} to {new_type}"
return f"Error: Resource '{res_id}' not found"
if "resize" in msg or "scale" in msg or "upgrade" in msg or "downgrade" in msg:
return "Use format: 'change [resource_id] to [instance_type]'"
return "Command not recognized. Use 'change [resource_id] to [instance_type]'"
def _calculate_metrics(self, load: float, resources: list) -> Tuple[float, float, float]:
total_cap = sum(INSTANCE_DATA[r.type]["capacity"] for r in resources)
utilization = load / (total_cap + 1e-6)
latency = 50 * (1 + math.exp(utilization * 2 - 2))
error_rate = 0.0 if utilization < 0.9 else (utilization - 0.9) * 2.0
return latency, error_rate, utilization
def _calculate_reward(self, latency: float, error_rate: float) -> RewModel:
total_cost = sum(r.monthly_cost for r in self._ep.resources)
budget = self._ep.task_config.sla["max_latency_ms"]
cost_ratio = total_cost / budget
cost_reward = 0.5 * (1.0 / (1.0 + max(0, cost_ratio - 1)))
lat_ratio = latency / budget
perf_reward = 0.5 * (1.0 / (1.0 + max(0, lat_ratio - 1)))
total_reward = cost_reward + perf_reward
initial_latency = self._ep.initial_latency
initial_cost = self._ep.initial_cost
cost_change = ((total_cost - initial_cost) / initial_cost) * 100 if initial_cost > 0 else 0
lat_change = ((latency - initial_latency) / initial_latency) * 100 if initial_latency > 0 else 0
return RewModel(
value=min(1.0, max(0.0, total_reward)),
reason=f"Cost: ${total_cost:.1f}/mo, Latency: {latency:.1f}ms",
cost_change_pct=cost_change,
latency_change_pct=lat_change,
)
def _build_observation(self, message: str) -> ObsModel:
if self._ep is None:
return self._error_obs()
latency, error_rate, _ = self._calculate_metrics(
self._ep.current_load,
self._ep.resources
)
for r in self._ep.resources:
r.cpu_usage = min(100.0, self._ep.current_load / INSTANCE_DATA[r.type]["capacity"] * 100)
r.mem_usage = min(100.0, r.cpu_usage * 0.9)
metrics = Metrics(
avg_latency_ms=latency,
error_rate=error_rate,
throughput_rps=100.0
)
sla = SLA(**self._ep.task_config.sla)
return ObsModel(
inventory=self._ep.resources,
metrics=metrics,
sla=sla,
echoed_message=message,
task_id=self._ep.task_config.task_id,
task_name=self._ep.task_config.name,
difficulty=self._ep.task_config.difficulty,
step=self._ep.steps,
)
def _error_obs(self, message: str = "Error: Environment not initialized") -> ObsModel:
return ObsModel(
inventory=[],
metrics=Metrics(avg_latency_ms=0, error_rate=0, throughput_rps=0),
sla=SLA(max_latency_ms=0, max_budget=0, min_uptime_pct=0),
echoed_message=message,
)
@property
def state(self) -> Dict[str, Any]:
if self._ep is None:
return {}
return {
"episode_id": self._ep.episode_id,
"task_id": self._ep.task_config.task_id,
"steps": self._ep.steps,
"crashed": self._ep.crashed,
}
Environment = CloudOpsEnvironment