opensleuth-env-gemini-cli / opensleuth_env /scripts /bootstrap_tasks_dataset.py
anugrah55's picture
Level 2 open-ended env: auto-fuzzer + TaskCatalog + Hub-driven catalog + extended /reset
77e65fb verified
"""Bootstrap / refresh the OpenSleuth Hub task catalog.
Idempotently creates ``anugrah55/opensleuth-tasks`` and pushes:
* The 9 builtin BLACK_BOX_FUNCTIONS as rows (so the dataset is non-empty
for testing and so the trainer's curriculum has parity with the
in-process oracle), and
* 6 brand-new tasks (``roman_to_int``, ``levenshtein_distance``,
``flatten_list``, ``merge_sorted``, ``run_length_encode``,
``binary_search``) that aren't in BLACK_BOX_FUNCTIONS, exercising
multi-arg and unannotated cases the auto-fuzzer must handle.
Each row is::
{
"name": str,
"target_function_name": str, # which fn inside source_code
"signature": str,
"description": str,
"difficulty": "easy"|"medium"|"hard",
"source_code": str, # standalone Python; NO oracle imports
"edge_cases_json": str, # JSON list of literal-repr strings
"fuzz_spec_json": str, # JSON dict or "null"
}
Run::
cd env && PYTHONPATH=. ../.venv/bin/python -m opensleuth_env.scripts.bootstrap_tasks_dataset
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from typing import Any, Dict, List, Optional
from opensleuth_env.black_box import BLACK_BOX_FUNCTIONS
log = logging.getLogger("opensleuth.bootstrap")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
DATASET_ID = "anugrah55/opensleuth-tasks"
# ---------------------------------------------------------------------------
# Oracle source code for the 9 builtins (self-contained -- no opensleuth_*
# imports, so the catalog's static reject filter accepts them).
# ---------------------------------------------------------------------------
_BUILTIN_SOURCE: Dict[str, Dict[str, Any]] = {
"fibonacci": {
"target_function_name": "fibonacci",
"source_code": (
"def fibonacci(n):\n"
" if not isinstance(n, int) or isinstance(n, bool) or n <= 0 or n > 90:\n"
" raise ValueError('Input must be a positive integer <= 90.')\n"
" a, b = 0, 1\n"
" for _ in range(n - 1):\n"
" a, b = b, a + b\n"
" return b if n > 0 else a\n"
),
"edge_cases": ["1", "2", "3", "10", "89", "90"],
"fuzz_spec": {"n": {"type": "int", "min": 1, "max": 90}},
},
"reverse_string": {
"target_function_name": "reverse_string",
"source_code": (
"def reverse_string(s):\n"
" if not isinstance(s, str):\n"
" raise TypeError('Input must be a string.')\n"
" return s[::-1]\n"
),
"edge_cases": ['""', '"a"', '"ab"', '"racecar"', '"Hello, World!"'],
"fuzz_spec": {"s": {"type": "str", "max_len": 12}},
},
"is_palindrome": {
"target_function_name": "is_palindrome",
"source_code": (
"def is_palindrome(s):\n"
" if not isinstance(s, str):\n"
" raise TypeError('Input must be a string.')\n"
" cleaned = ''.join(ch.lower() for ch in s if ch.isalnum())\n"
" return cleaned == cleaned[::-1]\n"
),
"edge_cases": [
'""', '"a"', '"ab"', '"abba"',
"\"A man, a plan, a canal: Panama\"", '"Hello"',
],
"fuzz_spec": {"s": {"type": "str", "max_len": 12}},
},
"digit_sum": {
"target_function_name": "digit_sum",
"source_code": (
"def digit_sum(n):\n"
" if not isinstance(n, int) or isinstance(n, bool):\n"
" raise TypeError('Input must be int.')\n"
" if n < 0:\n"
" raise ValueError('Input must be non-negative.')\n"
" return sum(int(c) for c in str(n))\n"
),
"edge_cases": ["0", "1", "9", "10", "99", "100", "9999"],
"fuzz_spec": {"n": {"type": "int", "min": 0, "max": 10000}},
},
"count_vowels": {
"target_function_name": "count_vowels",
"source_code": (
"def count_vowels(s):\n"
" if not isinstance(s, str):\n"
" raise TypeError('Input must be a string.')\n"
" return sum(1 for c in s.lower() if c in 'aeiou')\n"
),
"edge_cases": ['""', '"bcd"', '"AEIOU"', '"Hello, World!"', '"aaaaa"'],
"fuzz_spec": {"s": {"type": "str", "max_len": 16}},
},
"gcd": {
"target_function_name": "gcd",
"source_code": (
"def gcd(pair):\n"
" if not isinstance(pair, (list, tuple)) or len(pair) != 2:\n"
" raise TypeError('Input must be a 2-element list or tuple.')\n"
" a, b = pair\n"
" if not all(isinstance(x, int) and not isinstance(x, bool) for x in (a, b)):\n"
" raise TypeError('Both elements must be int.')\n"
" if a < 0 or b < 0:\n"
" raise ValueError('Both elements must be non-negative.')\n"
" while b:\n"
" a, b = b, a % b\n"
" return a\n"
),
"edge_cases": ["(0, 0)", "(0, 7)", "(12, 18)", "(17, 13)", "(100, 75)"],
"fuzz_spec": {
"pair": {
"type": "tuple",
"elems": [{"type": "int", "min": 0, "max": 1000}, {"type": "int", "min": 0, "max": 1000}],
}
},
},
"sort_unique": {
"target_function_name": "sort_unique",
"source_code": (
"def sort_unique(xs):\n"
" if not isinstance(xs, list):\n"
" raise TypeError('Input must be a list.')\n"
" if not all(isinstance(x, int) and not isinstance(x, bool) for x in xs):\n"
" raise TypeError('All elements must be int.')\n"
" return sorted(set(xs))\n"
),
"edge_cases": ["[]", "[1]", "[1, 1, 1]", "[3, 1, 2]", "[-5, 5, 0, -5, 5]"],
"fuzz_spec": {"xs": {"type": "list", "elem": {"type": "int", "min": -50, "max": 50}, "max_len": 8}},
},
"caesar_cipher": {
"target_function_name": "caesar_cipher",
"source_code": (
"def caesar_cipher(s):\n"
" if not isinstance(s, str):\n"
" raise TypeError('Input must be a string.')\n"
" out = []\n"
" for ch in s:\n"
" if 'a' <= ch <= 'z':\n"
" out.append(chr((ord(ch) - ord('a') + 3) % 26 + ord('a')))\n"
" else:\n"
" out.append(ch)\n"
" return ''.join(out)\n"
),
"edge_cases": ['""', '"abc"', '"xyz"', '"Hello, World!"', '"ABC"', '"hello world"'],
"fuzz_spec": {"s": {"type": "str", "max_len": 16}},
},
"is_prime": {
"target_function_name": "is_prime",
"source_code": (
"def is_prime(n):\n"
" if not isinstance(n, int) or isinstance(n, bool):\n"
" raise TypeError('Input must be int.')\n"
" if n < 2:\n"
" return False\n"
" if n < 4:\n"
" return True\n"
" if n % 2 == 0:\n"
" return False\n"
" i = 3\n"
" while i * i <= n:\n"
" if n % i == 0:\n"
" return False\n"
" i += 2\n"
" return True\n"
),
"edge_cases": ["0", "1", "2", "3", "4", "17", "25", "97", "100"],
"fuzz_spec": {"n": {"type": "int", "min": 0, "max": 200}},
},
}
# ---------------------------------------------------------------------------
# Six new tasks. These exercise auto-fuzzer features the builtins didn't:
# * multi-arg signatures (binary_search, merge_sorted, levenshtein_distance)
# * Optional / Literal hint coverage (run_length_encode -> list[tuple[str, int]])
# * unannotated containers (flatten_list)
# ---------------------------------------------------------------------------
_NEW_TASK_ROWS: List[Dict[str, Any]] = [
{
"name": "roman_to_int",
"target_function_name": "roman_to_int",
"signature": "roman_to_int(s: str) -> int",
"description": (
"Parse a roman numeral string into its integer value. "
"Raises ValueError for non-roman characters. Subtraction "
"rules (IV=4, IX=9, XL=40, ...) are honoured. Empty -> 0."
),
"difficulty": "medium",
"source_code": (
"def roman_to_int(s: str) -> int:\n"
" if not isinstance(s, str):\n"
" raise TypeError('input must be str')\n"
" table = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}\n"
" total = 0\n"
" prev = 0\n"
" for ch in reversed(s.upper()):\n"
" if ch not in table:\n"
" raise ValueError(f'invalid roman numeral character: {ch!r}')\n"
" v = table[ch]\n"
" if v < prev:\n"
" total -= v\n"
" else:\n"
" total += v\n"
" prev = v\n"
" return total\n"
),
"edge_cases": ['""', '"I"', '"IV"', '"IX"', '"LVIII"', '"MCMXCIV"', '"MMXXIV"'],
"fuzz_spec": {"s": {"type": "str", "alphabet": "IVXLCDM", "max_len": 8}},
},
{
"name": "levenshtein_distance",
"target_function_name": "levenshtein_distance",
"signature": "levenshtein_distance(a: str, b: str) -> int",
"description": (
"Classic edit distance between two strings: minimum number of "
"single-character insertions, deletions, or substitutions to "
"transform a into b. Both arguments must be str."
),
"difficulty": "hard",
"source_code": (
"def levenshtein_distance(a: str, b: str) -> int:\n"
" if not isinstance(a, str) or not isinstance(b, str):\n"
" raise TypeError('both arguments must be str')\n"
" if a == b:\n"
" return 0\n"
" if not a:\n"
" return len(b)\n"
" if not b:\n"
" return len(a)\n"
" prev = list(range(len(b) + 1))\n"
" for i, ca in enumerate(a, 1):\n"
" cur = [i] + [0] * len(b)\n"
" for j, cb in enumerate(b, 1):\n"
" ins = cur[j-1] + 1\n"
" dele = prev[j] + 1\n"
" sub = prev[j-1] + (ca != cb)\n"
" cur[j] = min(ins, dele, sub)\n"
" prev = cur\n"
" return prev[-1]\n"
),
"edge_cases": [
'("", "")', '("a", "")', '("", "a")', '("kitten", "sitting")',
'("flaw", "lawn")', '("abc", "abc")',
],
"fuzz_spec": {
"a": {"type": "str", "alphabet": "abc", "max_len": 6},
"b": {"type": "str", "alphabet": "abc", "max_len": 6},
},
},
{
"name": "flatten_list",
"target_function_name": "flatten_list",
"signature": "flatten_list(xs: list) -> list",
"description": (
"Recursively flatten a nested list of arbitrary depth. Tuples "
"are also flattened; non-list/tuple atoms (ints, strs, ...) "
"pass through unchanged."
),
"difficulty": "medium",
"source_code": (
"def flatten_list(xs):\n"
" if not isinstance(xs, (list, tuple)):\n"
" raise TypeError('input must be list or tuple')\n"
" out = []\n"
" stack = list(xs)\n"
" # iterative DFS to avoid recursion limits on adversarial input\n"
" rev = []\n"
" rev.extend(reversed(stack))\n"
" while rev:\n"
" x = rev.pop()\n"
" if isinstance(x, (list, tuple)):\n"
" for y in reversed(x):\n"
" rev.append(y)\n"
" else:\n"
" out.append(x)\n"
" return out\n"
),
"edge_cases": [
"[]", "[1]", "[[1, 2], [3, 4]]",
"[1, [2, [3, [4, [5]]]]]", "[[], [], 1]",
],
"fuzz_spec": {
"xs": {
"type": "list",
"elem": {"type": "int", "min": -10, "max": 10},
"max_len": 6,
}
},
},
{
"name": "merge_sorted",
"target_function_name": "merge_sorted",
"signature": "merge_sorted(a: list[int], b: list[int]) -> list[int]",
"description": (
"Merge two pre-sorted lists of ints into a single sorted list. "
"Both arguments must be lists; elements must be ints (bools "
"rejected). The classic merge step of merge-sort."
),
"difficulty": "medium",
"source_code": (
"def merge_sorted(a, b):\n"
" if not isinstance(a, list) or not isinstance(b, list):\n"
" raise TypeError('both arguments must be list')\n"
" for x in (*a, *b):\n"
" if not isinstance(x, int) or isinstance(x, bool):\n"
" raise TypeError('elements must be int')\n"
" out = []\n"
" i = j = 0\n"
" while i < len(a) and j < len(b):\n"
" if a[i] <= b[j]:\n"
" out.append(a[i]); i += 1\n"
" else:\n"
" out.append(b[j]); j += 1\n"
" out.extend(a[i:])\n"
" out.extend(b[j:])\n"
" return out\n"
),
"edge_cases": [
"([], [])", "([1, 2, 3], [])", "([], [1, 2, 3])",
"([1, 3, 5], [2, 4, 6])", "([1, 1], [1, 1])",
],
"fuzz_spec": {
"a": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5},
"b": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5},
},
},
{
"name": "run_length_encode",
"target_function_name": "run_length_encode",
"signature": "run_length_encode(s: str) -> list[tuple[str, int]]",
"description": (
"Run-length encoding: returns a list of (character, count) "
"tuples for each run of identical characters in s. Empty "
"input yields an empty list."
),
"difficulty": "easy",
"source_code": (
"def run_length_encode(s):\n"
" if not isinstance(s, str):\n"
" raise TypeError('input must be str')\n"
" if not s:\n"
" return []\n"
" out = []\n"
" cur = s[0]\n"
" n = 1\n"
" for ch in s[1:]:\n"
" if ch == cur:\n"
" n += 1\n"
" else:\n"
" out.append((cur, n))\n"
" cur = ch\n"
" n = 1\n"
" out.append((cur, n))\n"
" return out\n"
),
"edge_cases": ['""', '"a"', '"aa"', '"abc"', '"aaabbbccc"', '"aaaaaaaaaa"'],
"fuzz_spec": {"s": {"type": "str", "alphabet": "ab", "max_len": 12}},
},
{
"name": "binary_search",
"target_function_name": "binary_search",
"signature": "binary_search(arr: list[int], target: int) -> int",
"description": (
"Return the index of target in the sorted ascending list arr, "
"or -1 if not present. arr must be a list of ints; target "
"must be int. The list is assumed sorted."
),
"difficulty": "medium",
"source_code": (
"def binary_search(arr, target):\n"
" if not isinstance(arr, list):\n"
" raise TypeError('arr must be list')\n"
" if not isinstance(target, int) or isinstance(target, bool):\n"
" raise TypeError('target must be int')\n"
" lo, hi = 0, len(arr) - 1\n"
" while lo <= hi:\n"
" mid = (lo + hi) // 2\n"
" v = arr[mid]\n"
" if v == target:\n"
" return mid\n"
" if v < target:\n"
" lo = mid + 1\n"
" else:\n"
" hi = mid - 1\n"
" return -1\n"
),
"edge_cases": [
"([], 3)", "([1], 1)", "([1], 2)",
"([1, 2, 3, 4, 5], 3)", "([1, 2, 3, 4, 5], 0)",
"([1, 2, 3, 4, 5], 6)",
],
"fuzz_spec": {
"arr": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 8},
"target": {"type": "int", "min": -20, "max": 20},
},
},
]
def _builtin_to_row(name: str) -> Dict[str, Any]:
spec = BLACK_BOX_FUNCTIONS[name]
src_meta = _BUILTIN_SOURCE[name]
return {
"name": name,
"target_function_name": src_meta["target_function_name"],
"signature": spec.signature,
"description": spec.description,
"difficulty": spec.difficulty,
"source_code": src_meta["source_code"],
"edge_cases_json": json.dumps(src_meta["edge_cases"]),
"fuzz_spec_json": json.dumps(src_meta["fuzz_spec"]),
}
def _new_task_to_row(meta: Dict[str, Any]) -> Dict[str, Any]:
return {
"name": meta["name"],
"target_function_name": meta["target_function_name"],
"signature": meta["signature"],
"description": meta["description"],
"difficulty": meta["difficulty"],
"source_code": meta["source_code"],
"edge_cases_json": json.dumps(meta["edge_cases"]),
"fuzz_spec_json": json.dumps(meta["fuzz_spec"]),
}
def build_rows() -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for name in BLACK_BOX_FUNCTIONS:
rows.append(_builtin_to_row(name))
for meta in _NEW_TASK_ROWS:
rows.append(_new_task_to_row(meta))
return rows
def push_to_hub(rows: List[Dict[str, Any]], dataset_id: str, *, private: bool = False) -> str:
"""Push the row list to ``dataset_id`` (overwriting any prior contents).
Returns the hub URL.
"""
from datasets import Dataset
from huggingface_hub import HfApi
api = HfApi()
api.create_repo(
repo_id=dataset_id,
repo_type="dataset",
exist_ok=True,
private=private,
)
ds = Dataset.from_list(rows)
log.info("pushing %d row(s) to %s", len(rows), dataset_id)
ds.push_to_hub(dataset_id, split="train", private=private)
return f"https://huggingface.co/datasets/{dataset_id}"
def main(argv: Optional[List[str]] = None) -> int:
p = argparse.ArgumentParser(description="Bootstrap the OpenSleuth Hub task catalog.")
p.add_argument("--dataset-id", default=DATASET_ID)
p.add_argument("--dry-run", action="store_true", help="Print row count, don't push.")
p.add_argument("--private", action="store_true", help="Create as private dataset.")
args = p.parse_args(argv)
rows = build_rows()
log.info("built %d row(s) (%d builtin + %d new)",
len(rows), len(BLACK_BOX_FUNCTIONS), len(_NEW_TASK_ROWS))
for r in rows:
log.info(" %-22s difficulty=%-6s edges=%-2d",
r["name"], r["difficulty"], len(json.loads(r["edge_cases_json"])))
if args.dry_run:
log.info("--dry-run: not pushing")
return 0
url = push_to_hub(rows, args.dataset_id, private=args.private)
log.info("dataset live at %s", url)
return 0
if __name__ == "__main__":
sys.exit(main())