File size: 12,488 Bytes
31715b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""Unit tests for the OpenSleuth env + verifier.

Run with `pytest -q` from the env/ directory.
"""

from __future__ import annotations

import pytest

from opensleuth_env import (
    BLACK_BOX_FUNCTIONS,
    OpenSleuthEnv,
    ProbeAction,
    SubmitAction,
)
from opensleuth_env.env import _bucket_of, NEW_BUCKET_BONUS, NEW_OUTPUT_BONUS, PROBE_STEP_COST
from opensleuth_env.verifier import (
    calculate_complexity_penalty,
    generate_fuzz_inputs,
    get_edge_inputs,
    verify_submission,
    _looks_like_reference_import,
)


# ---------- env transitions ------------------------------------------------


def test_reset_returns_episode_id_and_signature():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci")
    assert obs.episode_id
    assert obs.target_function_name == "fibonacci"
    assert "fibonacci" in obs.target_function_signature
    assert obs.probe_history == []
    assert obs.steps_taken == 0
    # New v0.3 metadata.
    assert obs.difficulty == "easy"
    assert obs.coverage_buckets_seen == 0


def test_unknown_target_raises():
    env = OpenSleuthEnv()
    with pytest.raises(ValueError):
        env.reset("not_a_real_function")


def test_probe_with_int_input_records_output():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci")
    resp = env.step(obs.episode_id, ProbeAction(input_repr="10"))
    assert resp.done is False
    assert resp.observation.probe_history[-1].is_error is False
    assert resp.observation.probe_history[-1].output_repr == "55"
    # First successful probe = NEW_OUTPUT_BONUS + NEW_BUCKET_BONUS + PROBE_STEP_COST.
    expected = NEW_OUTPUT_BONUS + NEW_BUCKET_BONUS + PROBE_STEP_COST
    assert resp.reward == pytest.approx(expected)
    assert resp.info["coverage_bonus"] == pytest.approx(NEW_BUCKET_BONUS)
    assert resp.info["bucket"] == "int:medium"
    assert resp.observation.coverage_buckets_seen == 1
    assert resp.observation.seen_outputs_count == 1


def test_probe_with_invalid_literal_returns_parse_error():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci")
    resp = env.step(obs.episode_id, ProbeAction(input_repr="not a literal"))
    assert resp.done is False
    assert resp.observation.probe_history[-1].error_type == "ParseError"


def test_repeated_output_only_pays_intrinsic_once():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci")
    r1 = env.step(obs.episode_id, ProbeAction(input_repr="10"))
    r2 = env.step(obs.episode_id, ProbeAction(input_repr="10"))
    assert r1.reward > r2.reward
    # Second hit on the same bucket+output: just the per-step cost.
    assert r2.reward == pytest.approx(PROBE_STEP_COST)


def test_step_limit_terminates_episode():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci", max_steps=2)
    env.step(obs.episode_id, ProbeAction(input_repr="1"))
    resp = env.step(obs.episode_id, ProbeAction(input_repr="2"))
    assert resp.done is True


def test_unknown_episode_id_raises():
    env = OpenSleuthEnv()
    with pytest.raises(KeyError):
        env.step("does-not-exist", ProbeAction(input_repr="1"))


# ---------- coverage bucketing (CovRL-Fuzz inspired) -----------------------


def test_bucket_of_distinguishes_qualitative_input_classes():
    assert _bucket_of(0) == "int:zero"
    assert _bucket_of(-1) == "int:negative"
    assert _bucket_of(5) == "int:small"
    assert _bucket_of(50) == "int:medium"
    assert _bucket_of(5000) == "int:large"
    assert _bucket_of(50_000) == "int:huge"
    assert _bucket_of("") == "str:empty"
    assert _bucket_of("a") == "str:singleton"
    assert _bucket_of([]) == "list:empty"
    assert _bucket_of((1, 2)) == "tuple:short"
    assert _bucket_of(True) == "bool:True"  # bool isolated from int
    assert _bucket_of(None) == "none"


def test_probe_distinct_buckets_each_pay_coverage_bonus():
    env = OpenSleuthEnv()
    obs = env.reset("fibonacci")
    # 1 (small), 50 (medium), 5 (already small)
    r1 = env.step(obs.episode_id, ProbeAction(input_repr="1"))
    r2 = env.step(obs.episode_id, ProbeAction(input_repr="50"))
    r3 = env.step(obs.episode_id, ProbeAction(input_repr="5"))
    assert r1.info["coverage_bonus"] == pytest.approx(NEW_BUCKET_BONUS)
    assert r2.info["coverage_bonus"] == pytest.approx(NEW_BUCKET_BONUS)
    assert r3.info["coverage_bonus"] == pytest.approx(0.0)
    assert r3.observation.coverage_buckets_seen == 2


# ---------- verifier -------------------------------------------------------


def test_verifier_perfect_score_on_reference_impl():
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    code = (
        "def fibonacci(n):\n"
        "    if not isinstance(n, int) or n <= 0 or n > 90:\n"
        "        raise ValueError('bad')\n"
        "    a, b = 0, 1\n"
        "    for _ in range(n - 1):\n"
        "        a, b = b, a + b\n"
        "    return b\n"
    )
    inputs = generate_fuzz_inputs(spec, count=30, seed=0)
    edges = get_edge_inputs(spec)
    result = verify_submission(code, spec.fn, inputs, target_name="fibonacci", edge_inputs=edges)
    assert result.matches == 30 + len(edges)
    assert result.execution_reward == pytest.approx(100.0)
    assert result.edge_pass_rate == pytest.approx(1.0)
    assert result.floor_penalty == 0.0
    assert result.reward_hack_penalty == 0.0


def test_verifier_partial_score_on_buggy_impl():
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    buggy = (
        "def fibonacci(n):\n"
        "    if not isinstance(n, int) or n <= 0 or n > 90:\n"
        "        raise ValueError('bad')\n"
        "    a, b = 0, 1\n"
        "    for _ in range(n - 1):\n"
        "        a, b = b, a + b\n"
        "    return b + 1\n"
    )
    inputs = generate_fuzz_inputs(spec, count=30, seed=0)
    result = verify_submission(buggy, spec.fn, inputs, target_name="fibonacci")
    assert result.execution_reward == pytest.approx(0.0)
    assert result.matches == 0
    # Sub-50% match rate triggers the hard floor.
    assert result.floor_penalty == 25.0


def test_verifier_syntax_error_returns_define_error_and_full_penalty():
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    inputs = generate_fuzz_inputs(spec, count=10, seed=0)
    result = verify_submission("def fib(:\n  pass", spec.fn, inputs, target_name="fibonacci")
    assert result.define_error is not None
    assert result.execution_reward == 0.0
    assert result.complexity_penalty == 50.0
    assert result.floor_penalty == 25.0


def test_verifier_missing_target_returns_error():
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    inputs = generate_fuzz_inputs(spec, count=10, seed=0)
    result = verify_submission("def other(x): return x", spec.fn, inputs, target_name="fibonacci")
    assert result.define_error is not None
    assert result.execution_reward == 0.0


def test_complexity_penalty_low_for_simple_fn():
    code = "def f(x): return x\n"
    assert calculate_complexity_penalty(code) < 1.0


def test_complexity_penalty_high_for_branchy_fn():
    body = "\n    ".join(f"if x == {i}: return {i}" for i in range(100))
    code = f"def f(x):\n    {body}\n    return -1\n"
    assert calculate_complexity_penalty(code) > 5.0


# ---------- anti-reward-hacking --------------------------------------------


def test_sandbox_blocks_import_of_reference_module():
    """Critical regression: previously an agent could write::

        from opensleuth_env.black_box import _fibonacci
        def fibonacci(n): return _fibonacci(n)

    and reward-hack to a perfect score. The hardened sandbox must block this.
    """
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    hack = (
        "def fibonacci(n):\n"
        "    from opensleuth_env.black_box import _fibonacci\n"
        "    return _fibonacci(n)\n"
    )
    inputs = generate_fuzz_inputs(spec, count=10, seed=0)
    result = verify_submission(hack, spec.fn, inputs, target_name="fibonacci")
    # Either definition fails (no __import__) or per-call fails. Either way
    # the agent must NOT score positively.
    assert result.execution_reward < 50.0
    # Static detector flagged the import attempt.
    assert result.reward_hack_penalty >= 25.0


def test_static_detector_flags_opensleuth_import():
    code = "import opensleuth_env\ndef f(x): return x\n"
    assert _looks_like_reference_import(code) is True
    assert _looks_like_reference_import("def f(x): return x\n") is False


def test_constant_function_collapse_is_penalised():
    """An agent that learns to always return the same value should be
    penalised even if some random inputs happen to match (e.g. for
    `digit_sum`, `lambda x: 0` matches only x=0)."""
    spec = BLACK_BOX_FUNCTIONS["digit_sum"]
    code = "def digit_sum(n):\n    return 0\n"
    inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 999]
    result = verify_submission(code, spec.fn, inputs, target_name="digit_sum")
    # All distinct inputs return 0 (one signature) while ref produces many.
    assert result.reward_hack_penalty >= 15.0


def test_sandbox_blocks_open_and_eval():
    spec = BLACK_BOX_FUNCTIONS["fibonacci"]
    bad = (
        "def fibonacci(n):\n"
        "    open('/tmp/x', 'w')\n"
        "    return 0\n"
    )
    inputs = generate_fuzz_inputs(spec, count=5, seed=0)
    result = verify_submission(bad, spec.fn, inputs, target_name="fibonacci")
    # Either the per-call NameError on `open` makes everything mismatch,
    # or it raises at definition time. Either way, low reward.
    assert result.execution_reward < 50.0


# ---------- stratified scoring (edge vs random) ----------------------------


def test_edge_cases_are_always_evaluated():
    spec = BLACK_BOX_FUNCTIONS["reverse_string"]
    # Submission that fails the empty-string edge case but works for non-empty.
    code = (
        "def reverse_string(s):\n"
        "    if s == '':\n"
        "        return 'OOPS'\n"
        "    return s[::-1]\n"
    )
    inputs = generate_fuzz_inputs(spec, count=20, seed=0)
    edges = get_edge_inputs(spec)
    assert "" in edges
    result = verify_submission(
        code, spec.fn, inputs, target_name="reverse_string", edge_inputs=edges
    )
    # Should pass most random + most edge except the empty-string edge case.
    assert result.matches_by_category["edge"] == len(edges) - 1
    assert result.edge_pass_rate < 1.0
    assert result.matches_by_category["random"] >= 18  # very rare to roll empty


# ---------- end-to-end submission via env ----------------------------------


def test_env_submit_reference_implementation_gives_high_reward():
    env = OpenSleuthEnv(fuzz_count=20)
    obs = env.reset("reverse_string")
    code = "def reverse_string(s):\n    return s[::-1]\n"
    resp = env.step(obs.episode_id, SubmitAction(code=code))
    assert resp.done is True
    # 100 - tiny complexity penalty + 50 perfect bonus.
    assert resp.reward > 140.0
    assert resp.info["execution_reward"] == pytest.approx(100.0)
    assert resp.info["edge_pass_rate"] == pytest.approx(1.0)
    assert resp.info["floor_penalty"] == 0.0
    assert resp.info["reward_hack_penalty"] == 0.0
    assert resp.info["perfect_bonus"] == 50.0


def test_env_submit_buggy_function_lands_clearly_negative():
    """Wrong submissions must end up clearly negative so the trainer's GRPO
    advantage penalises 'just emit any function'."""
    env = OpenSleuthEnv(fuzz_count=10)
    obs = env.reset("digit_sum")
    code = "def digit_sum(n):\n    return -1\n"
    resp = env.step(obs.episode_id, SubmitAction(code=code))
    assert resp.done is True
    assert resp.info["execution_reward"] < 50.0
    assert resp.reward < 0.0
    assert resp.info["floor_penalty"] == 25.0


def test_env_submit_import_hack_scores_clearly_negative():
    env = OpenSleuthEnv(fuzz_count=10)
    obs = env.reset("fibonacci")
    code = (
        "def fibonacci(n):\n"
        "    from opensleuth_env.black_box import _fibonacci\n"
        "    return _fibonacci(n)\n"
    )
    resp = env.step(obs.episode_id, SubmitAction(code=code))
    assert resp.done is True
    assert resp.reward < 0.0
    assert resp.info["reward_hack_penalty"] >= 25.0


# ---------- spec metadata --------------------------------------------------


def test_all_specs_have_difficulty_and_edge_cases():
    valid = {"easy", "medium", "hard"}
    for name, spec in BLACK_BOX_FUNCTIONS.items():
        assert spec.difficulty in valid, f"{name} has invalid difficulty {spec.difficulty!r}"
        assert isinstance(spec.edge_cases, list)
        assert len(spec.edge_cases) >= 3, f"{name} should declare >=3 edge cases for robust scoring"