Spaces:
Sleeping
Sleeping
| """Python client for the Forge + Arena environment. | |
| Usage:: | |
| from client import ForgeArenaEnv, OverseerInspectAction | |
| with ForgeArenaEnv(base_url="https://amogh-kal1-forge-arena.hf.space/api") as env: | |
| obs = env.reset() | |
| print(obs.task_description) | |
| result = env.step(OverseerInspectAction( | |
| detection=True, | |
| explanation="The Worker omitted the mandatory disclaimer.", | |
| correction="...", | |
| confidence=0.85, | |
| )) | |
| print(result.composite_reward) | |
| """ | |
| from __future__ import annotations | |
| from typing import Dict | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from .models import ( | |
| AnyForgeAction, | |
| ForgeEpisodeResult, | |
| ForgeObservation, | |
| OverseerInspectAction, | |
| OverseerProbeAction, | |
| ) | |
| class ForgeArenaEnv(EnvClient[AnyForgeAction, ForgeObservation, State]): | |
| """Client for the Forge + Arena oversight environment. | |
| Maintains a persistent connection to the environment server and exposes the | |
| standard openenv ``reset()`` / ``step()`` interface. | |
| Args: | |
| base_url: Base URL of a running Forge + Arena server. | |
| Local: ``http://localhost:8000`` | |
| Space: ``https://amogh-kal1-forge-arena.hf.space/api`` | |
| Example:: | |
| with ForgeArenaEnv(base_url="http://localhost:8000") as env: | |
| obs = env.reset() | |
| result = env.step(OverseerInspectAction( | |
| detection=False, confidence=0.4, | |
| )) | |
| """ | |
| def _step_payload(self, action: AnyForgeAction) -> Dict: | |
| """Serialise the action to the JSON body expected by POST /step.""" | |
| return action.model_dump() | |
| def _parse_result( | |
| self, payload: Dict | |
| ) -> StepResult[ForgeObservation | ForgeEpisodeResult]: | |
| """Parse the server response into a typed StepResult.""" | |
| done = payload.get("done", False) | |
| if done: | |
| obs_data = payload.get("observation", payload) | |
| observation: ForgeObservation | ForgeEpisodeResult = ForgeEpisodeResult( | |
| episode_id=obs_data.get("episode_id", ""), | |
| domain=obs_data.get("domain", ""), | |
| worker_cot=obs_data.get("worker_cot", ""), | |
| worker_output=obs_data.get("worker_output", ""), | |
| done=True, | |
| corruption_present=obs_data.get("corruption_present", False), | |
| corruption_type=obs_data.get("corruption_type"), | |
| ground_truth_output=obs_data.get("ground_truth_output", ""), | |
| overseer_detection=obs_data.get("overseer_detection", False), | |
| overseer_explanation=obs_data.get("overseer_explanation", ""), | |
| overseer_correction=obs_data.get("overseer_correction", ""), | |
| overseer_confidence=obs_data.get("overseer_confidence", 0.0), | |
| composite_reward=obs_data.get("composite_reward", 0.0), | |
| detection_score=obs_data.get("detection_score", 0.0), | |
| explanation_score=obs_data.get("explanation_score", 0.0), | |
| correction_score=obs_data.get("correction_score", 0.0), | |
| calibration_score=obs_data.get("calibration_score", 0.0), | |
| ) | |
| else: | |
| obs_data = payload.get("observation", payload) | |
| observation = ForgeObservation( | |
| episode_id=obs_data.get("episode_id", ""), | |
| task_description=obs_data.get("task_description", ""), | |
| source_material=obs_data.get("source_material", ""), | |
| domain=obs_data.get("domain", ""), | |
| worker_cot=obs_data.get("worker_cot", ""), | |
| worker_output=obs_data.get("worker_output", ""), | |
| done=False, | |
| phase=obs_data.get("phase", "overseer_inspecting"), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=done, | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| """Parse the server state response.""" | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |