| """Tests for OpenSleuth Level 2: auto-fuzzer + TaskCatalog + open /reset. |
| |
| These tests do *not* require Hub network access. The Hub-availability test |
| is opportunistic: it asserts ``>=15`` total tasks if the dataset loads, but |
| silently passes (with a marker) if the Hub is offline / the env is sandboxed. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import random |
| import typing |
| from typing import Optional, Literal |
|
|
| import pytest |
| from fastapi.testclient import TestClient |
|
|
| from opensleuth_env import ( |
| BLACK_BOX_FUNCTIONS, |
| OpenSleuthEnv, |
| ProbeAction, |
| SubmitAction, |
| TaskCatalog, |
| TaskResolutionError, |
| auto_fuzz, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class TestAutoFuzzerTypes: |
| def _rng(self, seed: int = 0) -> random.Random: |
| return random.Random(seed) |
|
|
| def test_int_inputs_are_ints(self): |
| def f(n: int) -> int: |
| return n |
|
|
| rng = self._rng() |
| outs = auto_fuzz(f, 50, rng) |
| assert len(outs) == 50 |
| assert all(isinstance(t, tuple) and len(t) == 1 for t in outs) |
| assert all(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs) |
|
|
| def test_str_inputs_are_strs(self): |
| def f(s: str) -> int: |
| return len(s) |
|
|
| outs = auto_fuzz(f, 30, self._rng()) |
| assert all(isinstance(t[0], str) for t in outs) |
|
|
| def test_list_int_inputs_are_lists_of_ints(self): |
| def f(xs: list[int]) -> int: |
| return sum(xs) |
|
|
| outs = auto_fuzz(f, 30, self._rng()) |
| for (xs,) in outs: |
| assert isinstance(xs, list) |
| assert all(isinstance(x, int) for x in xs) |
|
|
| def test_homogeneous_tuple_inputs(self): |
| def f(xs: tuple[int, ...]) -> int: |
| return sum(xs) |
|
|
| outs = auto_fuzz(f, 30, self._rng()) |
| for (xs,) in outs: |
| assert isinstance(xs, tuple) |
| assert all(isinstance(x, int) for x in xs) |
|
|
| def test_heterogeneous_tuple_inputs(self): |
| def f(t: tuple[int, str]) -> int: |
| return len(t[1]) |
|
|
| outs = auto_fuzz(f, 30, self._rng()) |
| for (t,) in outs: |
| assert isinstance(t, tuple) and len(t) == 2 |
| assert isinstance(t[0], int) |
| assert isinstance(t[1], str) |
|
|
| def test_optional_inputs_sometimes_None(self): |
| def f(x: Optional[int]) -> int: |
| return 0 |
|
|
| outs = auto_fuzz(f, 200, self._rng(seed=42)) |
| seen_none = any(t[0] is None for t in outs) |
| seen_int = any(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs) |
| assert seen_none, "Optional[int] should occasionally yield None" |
| assert seen_int, "Optional[int] should also yield ints" |
|
|
| def test_literal_inputs_only_pick_listed_values(self): |
| def f(mode: Literal["a", "b", "c"]) -> int: |
| return 0 |
|
|
| outs = auto_fuzz(f, 50, self._rng()) |
| for (m,) in outs: |
| assert m in ("a", "b", "c") |
|
|
| def test_dict_str_int_inputs(self): |
| def f(d: dict[str, int]) -> int: |
| return len(d) |
|
|
| outs = auto_fuzz(f, 20, self._rng()) |
| for (d,) in outs: |
| assert isinstance(d, dict) |
| for k, v in d.items(): |
| assert isinstance(k, str) |
| assert isinstance(v, int) |
|
|
| def test_multi_arg_returns_full_tuples(self): |
| def f(a: int, b: str) -> int: |
| return 0 |
|
|
| outs = auto_fuzz(f, 20, self._rng()) |
| for t in outs: |
| assert isinstance(t, tuple) |
| assert len(t) == 2 |
| assert isinstance(t[0], int) |
| assert isinstance(t[1], str) |
|
|
| def test_unannotated_param_falls_back_to_int(self): |
| def f(x): |
| return x |
|
|
| outs = auto_fuzz(f, 30, self._rng()) |
| for (x,) in outs: |
| assert isinstance(x, int) |
|
|
|
|
| class TestAutoFuzzerSpecOverride: |
| def test_int_min_max_overrides_default_range(self): |
| def f(n: int) -> int: |
| return n |
|
|
| outs = auto_fuzz(f, 100, random.Random(0), fuzz_spec={"n": {"type": "int", "min": 1, "max": 5}}) |
| for (n,) in outs: |
| assert 1 <= n <= 5, f"expected n in [1, 5], got {n}" |
|
|
| def test_str_alphabet_override(self): |
| def f(s: str) -> int: |
| return len(s) |
|
|
| outs = auto_fuzz( |
| f, 100, random.Random(0), |
| fuzz_spec={"s": {"type": "str", "alphabet": "ab", "max_len": 4}}, |
| ) |
| for (s,) in outs: |
| assert len(s) <= 4 |
| for ch in s: |
| assert ch in "ab", f"unexpected char {ch!r} in {s!r}" |
|
|
| def test_list_elem_override(self): |
| def f(xs: list[int]) -> int: |
| return sum(xs) |
|
|
| outs = auto_fuzz( |
| f, 80, random.Random(0), |
| fuzz_spec={"xs": {"type": "list", "elem": {"type": "int", "min": 0, "max": 3}, "max_len": 4}}, |
| ) |
| for (xs,) in outs: |
| assert len(xs) <= 4 |
| for v in xs: |
| assert 0 <= v <= 3 |
|
|
| def test_tuple_elems_override(self): |
| def f(t): |
| return t |
|
|
| outs = auto_fuzz( |
| f, 30, random.Random(0), |
| fuzz_spec={"t": {"type": "tuple", "elems": [ |
| {"type": "int", "min": 0, "max": 1}, |
| {"type": "str", "alphabet": "x", "max_len": 2}, |
| ]}}, |
| ) |
| for (t,) in outs: |
| assert isinstance(t, tuple) and len(t) == 2 |
| assert 0 <= t[0] <= 1 |
| for ch in t[1]: |
| assert ch == "x" |
|
|
|
|
| |
| |
| |
|
|
|
|
| class TestTaskCatalog: |
| def test_resolves_builtin_by_name(self): |
| cat = TaskCatalog(enable_hub=False) |
| spec = cat.resolve(target_name="fibonacci") |
| assert spec.name == "fibonacci" |
| assert spec is BLACK_BOX_FUNCTIONS["fibonacci"] |
| assert spec.unpack_args is False |
| assert spec.source == "builtin" |
|
|
| def test_resolves_caller_supplied_target_code(self): |
| cat = TaskCatalog(enable_hub=False) |
| code = "def add(a: int, b: int) -> int:\n return a + b\n" |
| spec = cat.resolve(target_code=code, target_function_name="add") |
| assert spec.name == "add" |
| assert spec.unpack_args is True |
| assert spec.source == "user" |
| |
| rng = random.Random(0) |
| inputs = spec.fuzzer(rng, 10) |
| for args in inputs: |
| assert isinstance(args, tuple) and len(args) == 2 |
| assert spec.fn(*args) == args[0] + args[1] |
|
|
| def test_caller_supplied_unary_uses_unwrapped_call(self): |
| cat = TaskCatalog(enable_hub=False) |
| code = "def square(n: int) -> int:\n return n * n\n" |
| spec = cat.resolve(target_code=code, target_function_name="square") |
| assert spec.unpack_args is False |
| rng = random.Random(0) |
| inputs = spec.fuzzer(rng, 5) |
| for x in inputs: |
| assert isinstance(x, int) |
| assert spec.fn(x) == x * x |
|
|
| def test_resolve_with_no_source_raises(self): |
| cat = TaskCatalog(enable_hub=False) |
| with pytest.raises(TaskResolutionError): |
| cat.resolve() |
|
|
| def test_resolve_unknown_name_raises(self): |
| cat = TaskCatalog(enable_hub=False) |
| with pytest.raises(TaskResolutionError): |
| cat.resolve(target_name="this_does_not_exist") |
|
|
| def test_target_code_without_function_name_raises(self): |
| cat = TaskCatalog(enable_hub=False) |
| with pytest.raises(TaskResolutionError): |
| cat.resolve(target_code="def foo(): return 1\n") |
|
|
| def test_rejects_oracle_import(self): |
| cat = TaskCatalog(enable_hub=False) |
| bad = ( |
| "import opensleuth_env\n" |
| "def f(x): return x\n" |
| ) |
| with pytest.raises(TaskResolutionError): |
| cat.resolve(target_code=bad, target_function_name="f") |
|
|
| bad2 = ( |
| "from opensleuth_env.black_box import _fibonacci\n" |
| "def f(x): return _fibonacci(x)\n" |
| ) |
| with pytest.raises(TaskResolutionError): |
| cat.resolve(target_code=bad2, target_function_name="f") |
|
|
| def test_target_code_using_open_is_blocked_at_call_time(self): |
| """`open` is not in the safe-builtins whitelist. The catalog will |
| compile the function (since `open` is only resolved at call-time |
| via NameError), but invoking it must fail safely.""" |
| cat = TaskCatalog(enable_hub=False) |
| code = ( |
| "def f(x):\n" |
| " open('/tmp/x', 'w')\n" |
| " return 0\n" |
| ) |
| spec = cat.resolve(target_code=code, target_function_name="f") |
| with pytest.raises(NameError): |
| spec.fn(0) |
|
|
| def test_caller_supplied_edge_cases_are_parsed(self): |
| cat = TaskCatalog(enable_hub=False) |
| spec = cat.resolve( |
| target_code="def neg(n: int) -> int:\n return -n\n", |
| target_function_name="neg", |
| edge_cases=["0", "1", "-1", "100"], |
| ) |
| assert spec.edge_cases == [0, 1, -1, 100] |
|
|
| def test_caller_supplied_fuzz_spec_is_used(self): |
| cat = TaskCatalog(enable_hub=False) |
| spec = cat.resolve( |
| target_code="def f(n: int) -> int:\n return n\n", |
| target_function_name="f", |
| fuzz_spec={"n": {"type": "int", "min": 7, "max": 9}}, |
| ) |
| rng = random.Random(0) |
| inputs = spec.fuzzer(rng, 50) |
| for x in inputs: |
| assert 7 <= x <= 9 |
|
|
| def test_list_builtin_returns_nine_entries(self): |
| cat = TaskCatalog(enable_hub=False) |
| builtins_list = cat.list_builtin() |
| assert len(builtins_list) == 9 |
| for entry in builtins_list: |
| assert entry["source"] == "builtin" |
| assert "name" in entry |
| assert "signature" in entry |
| assert "difficulty" in entry |
|
|
|
|
| |
| |
| |
|
|
|
|
| class TestEnvOpenEnded: |
| def test_legacy_reset_by_target_name_unchanged(self): |
| env = OpenSleuthEnv(fuzz_count=10) |
| obs = env.reset(target_name="fibonacci") |
| assert obs.target_function_name == "fibonacci" |
| assert obs.difficulty == "easy" |
| assert obs.steps_taken == 0 |
|
|
| |
| resp = env.step(obs.episode_id, ProbeAction(input_repr="10")) |
| assert resp.observation.probe_history[-1].output_repr == "55" |
|
|
| def test_env_caller_supplied_unary_full_loop(self): |
| env = OpenSleuthEnv(fuzz_count=10) |
| obs = env.reset( |
| target_code="def square(n: int) -> int:\n return n * n\n", |
| target_function_name="square", |
| ) |
| assert obs.target_function_name == "square" |
|
|
| |
| resp = env.step(obs.episode_id, ProbeAction(input_repr="5")) |
| assert resp.observation.probe_history[-1].output_repr == "25" |
|
|
| |
| code = "def square(n):\n return n * n\n" |
| resp = env.step(obs.episode_id, SubmitAction(code=code)) |
| assert resp.done is True |
| assert resp.info["execution_reward"] == pytest.approx(100.0) |
| assert resp.reward > 140.0 |
|
|
| def test_env_caller_supplied_multi_arg_full_loop(self): |
| env = OpenSleuthEnv(fuzz_count=10) |
| obs = env.reset( |
| target_code="def add(a: int, b: int) -> int:\n return a + b\n", |
| target_function_name="add", |
| edge_cases=["(0, 0)", "(1, -1)", "(100, 0)"], |
| ) |
| assert obs.target_function_name == "add" |
|
|
| |
| resp = env.step(obs.episode_id, ProbeAction(input_repr="(2, 3)")) |
| assert resp.observation.probe_history[-1].output_repr == "5" |
|
|
| |
| code = "def add(a, b):\n return a + b\n" |
| resp = env.step(obs.episode_id, SubmitAction(code=code)) |
| assert resp.done is True |
| assert resp.info["execution_reward"] == pytest.approx(100.0) |
| assert resp.reward > 140.0 |
|
|
| def test_env_caller_supplied_buggy_submission_scored_negative(self): |
| env = OpenSleuthEnv(fuzz_count=10) |
| obs = env.reset( |
| target_code="def add(a: int, b: int) -> int:\n return a + b\n", |
| target_function_name="add", |
| ) |
| bad = "def add(a, b):\n return a - b\n" |
| resp = env.step(obs.episode_id, SubmitAction(code=bad)) |
| assert resp.done is True |
| assert resp.info["execution_reward"] < 50.0 |
| assert resp.reward < 0.0 |
|
|
| def test_env_caller_supplied_oracle_import_rejected(self): |
| env = OpenSleuthEnv() |
| with pytest.raises(ValueError): |
| env.reset( |
| target_code="import opensleuth_env\ndef f(x): return x\n", |
| target_function_name="f", |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @pytest.fixture(scope="module") |
| def http_client(): |
| from server import app |
|
|
| with TestClient(app) as client: |
| yield client |
|
|
|
|
| class TestHttpLayer: |
| def test_tasks_endpoint_lists_at_least_nine_builtin(self, http_client): |
| r = http_client.get("/tasks?source=builtin") |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["count"] >= 9 |
| names = [t["name"] for t in body["tasks"]] |
| for name in BLACK_BOX_FUNCTIONS: |
| assert name in names |
|
|
| def test_tasks_all_includes_at_least_builtins(self, http_client): |
| r = http_client.get("/tasks?source=all") |
| assert r.status_code == 200 |
| body = r.json() |
| |
| |
| |
| assert body["count"] >= 9 |
| if not body["hub"].get("enabled", False) or body["hub"].get("error"): |
| pytest.skip(f"hub not reachable: {body['hub']}") |
| |
| assert body["count"] >= 15 |
|
|
| def test_sample_inputs_returns_n_repr_strings_for_builtin(self, http_client): |
| r = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7") |
| assert r.status_code == 200, r.text |
| body = r.json() |
| assert body["name"] == "fibonacci" |
| assert body["n"] == 5 |
| assert body["seed"] == 7 |
| assert isinstance(body["inputs"], list) |
| assert len(body["inputs"]) == 5 |
| |
| |
| import ast |
| for s in body["inputs"]: |
| assert isinstance(s, str) |
| ast.literal_eval(s) |
| |
| r2 = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7") |
| assert r2.json()["inputs"] == body["inputs"] |
|
|
| def test_sample_inputs_unknown_target_404s(self, http_client): |
| r = http_client.get("/tasks/__nope__/sample_inputs?n=2&seed=0") |
| assert r.status_code == 404 |
|
|
| def test_obviously_wrong_submission_scores_low_under_thread_pool(self, http_client): |
| """Regression: TestClient uses a worker thread, exercising the |
| same `signal.signal` -> ValueError path that uvicorn workers hit |
| in production. Before the verifier fix, this returned 100/100 for |
| any defined function (incl. ``def fibonacci(n): return n``). |
| After the fix, an obviously-wrong submission should score near |
| zero and trigger the floor penalty. |
| """ |
| ep = http_client.post("/reset", json={ |
| "target_name": "fibonacci", "seed": 42, "max_steps": 2, |
| }).json() |
| eid = ep["episode_id"] |
| r = http_client.post("/step", json={ |
| "episode_id": eid, |
| "action": {"action_type": "submit", "code": "def fibonacci(n):\n return n\n"}, |
| }) |
| assert r.status_code == 200, r.text |
| body = r.json() |
| info = body["info"] |
| |
| |
| assert info["execution_reward"] < 20.0, info |
| assert info["matches"] < info["fuzz_count"] // 4, info |
| |
| assert info["floor_penalty"] == 25.0, info |
| |
| assert info["perfect_bonus"] == 0.0, info |
|
|
| def test_reset_legacy_target_name_still_works(self, http_client): |
| r = http_client.post("/reset", json={ |
| "target_name": "fibonacci", "seed": 0, "max_steps": 10, |
| }) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["target_function_name"] == "fibonacci" |
| assert "fibonacci" in body["target_function_signature"] |
|
|
| def test_reset_caller_supplied_target_code(self, http_client): |
| payload = { |
| "target_code": "def add(a: int, b: int) -> int:\n return a + b\n", |
| "target_function_name": "add", |
| "edge_cases": ["(0, 0)", "(1, -1)"], |
| "max_steps": 5, |
| } |
| r = http_client.post("/reset", json=payload) |
| assert r.status_code == 200, r.text |
| body = r.json() |
| assert body["target_function_name"] == "add" |
| eid = body["episode_id"] |
|
|
| |
| r = http_client.post("/step", json={ |
| "episode_id": eid, |
| "action": {"action_type": "probe", "input_repr": "(7, 8)"}, |
| }) |
| assert r.status_code == 200, r.text |
| body = r.json() |
| assert body["observation"]["probe_history"][-1]["output_repr"] == "15" |
|
|
| |
| r = http_client.post("/step", json={ |
| "episode_id": eid, |
| "action": {"action_type": "submit", "code": "def add(a, b):\n return a + b\n"}, |
| }) |
| assert r.status_code == 200, r.text |
| body = r.json() |
| assert body["done"] is True |
| assert body["info"]["execution_reward"] == pytest.approx(100.0) |
| assert body["reward"] > 140.0 |
|
|
| def test_reset_with_neither_target_returns_400(self, http_client): |
| r = http_client.post("/reset", json={"seed": 0}) |
| assert r.status_code == 400 |
|
|
| def test_reset_with_target_code_only_no_function_name_returns_400(self, http_client): |
| r = http_client.post("/reset", json={ |
| "target_code": "def f(): return 1\n", |
| }) |
| assert r.status_code == 400 |
|
|
| def test_functions_endpoint_unchanged_for_trainer(self, http_client): |
| r = http_client.get("/functions") |
| assert r.status_code == 200 |
| body = r.json() |
| assert "functions" in body |
| names = [f["name"] for f in body["functions"]] |
| for name in BLACK_BOX_FUNCTIONS: |
| assert name in names |
| |
| for entry in body["functions"]: |
| for k in ("name", "signature", "description", "difficulty", "edge_case_count"): |
| assert k in entry |
|
|