| """TaskCatalog: resolve a target function from one of three sources. |
| |
| OpenSleuth Level 2 makes the env open-ended. Where v0.3 only knew about the |
| 9 hand-written ``BLACK_BOX_FUNCTIONS``, the catalog accepts targets from: |
| |
| 1. **Caller-supplied** -- per-/reset payload, the most specific source. |
| The caller passes ``target_code`` + ``target_function_name`` (and |
| optionally ``edge_cases`` / ``fuzz_spec``) and we compile the source |
| in the same hardened sandbox the verifier uses for submissions. |
| |
| 2. **Hub dataset** -- ``anugrah55/opensleuth-tasks`` on Hugging Face Hub. |
| Each row carries ``{name, signature, description, difficulty, |
| source_code, edge_cases_json, fuzz_spec_json}``. Loaded lazily on |
| first reset and cached in-process. |
| |
| 3. **Builtin registry** -- the original 9 ``BLACK_BOX_FUNCTIONS``. Kept |
| as the safety-net so the in-flight trainer keeps working unchanged. |
| |
| Resolution priority: caller-supplied wins, then Hub by name, then builtin. |
| This makes "trainer asks for fibonacci" still resolve to the builtin |
| fibonacci even when the Hub copy exists, *unless* the caller explicitly |
| overrides via ``target_code``. |
| |
| Sandbox: caller-supplied / Hub source code is executed via the same |
| ``_make_safe_globals`` whitelist as agent submissions (no ``__import__``, |
| ``open``, ``eval``, ...). On top we statically reject any source that |
| imports ``opensleuth_*`` to prevent oracle-cheesing. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import ast |
| import inspect |
| import json |
| import logging |
| import threading |
| from typing import Any, Callable, Dict, List, Optional |
|
|
| from .auto_fuzzer import auto_fuzz, make_fuzzer |
| from .black_box import BLACK_BOX_FUNCTIONS, FunctionSpec |
| from .verifier import _make_safe_globals |
|
|
| log = logging.getLogger("opensleuth.task_catalog") |
|
|
| HUB_DATASET_ID = "anugrah55/opensleuth-tasks" |
|
|
|
|
| class TaskResolutionError(ValueError): |
| """Raised when a /reset request can't be turned into a FunctionSpec.""" |
|
|
|
|
| |
| |
| |
|
|
|
|
| _FORBIDDEN_PREFIXES = ("opensleuth", "opensleuth_env") |
|
|
|
|
| def _statically_reject_oracle_import(code: str) -> Optional[str]: |
| """Return an error string if the source statically imports the env's own |
| oracle module (which would let the agent / Hub author cheese the |
| verifier). The hardened sandbox already blocks ``__import__``, but we |
| fail fast and surface a clear error. |
| """ |
| try: |
| tree = ast.parse(code) |
| except SyntaxError as e: |
| return f"target_code is not valid Python: {e}" |
| for node in ast.walk(tree): |
| if isinstance(node, ast.Import): |
| for alias in node.names: |
| if any(alias.name.startswith(p) for p in _FORBIDDEN_PREFIXES): |
| return ( |
| f"target_code is not allowed to import {alias.name!r} " |
| "(oracle import)." |
| ) |
| elif isinstance(node, ast.ImportFrom): |
| mod = node.module or "" |
| if any(mod.startswith(p) for p in _FORBIDDEN_PREFIXES): |
| return ( |
| f"target_code is not allowed to import from {mod!r} " |
| "(oracle import)." |
| ) |
| return None |
|
|
|
|
| def _compile_target_in_sandbox(code: str, function_name: str) -> Callable[..., Any]: |
| """Compile ``code`` in the same restricted globals the verifier uses for |
| agent submissions, then return the named callable. Raises |
| ``TaskResolutionError`` on any problem so /reset can return a clean 400. |
| """ |
| err = _statically_reject_oracle_import(code) |
| if err: |
| raise TaskResolutionError(err) |
| safe_globals = _make_safe_globals() |
| local_scope: Dict[str, Any] = {} |
| try: |
| exec(code, safe_globals, local_scope) |
| except Exception as e: |
| raise TaskResolutionError( |
| f"target_code raised at definition time: {type(e).__name__}: {e}" |
| ) from e |
| fn = local_scope.get(function_name) or safe_globals.get(function_name) |
| if not callable(fn): |
| raise TaskResolutionError( |
| f"target_code does not define a callable named {function_name!r}." |
| ) |
| return fn |
|
|
|
|
| def _arity_of(fn: Callable[..., Any]) -> int: |
| """Number of positional / positional-or-keyword params on ``fn``.""" |
| try: |
| sig = inspect.signature(fn) |
| except (TypeError, ValueError): |
| return 1 |
| n = 0 |
| for p in sig.parameters.values(): |
| if p.kind in ( |
| inspect.Parameter.POSITIONAL_ONLY, |
| inspect.Parameter.POSITIONAL_OR_KEYWORD, |
| ): |
| n += 1 |
| return max(n, 1) |
|
|
|
|
| def _signature_string(fn: Callable[..., Any], name: str) -> str: |
| try: |
| sig = inspect.signature(fn) |
| return f"{name}{sig}" |
| except (TypeError, ValueError): |
| return f"{name}(...)" |
|
|
|
|
| def _description_of(fn: Callable[..., Any]) -> str: |
| return inspect.getdoc(fn) or "" |
|
|
|
|
| def _parse_edge_cases(edge_cases: Optional[List[Any]]) -> List[Any]: |
| """Edge cases arrive as a list of strings (Python literal reprs) when |
| coming from the API or from the Hub's ``edge_cases_json`` column. Each |
| string is parsed via ``ast.literal_eval``. Already-parsed values |
| (e.g. ints from the bootstrap script) are passed through unchanged. |
| """ |
| if not edge_cases: |
| return [] |
| parsed: List[Any] = [] |
| for raw in edge_cases: |
| if isinstance(raw, str): |
| try: |
| parsed.append(ast.literal_eval(raw)) |
| except (ValueError, SyntaxError) as e: |
| raise TaskResolutionError( |
| f"edge_cases entry {raw!r} is not a Python literal: {e}" |
| ) from e |
| else: |
| parsed.append(raw) |
| return parsed |
|
|
|
|
| def _flatten_unary_edges(arity: int, edges: List[Any]) -> List[Any]: |
| """For unary fns we accept either ``[5, 10]`` or ``[(5,), (10,)]`` and |
| normalise to flat values; for multi-arg fns we require tuples and pass |
| them through.""" |
| if arity == 1: |
| out = [] |
| for e in edges: |
| if isinstance(e, tuple) and len(e) == 1: |
| out.append(e[0]) |
| else: |
| out.append(e) |
| return out |
| out = [] |
| for e in edges: |
| if not isinstance(e, tuple): |
| raise TaskResolutionError( |
| f"edge_cases for a {arity}-arg target must be tuples, " |
| f"got {type(e).__name__}: {e!r}" |
| ) |
| out.append(e) |
| return out |
|
|
|
|
| def _spec_from_callable( |
| name: str, |
| fn: Callable[..., Any], |
| *, |
| description: Optional[str] = None, |
| signature: Optional[str] = None, |
| difficulty: str = "medium", |
| edge_cases: Optional[List[Any]] = None, |
| fuzz_spec: Optional[Dict[str, Dict[str, Any]]] = None, |
| source: str = "user", |
| ) -> FunctionSpec: |
| """Build a FunctionSpec from a Python callable + optional metadata. |
| |
| Wraps ``auto_fuzz`` for the fuzzer. The arity is auto-detected from |
| ``inspect.signature`` so ``unpack_args`` is set correctly: unary fns |
| behave like the existing builtins (single-arg call), N-arg fns flow |
| through the tuple-unpacking path in env / verifier. |
| """ |
| arity = _arity_of(fn) |
| unpack = arity > 1 |
|
|
| parsed_edges = _flatten_unary_edges(arity, _parse_edge_cases(edge_cases)) |
|
|
| if unpack: |
| |
| |
| |
| |
| |
| ref_fn: Callable[..., Any] = fn |
|
|
| def _fuzzer(rng, n): |
| return auto_fuzz(fn, n, rng, fuzz_spec=fuzz_spec) |
|
|
| else: |
| ref_fn = fn |
|
|
| def _unary_fuzzer(rng, n): |
| tuples = auto_fuzz(fn, n, rng, fuzz_spec=fuzz_spec) |
| return [t[0] if isinstance(t, tuple) and len(t) == 1 else t for t in tuples] |
|
|
| _fuzzer = _unary_fuzzer |
|
|
| return FunctionSpec( |
| name=name, |
| fn=ref_fn, |
| signature=signature or _signature_string(fn, name), |
| description=description or _description_of(fn), |
| fuzzer=_fuzzer, |
| difficulty=difficulty, |
| edge_cases=parsed_edges, |
| unpack_args=unpack, |
| source=source, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class _HubCache: |
| """Lazily loads the Hub dataset into ``{name: FunctionSpec}``. Thread- |
| safe initialisation; subsequent reads are lock-free.""" |
|
|
| def __init__(self, dataset_id: str): |
| self.dataset_id = dataset_id |
| self._lock = threading.Lock() |
| self._loaded: bool = False |
| self._specs: Dict[str, FunctionSpec] = {} |
| self._raw_rows: List[Dict[str, Any]] = [] |
| self._load_error: Optional[str] = None |
|
|
| @property |
| def loaded(self) -> bool: |
| return self._loaded |
|
|
| @property |
| def load_error(self) -> Optional[str]: |
| return self._load_error |
|
|
| def _row_to_spec(self, row: Dict[str, Any]) -> Optional[FunctionSpec]: |
| name = row.get("name") |
| code = row.get("source_code") |
| if not name or not code: |
| return None |
| fn_name = row.get("target_function_name") or name |
| try: |
| fn = _compile_target_in_sandbox(code, fn_name) |
| except TaskResolutionError as e: |
| log.warning("hub task %r failed to compile: %s", name, e) |
| return None |
| edge_cases_raw = row.get("edge_cases_json") or "[]" |
| fuzz_spec_raw = row.get("fuzz_spec_json") or "null" |
| try: |
| edge_cases = json.loads(edge_cases_raw) if isinstance(edge_cases_raw, str) else edge_cases_raw |
| except json.JSONDecodeError: |
| edge_cases = [] |
| try: |
| fuzz_spec = json.loads(fuzz_spec_raw) if isinstance(fuzz_spec_raw, str) else fuzz_spec_raw |
| except json.JSONDecodeError: |
| fuzz_spec = None |
| try: |
| return _spec_from_callable( |
| name=name, |
| fn=fn, |
| description=row.get("description") or _description_of(fn), |
| signature=row.get("signature") or _signature_string(fn, name), |
| difficulty=row.get("difficulty") or "medium", |
| edge_cases=edge_cases, |
| fuzz_spec=fuzz_spec, |
| source="hub", |
| ) |
| except TaskResolutionError as e: |
| log.warning("hub task %r could not be specced: %s", name, e) |
| return None |
|
|
| def ensure_loaded(self) -> None: |
| if self._loaded: |
| return |
| with self._lock: |
| if self._loaded: |
| return |
| try: |
| from datasets import load_dataset |
|
|
| ds = load_dataset(self.dataset_id, split="train") |
| rows = list(ds) |
| specs: Dict[str, FunctionSpec] = {} |
| for row in rows: |
| spec = self._row_to_spec(row) |
| if spec is not None: |
| specs[spec.name] = spec |
| self._specs = specs |
| self._raw_rows = rows |
| log.info( |
| "loaded %d task(s) from %s (%d row(s) total)", |
| len(specs), |
| self.dataset_id, |
| len(rows), |
| ) |
| except Exception as e: |
| |
| |
| |
| |
| self._load_error = f"{type(e).__name__}: {e}" |
| log.warning("hub dataset %s unavailable: %s", self.dataset_id, self._load_error) |
| finally: |
| self._loaded = True |
|
|
| def specs(self) -> Dict[str, FunctionSpec]: |
| self.ensure_loaded() |
| return self._specs |
|
|
| def rows(self) -> List[Dict[str, Any]]: |
| self.ensure_loaded() |
| return self._raw_rows |
|
|
|
|
| |
| |
| |
|
|
|
|
| class TaskCatalog: |
| """Resolves /reset payloads to FunctionSpecs from caller / Hub / builtin.""" |
|
|
| def __init__( |
| self, |
| hub_dataset_id: str = HUB_DATASET_ID, |
| *, |
| enable_hub: bool = True, |
| ) -> None: |
| self.hub_dataset_id = hub_dataset_id |
| self.enable_hub = enable_hub |
| self._hub = _HubCache(hub_dataset_id) if enable_hub else None |
|
|
| |
|
|
| def resolve( |
| self, |
| target_name: Optional[str] = None, |
| target_code: Optional[str] = None, |
| target_function_name: Optional[str] = None, |
| edge_cases: Optional[List[Any]] = None, |
| fuzz_spec: Optional[Dict[str, Dict[str, Any]]] = None, |
| ) -> FunctionSpec: |
| |
| if target_code is not None: |
| if not target_function_name: |
| raise TaskResolutionError( |
| "target_code requires target_function_name to identify " |
| "which callable in the source to use." |
| ) |
| fn = _compile_target_in_sandbox(target_code, target_function_name) |
| return _spec_from_callable( |
| name=target_function_name, |
| fn=fn, |
| edge_cases=edge_cases, |
| fuzz_spec=fuzz_spec, |
| source="user", |
| ) |
|
|
| |
| |
| |
| if not target_name: |
| raise TaskResolutionError( |
| "Either target_name or (target_code + target_function_name) must be set." |
| ) |
|
|
| if target_name in BLACK_BOX_FUNCTIONS: |
| return BLACK_BOX_FUNCTIONS[target_name] |
|
|
| if self._hub is not None: |
| hub_specs = self._hub.specs() |
| if target_name in hub_specs: |
| return hub_specs[target_name] |
|
|
| available = self.list_known_names() |
| raise TaskResolutionError( |
| f"Unknown target function: {target_name!r}. Available: {sorted(available)[:25]}" |
| ) |
|
|
| |
|
|
| def list_known_names(self) -> List[str]: |
| names = set(BLACK_BOX_FUNCTIONS) |
| if self._hub is not None: |
| try: |
| names.update(self._hub.specs()) |
| except Exception: |
| pass |
| return sorted(names) |
|
|
| def list_builtin(self) -> List[Dict[str, Any]]: |
| return [ |
| { |
| "name": s.name, |
| "signature": s.signature, |
| "description": s.description, |
| "difficulty": s.difficulty, |
| "edge_case_count": len(s.edge_cases or []), |
| "source": "builtin", |
| } |
| for s in BLACK_BOX_FUNCTIONS.values() |
| ] |
|
|
| def list_hub(self) -> List[Dict[str, Any]]: |
| if self._hub is None: |
| return [] |
| out = [] |
| for s in self._hub.specs().values(): |
| |
| |
| if s.name in BLACK_BOX_FUNCTIONS: |
| continue |
| out.append( |
| { |
| "name": s.name, |
| "signature": s.signature, |
| "description": s.description, |
| "difficulty": s.difficulty, |
| "edge_case_count": len(s.edge_cases or []), |
| "source": "hub", |
| } |
| ) |
| return out |
|
|
| def list_all(self) -> List[Dict[str, Any]]: |
| return self.list_builtin() + self.list_hub() |
|
|
| |
|
|
| def hub_status(self) -> Dict[str, Any]: |
| if self._hub is None: |
| return {"enabled": False} |
| return { |
| "enabled": True, |
| "dataset_id": self.hub_dataset_id, |
| "loaded": self._hub.loaded, |
| "task_count": len(self._hub.specs()) if self._hub.loaded else None, |
| "error": self._hub.load_error, |
| } |
|
|