| """OpenEnv-conformant adapter for OpenSleuthEnv. |
| |
| Wraps the existing multi-episode :class:`OpenSleuthEnv` registry as a |
| single-episode-per-session :class:`openenv.core.env_server.interfaces.Environment` |
| so the canonical OpenEnv HTTP / WebSocket protocol can be served alongside |
| the legacy ``/reset`` + ``/step`` endpoints the in-flight trainer uses. |
| |
| This module is *additive*. It does not touch the legacy server contract; |
| ``server.py`` mounts the OpenEnv-style sub-application at ``/openenv/*`` so the |
| trainer (which talks to the bare ``/reset`` and ``/step``) is unaffected. |
| |
| The adapter conforms to OpenEnv 0.2.x: |
| |
| * ``Environment.reset(seed, episode_id, **kwargs) -> Observation`` |
| * ``Environment.step(action, timeout_s, **kwargs) -> Observation`` |
| * ``Environment.state -> State`` |
| * ``Environment.get_metadata() -> EnvironmentMetadata`` |
| |
| See https://github.com/meta-pytorch/OpenEnv (v0.2.3, BSD-3) for the spec. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Any, List, Literal, Optional |
| from uuid import uuid4 |
|
|
| from pydantic import Field |
|
|
| try: |
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import ( |
| Action as OEAction, |
| EnvironmentMetadata, |
| Observation as OEObservation, |
| State as OEState, |
| ) |
|
|
| OPENENV_AVAILABLE = True |
| except ImportError: |
| OPENENV_AVAILABLE = False |
| OEAction = object |
| OEObservation = object |
| OEState = object |
| Environment = object |
| EnvironmentMetadata = object |
|
|
| from .env import OpenSleuthEnv |
| from .models import ProbeAction, SubmitAction |
|
|
|
|
| if OPENENV_AVAILABLE: |
|
|
| class OpenSleuthAction(OEAction): |
| """Unified OpenEnv-style action. |
| |
| The OpenEnv spec wants a single concrete Action subclass per |
| environment; we encode the probe / submit choice via the |
| ``action_type`` discriminator field. Internally we still translate |
| to the original :class:`ProbeAction` / :class:`SubmitAction` so the |
| legacy reward shaping is preserved bit-for-bit. |
| """ |
|
|
| action_type: Literal["probe", "submit"] = Field( |
| ..., description="Either 'probe' (with input_repr) or 'submit' (with code)." |
| ) |
| input_repr: Optional[str] = Field( |
| default=None, |
| description="Python literal repr of the probe input. Required when action_type='probe'.", |
| ) |
| code: Optional[str] = Field( |
| default=None, |
| description="Python source defining the target function. Required when action_type='submit'.", |
| ) |
|
|
| class OpenSleuthObservation(OEObservation): |
| """OpenEnv observation wrapper. |
| |
| OpenEnv's ``Observation`` base class supplies ``done``, ``reward``, |
| and ``metadata``. We add OpenSleuth-specific fields for the agent |
| (target signature, probe history, etc.). Trainer-facing structured |
| info is also surfaced via ``info`` for backwards compat. |
| """ |
|
|
| episode_id: str = Field(default="", description="Per-session episode id.") |
| target_function_name: str = Field(default="") |
| target_function_signature: str = Field( |
| default="", description="Public signature + docstring for the target." |
| ) |
| probe_history: List[dict] = Field( |
| default_factory=list, |
| description="Recent probe records (input_repr, output_repr, is_error, ...).", |
| ) |
| last_error: str = Field(default="", description="Last error string, if any.") |
| steps_taken: int = Field(default=0) |
| max_steps: int = Field(default=25) |
| difficulty: Optional[str] = Field(default=None) |
| coverage_buckets_seen: int = Field(default=0) |
| seen_outputs_count: int = Field(default=0) |
| seen_error_types_count: int = Field(default=0) |
| info: dict = Field( |
| default_factory=dict, |
| description="Structured info from the underlying step (matches the legacy info dict).", |
| ) |
|
|
| class OpenSleuthState(OEState): |
| """OpenEnv-style episode state.""" |
|
|
| target_function_name: Optional[str] = Field(default=None) |
| max_steps: int = Field(default=25) |
| finished: bool = Field(default=False) |
|
|
| class OpenSleuthEnvironment(Environment): |
| """OpenEnv-conformant adapter around :class:`OpenSleuthEnv`. |
| |
| One adapter instance == one episode (one WebSocket session). Inside, |
| we keep a single :class:`OpenSleuthEnv` registry but only ever populate |
| a single episode at a time. |
| |
| ``SUPPORTS_CONCURRENT_SESSIONS = True`` is safe because each WebSocket |
| connection in OpenEnv's :class:`HTTPEnvServer` instantiates its own |
| :class:`OpenSleuthEnvironment`, and our underlying registries are |
| per-instance. |
| """ |
|
|
| SUPPORTS_CONCURRENT_SESSIONS = True |
|
|
| def __init__(self) -> None: |
| super().__init__() |
| self._env = OpenSleuthEnv() |
| self._episode_id: Optional[str] = None |
| self._target_function_name: Optional[str] = None |
| self._max_steps: int = 25 |
| self._step_count: int = 0 |
| self._done: bool = False |
|
|
| def reset( |
| self, |
| seed: Optional[int] = None, |
| episode_id: Optional[str] = None, |
| target_name: Optional[str] = None, |
| target_code: Optional[str] = None, |
| target_function_name: Optional[str] = None, |
| max_steps: int = 25, |
| edge_cases: Optional[list] = None, |
| fuzz_spec: Optional[dict] = None, |
| **kwargs: Any, |
| ) -> "OpenSleuthObservation": |
| |
| |
| if not target_name and not target_code: |
| target_name = "fibonacci" |
| obs = self._env.reset( |
| target_name=target_name, |
| seed=seed if seed is not None else 0, |
| max_steps=max_steps, |
| target_code=target_code, |
| target_function_name=target_function_name, |
| edge_cases=edge_cases, |
| fuzz_spec=fuzz_spec, |
| ) |
| self._episode_id = episode_id or obs.episode_id |
| self._target_function_name = obs.target_function_name |
| self._max_steps = max_steps |
| self._step_count = 0 |
| self._done = False |
| return self._wrap_obs(obs, reward=None, done=False, info={}) |
|
|
| def step( |
| self, |
| action: "OpenSleuthAction", |
| timeout_s: Optional[float] = None, |
| **kwargs: Any, |
| ) -> "OpenSleuthObservation": |
| if self._episode_id is None: |
| |
| |
| self.reset() |
|
|
| internal_action: Any |
| if action.action_type == "probe": |
| if action.input_repr is None: |
| raise ValueError( |
| "OpenSleuthAction(action_type='probe') requires input_repr." |
| ) |
| internal_action = ProbeAction(input_repr=action.input_repr) |
| elif action.action_type == "submit": |
| if action.code is None: |
| raise ValueError( |
| "OpenSleuthAction(action_type='submit') requires code." |
| ) |
| internal_action = SubmitAction(code=action.code) |
| else: |
| raise ValueError(f"Unknown action_type: {action.action_type!r}") |
|
|
| assert self._episode_id is not None |
| resp = self._env.step(self._episode_id, internal_action) |
| self._step_count += 1 |
| self._done = resp.done |
| return self._wrap_obs( |
| resp.observation, reward=resp.reward, done=resp.done, info=resp.info |
| ) |
|
|
| @property |
| def state(self) -> "OpenSleuthState": |
| return OpenSleuthState( |
| episode_id=self._episode_id, |
| step_count=self._step_count, |
| target_function_name=self._target_function_name, |
| max_steps=self._max_steps, |
| finished=self._done, |
| ) |
|
|
| def get_metadata(self) -> "EnvironmentMetadata": |
| return EnvironmentMetadata( |
| name="OpenSleuth", |
| description=( |
| "Algorithmic detective: probe a hidden Python function then submit " |
| "code that reproduces it. Used for GRPO RL training on Qwen-2.5." |
| ), |
| version="0.4.1", |
| author="OpenSleuth team", |
| documentation_url=( |
| "https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli" |
| ), |
| ) |
|
|
| def close(self) -> None: |
| self._episode_id = None |
| self._target_function_name = None |
| self._step_count = 0 |
| self._done = False |
|
|
| def _wrap_obs( |
| self, |
| internal_obs: Any, |
| *, |
| reward: Optional[float], |
| done: bool, |
| info: dict, |
| ) -> "OpenSleuthObservation": |
| return OpenSleuthObservation( |
| done=done, |
| reward=reward, |
| episode_id=internal_obs.episode_id, |
| target_function_name=internal_obs.target_function_name, |
| target_function_signature=internal_obs.target_function_signature, |
| probe_history=[r.model_dump() for r in internal_obs.probe_history], |
| last_error=internal_obs.last_error, |
| steps_taken=internal_obs.steps_taken, |
| max_steps=internal_obs.max_steps, |
| difficulty=internal_obs.difficulty, |
| coverage_buckets_seen=internal_obs.coverage_buckets_seen, |
| seen_outputs_count=internal_obs.seen_outputs_count, |
| seen_error_types_count=internal_obs.seen_error_types_count, |
| info=info, |
| metadata={"info": info}, |
| ) |
|
|
|
|
| __all__ = ["OPENENV_AVAILABLE"] |
| if OPENENV_AVAILABLE: |
| __all__ += [ |
| "OpenSleuthAction", |
| "OpenSleuthObservation", |
| "OpenSleuthState", |
| "OpenSleuthEnvironment", |
| ] |
|
|