File size: 3,676 Bytes
4719066
2305b9f
4719066
 
 
2305b9f
 
4719066
2305b9f
4719066
2305b9f
 
 
4719066
 
2305b9f
 
 
 
4719066
 
 
 
 
 
2305b9f
4719066
 
 
 
 
 
2305b9f
4719066
 
 
 
2305b9f
4719066
 
 
 
 
2305b9f
4719066
2305b9f
4719066
 
 
 
 
2305b9f
4719066
 
 
 
 
 
2305b9f
4719066
 
 
 
 
2305b9f
 
 
 
 
4719066
2305b9f
4719066
 
 
 
2305b9f
4719066
2305b9f
4719066
 
 
2305b9f
 
 
 
4719066
 
 
 
 
2305b9f
4719066
 
 
2305b9f
4719066
 
2305b9f
4719066
 
 
 
2305b9f
 
 
 
 
 
4719066
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Synchronous HTTP client for the OrgOS OpenEnv environment.

Usage
-----
    from client import OrgOSEnvClient
    from models import OrgOSAction

    client = OrgOSEnvClient(base_url="http://localhost:8000")

    # Start a new episode (workflow_id "A"/"B"/"C" or None for round-robin)
    result = client.reset(workflow_id="A")
    print(result.observation.workflow_goal)

    # Take a step
    action = OrgOSAction(
        app="zendesk",
        operation="acknowledge_ticket",
        args={"ticket_number": "ZD-001"},
    )
    result = client.step(action)
    print(result.observation.current_score, result.reward, result.done)

    # Inspect state
    state = client.state()
    print(state.episode_id, state.workflow_completion)
"""

from typing import Optional
import httpx
from pydantic import BaseModel

from models import OrgOSAction, OrgOSObservation, OrgOSState


class StepResult(BaseModel):
    """Returned by reset() and step()."""
    observation: OrgOSObservation
    reward: float
    done: bool
    info: dict = {}


class OrgOSEnvClient:
    """
    Thin synchronous wrapper around the OrgOS HTTP API.

    All methods raise httpx.HTTPStatusError on non-2xx responses.
    """

    def __init__(self, base_url: str = "http://localhost:8000", timeout: float = 30.0):
        self.base_url  = base_url.rstrip("/")
        self._client   = httpx.Client(base_url=self.base_url, timeout=timeout)

    # ------------------------------------------------------------------
    # Core API
    # ------------------------------------------------------------------

    def reset(self, workflow_id: Optional[str] = None) -> StepResult:
        """
        Start a new episode.

        Parameters
        ----------
        workflow_id : str | None
            "A" = Customer Bug Fix  (support role)
            "B" = Employee Onboarding  (manager role)
            "C" = Churn Risk Alert  (support role)
            None = round-robin (A → B → C → A …)
        """
        payload = {"workflow_id": workflow_id} if workflow_id is not None else {}
        resp    = self._client.post("/reset", json=payload)
        resp.raise_for_status()
        return StepResult(**resp.json())

    def step(self, action: OrgOSAction) -> StepResult:
        """
        Take one action in the environment.

        Parameters
        ----------
        action : OrgOSAction
            app       : str   – "jira" | "zendesk" | "salesforce" | "workday"
            operation : str   – app-specific operation name
            args      : dict  – operation arguments
        """
        resp = self._client.post("/step", json=action.model_dump())
        resp.raise_for_status()
        return StepResult(**resp.json())

    def state(self) -> OrgOSState:
        """Return current episode metadata without modifying state."""
        resp = self._client.get("/state")
        resp.raise_for_status()
        return OrgOSState(**resp.json())

    def health(self) -> dict:
        """Ping the server. Returns {"status": "healthy"} if healthy."""
        resp = self._client.get("/health")
        resp.raise_for_status()
        return resp.json()

    def app_schemas(self) -> dict:
        """Return per-app operation catalogue."""
        resp = self._client.get("/schema/apps")
        resp.raise_for_status()
        return resp.json()

    # ------------------------------------------------------------------
    # Context manager support
    # ------------------------------------------------------------------

    def __enter__(self):
        return self

    def __exit__(self, *_):
        self.close()

    def close(self):
        self._client.close()