| """Bootstrap / refresh the OpenSleuth Hub task catalog. |
| |
| Idempotently creates ``anugrah55/opensleuth-tasks`` and pushes: |
| |
| * The 9 builtin BLACK_BOX_FUNCTIONS as rows (so the dataset is non-empty |
| for testing and so the trainer's curriculum has parity with the |
| in-process oracle), and |
| * 6 brand-new tasks (``roman_to_int``, ``levenshtein_distance``, |
| ``flatten_list``, ``merge_sorted``, ``run_length_encode``, |
| ``binary_search``) that aren't in BLACK_BOX_FUNCTIONS, exercising |
| multi-arg and unannotated cases the auto-fuzzer must handle. |
| |
| Each row is:: |
| |
| { |
| "name": str, |
| "target_function_name": str, # which fn inside source_code |
| "signature": str, |
| "description": str, |
| "difficulty": "easy"|"medium"|"hard", |
| "source_code": str, # standalone Python; NO oracle imports |
| "edge_cases_json": str, # JSON list of literal-repr strings |
| "fuzz_spec_json": str, # JSON dict or "null" |
| } |
| |
| Run:: |
| |
| cd env && PYTHONPATH=. ../.venv/bin/python -m opensleuth_env.scripts.bootstrap_tasks_dataset |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| import sys |
| from typing import Any, Dict, List, Optional |
|
|
| from opensleuth_env.black_box import BLACK_BOX_FUNCTIONS |
|
|
| log = logging.getLogger("opensleuth.bootstrap") |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") |
|
|
| DATASET_ID = "anugrah55/opensleuth-tasks" |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
| _BUILTIN_SOURCE: Dict[str, Dict[str, Any]] = { |
| "fibonacci": { |
| "target_function_name": "fibonacci", |
| "source_code": ( |
| "def fibonacci(n):\n" |
| " if not isinstance(n, int) or isinstance(n, bool) or n <= 0 or n > 90:\n" |
| " raise ValueError('Input must be a positive integer <= 90.')\n" |
| " a, b = 0, 1\n" |
| " for _ in range(n - 1):\n" |
| " a, b = b, a + b\n" |
| " return b if n > 0 else a\n" |
| ), |
| "edge_cases": ["1", "2", "3", "10", "89", "90"], |
| "fuzz_spec": {"n": {"type": "int", "min": 1, "max": 90}}, |
| }, |
| "reverse_string": { |
| "target_function_name": "reverse_string", |
| "source_code": ( |
| "def reverse_string(s):\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('Input must be a string.')\n" |
| " return s[::-1]\n" |
| ), |
| "edge_cases": ['""', '"a"', '"ab"', '"racecar"', '"Hello, World!"'], |
| "fuzz_spec": {"s": {"type": "str", "max_len": 12}}, |
| }, |
| "is_palindrome": { |
| "target_function_name": "is_palindrome", |
| "source_code": ( |
| "def is_palindrome(s):\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('Input must be a string.')\n" |
| " cleaned = ''.join(ch.lower() for ch in s if ch.isalnum())\n" |
| " return cleaned == cleaned[::-1]\n" |
| ), |
| "edge_cases": [ |
| '""', '"a"', '"ab"', '"abba"', |
| "\"A man, a plan, a canal: Panama\"", '"Hello"', |
| ], |
| "fuzz_spec": {"s": {"type": "str", "max_len": 12}}, |
| }, |
| "digit_sum": { |
| "target_function_name": "digit_sum", |
| "source_code": ( |
| "def digit_sum(n):\n" |
| " if not isinstance(n, int) or isinstance(n, bool):\n" |
| " raise TypeError('Input must be int.')\n" |
| " if n < 0:\n" |
| " raise ValueError('Input must be non-negative.')\n" |
| " return sum(int(c) for c in str(n))\n" |
| ), |
| "edge_cases": ["0", "1", "9", "10", "99", "100", "9999"], |
| "fuzz_spec": {"n": {"type": "int", "min": 0, "max": 10000}}, |
| }, |
| "count_vowels": { |
| "target_function_name": "count_vowels", |
| "source_code": ( |
| "def count_vowels(s):\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('Input must be a string.')\n" |
| " return sum(1 for c in s.lower() if c in 'aeiou')\n" |
| ), |
| "edge_cases": ['""', '"bcd"', '"AEIOU"', '"Hello, World!"', '"aaaaa"'], |
| "fuzz_spec": {"s": {"type": "str", "max_len": 16}}, |
| }, |
| "gcd": { |
| "target_function_name": "gcd", |
| "source_code": ( |
| "def gcd(pair):\n" |
| " if not isinstance(pair, (list, tuple)) or len(pair) != 2:\n" |
| " raise TypeError('Input must be a 2-element list or tuple.')\n" |
| " a, b = pair\n" |
| " if not all(isinstance(x, int) and not isinstance(x, bool) for x in (a, b)):\n" |
| " raise TypeError('Both elements must be int.')\n" |
| " if a < 0 or b < 0:\n" |
| " raise ValueError('Both elements must be non-negative.')\n" |
| " while b:\n" |
| " a, b = b, a % b\n" |
| " return a\n" |
| ), |
| "edge_cases": ["(0, 0)", "(0, 7)", "(12, 18)", "(17, 13)", "(100, 75)"], |
| "fuzz_spec": { |
| "pair": { |
| "type": "tuple", |
| "elems": [{"type": "int", "min": 0, "max": 1000}, {"type": "int", "min": 0, "max": 1000}], |
| } |
| }, |
| }, |
| "sort_unique": { |
| "target_function_name": "sort_unique", |
| "source_code": ( |
| "def sort_unique(xs):\n" |
| " if not isinstance(xs, list):\n" |
| " raise TypeError('Input must be a list.')\n" |
| " if not all(isinstance(x, int) and not isinstance(x, bool) for x in xs):\n" |
| " raise TypeError('All elements must be int.')\n" |
| " return sorted(set(xs))\n" |
| ), |
| "edge_cases": ["[]", "[1]", "[1, 1, 1]", "[3, 1, 2]", "[-5, 5, 0, -5, 5]"], |
| "fuzz_spec": {"xs": {"type": "list", "elem": {"type": "int", "min": -50, "max": 50}, "max_len": 8}}, |
| }, |
| "caesar_cipher": { |
| "target_function_name": "caesar_cipher", |
| "source_code": ( |
| "def caesar_cipher(s):\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('Input must be a string.')\n" |
| " out = []\n" |
| " for ch in s:\n" |
| " if 'a' <= ch <= 'z':\n" |
| " out.append(chr((ord(ch) - ord('a') + 3) % 26 + ord('a')))\n" |
| " else:\n" |
| " out.append(ch)\n" |
| " return ''.join(out)\n" |
| ), |
| "edge_cases": ['""', '"abc"', '"xyz"', '"Hello, World!"', '"ABC"', '"hello world"'], |
| "fuzz_spec": {"s": {"type": "str", "max_len": 16}}, |
| }, |
| "is_prime": { |
| "target_function_name": "is_prime", |
| "source_code": ( |
| "def is_prime(n):\n" |
| " if not isinstance(n, int) or isinstance(n, bool):\n" |
| " raise TypeError('Input must be int.')\n" |
| " if n < 2:\n" |
| " return False\n" |
| " if n < 4:\n" |
| " return True\n" |
| " if n % 2 == 0:\n" |
| " return False\n" |
| " i = 3\n" |
| " while i * i <= n:\n" |
| " if n % i == 0:\n" |
| " return False\n" |
| " i += 2\n" |
| " return True\n" |
| ), |
| "edge_cases": ["0", "1", "2", "3", "4", "17", "25", "97", "100"], |
| "fuzz_spec": {"n": {"type": "int", "min": 0, "max": 200}}, |
| }, |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
|
|
| _NEW_TASK_ROWS: List[Dict[str, Any]] = [ |
| { |
| "name": "roman_to_int", |
| "target_function_name": "roman_to_int", |
| "signature": "roman_to_int(s: str) -> int", |
| "description": ( |
| "Parse a roman numeral string into its integer value. " |
| "Raises ValueError for non-roman characters. Subtraction " |
| "rules (IV=4, IX=9, XL=40, ...) are honoured. Empty -> 0." |
| ), |
| "difficulty": "medium", |
| "source_code": ( |
| "def roman_to_int(s: str) -> int:\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('input must be str')\n" |
| " table = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}\n" |
| " total = 0\n" |
| " prev = 0\n" |
| " for ch in reversed(s.upper()):\n" |
| " if ch not in table:\n" |
| " raise ValueError(f'invalid roman numeral character: {ch!r}')\n" |
| " v = table[ch]\n" |
| " if v < prev:\n" |
| " total -= v\n" |
| " else:\n" |
| " total += v\n" |
| " prev = v\n" |
| " return total\n" |
| ), |
| "edge_cases": ['""', '"I"', '"IV"', '"IX"', '"LVIII"', '"MCMXCIV"', '"MMXXIV"'], |
| "fuzz_spec": {"s": {"type": "str", "alphabet": "IVXLCDM", "max_len": 8}}, |
| }, |
| { |
| "name": "levenshtein_distance", |
| "target_function_name": "levenshtein_distance", |
| "signature": "levenshtein_distance(a: str, b: str) -> int", |
| "description": ( |
| "Classic edit distance between two strings: minimum number of " |
| "single-character insertions, deletions, or substitutions to " |
| "transform a into b. Both arguments must be str." |
| ), |
| "difficulty": "hard", |
| "source_code": ( |
| "def levenshtein_distance(a: str, b: str) -> int:\n" |
| " if not isinstance(a, str) or not isinstance(b, str):\n" |
| " raise TypeError('both arguments must be str')\n" |
| " if a == b:\n" |
| " return 0\n" |
| " if not a:\n" |
| " return len(b)\n" |
| " if not b:\n" |
| " return len(a)\n" |
| " prev = list(range(len(b) + 1))\n" |
| " for i, ca in enumerate(a, 1):\n" |
| " cur = [i] + [0] * len(b)\n" |
| " for j, cb in enumerate(b, 1):\n" |
| " ins = cur[j-1] + 1\n" |
| " dele = prev[j] + 1\n" |
| " sub = prev[j-1] + (ca != cb)\n" |
| " cur[j] = min(ins, dele, sub)\n" |
| " prev = cur\n" |
| " return prev[-1]\n" |
| ), |
| "edge_cases": [ |
| '("", "")', '("a", "")', '("", "a")', '("kitten", "sitting")', |
| '("flaw", "lawn")', '("abc", "abc")', |
| ], |
| "fuzz_spec": { |
| "a": {"type": "str", "alphabet": "abc", "max_len": 6}, |
| "b": {"type": "str", "alphabet": "abc", "max_len": 6}, |
| }, |
| }, |
| { |
| "name": "flatten_list", |
| "target_function_name": "flatten_list", |
| "signature": "flatten_list(xs: list) -> list", |
| "description": ( |
| "Recursively flatten a nested list of arbitrary depth. Tuples " |
| "are also flattened; non-list/tuple atoms (ints, strs, ...) " |
| "pass through unchanged." |
| ), |
| "difficulty": "medium", |
| "source_code": ( |
| "def flatten_list(xs):\n" |
| " if not isinstance(xs, (list, tuple)):\n" |
| " raise TypeError('input must be list or tuple')\n" |
| " out = []\n" |
| " stack = list(xs)\n" |
| " # iterative DFS to avoid recursion limits on adversarial input\n" |
| " rev = []\n" |
| " rev.extend(reversed(stack))\n" |
| " while rev:\n" |
| " x = rev.pop()\n" |
| " if isinstance(x, (list, tuple)):\n" |
| " for y in reversed(x):\n" |
| " rev.append(y)\n" |
| " else:\n" |
| " out.append(x)\n" |
| " return out\n" |
| ), |
| "edge_cases": [ |
| "[]", "[1]", "[[1, 2], [3, 4]]", |
| "[1, [2, [3, [4, [5]]]]]", "[[], [], 1]", |
| ], |
| "fuzz_spec": { |
| "xs": { |
| "type": "list", |
| "elem": {"type": "int", "min": -10, "max": 10}, |
| "max_len": 6, |
| } |
| }, |
| }, |
| { |
| "name": "merge_sorted", |
| "target_function_name": "merge_sorted", |
| "signature": "merge_sorted(a: list[int], b: list[int]) -> list[int]", |
| "description": ( |
| "Merge two pre-sorted lists of ints into a single sorted list. " |
| "Both arguments must be lists; elements must be ints (bools " |
| "rejected). The classic merge step of merge-sort." |
| ), |
| "difficulty": "medium", |
| "source_code": ( |
| "def merge_sorted(a, b):\n" |
| " if not isinstance(a, list) or not isinstance(b, list):\n" |
| " raise TypeError('both arguments must be list')\n" |
| " for x in (*a, *b):\n" |
| " if not isinstance(x, int) or isinstance(x, bool):\n" |
| " raise TypeError('elements must be int')\n" |
| " out = []\n" |
| " i = j = 0\n" |
| " while i < len(a) and j < len(b):\n" |
| " if a[i] <= b[j]:\n" |
| " out.append(a[i]); i += 1\n" |
| " else:\n" |
| " out.append(b[j]); j += 1\n" |
| " out.extend(a[i:])\n" |
| " out.extend(b[j:])\n" |
| " return out\n" |
| ), |
| "edge_cases": [ |
| "([], [])", "([1, 2, 3], [])", "([], [1, 2, 3])", |
| "([1, 3, 5], [2, 4, 6])", "([1, 1], [1, 1])", |
| ], |
| "fuzz_spec": { |
| "a": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5}, |
| "b": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 5}, |
| }, |
| }, |
| { |
| "name": "run_length_encode", |
| "target_function_name": "run_length_encode", |
| "signature": "run_length_encode(s: str) -> list[tuple[str, int]]", |
| "description": ( |
| "Run-length encoding: returns a list of (character, count) " |
| "tuples for each run of identical characters in s. Empty " |
| "input yields an empty list." |
| ), |
| "difficulty": "easy", |
| "source_code": ( |
| "def run_length_encode(s):\n" |
| " if not isinstance(s, str):\n" |
| " raise TypeError('input must be str')\n" |
| " if not s:\n" |
| " return []\n" |
| " out = []\n" |
| " cur = s[0]\n" |
| " n = 1\n" |
| " for ch in s[1:]:\n" |
| " if ch == cur:\n" |
| " n += 1\n" |
| " else:\n" |
| " out.append((cur, n))\n" |
| " cur = ch\n" |
| " n = 1\n" |
| " out.append((cur, n))\n" |
| " return out\n" |
| ), |
| "edge_cases": ['""', '"a"', '"aa"', '"abc"', '"aaabbbccc"', '"aaaaaaaaaa"'], |
| "fuzz_spec": {"s": {"type": "str", "alphabet": "ab", "max_len": 12}}, |
| }, |
| { |
| "name": "binary_search", |
| "target_function_name": "binary_search", |
| "signature": "binary_search(arr: list[int], target: int) -> int", |
| "description": ( |
| "Return the index of target in the sorted ascending list arr, " |
| "or -1 if not present. arr must be a list of ints; target " |
| "must be int. The list is assumed sorted." |
| ), |
| "difficulty": "medium", |
| "source_code": ( |
| "def binary_search(arr, target):\n" |
| " if not isinstance(arr, list):\n" |
| " raise TypeError('arr must be list')\n" |
| " if not isinstance(target, int) or isinstance(target, bool):\n" |
| " raise TypeError('target must be int')\n" |
| " lo, hi = 0, len(arr) - 1\n" |
| " while lo <= hi:\n" |
| " mid = (lo + hi) // 2\n" |
| " v = arr[mid]\n" |
| " if v == target:\n" |
| " return mid\n" |
| " if v < target:\n" |
| " lo = mid + 1\n" |
| " else:\n" |
| " hi = mid - 1\n" |
| " return -1\n" |
| ), |
| "edge_cases": [ |
| "([], 3)", "([1], 1)", "([1], 2)", |
| "([1, 2, 3, 4, 5], 3)", "([1, 2, 3, 4, 5], 0)", |
| "([1, 2, 3, 4, 5], 6)", |
| ], |
| "fuzz_spec": { |
| "arr": {"type": "list", "elem": {"type": "int", "min": -20, "max": 20}, "max_len": 8}, |
| "target": {"type": "int", "min": -20, "max": 20}, |
| }, |
| }, |
| ] |
|
|
|
|
| def _builtin_to_row(name: str) -> Dict[str, Any]: |
| spec = BLACK_BOX_FUNCTIONS[name] |
| src_meta = _BUILTIN_SOURCE[name] |
| return { |
| "name": name, |
| "target_function_name": src_meta["target_function_name"], |
| "signature": spec.signature, |
| "description": spec.description, |
| "difficulty": spec.difficulty, |
| "source_code": src_meta["source_code"], |
| "edge_cases_json": json.dumps(src_meta["edge_cases"]), |
| "fuzz_spec_json": json.dumps(src_meta["fuzz_spec"]), |
| } |
|
|
|
|
| def _new_task_to_row(meta: Dict[str, Any]) -> Dict[str, Any]: |
| return { |
| "name": meta["name"], |
| "target_function_name": meta["target_function_name"], |
| "signature": meta["signature"], |
| "description": meta["description"], |
| "difficulty": meta["difficulty"], |
| "source_code": meta["source_code"], |
| "edge_cases_json": json.dumps(meta["edge_cases"]), |
| "fuzz_spec_json": json.dumps(meta["fuzz_spec"]), |
| } |
|
|
|
|
| def build_rows() -> List[Dict[str, Any]]: |
| rows: List[Dict[str, Any]] = [] |
| for name in BLACK_BOX_FUNCTIONS: |
| rows.append(_builtin_to_row(name)) |
| for meta in _NEW_TASK_ROWS: |
| rows.append(_new_task_to_row(meta)) |
| return rows |
|
|
|
|
| def push_to_hub(rows: List[Dict[str, Any]], dataset_id: str, *, private: bool = False) -> str: |
| """Push the row list to ``dataset_id`` (overwriting any prior contents). |
| Returns the hub URL. |
| """ |
| from datasets import Dataset |
| from huggingface_hub import HfApi |
|
|
| api = HfApi() |
| api.create_repo( |
| repo_id=dataset_id, |
| repo_type="dataset", |
| exist_ok=True, |
| private=private, |
| ) |
|
|
| ds = Dataset.from_list(rows) |
| log.info("pushing %d row(s) to %s", len(rows), dataset_id) |
| ds.push_to_hub(dataset_id, split="train", private=private) |
| return f"https://huggingface.co/datasets/{dataset_id}" |
|
|
|
|
| def main(argv: Optional[List[str]] = None) -> int: |
| p = argparse.ArgumentParser(description="Bootstrap the OpenSleuth Hub task catalog.") |
| p.add_argument("--dataset-id", default=DATASET_ID) |
| p.add_argument("--dry-run", action="store_true", help="Print row count, don't push.") |
| p.add_argument("--private", action="store_true", help="Create as private dataset.") |
| args = p.parse_args(argv) |
|
|
| rows = build_rows() |
| log.info("built %d row(s) (%d builtin + %d new)", |
| len(rows), len(BLACK_BOX_FUNCTIONS), len(_NEW_TASK_ROWS)) |
| for r in rows: |
| log.info(" %-22s difficulty=%-6s edges=%-2d", |
| r["name"], r["difficulty"], len(json.loads(r["edge_cases_json"]))) |
|
|
| if args.dry_run: |
| log.info("--dry-run: not pushing") |
| return 0 |
|
|
| url = push_to_hub(rows, args.dataset_id, private=args.private) |
| log.info("dataset live at %s", url) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|