orgOS / client.py
Taniieeee83's picture
added minimal ui and all 4 apps+workflows
2305b9f
"""
Synchronous HTTP client for the OrgOS OpenEnv environment.
Usage
-----
from client import OrgOSEnvClient
from models import OrgOSAction
client = OrgOSEnvClient(base_url="http://localhost:8000")
# Start a new episode (workflow_id "A"/"B"/"C" or None for round-robin)
result = client.reset(workflow_id="A")
print(result.observation.workflow_goal)
# Take a step
action = OrgOSAction(
app="zendesk",
operation="acknowledge_ticket",
args={"ticket_number": "ZD-001"},
)
result = client.step(action)
print(result.observation.current_score, result.reward, result.done)
# Inspect state
state = client.state()
print(state.episode_id, state.workflow_completion)
"""
from typing import Optional
import httpx
from pydantic import BaseModel
from models import OrgOSAction, OrgOSObservation, OrgOSState
class StepResult(BaseModel):
"""Returned by reset() and step()."""
observation: OrgOSObservation
reward: float
done: bool
info: dict = {}
class OrgOSEnvClient:
"""
Thin synchronous wrapper around the OrgOS HTTP API.
All methods raise httpx.HTTPStatusError on non-2xx responses.
"""
def __init__(self, base_url: str = "http://localhost:8000", timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self._client = httpx.Client(base_url=self.base_url, timeout=timeout)
# ------------------------------------------------------------------
# Core API
# ------------------------------------------------------------------
def reset(self, workflow_id: Optional[str] = None) -> StepResult:
"""
Start a new episode.
Parameters
----------
workflow_id : str | None
"A" = Customer Bug Fix (support role)
"B" = Employee Onboarding (manager role)
"C" = Churn Risk Alert (support role)
None = round-robin (A β†’ B β†’ C β†’ A …)
"""
payload = {"workflow_id": workflow_id} if workflow_id is not None else {}
resp = self._client.post("/reset", json=payload)
resp.raise_for_status()
return StepResult(**resp.json())
def step(self, action: OrgOSAction) -> StepResult:
"""
Take one action in the environment.
Parameters
----------
action : OrgOSAction
app : str – "jira" | "zendesk" | "salesforce" | "workday"
operation : str – app-specific operation name
args : dict – operation arguments
"""
resp = self._client.post("/step", json=action.model_dump())
resp.raise_for_status()
return StepResult(**resp.json())
def state(self) -> OrgOSState:
"""Return current episode metadata without modifying state."""
resp = self._client.get("/state")
resp.raise_for_status()
return OrgOSState(**resp.json())
def health(self) -> dict:
"""Ping the server. Returns {"status": "healthy"} if healthy."""
resp = self._client.get("/health")
resp.raise_for_status()
return resp.json()
def app_schemas(self) -> dict:
"""Return per-app operation catalogue."""
resp = self._client.get("/schema/apps")
resp.raise_for_status()
return resp.json()
# ------------------------------------------------------------------
# Context manager support
# ------------------------------------------------------------------
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def close(self):
self._client.close()