Spaces:
Running
Running
| """OpenEnv-friendly HTTP client for the PolyGuard environment.""" | |
| from __future__ import annotations | |
| from typing import Any | |
| import requests | |
| class PolyGuardOpenEnvClient: | |
| def __init__(self, base_url: str = "http://127.0.0.1:8100") -> None: | |
| self.base_url = base_url.rstrip("/") | |
| def reset(self, **kwargs: Any) -> dict[str, Any]: | |
| response = requests.post(f"{self.base_url}/reset", json=kwargs, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def step(self, action: dict[str, Any]) -> dict[str, Any]: | |
| response = requests.post(f"{self.base_url}/step", json=action, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def state(self) -> dict[str, Any]: | |
| response = requests.get(f"{self.base_url}/state", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def metadata(self) -> dict[str, Any]: | |
| response = requests.get(f"{self.base_url}/metadata", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def schema(self) -> dict[str, Any]: | |
| response = requests.get(f"{self.base_url}/schema", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| def mcp(self, payload: dict[str, Any]) -> dict[str, Any]: | |
| response = requests.post(f"{self.base_url}/mcp", json=payload, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |