Spaces:
Running
Running
| """ | |
| Synchronous HTTP client for the OrgOS OpenEnv environment. | |
| Usage | |
| ----- | |
| from client import OrgOSEnvClient | |
| from models import OrgOSAction | |
| client = OrgOSEnvClient(base_url="http://localhost:8000") | |
| # Start a new episode (workflow_id "A"/"B"/"C" or None for round-robin) | |
| result = client.reset(workflow_id="A") | |
| print(result.observation.workflow_goal) | |
| # Take a step | |
| action = OrgOSAction( | |
| app="zendesk", | |
| operation="acknowledge_ticket", | |
| args={"ticket_number": "ZD-001"}, | |
| ) | |
| result = client.step(action) | |
| print(result.observation.current_score, result.reward, result.done) | |
| # Inspect state | |
| state = client.state() | |
| print(state.episode_id, state.workflow_completion) | |
| """ | |
| from typing import Optional | |
| import httpx | |
| from pydantic import BaseModel | |
| from models import OrgOSAction, OrgOSObservation, OrgOSState | |
| class StepResult(BaseModel): | |
| """Returned by reset() and step().""" | |
| observation: OrgOSObservation | |
| reward: float | |
| done: bool | |
| info: dict = {} | |
| class OrgOSEnvClient: | |
| """ | |
| Thin synchronous wrapper around the OrgOS HTTP API. | |
| All methods raise httpx.HTTPStatusError on non-2xx responses. | |
| """ | |
| def __init__(self, base_url: str = "http://localhost:8000", timeout: float = 30.0): | |
| self.base_url = base_url.rstrip("/") | |
| self._client = httpx.Client(base_url=self.base_url, timeout=timeout) | |
| # ------------------------------------------------------------------ | |
| # Core API | |
| # ------------------------------------------------------------------ | |
| def reset(self, workflow_id: Optional[str] = None) -> StepResult: | |
| """ | |
| Start a new episode. | |
| Parameters | |
| ---------- | |
| workflow_id : str | None | |
| "A" = Customer Bug Fix (support role) | |
| "B" = Employee Onboarding (manager role) | |
| "C" = Churn Risk Alert (support role) | |
| None = round-robin (A β B β C β A β¦) | |
| """ | |
| payload = {"workflow_id": workflow_id} if workflow_id is not None else {} | |
| resp = self._client.post("/reset", json=payload) | |
| resp.raise_for_status() | |
| return StepResult(**resp.json()) | |
| def step(self, action: OrgOSAction) -> StepResult: | |
| """ | |
| Take one action in the environment. | |
| Parameters | |
| ---------- | |
| action : OrgOSAction | |
| app : str β "jira" | "zendesk" | "salesforce" | "workday" | |
| operation : str β app-specific operation name | |
| args : dict β operation arguments | |
| """ | |
| resp = self._client.post("/step", json=action.model_dump()) | |
| resp.raise_for_status() | |
| return StepResult(**resp.json()) | |
| def state(self) -> OrgOSState: | |
| """Return current episode metadata without modifying state.""" | |
| resp = self._client.get("/state") | |
| resp.raise_for_status() | |
| return OrgOSState(**resp.json()) | |
| def health(self) -> dict: | |
| """Ping the server. Returns {"status": "healthy"} if healthy.""" | |
| resp = self._client.get("/health") | |
| resp.raise_for_status() | |
| return resp.json() | |
| def app_schemas(self) -> dict: | |
| """Return per-app operation catalogue.""" | |
| resp = self._client.get("/schema/apps") | |
| resp.raise_for_status() | |
| return resp.json() | |
| # ------------------------------------------------------------------ | |
| # Context manager support | |
| # ------------------------------------------------------------------ | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *_): | |
| self.close() | |
| def close(self): | |
| self._client.close() | |