"""Tests for OpenSleuth Level 2: auto-fuzzer + TaskCatalog + open /reset. These tests do *not* require Hub network access. The Hub-availability test is opportunistic: it asserts ``>=15`` total tasks if the dataset loads, but silently passes (with a marker) if the Hub is offline / the env is sandboxed. """ from __future__ import annotations import os import random import typing from typing import Optional, Literal import pytest from fastapi.testclient import TestClient from opensleuth_env import ( BLACK_BOX_FUNCTIONS, OpenSleuthEnv, ProbeAction, SubmitAction, TaskCatalog, TaskResolutionError, auto_fuzz, ) # --------------------------------------------------------------------------- # Auto-fuzzer # --------------------------------------------------------------------------- class TestAutoFuzzerTypes: def _rng(self, seed: int = 0) -> random.Random: return random.Random(seed) def test_int_inputs_are_ints(self): def f(n: int) -> int: return n rng = self._rng() outs = auto_fuzz(f, 50, rng) assert len(outs) == 50 assert all(isinstance(t, tuple) and len(t) == 1 for t in outs) assert all(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs) def test_str_inputs_are_strs(self): def f(s: str) -> int: return len(s) outs = auto_fuzz(f, 30, self._rng()) assert all(isinstance(t[0], str) for t in outs) def test_list_int_inputs_are_lists_of_ints(self): def f(xs: list[int]) -> int: return sum(xs) outs = auto_fuzz(f, 30, self._rng()) for (xs,) in outs: assert isinstance(xs, list) assert all(isinstance(x, int) for x in xs) def test_homogeneous_tuple_inputs(self): def f(xs: tuple[int, ...]) -> int: return sum(xs) outs = auto_fuzz(f, 30, self._rng()) for (xs,) in outs: assert isinstance(xs, tuple) assert all(isinstance(x, int) for x in xs) def test_heterogeneous_tuple_inputs(self): def f(t: tuple[int, str]) -> int: return len(t[1]) outs = auto_fuzz(f, 30, self._rng()) for (t,) in outs: assert isinstance(t, tuple) and len(t) == 2 assert isinstance(t[0], int) assert isinstance(t[1], str) def test_optional_inputs_sometimes_None(self): def f(x: Optional[int]) -> int: return 0 outs = auto_fuzz(f, 200, self._rng(seed=42)) seen_none = any(t[0] is None for t in outs) seen_int = any(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs) assert seen_none, "Optional[int] should occasionally yield None" assert seen_int, "Optional[int] should also yield ints" def test_literal_inputs_only_pick_listed_values(self): def f(mode: Literal["a", "b", "c"]) -> int: return 0 outs = auto_fuzz(f, 50, self._rng()) for (m,) in outs: assert m in ("a", "b", "c") def test_dict_str_int_inputs(self): def f(d: dict[str, int]) -> int: return len(d) outs = auto_fuzz(f, 20, self._rng()) for (d,) in outs: assert isinstance(d, dict) for k, v in d.items(): assert isinstance(k, str) assert isinstance(v, int) def test_multi_arg_returns_full_tuples(self): def f(a: int, b: str) -> int: return 0 outs = auto_fuzz(f, 20, self._rng()) for t in outs: assert isinstance(t, tuple) assert len(t) == 2 assert isinstance(t[0], int) assert isinstance(t[1], str) def test_unannotated_param_falls_back_to_int(self): def f(x): # no annotation return x outs = auto_fuzz(f, 30, self._rng()) for (x,) in outs: assert isinstance(x, int) class TestAutoFuzzerSpecOverride: def test_int_min_max_overrides_default_range(self): def f(n: int) -> int: return n outs = auto_fuzz(f, 100, random.Random(0), fuzz_spec={"n": {"type": "int", "min": 1, "max": 5}}) for (n,) in outs: assert 1 <= n <= 5, f"expected n in [1, 5], got {n}" def test_str_alphabet_override(self): def f(s: str) -> int: return len(s) outs = auto_fuzz( f, 100, random.Random(0), fuzz_spec={"s": {"type": "str", "alphabet": "ab", "max_len": 4}}, ) for (s,) in outs: assert len(s) <= 4 for ch in s: assert ch in "ab", f"unexpected char {ch!r} in {s!r}" def test_list_elem_override(self): def f(xs: list[int]) -> int: return sum(xs) outs = auto_fuzz( f, 80, random.Random(0), fuzz_spec={"xs": {"type": "list", "elem": {"type": "int", "min": 0, "max": 3}, "max_len": 4}}, ) for (xs,) in outs: assert len(xs) <= 4 for v in xs: assert 0 <= v <= 3 def test_tuple_elems_override(self): def f(t): return t outs = auto_fuzz( f, 30, random.Random(0), fuzz_spec={"t": {"type": "tuple", "elems": [ {"type": "int", "min": 0, "max": 1}, {"type": "str", "alphabet": "x", "max_len": 2}, ]}}, ) for (t,) in outs: assert isinstance(t, tuple) and len(t) == 2 assert 0 <= t[0] <= 1 for ch in t[1]: assert ch == "x" # --------------------------------------------------------------------------- # TaskCatalog # --------------------------------------------------------------------------- class TestTaskCatalog: def test_resolves_builtin_by_name(self): cat = TaskCatalog(enable_hub=False) spec = cat.resolve(target_name="fibonacci") assert spec.name == "fibonacci" assert spec is BLACK_BOX_FUNCTIONS["fibonacci"] assert spec.unpack_args is False assert spec.source == "builtin" def test_resolves_caller_supplied_target_code(self): cat = TaskCatalog(enable_hub=False) code = "def add(a: int, b: int) -> int:\n return a + b\n" spec = cat.resolve(target_code=code, target_function_name="add") assert spec.name == "add" assert spec.unpack_args is True # 2-arg assert spec.source == "user" # The wrapped fuzzer must produce calls that succeed end-to-end. rng = random.Random(0) inputs = spec.fuzzer(rng, 10) for args in inputs: assert isinstance(args, tuple) and len(args) == 2 assert spec.fn(*args) == args[0] + args[1] def test_caller_supplied_unary_uses_unwrapped_call(self): cat = TaskCatalog(enable_hub=False) code = "def square(n: int) -> int:\n return n * n\n" spec = cat.resolve(target_code=code, target_function_name="square") assert spec.unpack_args is False rng = random.Random(0) inputs = spec.fuzzer(rng, 5) for x in inputs: assert isinstance(x, int) assert spec.fn(x) == x * x def test_resolve_with_no_source_raises(self): cat = TaskCatalog(enable_hub=False) with pytest.raises(TaskResolutionError): cat.resolve() def test_resolve_unknown_name_raises(self): cat = TaskCatalog(enable_hub=False) with pytest.raises(TaskResolutionError): cat.resolve(target_name="this_does_not_exist") def test_target_code_without_function_name_raises(self): cat = TaskCatalog(enable_hub=False) with pytest.raises(TaskResolutionError): cat.resolve(target_code="def foo(): return 1\n") def test_rejects_oracle_import(self): cat = TaskCatalog(enable_hub=False) bad = ( "import opensleuth_env\n" "def f(x): return x\n" ) with pytest.raises(TaskResolutionError): cat.resolve(target_code=bad, target_function_name="f") bad2 = ( "from opensleuth_env.black_box import _fibonacci\n" "def f(x): return _fibonacci(x)\n" ) with pytest.raises(TaskResolutionError): cat.resolve(target_code=bad2, target_function_name="f") def test_target_code_using_open_is_blocked_at_call_time(self): """`open` is not in the safe-builtins whitelist. The catalog will compile the function (since `open` is only resolved at call-time via NameError), but invoking it must fail safely.""" cat = TaskCatalog(enable_hub=False) code = ( "def f(x):\n" " open('/tmp/x', 'w')\n" " return 0\n" ) spec = cat.resolve(target_code=code, target_function_name="f") with pytest.raises(NameError): spec.fn(0) def test_caller_supplied_edge_cases_are_parsed(self): cat = TaskCatalog(enable_hub=False) spec = cat.resolve( target_code="def neg(n: int) -> int:\n return -n\n", target_function_name="neg", edge_cases=["0", "1", "-1", "100"], ) assert spec.edge_cases == [0, 1, -1, 100] def test_caller_supplied_fuzz_spec_is_used(self): cat = TaskCatalog(enable_hub=False) spec = cat.resolve( target_code="def f(n: int) -> int:\n return n\n", target_function_name="f", fuzz_spec={"n": {"type": "int", "min": 7, "max": 9}}, ) rng = random.Random(0) inputs = spec.fuzzer(rng, 50) for x in inputs: assert 7 <= x <= 9 def test_list_builtin_returns_nine_entries(self): cat = TaskCatalog(enable_hub=False) builtins_list = cat.list_builtin() assert len(builtins_list) == 9 for entry in builtins_list: assert entry["source"] == "builtin" assert "name" in entry assert "signature" in entry assert "difficulty" in entry # --------------------------------------------------------------------------- # End-to-end via OpenSleuthEnv # --------------------------------------------------------------------------- class TestEnvOpenEnded: def test_legacy_reset_by_target_name_unchanged(self): env = OpenSleuthEnv(fuzz_count=10) obs = env.reset(target_name="fibonacci") assert obs.target_function_name == "fibonacci" assert obs.difficulty == "easy" assert obs.steps_taken == 0 # Probe via the same path as before. resp = env.step(obs.episode_id, ProbeAction(input_repr="10")) assert resp.observation.probe_history[-1].output_repr == "55" def test_env_caller_supplied_unary_full_loop(self): env = OpenSleuthEnv(fuzz_count=10) obs = env.reset( target_code="def square(n: int) -> int:\n return n * n\n", target_function_name="square", ) assert obs.target_function_name == "square" # Probe. resp = env.step(obs.episode_id, ProbeAction(input_repr="5")) assert resp.observation.probe_history[-1].output_repr == "25" # Submit a perfect implementation. code = "def square(n):\n return n * n\n" resp = env.step(obs.episode_id, SubmitAction(code=code)) assert resp.done is True assert resp.info["execution_reward"] == pytest.approx(100.0) assert resp.reward > 140.0 def test_env_caller_supplied_multi_arg_full_loop(self): env = OpenSleuthEnv(fuzz_count=10) obs = env.reset( target_code="def add(a: int, b: int) -> int:\n return a + b\n", target_function_name="add", edge_cases=["(0, 0)", "(1, -1)", "(100, 0)"], ) assert obs.target_function_name == "add" # Probe with a 2-tuple. resp = env.step(obs.episode_id, ProbeAction(input_repr="(2, 3)")) assert resp.observation.probe_history[-1].output_repr == "5" # Submit a perfect implementation. code = "def add(a, b):\n return a + b\n" resp = env.step(obs.episode_id, SubmitAction(code=code)) assert resp.done is True assert resp.info["execution_reward"] == pytest.approx(100.0) assert resp.reward > 140.0 def test_env_caller_supplied_buggy_submission_scored_negative(self): env = OpenSleuthEnv(fuzz_count=10) obs = env.reset( target_code="def add(a: int, b: int) -> int:\n return a + b\n", target_function_name="add", ) bad = "def add(a, b):\n return a - b\n" resp = env.step(obs.episode_id, SubmitAction(code=bad)) assert resp.done is True assert resp.info["execution_reward"] < 50.0 assert resp.reward < 0.0 def test_env_caller_supplied_oracle_import_rejected(self): env = OpenSleuthEnv() with pytest.raises(ValueError): env.reset( target_code="import opensleuth_env\ndef f(x): return x\n", target_function_name="f", ) # --------------------------------------------------------------------------- # HTTP layer # --------------------------------------------------------------------------- @pytest.fixture(scope="module") def http_client(): from server import app with TestClient(app) as client: yield client class TestHttpLayer: def test_tasks_endpoint_lists_at_least_nine_builtin(self, http_client): r = http_client.get("/tasks?source=builtin") assert r.status_code == 200 body = r.json() assert body["count"] >= 9 names = [t["name"] for t in body["tasks"]] for name in BLACK_BOX_FUNCTIONS: assert name in names def test_tasks_all_includes_at_least_builtins(self, http_client): r = http_client.get("/tasks?source=all") assert r.status_code == 200 body = r.json() # The builtins are always present. If the Hub is reachable we'd # expect 15+, but the test must pass even if Hub is unavailable # (e.g. CI sandboxes block egress). assert body["count"] >= 9 if not body["hub"].get("enabled", False) or body["hub"].get("error"): pytest.skip(f"hub not reachable: {body['hub']}") # Hub reachable -> dataset should have 15+ rows after bootstrap. assert body["count"] >= 15 def test_sample_inputs_returns_n_repr_strings_for_builtin(self, http_client): r = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7") assert r.status_code == 200, r.text body = r.json() assert body["name"] == "fibonacci" assert body["n"] == 5 assert body["seed"] == 7 assert isinstance(body["inputs"], list) assert len(body["inputs"]) == 5 # Every returned string must be ast.literal_eval-safe so the trainer # can post it straight back to /step as a probe input_repr. import ast for s in body["inputs"]: assert isinstance(s, str) ast.literal_eval(s) # Determinism: same seed -> identical inputs. r2 = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7") assert r2.json()["inputs"] == body["inputs"] def test_sample_inputs_unknown_target_404s(self, http_client): r = http_client.get("/tasks/__nope__/sample_inputs?n=2&seed=0") assert r.status_code == 404 def test_obviously_wrong_submission_scores_low_under_thread_pool(self, http_client): """Regression: TestClient uses a worker thread, exercising the same `signal.signal` -> ValueError path that uvicorn workers hit in production. Before the verifier fix, this returned 100/100 for any defined function (incl. ``def fibonacci(n): return n``). After the fix, an obviously-wrong submission should score near zero and trigger the floor penalty. """ ep = http_client.post("/reset", json={ "target_name": "fibonacci", "seed": 42, "max_steps": 2, }).json() eid = ep["episode_id"] r = http_client.post("/step", json={ "episode_id": eid, "action": {"action_type": "submit", "code": "def fibonacci(n):\n return n\n"}, }) assert r.status_code == 200, r.text body = r.json() info = body["info"] # ``return n`` matches at most a couple of fixed points (n=1, n=2) # out of 100+ random inputs; execution_reward should be tiny. assert info["execution_reward"] < 20.0, info assert info["matches"] < info["fuzz_count"] // 4, info # Floor penalty should kick in. assert info["floor_penalty"] == 25.0, info # And the perfect-bonus must NOT fire. assert info["perfect_bonus"] == 0.0, info def test_reset_legacy_target_name_still_works(self, http_client): r = http_client.post("/reset", json={ "target_name": "fibonacci", "seed": 0, "max_steps": 10, }) assert r.status_code == 200 body = r.json() assert body["target_function_name"] == "fibonacci" assert "fibonacci" in body["target_function_signature"] def test_reset_caller_supplied_target_code(self, http_client): payload = { "target_code": "def add(a: int, b: int) -> int:\n return a + b\n", "target_function_name": "add", "edge_cases": ["(0, 0)", "(1, -1)"], "max_steps": 5, } r = http_client.post("/reset", json=payload) assert r.status_code == 200, r.text body = r.json() assert body["target_function_name"] == "add" eid = body["episode_id"] # Probe -> verify wrapping. r = http_client.post("/step", json={ "episode_id": eid, "action": {"action_type": "probe", "input_repr": "(7, 8)"}, }) assert r.status_code == 200, r.text body = r.json() assert body["observation"]["probe_history"][-1]["output_repr"] == "15" # Submit perfect. r = http_client.post("/step", json={ "episode_id": eid, "action": {"action_type": "submit", "code": "def add(a, b):\n return a + b\n"}, }) assert r.status_code == 200, r.text body = r.json() assert body["done"] is True assert body["info"]["execution_reward"] == pytest.approx(100.0) assert body["reward"] > 140.0 def test_reset_with_neither_target_returns_400(self, http_client): r = http_client.post("/reset", json={"seed": 0}) assert r.status_code == 400 def test_reset_with_target_code_only_no_function_name_returns_400(self, http_client): r = http_client.post("/reset", json={ "target_code": "def f(): return 1\n", }) assert r.status_code == 400 def test_functions_endpoint_unchanged_for_trainer(self, http_client): r = http_client.get("/functions") assert r.status_code == 200 body = r.json() assert "functions" in body names = [f["name"] for f in body["functions"]] for name in BLACK_BOX_FUNCTIONS: assert name in names # The original v0.3 fields must all be present. for entry in body["functions"]: for k in ("name", "signature", "description", "difficulty", "edge_case_count"): assert k in entry