Spaces:
Sleeping
Sleeping
File size: 6,822 Bytes
4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b 4f129c9 181758b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | """FastAPI application for the SupportDesk environment."""
from __future__ import annotations
import os
from typing import Any
import uvicorn
from fastapi import Body, HTTPException
from fastapi.routing import APIRoute
try:
from openenv.core.env_server import http_server as openenv_http_server
except ImportError:
try:
from openenv_core.env_server import http_server as openenv_http_server
except Exception as e: # pragma: no cover
raise ImportError(
"openenv is required for the web interface. Install dependencies with '\n uv sync\n'"
) from e
from models import SupportDeskAction, SupportDeskObservation, SupportDeskState
from server.supportdesk_environment import SupportDeskEnvironment
from tasks import TASKS
# Bind the default OpenEnv /state route to the full typed state model.
openenv_http_server.State = SupportDeskState
create_app = openenv_http_server.create_app
# Create the app with web interface and README integration.
app = create_app(
SupportDeskEnvironment,
SupportDeskAction,
SupportDeskObservation,
env_name="supportdesk_env",
max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions
)
TASK_GRADER_PATHS = {
"billing_refund_easy": "graders:BillingRefundEasyGrader",
"account_takeover_medium": "graders:AccountTakeoverMediumGrader",
"api_incident_hard": "graders:ApiIncidentHardGrader",
"regulated_export_exception_hard": "graders:RegulatedExportExceptionHardGrader",
}
def _replace_route(path: str, methods: set[str]) -> None:
"""Remove a generated route so we can register a score-aware replacement."""
app.router.routes = [
route
for route in app.router.routes
if not (
isinstance(route, APIRoute)
and route.path == path
and methods.issubset(set(route.methods or set()))
)
]
def _score_response(env: SupportDeskEnvironment, observation: SupportDeskObservation) -> dict[str, Any]:
"""Return the standard OpenEnv shape plus an explicit top-level score."""
return {
"observation": observation.model_dump(),
"reward": observation.reward,
"done": observation.done,
"score": env.state.current_score,
}
_replace_route("/reset", {"POST"})
_replace_route("/step", {"POST"})
@app.post("/reset")
async def reset_with_score(
request: openenv_http_server.ResetRequest = Body(default_factory=openenv_http_server.ResetRequest),
) -> dict[str, Any]:
"""Reset the environment and expose the initial deterministic score at top level."""
env = SupportDeskEnvironment()
try:
kwargs = request.model_dump(exclude_unset=True)
observation = env.reset(**kwargs)
return _score_response(env, observation)
finally:
env.close()
@app.post("/step")
async def step_with_score(request: openenv_http_server.StepRequest) -> dict[str, Any]:
"""Execute a step and expose the current deterministic score at top level."""
action_data = request.action
try:
action = openenv_http_server.deserialize_action(action_data, SupportDeskAction)
except openenv_http_server.ValidationError as exc:
raise HTTPException(status_code=422, detail=exc.errors()) from exc
env = SupportDeskEnvironment()
try:
kwargs = request.model_dump(exclude_unset=True, exclude={"action"})
observation = env.step(action, **kwargs)
return _score_response(env, observation)
finally:
env.close()
@app.get("/tasks")
def list_tasks() -> dict[str, Any]:
"""Expose a stable task catalog for UI, debugging, and pre-submit checks."""
return {
"environment": {
"name": "supportdesk_env",
"version": "0.1.0",
"grader_type": "deterministic",
"score_range": [0.0, 1.0],
},
"total_tasks": len(TASKS),
"tasks": [
{
"task_id": task.task_id,
"grader": TASK_GRADER_PATHS[task.task_id],
"title": task.title,
"difficulty": task.difficulty,
"objective": task.objective,
"max_steps": task.max_steps,
"gold_issue_type": task.gold_issue_type,
"gold_queue": task.gold_queue,
"gold_priority": task.gold_priority,
"ticket_context": {
"customer_tier": task.ticket.customer_tier,
"region": task.ticket.region,
"affected_users": task.ticket.affected_users,
"sla_minutes_remaining": task.ticket.sla_minutes_remaining,
},
}
for task in TASKS.values()
],
}
@app.get("/episodes/{episode_id}/state", response_model=SupportDeskState)
def get_episode_state(episode_id: str) -> SupportDeskState:
"""Optional explicit state helper for robust episode-addressable inspection."""
try:
return SupportDeskEnvironment.state_for_episode(episode_id)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.post("/episodes/{episode_id}/step")
def step_episode(
episode_id: str,
payload: dict[str, Any] = Body(...),
) -> dict[str, Any]:
"""Optional explicit step helper that does not require sticky request context."""
action_payload = payload.get("action")
if not isinstance(action_payload, dict):
raise HTTPException(status_code=422, detail="Request body must include an 'action' object.")
timeout_s = payload.get("timeout_s")
try:
action = SupportDeskAction.model_validate(action_payload)
env = SupportDeskEnvironment()
observation = env.step(action, timeout_s=timeout_s, episode_id=episode_id)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return {
"observation": observation.model_dump(),
"reward": observation.reward,
"done": observation.done,
"score": SupportDeskEnvironment.state_for_episode(episode_id).current_score,
}
def main(host: str = "0.0.0.0", port: int = 8000) -> None:
"""
Entry point for direct execution via uv run or python -m.
This function enables running the server without Docker:
uv run --project . server
uv run --project . server --port 8001
python -m server.app
Args:
host: Host address to bind to (default: "0.0.0.0")
port: Port number to listen on (default: 8000)
For production deployments, consider using uvicorn directly with
multiple workers:
uvicorn server.app:app --workers 4
"""
uvicorn.run("server.app:app", host=host, port=port)
if __name__ == '__main__':
main()
|