anugrah55's picture
OpenEnv 0.2.3 conformance: mount /openenv sub-app, add adapter + tests + example client
31715b5 verified
"""OpenEnv-conformant adapter for OpenSleuthEnv.
Wraps the existing multi-episode :class:`OpenSleuthEnv` registry as a
single-episode-per-session :class:`openenv.core.env_server.interfaces.Environment`
so the canonical OpenEnv HTTP / WebSocket protocol can be served alongside
the legacy ``/reset`` + ``/step`` endpoints the in-flight trainer uses.
This module is *additive*. It does not touch the legacy server contract;
``server.py`` mounts the OpenEnv-style sub-application at ``/openenv/*`` so the
trainer (which talks to the bare ``/reset`` and ``/step``) is unaffected.
The adapter conforms to OpenEnv 0.2.x:
* ``Environment.reset(seed, episode_id, **kwargs) -> Observation``
* ``Environment.step(action, timeout_s, **kwargs) -> Observation``
* ``Environment.state -> State``
* ``Environment.get_metadata() -> EnvironmentMetadata``
See https://github.com/meta-pytorch/OpenEnv (v0.2.3, BSD-3) for the spec.
"""
from __future__ import annotations
from typing import Any, List, Literal, Optional
from uuid import uuid4
from pydantic import Field
try:
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import (
Action as OEAction,
EnvironmentMetadata,
Observation as OEObservation,
State as OEState,
)
OPENENV_AVAILABLE = True
except ImportError: # pragma: no cover - openenv is required at runtime in the Space
OPENENV_AVAILABLE = False
OEAction = object # type: ignore[assignment, misc]
OEObservation = object # type: ignore[assignment, misc]
OEState = object # type: ignore[assignment, misc]
Environment = object # type: ignore[assignment, misc]
EnvironmentMetadata = object # type: ignore[assignment, misc]
from .env import OpenSleuthEnv
from .models import ProbeAction, SubmitAction
if OPENENV_AVAILABLE:
class OpenSleuthAction(OEAction):
"""Unified OpenEnv-style action.
The OpenEnv spec wants a single concrete Action subclass per
environment; we encode the probe / submit choice via the
``action_type`` discriminator field. Internally we still translate
to the original :class:`ProbeAction` / :class:`SubmitAction` so the
legacy reward shaping is preserved bit-for-bit.
"""
action_type: Literal["probe", "submit"] = Field(
..., description="Either 'probe' (with input_repr) or 'submit' (with code)."
)
input_repr: Optional[str] = Field(
default=None,
description="Python literal repr of the probe input. Required when action_type='probe'.",
)
code: Optional[str] = Field(
default=None,
description="Python source defining the target function. Required when action_type='submit'.",
)
class OpenSleuthObservation(OEObservation):
"""OpenEnv observation wrapper.
OpenEnv's ``Observation`` base class supplies ``done``, ``reward``,
and ``metadata``. We add OpenSleuth-specific fields for the agent
(target signature, probe history, etc.). Trainer-facing structured
info is also surfaced via ``info`` for backwards compat.
"""
episode_id: str = Field(default="", description="Per-session episode id.")
target_function_name: str = Field(default="")
target_function_signature: str = Field(
default="", description="Public signature + docstring for the target."
)
probe_history: List[dict] = Field(
default_factory=list,
description="Recent probe records (input_repr, output_repr, is_error, ...).",
)
last_error: str = Field(default="", description="Last error string, if any.")
steps_taken: int = Field(default=0)
max_steps: int = Field(default=25)
difficulty: Optional[str] = Field(default=None)
coverage_buckets_seen: int = Field(default=0)
seen_outputs_count: int = Field(default=0)
seen_error_types_count: int = Field(default=0)
info: dict = Field(
default_factory=dict,
description="Structured info from the underlying step (matches the legacy info dict).",
)
class OpenSleuthState(OEState):
"""OpenEnv-style episode state."""
target_function_name: Optional[str] = Field(default=None)
max_steps: int = Field(default=25)
finished: bool = Field(default=False)
class OpenSleuthEnvironment(Environment):
"""OpenEnv-conformant adapter around :class:`OpenSleuthEnv`.
One adapter instance == one episode (one WebSocket session). Inside,
we keep a single :class:`OpenSleuthEnv` registry but only ever populate
a single episode at a time.
``SUPPORTS_CONCURRENT_SESSIONS = True`` is safe because each WebSocket
connection in OpenEnv's :class:`HTTPEnvServer` instantiates its own
:class:`OpenSleuthEnvironment`, and our underlying registries are
per-instance.
"""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._env = OpenSleuthEnv()
self._episode_id: Optional[str] = None
self._target_function_name: Optional[str] = None
self._max_steps: int = 25
self._step_count: int = 0
self._done: bool = False
def reset( # type: ignore[override]
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
target_name: Optional[str] = None,
target_code: Optional[str] = None,
target_function_name: Optional[str] = None,
max_steps: int = 25,
edge_cases: Optional[list] = None,
fuzz_spec: Optional[dict] = None,
**kwargs: Any,
) -> "OpenSleuthObservation":
# Default to a builtin so a bare reset() still produces a valid
# episode (per OpenEnv spec, reset() with no args must work).
if not target_name and not target_code:
target_name = "fibonacci"
obs = self._env.reset(
target_name=target_name,
seed=seed if seed is not None else 0,
max_steps=max_steps,
target_code=target_code,
target_function_name=target_function_name,
edge_cases=edge_cases,
fuzz_spec=fuzz_spec,
)
self._episode_id = episode_id or obs.episode_id
self._target_function_name = obs.target_function_name
self._max_steps = max_steps
self._step_count = 0
self._done = False
return self._wrap_obs(obs, reward=None, done=False, info={})
def step( # type: ignore[override]
self,
action: "OpenSleuthAction",
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> "OpenSleuthObservation":
if self._episode_id is None:
# Auto-reset on first step with the default target so HTTP /step
# smoke tests don't 500 just because /reset wasn't called first.
self.reset()
internal_action: Any
if action.action_type == "probe":
if action.input_repr is None:
raise ValueError(
"OpenSleuthAction(action_type='probe') requires input_repr."
)
internal_action = ProbeAction(input_repr=action.input_repr)
elif action.action_type == "submit":
if action.code is None:
raise ValueError(
"OpenSleuthAction(action_type='submit') requires code."
)
internal_action = SubmitAction(code=action.code)
else: # pragma: no cover - Pydantic Literal already constrains this
raise ValueError(f"Unknown action_type: {action.action_type!r}")
assert self._episode_id is not None
resp = self._env.step(self._episode_id, internal_action)
self._step_count += 1
self._done = resp.done
return self._wrap_obs(
resp.observation, reward=resp.reward, done=resp.done, info=resp.info
)
@property
def state(self) -> "OpenSleuthState": # type: ignore[override]
return OpenSleuthState(
episode_id=self._episode_id,
step_count=self._step_count,
target_function_name=self._target_function_name,
max_steps=self._max_steps,
finished=self._done,
)
def get_metadata(self) -> "EnvironmentMetadata": # type: ignore[override]
return EnvironmentMetadata(
name="OpenSleuth",
description=(
"Algorithmic detective: probe a hidden Python function then submit "
"code that reproduces it. Used for GRPO RL training on Qwen-2.5."
),
version="0.4.1",
author="OpenSleuth team",
documentation_url=(
"https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli"
),
)
def close(self) -> None: # type: ignore[override]
self._episode_id = None
self._target_function_name = None
self._step_count = 0
self._done = False
def _wrap_obs(
self,
internal_obs: Any,
*,
reward: Optional[float],
done: bool,
info: dict,
) -> "OpenSleuthObservation":
return OpenSleuthObservation(
done=done,
reward=reward,
episode_id=internal_obs.episode_id,
target_function_name=internal_obs.target_function_name,
target_function_signature=internal_obs.target_function_signature,
probe_history=[r.model_dump() for r in internal_obs.probe_history],
last_error=internal_obs.last_error,
steps_taken=internal_obs.steps_taken,
max_steps=internal_obs.max_steps,
difficulty=internal_obs.difficulty,
coverage_buckets_seen=internal_obs.coverage_buckets_seen,
seen_outputs_count=internal_obs.seen_outputs_count,
seen_error_types_count=internal_obs.seen_error_types_count,
info=info,
metadata={"info": info},
)
__all__ = ["OPENENV_AVAILABLE"]
if OPENENV_AVAILABLE:
__all__ += [
"OpenSleuthAction",
"OpenSleuthObservation",
"OpenSleuthState",
"OpenSleuthEnvironment",
]