Spaces:
Running
Running
File size: 3,676 Bytes
4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | """
Synchronous HTTP client for the OrgOS OpenEnv environment.
Usage
-----
from client import OrgOSEnvClient
from models import OrgOSAction
client = OrgOSEnvClient(base_url="http://localhost:8000")
# Start a new episode (workflow_id "A"/"B"/"C" or None for round-robin)
result = client.reset(workflow_id="A")
print(result.observation.workflow_goal)
# Take a step
action = OrgOSAction(
app="zendesk",
operation="acknowledge_ticket",
args={"ticket_number": "ZD-001"},
)
result = client.step(action)
print(result.observation.current_score, result.reward, result.done)
# Inspect state
state = client.state()
print(state.episode_id, state.workflow_completion)
"""
from typing import Optional
import httpx
from pydantic import BaseModel
from models import OrgOSAction, OrgOSObservation, OrgOSState
class StepResult(BaseModel):
"""Returned by reset() and step()."""
observation: OrgOSObservation
reward: float
done: bool
info: dict = {}
class OrgOSEnvClient:
"""
Thin synchronous wrapper around the OrgOS HTTP API.
All methods raise httpx.HTTPStatusError on non-2xx responses.
"""
def __init__(self, base_url: str = "http://localhost:8000", timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self._client = httpx.Client(base_url=self.base_url, timeout=timeout)
# ------------------------------------------------------------------
# Core API
# ------------------------------------------------------------------
def reset(self, workflow_id: Optional[str] = None) -> StepResult:
"""
Start a new episode.
Parameters
----------
workflow_id : str | None
"A" = Customer Bug Fix (support role)
"B" = Employee Onboarding (manager role)
"C" = Churn Risk Alert (support role)
None = round-robin (A → B → C → A …)
"""
payload = {"workflow_id": workflow_id} if workflow_id is not None else {}
resp = self._client.post("/reset", json=payload)
resp.raise_for_status()
return StepResult(**resp.json())
def step(self, action: OrgOSAction) -> StepResult:
"""
Take one action in the environment.
Parameters
----------
action : OrgOSAction
app : str – "jira" | "zendesk" | "salesforce" | "workday"
operation : str – app-specific operation name
args : dict – operation arguments
"""
resp = self._client.post("/step", json=action.model_dump())
resp.raise_for_status()
return StepResult(**resp.json())
def state(self) -> OrgOSState:
"""Return current episode metadata without modifying state."""
resp = self._client.get("/state")
resp.raise_for_status()
return OrgOSState(**resp.json())
def health(self) -> dict:
"""Ping the server. Returns {"status": "healthy"} if healthy."""
resp = self._client.get("/health")
resp.raise_for_status()
return resp.json()
def app_schemas(self) -> dict:
"""Return per-app operation catalogue."""
resp = self._client.get("/schema/apps")
resp.raise_for_status()
return resp.json()
# ------------------------------------------------------------------
# Context manager support
# ------------------------------------------------------------------
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def close(self):
self._client.close()
|