"""OpenEnv-conformant adapter for OpenSleuthEnv. Wraps the existing multi-episode :class:`OpenSleuthEnv` registry as a single-episode-per-session :class:`openenv.core.env_server.interfaces.Environment` so the canonical OpenEnv HTTP / WebSocket protocol can be served alongside the legacy ``/reset`` + ``/step`` endpoints the in-flight trainer uses. This module is *additive*. It does not touch the legacy server contract; ``server.py`` mounts the OpenEnv-style sub-application at ``/openenv/*`` so the trainer (which talks to the bare ``/reset`` and ``/step``) is unaffected. The adapter conforms to OpenEnv 0.2.x: * ``Environment.reset(seed, episode_id, **kwargs) -> Observation`` * ``Environment.step(action, timeout_s, **kwargs) -> Observation`` * ``Environment.state -> State`` * ``Environment.get_metadata() -> EnvironmentMetadata`` See https://github.com/meta-pytorch/OpenEnv (v0.2.3, BSD-3) for the spec. """ from __future__ import annotations from typing import Any, List, Literal, Optional from uuid import uuid4 from pydantic import Field try: from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import ( Action as OEAction, EnvironmentMetadata, Observation as OEObservation, State as OEState, ) OPENENV_AVAILABLE = True except ImportError: # pragma: no cover - openenv is required at runtime in the Space OPENENV_AVAILABLE = False OEAction = object # type: ignore[assignment, misc] OEObservation = object # type: ignore[assignment, misc] OEState = object # type: ignore[assignment, misc] Environment = object # type: ignore[assignment, misc] EnvironmentMetadata = object # type: ignore[assignment, misc] from .env import OpenSleuthEnv from .models import ProbeAction, SubmitAction if OPENENV_AVAILABLE: class OpenSleuthAction(OEAction): """Unified OpenEnv-style action. The OpenEnv spec wants a single concrete Action subclass per environment; we encode the probe / submit choice via the ``action_type`` discriminator field. Internally we still translate to the original :class:`ProbeAction` / :class:`SubmitAction` so the legacy reward shaping is preserved bit-for-bit. """ action_type: Literal["probe", "submit"] = Field( ..., description="Either 'probe' (with input_repr) or 'submit' (with code)." ) input_repr: Optional[str] = Field( default=None, description="Python literal repr of the probe input. Required when action_type='probe'.", ) code: Optional[str] = Field( default=None, description="Python source defining the target function. Required when action_type='submit'.", ) class OpenSleuthObservation(OEObservation): """OpenEnv observation wrapper. OpenEnv's ``Observation`` base class supplies ``done``, ``reward``, and ``metadata``. We add OpenSleuth-specific fields for the agent (target signature, probe history, etc.). Trainer-facing structured info is also surfaced via ``info`` for backwards compat. """ episode_id: str = Field(default="", description="Per-session episode id.") target_function_name: str = Field(default="") target_function_signature: str = Field( default="", description="Public signature + docstring for the target." ) probe_history: List[dict] = Field( default_factory=list, description="Recent probe records (input_repr, output_repr, is_error, ...).", ) last_error: str = Field(default="", description="Last error string, if any.") steps_taken: int = Field(default=0) max_steps: int = Field(default=25) difficulty: Optional[str] = Field(default=None) coverage_buckets_seen: int = Field(default=0) seen_outputs_count: int = Field(default=0) seen_error_types_count: int = Field(default=0) info: dict = Field( default_factory=dict, description="Structured info from the underlying step (matches the legacy info dict).", ) class OpenSleuthState(OEState): """OpenEnv-style episode state.""" target_function_name: Optional[str] = Field(default=None) max_steps: int = Field(default=25) finished: bool = Field(default=False) class OpenSleuthEnvironment(Environment): """OpenEnv-conformant adapter around :class:`OpenSleuthEnv`. One adapter instance == one episode (one WebSocket session). Inside, we keep a single :class:`OpenSleuthEnv` registry but only ever populate a single episode at a time. ``SUPPORTS_CONCURRENT_SESSIONS = True`` is safe because each WebSocket connection in OpenEnv's :class:`HTTPEnvServer` instantiates its own :class:`OpenSleuthEnvironment`, and our underlying registries are per-instance. """ SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: super().__init__() self._env = OpenSleuthEnv() self._episode_id: Optional[str] = None self._target_function_name: Optional[str] = None self._max_steps: int = 25 self._step_count: int = 0 self._done: bool = False def reset( # type: ignore[override] self, seed: Optional[int] = None, episode_id: Optional[str] = None, target_name: Optional[str] = None, target_code: Optional[str] = None, target_function_name: Optional[str] = None, max_steps: int = 25, edge_cases: Optional[list] = None, fuzz_spec: Optional[dict] = None, **kwargs: Any, ) -> "OpenSleuthObservation": # Default to a builtin so a bare reset() still produces a valid # episode (per OpenEnv spec, reset() with no args must work). if not target_name and not target_code: target_name = "fibonacci" obs = self._env.reset( target_name=target_name, seed=seed if seed is not None else 0, max_steps=max_steps, target_code=target_code, target_function_name=target_function_name, edge_cases=edge_cases, fuzz_spec=fuzz_spec, ) self._episode_id = episode_id or obs.episode_id self._target_function_name = obs.target_function_name self._max_steps = max_steps self._step_count = 0 self._done = False return self._wrap_obs(obs, reward=None, done=False, info={}) def step( # type: ignore[override] self, action: "OpenSleuthAction", timeout_s: Optional[float] = None, **kwargs: Any, ) -> "OpenSleuthObservation": if self._episode_id is None: # Auto-reset on first step with the default target so HTTP /step # smoke tests don't 500 just because /reset wasn't called first. self.reset() internal_action: Any if action.action_type == "probe": if action.input_repr is None: raise ValueError( "OpenSleuthAction(action_type='probe') requires input_repr." ) internal_action = ProbeAction(input_repr=action.input_repr) elif action.action_type == "submit": if action.code is None: raise ValueError( "OpenSleuthAction(action_type='submit') requires code." ) internal_action = SubmitAction(code=action.code) else: # pragma: no cover - Pydantic Literal already constrains this raise ValueError(f"Unknown action_type: {action.action_type!r}") assert self._episode_id is not None resp = self._env.step(self._episode_id, internal_action) self._step_count += 1 self._done = resp.done return self._wrap_obs( resp.observation, reward=resp.reward, done=resp.done, info=resp.info ) @property def state(self) -> "OpenSleuthState": # type: ignore[override] return OpenSleuthState( episode_id=self._episode_id, step_count=self._step_count, target_function_name=self._target_function_name, max_steps=self._max_steps, finished=self._done, ) def get_metadata(self) -> "EnvironmentMetadata": # type: ignore[override] return EnvironmentMetadata( name="OpenSleuth", description=( "Algorithmic detective: probe a hidden Python function then submit " "code that reproduces it. Used for GRPO RL training on Qwen-2.5." ), version="0.4.1", author="OpenSleuth team", documentation_url=( "https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli" ), ) def close(self) -> None: # type: ignore[override] self._episode_id = None self._target_function_name = None self._step_count = 0 self._done = False def _wrap_obs( self, internal_obs: Any, *, reward: Optional[float], done: bool, info: dict, ) -> "OpenSleuthObservation": return OpenSleuthObservation( done=done, reward=reward, episode_id=internal_obs.episode_id, target_function_name=internal_obs.target_function_name, target_function_signature=internal_obs.target_function_signature, probe_history=[r.model_dump() for r in internal_obs.probe_history], last_error=internal_obs.last_error, steps_taken=internal_obs.steps_taken, max_steps=internal_obs.max_steps, difficulty=internal_obs.difficulty, coverage_buckets_seen=internal_obs.coverage_buckets_seen, seen_outputs_count=internal_obs.seen_outputs_count, seen_error_types_count=internal_obs.seen_error_types_count, info=info, metadata={"info": info}, ) __all__ = ["OPENENV_AVAILABLE"] if OPENENV_AVAILABLE: __all__ += [ "OpenSleuthAction", "OpenSleuthObservation", "OpenSleuthState", "OpenSleuthEnvironment", ]