opensleuth-env-gemini-cli / tests /test_open_env.py
anugrah55's picture
verifier: fix SIGALRM-in-worker-thread bug that scored every well-formed submission 100/100 under uvicorn (fall back to no-timeout call when signal.signal raises). Trainer was training on a saturated reward landscape; this restores real per-submission scoring.
e7fc062 verified
"""Tests for OpenSleuth Level 2: auto-fuzzer + TaskCatalog + open /reset.
These tests do *not* require Hub network access. The Hub-availability test
is opportunistic: it asserts ``>=15`` total tasks if the dataset loads, but
silently passes (with a marker) if the Hub is offline / the env is sandboxed.
"""
from __future__ import annotations
import os
import random
import typing
from typing import Optional, Literal
import pytest
from fastapi.testclient import TestClient
from opensleuth_env import (
BLACK_BOX_FUNCTIONS,
OpenSleuthEnv,
ProbeAction,
SubmitAction,
TaskCatalog,
TaskResolutionError,
auto_fuzz,
)
# ---------------------------------------------------------------------------
# Auto-fuzzer
# ---------------------------------------------------------------------------
class TestAutoFuzzerTypes:
def _rng(self, seed: int = 0) -> random.Random:
return random.Random(seed)
def test_int_inputs_are_ints(self):
def f(n: int) -> int:
return n
rng = self._rng()
outs = auto_fuzz(f, 50, rng)
assert len(outs) == 50
assert all(isinstance(t, tuple) and len(t) == 1 for t in outs)
assert all(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs)
def test_str_inputs_are_strs(self):
def f(s: str) -> int:
return len(s)
outs = auto_fuzz(f, 30, self._rng())
assert all(isinstance(t[0], str) for t in outs)
def test_list_int_inputs_are_lists_of_ints(self):
def f(xs: list[int]) -> int:
return sum(xs)
outs = auto_fuzz(f, 30, self._rng())
for (xs,) in outs:
assert isinstance(xs, list)
assert all(isinstance(x, int) for x in xs)
def test_homogeneous_tuple_inputs(self):
def f(xs: tuple[int, ...]) -> int:
return sum(xs)
outs = auto_fuzz(f, 30, self._rng())
for (xs,) in outs:
assert isinstance(xs, tuple)
assert all(isinstance(x, int) for x in xs)
def test_heterogeneous_tuple_inputs(self):
def f(t: tuple[int, str]) -> int:
return len(t[1])
outs = auto_fuzz(f, 30, self._rng())
for (t,) in outs:
assert isinstance(t, tuple) and len(t) == 2
assert isinstance(t[0], int)
assert isinstance(t[1], str)
def test_optional_inputs_sometimes_None(self):
def f(x: Optional[int]) -> int:
return 0
outs = auto_fuzz(f, 200, self._rng(seed=42))
seen_none = any(t[0] is None for t in outs)
seen_int = any(isinstance(t[0], int) and not isinstance(t[0], bool) for t in outs)
assert seen_none, "Optional[int] should occasionally yield None"
assert seen_int, "Optional[int] should also yield ints"
def test_literal_inputs_only_pick_listed_values(self):
def f(mode: Literal["a", "b", "c"]) -> int:
return 0
outs = auto_fuzz(f, 50, self._rng())
for (m,) in outs:
assert m in ("a", "b", "c")
def test_dict_str_int_inputs(self):
def f(d: dict[str, int]) -> int:
return len(d)
outs = auto_fuzz(f, 20, self._rng())
for (d,) in outs:
assert isinstance(d, dict)
for k, v in d.items():
assert isinstance(k, str)
assert isinstance(v, int)
def test_multi_arg_returns_full_tuples(self):
def f(a: int, b: str) -> int:
return 0
outs = auto_fuzz(f, 20, self._rng())
for t in outs:
assert isinstance(t, tuple)
assert len(t) == 2
assert isinstance(t[0], int)
assert isinstance(t[1], str)
def test_unannotated_param_falls_back_to_int(self):
def f(x): # no annotation
return x
outs = auto_fuzz(f, 30, self._rng())
for (x,) in outs:
assert isinstance(x, int)
class TestAutoFuzzerSpecOverride:
def test_int_min_max_overrides_default_range(self):
def f(n: int) -> int:
return n
outs = auto_fuzz(f, 100, random.Random(0), fuzz_spec={"n": {"type": "int", "min": 1, "max": 5}})
for (n,) in outs:
assert 1 <= n <= 5, f"expected n in [1, 5], got {n}"
def test_str_alphabet_override(self):
def f(s: str) -> int:
return len(s)
outs = auto_fuzz(
f, 100, random.Random(0),
fuzz_spec={"s": {"type": "str", "alphabet": "ab", "max_len": 4}},
)
for (s,) in outs:
assert len(s) <= 4
for ch in s:
assert ch in "ab", f"unexpected char {ch!r} in {s!r}"
def test_list_elem_override(self):
def f(xs: list[int]) -> int:
return sum(xs)
outs = auto_fuzz(
f, 80, random.Random(0),
fuzz_spec={"xs": {"type": "list", "elem": {"type": "int", "min": 0, "max": 3}, "max_len": 4}},
)
for (xs,) in outs:
assert len(xs) <= 4
for v in xs:
assert 0 <= v <= 3
def test_tuple_elems_override(self):
def f(t):
return t
outs = auto_fuzz(
f, 30, random.Random(0),
fuzz_spec={"t": {"type": "tuple", "elems": [
{"type": "int", "min": 0, "max": 1},
{"type": "str", "alphabet": "x", "max_len": 2},
]}},
)
for (t,) in outs:
assert isinstance(t, tuple) and len(t) == 2
assert 0 <= t[0] <= 1
for ch in t[1]:
assert ch == "x"
# ---------------------------------------------------------------------------
# TaskCatalog
# ---------------------------------------------------------------------------
class TestTaskCatalog:
def test_resolves_builtin_by_name(self):
cat = TaskCatalog(enable_hub=False)
spec = cat.resolve(target_name="fibonacci")
assert spec.name == "fibonacci"
assert spec is BLACK_BOX_FUNCTIONS["fibonacci"]
assert spec.unpack_args is False
assert spec.source == "builtin"
def test_resolves_caller_supplied_target_code(self):
cat = TaskCatalog(enable_hub=False)
code = "def add(a: int, b: int) -> int:\n return a + b\n"
spec = cat.resolve(target_code=code, target_function_name="add")
assert spec.name == "add"
assert spec.unpack_args is True # 2-arg
assert spec.source == "user"
# The wrapped fuzzer must produce calls that succeed end-to-end.
rng = random.Random(0)
inputs = spec.fuzzer(rng, 10)
for args in inputs:
assert isinstance(args, tuple) and len(args) == 2
assert spec.fn(*args) == args[0] + args[1]
def test_caller_supplied_unary_uses_unwrapped_call(self):
cat = TaskCatalog(enable_hub=False)
code = "def square(n: int) -> int:\n return n * n\n"
spec = cat.resolve(target_code=code, target_function_name="square")
assert spec.unpack_args is False
rng = random.Random(0)
inputs = spec.fuzzer(rng, 5)
for x in inputs:
assert isinstance(x, int)
assert spec.fn(x) == x * x
def test_resolve_with_no_source_raises(self):
cat = TaskCatalog(enable_hub=False)
with pytest.raises(TaskResolutionError):
cat.resolve()
def test_resolve_unknown_name_raises(self):
cat = TaskCatalog(enable_hub=False)
with pytest.raises(TaskResolutionError):
cat.resolve(target_name="this_does_not_exist")
def test_target_code_without_function_name_raises(self):
cat = TaskCatalog(enable_hub=False)
with pytest.raises(TaskResolutionError):
cat.resolve(target_code="def foo(): return 1\n")
def test_rejects_oracle_import(self):
cat = TaskCatalog(enable_hub=False)
bad = (
"import opensleuth_env\n"
"def f(x): return x\n"
)
with pytest.raises(TaskResolutionError):
cat.resolve(target_code=bad, target_function_name="f")
bad2 = (
"from opensleuth_env.black_box import _fibonacci\n"
"def f(x): return _fibonacci(x)\n"
)
with pytest.raises(TaskResolutionError):
cat.resolve(target_code=bad2, target_function_name="f")
def test_target_code_using_open_is_blocked_at_call_time(self):
"""`open` is not in the safe-builtins whitelist. The catalog will
compile the function (since `open` is only resolved at call-time
via NameError), but invoking it must fail safely."""
cat = TaskCatalog(enable_hub=False)
code = (
"def f(x):\n"
" open('/tmp/x', 'w')\n"
" return 0\n"
)
spec = cat.resolve(target_code=code, target_function_name="f")
with pytest.raises(NameError):
spec.fn(0)
def test_caller_supplied_edge_cases_are_parsed(self):
cat = TaskCatalog(enable_hub=False)
spec = cat.resolve(
target_code="def neg(n: int) -> int:\n return -n\n",
target_function_name="neg",
edge_cases=["0", "1", "-1", "100"],
)
assert spec.edge_cases == [0, 1, -1, 100]
def test_caller_supplied_fuzz_spec_is_used(self):
cat = TaskCatalog(enable_hub=False)
spec = cat.resolve(
target_code="def f(n: int) -> int:\n return n\n",
target_function_name="f",
fuzz_spec={"n": {"type": "int", "min": 7, "max": 9}},
)
rng = random.Random(0)
inputs = spec.fuzzer(rng, 50)
for x in inputs:
assert 7 <= x <= 9
def test_list_builtin_returns_nine_entries(self):
cat = TaskCatalog(enable_hub=False)
builtins_list = cat.list_builtin()
assert len(builtins_list) == 9
for entry in builtins_list:
assert entry["source"] == "builtin"
assert "name" in entry
assert "signature" in entry
assert "difficulty" in entry
# ---------------------------------------------------------------------------
# End-to-end via OpenSleuthEnv
# ---------------------------------------------------------------------------
class TestEnvOpenEnded:
def test_legacy_reset_by_target_name_unchanged(self):
env = OpenSleuthEnv(fuzz_count=10)
obs = env.reset(target_name="fibonacci")
assert obs.target_function_name == "fibonacci"
assert obs.difficulty == "easy"
assert obs.steps_taken == 0
# Probe via the same path as before.
resp = env.step(obs.episode_id, ProbeAction(input_repr="10"))
assert resp.observation.probe_history[-1].output_repr == "55"
def test_env_caller_supplied_unary_full_loop(self):
env = OpenSleuthEnv(fuzz_count=10)
obs = env.reset(
target_code="def square(n: int) -> int:\n return n * n\n",
target_function_name="square",
)
assert obs.target_function_name == "square"
# Probe.
resp = env.step(obs.episode_id, ProbeAction(input_repr="5"))
assert resp.observation.probe_history[-1].output_repr == "25"
# Submit a perfect implementation.
code = "def square(n):\n return n * n\n"
resp = env.step(obs.episode_id, SubmitAction(code=code))
assert resp.done is True
assert resp.info["execution_reward"] == pytest.approx(100.0)
assert resp.reward > 140.0
def test_env_caller_supplied_multi_arg_full_loop(self):
env = OpenSleuthEnv(fuzz_count=10)
obs = env.reset(
target_code="def add(a: int, b: int) -> int:\n return a + b\n",
target_function_name="add",
edge_cases=["(0, 0)", "(1, -1)", "(100, 0)"],
)
assert obs.target_function_name == "add"
# Probe with a 2-tuple.
resp = env.step(obs.episode_id, ProbeAction(input_repr="(2, 3)"))
assert resp.observation.probe_history[-1].output_repr == "5"
# Submit a perfect implementation.
code = "def add(a, b):\n return a + b\n"
resp = env.step(obs.episode_id, SubmitAction(code=code))
assert resp.done is True
assert resp.info["execution_reward"] == pytest.approx(100.0)
assert resp.reward > 140.0
def test_env_caller_supplied_buggy_submission_scored_negative(self):
env = OpenSleuthEnv(fuzz_count=10)
obs = env.reset(
target_code="def add(a: int, b: int) -> int:\n return a + b\n",
target_function_name="add",
)
bad = "def add(a, b):\n return a - b\n"
resp = env.step(obs.episode_id, SubmitAction(code=bad))
assert resp.done is True
assert resp.info["execution_reward"] < 50.0
assert resp.reward < 0.0
def test_env_caller_supplied_oracle_import_rejected(self):
env = OpenSleuthEnv()
with pytest.raises(ValueError):
env.reset(
target_code="import opensleuth_env\ndef f(x): return x\n",
target_function_name="f",
)
# ---------------------------------------------------------------------------
# HTTP layer
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def http_client():
from server import app
with TestClient(app) as client:
yield client
class TestHttpLayer:
def test_tasks_endpoint_lists_at_least_nine_builtin(self, http_client):
r = http_client.get("/tasks?source=builtin")
assert r.status_code == 200
body = r.json()
assert body["count"] >= 9
names = [t["name"] for t in body["tasks"]]
for name in BLACK_BOX_FUNCTIONS:
assert name in names
def test_tasks_all_includes_at_least_builtins(self, http_client):
r = http_client.get("/tasks?source=all")
assert r.status_code == 200
body = r.json()
# The builtins are always present. If the Hub is reachable we'd
# expect 15+, but the test must pass even if Hub is unavailable
# (e.g. CI sandboxes block egress).
assert body["count"] >= 9
if not body["hub"].get("enabled", False) or body["hub"].get("error"):
pytest.skip(f"hub not reachable: {body['hub']}")
# Hub reachable -> dataset should have 15+ rows after bootstrap.
assert body["count"] >= 15
def test_sample_inputs_returns_n_repr_strings_for_builtin(self, http_client):
r = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7")
assert r.status_code == 200, r.text
body = r.json()
assert body["name"] == "fibonacci"
assert body["n"] == 5
assert body["seed"] == 7
assert isinstance(body["inputs"], list)
assert len(body["inputs"]) == 5
# Every returned string must be ast.literal_eval-safe so the trainer
# can post it straight back to /step as a probe input_repr.
import ast
for s in body["inputs"]:
assert isinstance(s, str)
ast.literal_eval(s)
# Determinism: same seed -> identical inputs.
r2 = http_client.get("/tasks/fibonacci/sample_inputs?n=5&seed=7")
assert r2.json()["inputs"] == body["inputs"]
def test_sample_inputs_unknown_target_404s(self, http_client):
r = http_client.get("/tasks/__nope__/sample_inputs?n=2&seed=0")
assert r.status_code == 404
def test_obviously_wrong_submission_scores_low_under_thread_pool(self, http_client):
"""Regression: TestClient uses a worker thread, exercising the
same `signal.signal` -> ValueError path that uvicorn workers hit
in production. Before the verifier fix, this returned 100/100 for
any defined function (incl. ``def fibonacci(n): return n``).
After the fix, an obviously-wrong submission should score near
zero and trigger the floor penalty.
"""
ep = http_client.post("/reset", json={
"target_name": "fibonacci", "seed": 42, "max_steps": 2,
}).json()
eid = ep["episode_id"]
r = http_client.post("/step", json={
"episode_id": eid,
"action": {"action_type": "submit", "code": "def fibonacci(n):\n return n\n"},
})
assert r.status_code == 200, r.text
body = r.json()
info = body["info"]
# ``return n`` matches at most a couple of fixed points (n=1, n=2)
# out of 100+ random inputs; execution_reward should be tiny.
assert info["execution_reward"] < 20.0, info
assert info["matches"] < info["fuzz_count"] // 4, info
# Floor penalty should kick in.
assert info["floor_penalty"] == 25.0, info
# And the perfect-bonus must NOT fire.
assert info["perfect_bonus"] == 0.0, info
def test_reset_legacy_target_name_still_works(self, http_client):
r = http_client.post("/reset", json={
"target_name": "fibonacci", "seed": 0, "max_steps": 10,
})
assert r.status_code == 200
body = r.json()
assert body["target_function_name"] == "fibonacci"
assert "fibonacci" in body["target_function_signature"]
def test_reset_caller_supplied_target_code(self, http_client):
payload = {
"target_code": "def add(a: int, b: int) -> int:\n return a + b\n",
"target_function_name": "add",
"edge_cases": ["(0, 0)", "(1, -1)"],
"max_steps": 5,
}
r = http_client.post("/reset", json=payload)
assert r.status_code == 200, r.text
body = r.json()
assert body["target_function_name"] == "add"
eid = body["episode_id"]
# Probe -> verify wrapping.
r = http_client.post("/step", json={
"episode_id": eid,
"action": {"action_type": "probe", "input_repr": "(7, 8)"},
})
assert r.status_code == 200, r.text
body = r.json()
assert body["observation"]["probe_history"][-1]["output_repr"] == "15"
# Submit perfect.
r = http_client.post("/step", json={
"episode_id": eid,
"action": {"action_type": "submit", "code": "def add(a, b):\n return a + b\n"},
})
assert r.status_code == 200, r.text
body = r.json()
assert body["done"] is True
assert body["info"]["execution_reward"] == pytest.approx(100.0)
assert body["reward"] > 140.0
def test_reset_with_neither_target_returns_400(self, http_client):
r = http_client.post("/reset", json={"seed": 0})
assert r.status_code == 400
def test_reset_with_target_code_only_no_function_name_returns_400(self, http_client):
r = http_client.post("/reset", json={
"target_code": "def f(): return 1\n",
})
assert r.status_code == 400
def test_functions_endpoint_unchanged_for_trainer(self, http_client):
r = http_client.get("/functions")
assert r.status_code == 200
body = r.json()
assert "functions" in body
names = [f["name"] for f in body["functions"]]
for name in BLACK_BOX_FUNCTIONS:
assert name in names
# The original v0.3 fields must all be present.
for entry in body["functions"]:
for k in ("name", "signature", "description", "difficulty", "edge_case_count"):
assert k in entry