| """Verifier: scores a submitted Python source against a hidden reference |
| function by domain-aware fuzzing, with sandboxed execution and a complexity |
| penalty. |
| |
| Reward design (v0.3, paper-driven update): |
| |
| * ``execution_reward`` in ``[0, 100]`` is the fraction of fuzz inputs whose |
| outputs match the reference, scaled to 100. Inputs are drawn from two |
| categories that are scored separately so the trainer can see *which* |
| regime the agent fails on (Masud et al., 2026 §P3 "reward granularity"): |
| |
| - ``"edge"`` -- spec-defined must-pass cases (anti-deception, paper |
| §C1 of Ibrahim et al., 2024). |
| - ``"random"`` -- the original sampler. |
| |
| * ``complexity_penalty`` in ``[0, 50]`` is a bounded log-scaled cyclomatic |
| complexity, or 50 on syntax error. |
| |
| * ``reward_hack_penalty`` is a soft anti-hacking signal that fires when the |
| submission is a "constant function" (single distinct output / single |
| exception type) while the reference is genuinely diverse, OR the agent |
| attempts to import the reference module (we block this at sandbox-level |
| too, but we surface the attempt so the trainer can punish it). |
| |
| * ``floor_penalty`` adds a hard ``-25`` floor for sub-50% submissions |
| (Vul-R2 style; Wen et al. 2025 in Masud et al. 2026 §3.4.2). This stops |
| agents from learning that emitting *any* syntactically-valid function |
| pays positive reward. |
| |
| The headline ``total_reward`` returned in ``info`` is the *recommended* |
| total the env should hand back; the env is free to add a perfect-bonus on |
| top. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import ast |
| import math |
| import multiprocessing as mp |
| import random |
| import signal |
| from dataclasses import dataclass, field |
| from typing import Any, Callable, Dict, List, Optional |
|
|
|
|
| |
|
|
|
|
| class _CCVisitor(ast.NodeVisitor): |
| def __init__(self): |
| self.cc = 1 |
|
|
| def _bump(self, node): |
| self.cc += 1 |
| self.generic_visit(node) |
|
|
| visit_If = _bump |
| visit_For = _bump |
| visit_While = _bump |
| visit_AsyncFor = _bump |
| visit_ExceptHandler = _bump |
| visit_With = _bump |
| visit_IfExp = _bump |
|
|
| def visit_BoolOp(self, node): |
| self.cc += max(0, len(node.values) - 1) |
| self.generic_visit(node) |
|
|
|
|
| def calculate_complexity_penalty(code: str) -> float: |
| """Bounded log-scaled cyclomatic complexity, or 50 if code won't parse.""" |
| try: |
| tree = ast.parse(code) |
| except SyntaxError: |
| return 50.0 |
| v = _CCVisitor() |
| v.visit(tree) |
| |
| |
| return min(50.0, math.log2(v.cc)) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| _SAFE_BUILTINS_NAMES = ( |
| "abs all any ascii bin bool bytes bytearray callable chr complex dict " |
| "divmod enumerate filter float format frozenset getattr hasattr hash " |
| "hex id int isinstance issubclass iter len list map max min next object " |
| "oct ord pow print property range repr reversed round set slice sorted " |
| "str sum tuple type zip True False None NotImplemented Ellipsis " |
| "ArithmeticError AssertionError AttributeError BaseException " |
| "BufferError BytesWarning DeprecationWarning EOFError Exception " |
| "FloatingPointError IndexError KeyError LookupError MemoryError " |
| "NameError NotImplementedError OverflowError RecursionError " |
| "ReferenceError RuntimeError StopAsyncIteration StopIteration " |
| "SyntaxError TypeError UnboundLocalError UnicodeError ValueError " |
| "ZeroDivisionError __build_class__" |
| ).split() |
|
|
|
|
| def _make_safe_builtins() -> Dict[str, Any]: |
| import builtins as _b |
| out: Dict[str, Any] = {} |
| for n in _SAFE_BUILTINS_NAMES: |
| if hasattr(_b, n): |
| out[n] = getattr(_b, n) |
| return out |
|
|
|
|
| _SAFE_BUILTINS = _make_safe_builtins() |
| _PREIMPORTED_MODULES = ("math", "string", "itertools", "functools", "collections", "re") |
|
|
|
|
| def _make_safe_globals() -> Dict[str, Any]: |
| g: Dict[str, Any] = { |
| "__builtins__": _SAFE_BUILTINS, |
| "__name__": "__opensleuth_submission__", |
| } |
| for mod_name in _PREIMPORTED_MODULES: |
| g[mod_name] = __import__(mod_name) |
| return g |
|
|
|
|
| def _exec_target_in_sandbox(code: str, target_name: str, queue: mp.Queue) -> None: |
| """Run inside a child process so we can hard-kill on timeout.""" |
| try: |
| safe_globals = _make_safe_globals() |
| local_scope: dict = {} |
| exec(code, safe_globals, local_scope) |
| fn = local_scope.get(target_name) or safe_globals.get(target_name) |
| if not callable(fn): |
| queue.put(("err", f"No callable named {target_name!r} defined.")) |
| return |
| queue.put(("ok", None)) |
| except Exception as e: |
| queue.put(("err", f"{type(e).__name__}: {e}")) |
|
|
|
|
| def _can_define(code: str, target_name: str, timeout_s: float) -> Optional[str]: |
| """Return None if the submitted code defines the target callable, else an |
| error string. Uses a child process with a wall-clock timeout.""" |
| ctx = mp.get_context("fork") if mp.get_start_method(allow_none=True) != "spawn" else mp.get_context("spawn") |
| q: mp.Queue = ctx.Queue() |
| p = ctx.Process(target=_exec_target_in_sandbox, args=(code, target_name, q)) |
| p.start() |
| p.join(timeout=timeout_s) |
| if p.is_alive(): |
| p.terminate() |
| p.join(1.0) |
| if p.is_alive(): |
| p.kill() |
| return f"Definition timed out after {timeout_s}s." |
| if q.empty(): |
| return "Sandbox produced no result." |
| status, payload = q.get() |
| return None if status == "ok" else payload |
|
|
|
|
| |
| |
| |
| |
|
|
| class _CallTimeout(Exception): |
| pass |
|
|
|
|
| def _call_with_timeout(fn: Callable, arg: Any, timeout_s: float, *, unpack: bool = False): |
| """Call ``fn(arg)`` (or ``fn(*arg)`` if ``unpack``) with a wall-clock |
| timeout when possible. |
| |
| SIGALRM only works in the main thread of the main interpreter. When |
| the verifier runs inside a uvicorn worker thread (FastAPI request |
| handler), ``signal.signal`` raises ``ValueError`` and -- prior to this |
| fix -- both ref and submission calls would short-circuit through the |
| ``except Exception`` branch in ``_safe_call`` with the SAME ValueError, |
| which ``_outputs_equivalent`` then read as a "match", silently |
| awarding 100/100 to *every* submission regardless of correctness. |
| |
| Fix: per-call probe. If SIGALRM isn't installable from the current |
| thread, fall back to a direct call with no in-thread timeout. The |
| definition timeout is still enforced by the multiprocessing-based |
| ``_can_define`` ahead of fuzz scoring, so a malformed submission |
| that hangs at import time is caught there. For the OpenSleuth |
| trainer/eval use case (cooperative, not adversarial), letting a |
| pathological while-True submission stall a single request worker |
| is an acceptable trade-off relative to the current "all submissions |
| are perfect" failure mode. |
| """ |
| def _do_call(): |
| if unpack: |
| if not isinstance(arg, tuple): |
| |
| |
| |
| |
| a = (arg,) |
| else: |
| a = arg |
| return fn(*a) |
| return fn(arg) |
|
|
| def _handler(signum, frame): |
| raise _CallTimeout() |
|
|
| try: |
| old = signal.signal(signal.SIGALRM, _handler) |
| except (ValueError, OSError): |
| |
| |
| return _do_call() |
|
|
| signal.setitimer(signal.ITIMER_REAL, timeout_s) |
| try: |
| return _do_call() |
| finally: |
| signal.setitimer(signal.ITIMER_REAL, 0) |
| signal.signal(signal.SIGALRM, old) |
|
|
|
|
| def _safe_call(fn: Callable, arg: Any, timeout_s: float, *, unpack: bool = False): |
| """Returns (kind, value): kind in {'val', 'err', 'timeout'}. |
| |
| When ``unpack`` is True the input ``arg`` is expected to be an args |
| tuple and ``fn`` is invoked as ``fn(*arg)``. This is how multi-parameter |
| auto-fuzzer-driven targets are scored. |
| """ |
| try: |
| return ("val", _call_with_timeout(fn, arg, timeout_s, unpack=unpack)) |
| except _CallTimeout: |
| return ("timeout", f"timed out after {timeout_s}s") |
| except Exception as e: |
| return ("err", f"{type(e).__name__}: {e}") |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class VerificationResult: |
| execution_reward: float |
| complexity_penalty: float |
| define_error: Optional[str] |
| matches: int |
| fuzz_count: int |
| |
| matches_by_category: Dict[str, int] = field(default_factory=dict) |
| counts_by_category: Dict[str, int] = field(default_factory=dict) |
| edge_pass_rate: float = 0.0 |
| reward_hack_penalty: float = 0.0 |
| floor_penalty: float = 0.0 |
|
|
|
|
| def _detect_constant_collapse( |
| sub_outputs: List[Any], ref_outputs: List[Any], min_inputs: int = 6 |
| ) -> bool: |
| """Return True if the submission collapsed to a single output / error type |
| while the reference produced genuine diversity. This catches the |
| 'always return 0' / 'always raise' reward-hacking pattern. |
| """ |
| if len(sub_outputs) < min_inputs: |
| return False |
|
|
| def _signature(call_result): |
| kind, val = call_result |
| if kind == "val": |
| try: |
| return ("val", repr(val)) |
| except Exception: |
| return ("val", id(val)) |
| if kind == "err": |
| return ("err", val.split(":", 1)[0]) |
| return ("timeout", "") |
|
|
| sub_sig = {_signature(o) for o in sub_outputs} |
| ref_sig = {_signature(o) for o in ref_outputs} |
| return len(sub_sig) == 1 and len(ref_sig) >= 3 |
|
|
|
|
| def _looks_like_reference_import(code: str) -> bool: |
| """Static check for the most obvious reward-hacking pattern: importing |
| the reference function out of opensleuth_env. The sandbox already blocks |
| actual imports, but flagging them lets the env feed back a clear penalty |
| instead of a silent zero. |
| """ |
| try: |
| tree = ast.parse(code) |
| except SyntaxError: |
| return False |
| for node in ast.walk(tree): |
| if isinstance(node, ast.Import): |
| for alias in node.names: |
| if alias.name.startswith("opensleuth"): |
| return True |
| elif isinstance(node, ast.ImportFrom): |
| if node.module and node.module.startswith("opensleuth"): |
| return True |
| return False |
|
|
|
|
| def verify_submission( |
| submitted_code: str, |
| target_function: Callable[..., Any], |
| fuzz_inputs: List[Any], |
| *, |
| target_name: Optional[str] = None, |
| define_timeout_s: float = 5.0, |
| call_timeout_s: float = 1.0, |
| edge_inputs: Optional[List[Any]] = None, |
| unpack_args: bool = False, |
| ) -> VerificationResult: |
| """Score ``submitted_code`` against ``target_function`` over the supplied |
| ``fuzz_inputs`` (random regime) and ``edge_inputs`` (must-pass regime). |
| The agent is expected to define a top-level function with the same name as |
| ``target_function`` (overridable via ``target_name``).""" |
| name = target_name or target_function.__name__ |
| edge_inputs = list(edge_inputs or []) |
|
|
| |
| |
| |
| |
| hack_penalty = 25.0 if _looks_like_reference_import(submitted_code) else 0.0 |
|
|
| define_err = _can_define(submitted_code, name, define_timeout_s) |
| complexity = calculate_complexity_penalty(submitted_code) |
| if define_err is not None: |
| total = len(fuzz_inputs) + len(edge_inputs) |
| return VerificationResult( |
| execution_reward=0.0, |
| complexity_penalty=complexity, |
| define_error=define_err, |
| matches=0, |
| fuzz_count=total, |
| matches_by_category={"edge": 0, "random": 0}, |
| counts_by_category={"edge": len(edge_inputs), "random": len(fuzz_inputs)}, |
| edge_pass_rate=0.0, |
| reward_hack_penalty=hack_penalty, |
| floor_penalty=25.0, |
| ) |
|
|
| |
| |
| |
| safe_globals = _make_safe_globals() |
| local_scope: dict = {} |
| exec(submitted_code, safe_globals, local_scope) |
| submitted_fn = local_scope.get(name) or safe_globals.get(name) |
|
|
| matches_by_cat: Dict[str, int] = {"edge": 0, "random": 0} |
| counts_by_cat: Dict[str, int] = {"edge": len(edge_inputs), "random": len(fuzz_inputs)} |
|
|
| sub_results: List[Any] = [] |
| ref_results: List[Any] = [] |
|
|
| def _score(inputs: List[Any], category: str) -> None: |
| for inp in inputs: |
| ref = _safe_call(target_function, inp, call_timeout_s, unpack=unpack_args) |
| sub = _safe_call(submitted_fn, inp, call_timeout_s, unpack=unpack_args) |
| sub_results.append(sub) |
| ref_results.append(ref) |
| if _outputs_equivalent(ref, sub): |
| matches_by_cat[category] += 1 |
|
|
| _score(edge_inputs, "edge") |
| _score(fuzz_inputs, "random") |
|
|
| matches = matches_by_cat["edge"] + matches_by_cat["random"] |
| fuzz_count = len(fuzz_inputs) + len(edge_inputs) or 1 |
| exec_reward = 100.0 * (matches / fuzz_count) |
| edge_pass_rate = ( |
| matches_by_cat["edge"] / counts_by_cat["edge"] if counts_by_cat["edge"] else 0.0 |
| ) |
|
|
| |
| if _detect_constant_collapse(sub_results, ref_results): |
| hack_penalty += 15.0 |
|
|
| |
| |
| |
| floor_penalty = 25.0 if exec_reward < 50.0 else 0.0 |
|
|
| return VerificationResult( |
| execution_reward=exec_reward, |
| complexity_penalty=complexity, |
| define_error=None, |
| matches=matches, |
| fuzz_count=fuzz_count, |
| matches_by_category=matches_by_cat, |
| counts_by_category=counts_by_cat, |
| edge_pass_rate=edge_pass_rate, |
| reward_hack_penalty=hack_penalty, |
| floor_penalty=floor_penalty, |
| ) |
|
|
|
|
| def _outputs_equivalent(ref, sub) -> bool: |
| """Ref and sub are (kind, value) tuples from `_safe_call`. They count as |
| equivalent if both raised the same exception type, or both returned values |
| that are == equal.""" |
| rkind, rval = ref |
| skind, sval = sub |
| if rkind == "val" and skind == "val": |
| try: |
| return rval == sval |
| except Exception: |
| return False |
| if rkind == "err" and skind == "err": |
| return rval.split(":", 1)[0] == sval.split(":", 1)[0] |
| if rkind == "timeout" and skind == "timeout": |
| return True |
| return False |
|
|
|
|
| def generate_fuzz_inputs( |
| spec, count: int = 100, seed: Optional[int] = None |
| ) -> List[Any]: |
| """Public helper: pull ``count`` fuzz inputs from a FunctionSpec, optionally |
| seeded for reproducibility.""" |
| rng = random.Random(seed) |
| return spec.fuzzer(rng, count) |
|
|
|
|
| def get_edge_inputs(spec) -> List[Any]: |
| """Return the spec's must-pass edge inputs (empty list if the spec |
| predates the v0.3 schema).""" |
| return list(getattr(spec, "edge_cases", []) or []) |
|
|