Spaces:
Sleeping
Sleeping
| """CloudOps Optimizer Environment Client. | |
| Provides the async EnvClient for connecting to the server. | |
| """ | |
| from typing import Any, Dict, Optional | |
| try: | |
| from openenv.core.env_client import EnvClient | |
| from openenv.core.client_types import StepResult | |
| except ImportError: | |
| from openenv.core.env_client import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from models import Observation as ObsModel, Action as ActModel | |
| class CloudOpsClient(EnvClient[ActModel, ObsModel, Dict[str, Any]]): | |
| """Async client for the CloudOps Optimizer Environment.""" | |
| def _step_payload(self, action: ActModel) -> Dict[str, Any]: | |
| return action.model_dump() | |
| def _parse_result(self, payload: Dict[str, Any]) -> "StepResult[ObsModel]": | |
| obs_data = payload.get("observation", payload) | |
| reward = payload.get("reward", 0.0) | |
| done = payload.get("done", False) | |
| info = payload.get("info", {}) | |
| try: | |
| observation = ObsModel.model_validate(obs_data) | |
| except Exception: | |
| observation = ObsModel( | |
| inventory=[], | |
| metrics=payload.get("metrics", {"avg_latency_ms": 0, "error_rate": 0, "throughput_rps": 0}), | |
| sla=payload.get("sla", {"max_latency_ms": 0, "max_budget": 0, "min_uptime_pct": 0}), | |
| echoed_message="Error parsing observation", | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=reward, | |
| done=done, | |
| info=info, | |
| ) | |
| def _parse_state(self, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| return payload | |
| def get_client(base_url: str = "http://localhost:7860") -> CloudOpsClient: | |
| """Create a CloudOps client.""" | |
| return CloudOpsClient(base_url=base_url) | |
| __all__ = ["CloudOpsClient", "get_client"] |