File size: 10,868 Bytes
31715b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""OpenEnv-conformant adapter for OpenSleuthEnv.

Wraps the existing multi-episode :class:`OpenSleuthEnv` registry as a
single-episode-per-session :class:`openenv.core.env_server.interfaces.Environment`
so the canonical OpenEnv HTTP / WebSocket protocol can be served alongside
the legacy ``/reset`` + ``/step`` endpoints the in-flight trainer uses.

This module is *additive*. It does not touch the legacy server contract;
``server.py`` mounts the OpenEnv-style sub-application at ``/openenv/*`` so the
trainer (which talks to the bare ``/reset`` and ``/step``) is unaffected.

The adapter conforms to OpenEnv 0.2.x:

* ``Environment.reset(seed, episode_id, **kwargs) -> Observation``
* ``Environment.step(action, timeout_s, **kwargs) -> Observation``
* ``Environment.state -> State``
* ``Environment.get_metadata() -> EnvironmentMetadata``

See https://github.com/meta-pytorch/OpenEnv (v0.2.3, BSD-3) for the spec.
"""

from __future__ import annotations

from typing import Any, List, Literal, Optional
from uuid import uuid4

from pydantic import Field

try:
    from openenv.core.env_server.interfaces import Environment
    from openenv.core.env_server.types import (
        Action as OEAction,
        EnvironmentMetadata,
        Observation as OEObservation,
        State as OEState,
    )

    OPENENV_AVAILABLE = True
except ImportError:  # pragma: no cover - openenv is required at runtime in the Space
    OPENENV_AVAILABLE = False
    OEAction = object  # type: ignore[assignment, misc]
    OEObservation = object  # type: ignore[assignment, misc]
    OEState = object  # type: ignore[assignment, misc]
    Environment = object  # type: ignore[assignment, misc]
    EnvironmentMetadata = object  # type: ignore[assignment, misc]

from .env import OpenSleuthEnv
from .models import ProbeAction, SubmitAction


if OPENENV_AVAILABLE:

    class OpenSleuthAction(OEAction):
        """Unified OpenEnv-style action.

        The OpenEnv spec wants a single concrete Action subclass per
        environment; we encode the probe / submit choice via the
        ``action_type`` discriminator field. Internally we still translate
        to the original :class:`ProbeAction` / :class:`SubmitAction` so the
        legacy reward shaping is preserved bit-for-bit.
        """

        action_type: Literal["probe", "submit"] = Field(
            ..., description="Either 'probe' (with input_repr) or 'submit' (with code)."
        )
        input_repr: Optional[str] = Field(
            default=None,
            description="Python literal repr of the probe input. Required when action_type='probe'.",
        )
        code: Optional[str] = Field(
            default=None,
            description="Python source defining the target function. Required when action_type='submit'.",
        )

    class OpenSleuthObservation(OEObservation):
        """OpenEnv observation wrapper.

        OpenEnv's ``Observation`` base class supplies ``done``, ``reward``,
        and ``metadata``. We add OpenSleuth-specific fields for the agent
        (target signature, probe history, etc.). Trainer-facing structured
        info is also surfaced via ``info`` for backwards compat.
        """

        episode_id: str = Field(default="", description="Per-session episode id.")
        target_function_name: str = Field(default="")
        target_function_signature: str = Field(
            default="", description="Public signature + docstring for the target."
        )
        probe_history: List[dict] = Field(
            default_factory=list,
            description="Recent probe records (input_repr, output_repr, is_error, ...).",
        )
        last_error: str = Field(default="", description="Last error string, if any.")
        steps_taken: int = Field(default=0)
        max_steps: int = Field(default=25)
        difficulty: Optional[str] = Field(default=None)
        coverage_buckets_seen: int = Field(default=0)
        seen_outputs_count: int = Field(default=0)
        seen_error_types_count: int = Field(default=0)
        info: dict = Field(
            default_factory=dict,
            description="Structured info from the underlying step (matches the legacy info dict).",
        )

    class OpenSleuthState(OEState):
        """OpenEnv-style episode state."""

        target_function_name: Optional[str] = Field(default=None)
        max_steps: int = Field(default=25)
        finished: bool = Field(default=False)

    class OpenSleuthEnvironment(Environment):
        """OpenEnv-conformant adapter around :class:`OpenSleuthEnv`.

        One adapter instance == one episode (one WebSocket session). Inside,
        we keep a single :class:`OpenSleuthEnv` registry but only ever populate
        a single episode at a time.

        ``SUPPORTS_CONCURRENT_SESSIONS = True`` is safe because each WebSocket
        connection in OpenEnv's :class:`HTTPEnvServer` instantiates its own
        :class:`OpenSleuthEnvironment`, and our underlying registries are
        per-instance.
        """

        SUPPORTS_CONCURRENT_SESSIONS = True

        def __init__(self) -> None:
            super().__init__()
            self._env = OpenSleuthEnv()
            self._episode_id: Optional[str] = None
            self._target_function_name: Optional[str] = None
            self._max_steps: int = 25
            self._step_count: int = 0
            self._done: bool = False

        def reset(  # type: ignore[override]
            self,
            seed: Optional[int] = None,
            episode_id: Optional[str] = None,
            target_name: Optional[str] = None,
            target_code: Optional[str] = None,
            target_function_name: Optional[str] = None,
            max_steps: int = 25,
            edge_cases: Optional[list] = None,
            fuzz_spec: Optional[dict] = None,
            **kwargs: Any,
        ) -> "OpenSleuthObservation":
            # Default to a builtin so a bare reset() still produces a valid
            # episode (per OpenEnv spec, reset() with no args must work).
            if not target_name and not target_code:
                target_name = "fibonacci"
            obs = self._env.reset(
                target_name=target_name,
                seed=seed if seed is not None else 0,
                max_steps=max_steps,
                target_code=target_code,
                target_function_name=target_function_name,
                edge_cases=edge_cases,
                fuzz_spec=fuzz_spec,
            )
            self._episode_id = episode_id or obs.episode_id
            self._target_function_name = obs.target_function_name
            self._max_steps = max_steps
            self._step_count = 0
            self._done = False
            return self._wrap_obs(obs, reward=None, done=False, info={})

        def step(  # type: ignore[override]
            self,
            action: "OpenSleuthAction",
            timeout_s: Optional[float] = None,
            **kwargs: Any,
        ) -> "OpenSleuthObservation":
            if self._episode_id is None:
                # Auto-reset on first step with the default target so HTTP /step
                # smoke tests don't 500 just because /reset wasn't called first.
                self.reset()

            internal_action: Any
            if action.action_type == "probe":
                if action.input_repr is None:
                    raise ValueError(
                        "OpenSleuthAction(action_type='probe') requires input_repr."
                    )
                internal_action = ProbeAction(input_repr=action.input_repr)
            elif action.action_type == "submit":
                if action.code is None:
                    raise ValueError(
                        "OpenSleuthAction(action_type='submit') requires code."
                    )
                internal_action = SubmitAction(code=action.code)
            else:  # pragma: no cover - Pydantic Literal already constrains this
                raise ValueError(f"Unknown action_type: {action.action_type!r}")

            assert self._episode_id is not None
            resp = self._env.step(self._episode_id, internal_action)
            self._step_count += 1
            self._done = resp.done
            return self._wrap_obs(
                resp.observation, reward=resp.reward, done=resp.done, info=resp.info
            )

        @property
        def state(self) -> "OpenSleuthState":  # type: ignore[override]
            return OpenSleuthState(
                episode_id=self._episode_id,
                step_count=self._step_count,
                target_function_name=self._target_function_name,
                max_steps=self._max_steps,
                finished=self._done,
            )

        def get_metadata(self) -> "EnvironmentMetadata":  # type: ignore[override]
            return EnvironmentMetadata(
                name="OpenSleuth",
                description=(
                    "Algorithmic detective: probe a hidden Python function then submit "
                    "code that reproduces it. Used for GRPO RL training on Qwen-2.5."
                ),
                version="0.4.1",
                author="OpenSleuth team",
                documentation_url=(
                    "https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli"
                ),
            )

        def close(self) -> None:  # type: ignore[override]
            self._episode_id = None
            self._target_function_name = None
            self._step_count = 0
            self._done = False

        def _wrap_obs(
            self,
            internal_obs: Any,
            *,
            reward: Optional[float],
            done: bool,
            info: dict,
        ) -> "OpenSleuthObservation":
            return OpenSleuthObservation(
                done=done,
                reward=reward,
                episode_id=internal_obs.episode_id,
                target_function_name=internal_obs.target_function_name,
                target_function_signature=internal_obs.target_function_signature,
                probe_history=[r.model_dump() for r in internal_obs.probe_history],
                last_error=internal_obs.last_error,
                steps_taken=internal_obs.steps_taken,
                max_steps=internal_obs.max_steps,
                difficulty=internal_obs.difficulty,
                coverage_buckets_seen=internal_obs.coverage_buckets_seen,
                seen_outputs_count=internal_obs.seen_outputs_count,
                seen_error_types_count=internal_obs.seen_error_types_count,
                info=info,
                metadata={"info": info},
            )


__all__ = ["OPENENV_AVAILABLE"]
if OPENENV_AVAILABLE:
    __all__ += [
        "OpenSleuthAction",
        "OpenSleuthObservation",
        "OpenSleuthState",
        "OpenSleuthEnvironment",
    ]