"""Bootstrap / refresh the OpenSleuth Hub task catalog. Idempotently creates ``anugrah55/opensleuth-tasks`` and pushes: * The 9 builtin BLACK_BOX_FUNCTIONS as rows (so the dataset is non-empty for testing and so the trainer's curriculum has parity with the in-process oracle), and * 6 brand-new tasks (``roman_to_int``, ``levenshtein_distance``, ``flatten_list``, ``merge_sorted``, ``run_length_encode``, ``binary_search``) that aren't in BLACK_BOX_FUNCTIONS, exercising multi-arg and unannotated cases the auto-fuzzer must handle. Each row is:: { "name": str, "target_function_name": str, # which fn inside source_code "signature": str, "description": str, "difficulty": "easy"|"medium"|"hard", "source_code": str, # standalone Python; NO oracle imports "edge_cases_json": str, # JSON list of literal-repr strings "fuzz_spec_json": str, # JSON dict or "null" } Run:: cd env && PYTHONPATH=. ../.venv/bin/python -m opensleuth_env.scripts.bootstrap_tasks_dataset """ from __future__ import annotations import argparse import json import logging import sys from typing import Any, Dict, List, Optional from opensleuth_env.black_box import BLACK_BOX_FUNCTIONS log = logging.getLogger("opensleuth.bootstrap") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") DATASET_ID = "anugrah55/opensleuth-tasks" # --------------------------------------------------------------------------- # Oracle source code for the 9 builtins (self-contained -- no opensleuth_* # imports, so the catalog's static reject filter accepts them). # --------------------------------------------------------------------------- _BUILTIN_SOURCE: Dict[str, Dict[str, Any]] = { "fibonacci": { "target_function_name": "fibonacci", "source_code": ( "def fibonacci(n):\n" " if not isinstance(n, int) or isinstance(n, bool) or n <= 0 or n > 90:\n" " raise ValueError('Input must be a positive integer <= 90.')\n" " a, b = 0, 1\n" " for _ in range(n - 1):\n" " a, b = b, a + b\n" " return b if n > 0 else a\n" ), "edge_cases": ["1", "2", "3", "10", "89", "90"], "fuzz_spec": {"n": {"type": "int", "min": 1, "max": 90}}, }, "reverse_string": { "target_function_name": "reverse_string", "source_code": ( "def reverse_string(s):\n" " if not isinstance(s, str):\n" " raise TypeError('Input must be a string.')\n" " return s[::-1]\n" ), "edge_cases": ['""', '"a"', '"ab"', '"racecar"', '"Hello, World!"'], "fuzz_spec": {"s": {"type": "str", "max_len": 12}}, }, "is_palindrome": { "target_function_name": "is_palindrome", "source_code": ( "def is_palindrome(s):\n" " if not isinstance(s, str):\n" " raise TypeError('Input must be a string.')\n" " cleaned = ''.join(ch.lower() for ch in s if ch.isalnum())\n" " return cleaned == cleaned[::-1]\n" ), "edge_cases": [ '""', '"a"', '"ab"', '"abba"', "\"A man, a plan, a canal: Panama\"", '"Hello"', ], "fuzz_spec": {"s": {"type": "str", "max_len": 12}}, }, "digit_sum": { "target_function_name": "digit_sum", "source_code": ( "def digit_sum(n):\n" " if not isinstance(n, int) or isinstance(n, bool):\n" " raise TypeError('Input must be int.')\n" " if n < 0:\n" " raise ValueError('Input must be non-negative.')\n" " return sum(int(c) for c in str(n))\n" ), "edge_cases": ["0", "1", "9", "10", "99", "100", "9999"], "fuzz_spec": {"n": {"type": "int", "min": 0, "max": 10000}}, }, "count_vowels": { "target_function_name": "count_vowels", "source_code": ( "def count_vowels(s):\n" " if not isinstance(s, str):\n" " raise TypeError('Input must be a string.')\n" " return sum(1 for c in s.lower() if c in 'aeiou')\n" ), "edge_cases": ['""', '"bcd"', '"AEIOU"', '"Hello, World!"', '"aaaaa"'], "fuzz_spec": {"s": {"type": "str", "max_len": 16}}, }, "gcd": { "target_function_name": "gcd", "source_code": ( "def gcd(pair):\n" " if not isinstance(pair, (list, tuple)) or len(pair) != 2:\n" " raise TypeError('Input must be a 2-element list or tuple.')\n" " a, b = pair\n" " if not all(isinstance(x, int) and not isinstance(x, bool) for x in (a, b)):\n" " raise TypeError('Both elements must be int.')\n" " if a < 0 or b < 0:\n" " raise ValueError('Both elements must be non-negative.')\n" " while b:\n" " a, b = b, a % b\n" " return a\n" ), "edge_cases": ["(0, 0)", "(0, 7)", "(12, 18)", "(17, 13)", "(100, 75)"], "fuzz_spec": { "pair": { "type": "tuple", "elems": [{"type": "int", "min": 0, "max": 1000}, {"type": "int", "min": 0, "max": 1000}], } }, }, "sort_unique": { "target_function_name": "sort_unique", "source_code": ( "def sort_unique(xs):\n" " if not isinstance(xs, list):\n" " raise TypeError('Input must be a list.')\n" " if not all(isinstance(x, int) and not isinstance(x, bool) for x in xs):\n" " raise TypeError('All elements must be int.')\n" " return sorted(set(xs))\n" ), "edge_cases": ["[]", "[1]", "[1, 1, 1]", "[3, 1, 2]", "[-5, 5, 0, -5, 5]"], "fuzz_spec": {"xs": {"type": "list", "elem": {"type": "int", "min": -50, "max": 50}, "max_len": 8}}, }, "caesar_cipher": { "target_function_name": "caesar_cipher", "source_code": ( "def caesar_cipher(s):\n" " if not isinstance(s, str):\n" " raise TypeError('Input must be a string.')\n" " out = []\n" " for ch in s:\n" " if 'a' <= ch <= 'z':\n" " out.append(chr((ord(ch) - ord('a') + 3) % 26 + ord('a')))\n" " else:\n" " out.append(ch)\n" " return ''.join(out)\n" ), "edge_cases": ['""', '"abc"', '"xyz"', '"Hello, World!"', '"ABC"', '"hello world"'], "fuzz_spec": {"s": {"type": "str", "max_len": 16}}, }, "is_prime": { "target_function_name": "is_prime", "source_code": ( "def is_prime(n):\n" " if not isinstance(n, int) or isinstance(n, bool):\n" " raise TypeError('Input must be int.')\n" " if n < 2:\n" " return False\n" " if n < 4:\n" " return True\n" " if n % 2 == 0:\n" " return False\n" " i = 3\n" " while i * i <= n:\n" " if n % i == 0:\n" " return False\n" " i += 2\n" " return True\n" ), "edge_cases": ["0", "1", "2", "3", "4", "17", "25", "97", "100"], "fuzz_spec": {"n": {"type": "int", "min": 0, "max": 200}}, }, } # --------------------------------------------------------------------------- # Six new tasks. These exercise auto-fuzzer features the builtins didn't: # * multi-arg signatures (binary_search, merge_sorted, levenshtein_distance) # * Optional / Literal hint coverage (run_length_encode -> list[tuple[str, int]]) # * unannotated containers (flatten_list) # --------------------------------------------------------------------------- _NEW_TASK_ROWS: List[Dict[str, Any]] = [ { "name": "roman_to_int", "target_function_name": "roman_to_int", "signature": "roman_to_int(s: str) -> int", "description": ( "Parse a roman numeral string into its integer value. " "Raises ValueError for non-roman characters. Subtraction " "rules (IV=4, IX=9, XL=40, ...) are honoured. Empty -> 0." ), "difficulty": "medium", "source_code": ( "def roman_to_int(s: str) -> int:\n" " if not isinstance(s, str):\n" " raise TypeError('input must be str')\n" " table = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}\n" " total = 0\n" " prev = 0\n" " for ch in reversed(s.upper()):\n" " if ch not in table:\n" " raise ValueError(f'invalid roman numeral character: {ch!r}')\n" " v = table[ch]\n" " if v < prev:\n" " total -= v\n" " else:\n" " total += v\n" " prev = v\n" " return total\n" ), "edge_cases": ['""', '"I"', '"IV"', '"IX"', '"LVIII"', '"MCMXCIV"', '"MMXXIV"'], "fuzz_spec": {"s": {"type": "str", "alphabet": "IVXLCDM", "max_len": 8}}, }, { "name": "levenshtein_distance", "target_function_name": "levenshtein_distance", "signature": "levenshtein_distance(a: str, b: str) -> int", "description": ( "Classic edit distance between two strings: minimum number of " "single-character insertions, deletions, or substitutions to " "transform a into b. Both arguments must be str." ), "difficulty": "hard", "source_code": ( "def levenshtein_distance(a: str, b: str) -> int:\n" " if not isinstance(a, str) or not isinstance(b, str):\n" " raise TypeError('both arguments must be str')\n" " if a == b:\n" " return 0\n" " if not a:\n" " return len(b)\n" " if not b:\n" " return len(a)\n" " prev = list(range(len(b) + 1))\n" " for i, ca in enumerate(a, 1):\n" " cur = [i] + [0] * len(b)\n" " for j, cb in enumerate(b, 1):\n" " ins = cur[j-1] + 1\n" " dele = prev[j] + 1\n" " sub = prev[j-1] + (ca != cb)\n" " cur[j] = min(ins, dele, sub)\n" " prev = cur\n" " return prev[-1]\n" ), "edge_cases": [ '("", "")', '("a", "")', '("", "a")', '("kitten", "sitting")', '("flaw", "lawn")', '("abc", "abc")', ], "fuzz_spec": { "a": {"type": "str", "alphabet": "abc", "max_len": 6}, "b": {"type": "str", "alphabet": "abc", "max_len": 6}, }, }, { "name": "flatten_list", "target_function_name": "flatten_list", "signature": "flatten_list(xs: list) -> list", "description": ( "Recursively flatten a nested list of arbitrary depth. Tuples " "are also flattened; non-list/tuple atoms (ints, strs, ...) " "pass through unchanged." ), "difficulty": "medium", "source_code": ( "def flatten_list(xs):\n" " if not isinstance(xs, (list, tuple)):\n" " raise TypeError('input must be list or tuple')\n" " out = []\n" " stack = list(xs)\n" " # iterative DFS to avoid recursion limits on adversarial input\n" " rev = []\n" " rev.extend(reversed(stack))\n" " while rev:\n" " x = rev.pop()\n" " if isinstance(x, (list, tuple)):\n" " for y in reversed(x):\n" " rev.append(y)\n" " else:\n" " out.append(x)\n" " return out\n" ), "edge_cases": [ "[]", "[1]", "[[1, 2], [3, 4]]", "[1, [2, [3, [4, [5]]]]]", "[[], [], 1]", ], "fuzz_spec": { "xs": { "type": "list", "elem": {"type": "int", "min": -10, "max": 10}, "max_len": 6, } }, }, { "name": "merge_sorted", "target_function_name": "merge_sorted", "signature": "merge_sorted(a: list[int], b: list[int]) -> list[int]", "description": ( "Merge two pre-sorted lists of ints into a single sorted list. " "Both arguments must be lists; elements must be ints (bools " "rejected). The classic merge step of merge-sort." ), "difficulty": "medium", "source_code": ( "def merge_sorted(a, b):\n" " if not isinstance(a, list) or not isinstance(b, list):\n" " raise TypeError('both arguments must be list')\n" " for x in (*a, *b):\n" " if not isinstance(x, int) or isinstance(x, bool):\n" " raise TypeError('elements must be int')\n" " out = []\n" " i = j = 0\n" " while i < len(a) and j < len(b):\n" " if a[i] <= b[j]:\n" " out.append(a[i]); i += 1\n" " else:\n" " out.append(b[j]); j += 1\n" " out.extend(a[i:])\n" " out.extend(b[j:])\n" " return out\n" ), "edge_cases": [ "([], [])", "([1, 2, 3], [])", "([], [1, 2, 3])", "([1, 3, 5], [2, 4, 6])", "([1, 1], [1, 1])", ], "fuzz_spec": { "a": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5}, "b": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5}, }, }, { "name": "run_length_encode", "target_function_name": "run_length_encode", "signature": "run_length_encode(s: str) -> list[tuple[str, int]]", "description": ( "Run-length encoding: returns a list of (character, count) " "tuples for each run of identical characters in s. Empty " "input yields an empty list." ), "difficulty": "easy", "source_code": ( "def run_length_encode(s):\n" " if not isinstance(s, str):\n" " raise TypeError('input must be str')\n" " if not s:\n" " return []\n" " out = []\n" " cur = s[0]\n" " n = 1\n" " for ch in s[1:]:\n" " if ch == cur:\n" " n += 1\n" " else:\n" " out.append((cur, n))\n" " cur = ch\n" " n = 1\n" " out.append((cur, n))\n" " return out\n" ), "edge_cases": ['""', '"a"', '"aa"', '"abc"', '"aaabbbccc"', '"aaaaaaaaaa"'], "fuzz_spec": {"s": {"type": "str", "alphabet": "ab", "max_len": 12}}, }, { "name": "binary_search", "target_function_name": "binary_search", "signature": "binary_search(arr: list[int], target: int) -> int", "description": ( "Return the index of target in the sorted ascending list arr, " "or -1 if not present. arr must be a list of ints; target " "must be int. The list is assumed sorted." ), "difficulty": "medium", "source_code": ( "def binary_search(arr, target):\n" " if not isinstance(arr, list):\n" " raise TypeError('arr must be list')\n" " if not isinstance(target, int) or isinstance(target, bool):\n" " raise TypeError('target must be int')\n" " lo, hi = 0, len(arr) - 1\n" " while lo <= hi:\n" " mid = (lo + hi) // 2\n" " v = arr[mid]\n" " if v == target:\n" " return mid\n" " if v < target:\n" " lo = mid + 1\n" " else:\n" " hi = mid - 1\n" " return -1\n" ), "edge_cases": [ "([], 3)", "([1], 1)", "([1], 2)", "([1, 2, 3, 4, 5], 3)", "([1, 2, 3, 4, 5], 0)", "([1, 2, 3, 4, 5], 6)", ], "fuzz_spec": { "arr": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 8}, "target": {"type": "int", "min": -20, "max": 20}, }, }, ] def _builtin_to_row(name: str) -> Dict[str, Any]: spec = BLACK_BOX_FUNCTIONS[name] src_meta = _BUILTIN_SOURCE[name] return { "name": name, "target_function_name": src_meta["target_function_name"], "signature": spec.signature, "description": spec.description, "difficulty": spec.difficulty, "source_code": src_meta["source_code"], "edge_cases_json": json.dumps(src_meta["edge_cases"]), "fuzz_spec_json": json.dumps(src_meta["fuzz_spec"]), } def _new_task_to_row(meta: Dict[str, Any]) -> Dict[str, Any]: return { "name": meta["name"], "target_function_name": meta["target_function_name"], "signature": meta["signature"], "description": meta["description"], "difficulty": meta["difficulty"], "source_code": meta["source_code"], "edge_cases_json": json.dumps(meta["edge_cases"]), "fuzz_spec_json": json.dumps(meta["fuzz_spec"]), } def build_rows() -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] for name in BLACK_BOX_FUNCTIONS: rows.append(_builtin_to_row(name)) for meta in _NEW_TASK_ROWS: rows.append(_new_task_to_row(meta)) return rows def push_to_hub(rows: List[Dict[str, Any]], dataset_id: str, *, private: bool = False) -> str: """Push the row list to ``dataset_id`` (overwriting any prior contents). Returns the hub URL. """ from datasets import Dataset from huggingface_hub import HfApi api = HfApi() api.create_repo( repo_id=dataset_id, repo_type="dataset", exist_ok=True, private=private, ) ds = Dataset.from_list(rows) log.info("pushing %d row(s) to %s", len(rows), dataset_id) ds.push_to_hub(dataset_id, split="train", private=private) return f"https://huggingface.co/datasets/{dataset_id}" def main(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser(description="Bootstrap the OpenSleuth Hub task catalog.") p.add_argument("--dataset-id", default=DATASET_ID) p.add_argument("--dry-run", action="store_true", help="Print row count, don't push.") p.add_argument("--private", action="store_true", help="Create as private dataset.") args = p.parse_args(argv) rows = build_rows() log.info("built %d row(s) (%d builtin + %d new)", len(rows), len(BLACK_BOX_FUNCTIONS), len(_NEW_TASK_ROWS)) for r in rows: log.info(" %-22s difficulty=%-6s edges=%-2d", r["name"], r["difficulty"], len(json.loads(r["edge_cases_json"]))) if args.dry_run: log.info("--dry-run: not pushing") return 0 url = push_to_hub(rows, args.dataset_id, private=args.private) log.info("dataset live at %s", url) return 0 if __name__ == "__main__": sys.exit(main())