"""OpenSleuth live demo Space. A clickable Gradio app that lets a viewer watch the OpenSleuth agent solve one of the 15 catalog tasks live: pick a black-box function, pick an agent backend, watch the agent probe the env, submit a Python replica, and see the verifier reward streamed back in real time. Backends: * "oracle" — submit the canonical reference implementation * "base Qwen 0.5B" — Qwen/Qwen2.5-0.5B-Instruct, no fine-tuning * "trained Qwen 0.5B" — base + GRPO LoRA from anugrah55/opensleuth-qwen2.5-0.5b-grpo * "trained Qwen 3B" — base + GRPO LoRA from anugrah55/opensleuth-qwen2.5-3b-grpo-v2 (gracefully degraded if adapter repo is empty) Networks: hits the live env Space at https://anugrah55-opensleuth-env-gemini-cli.hf.space for /tasks, /reset, /step (probe + submit), /tasks/{name}/sample_inputs. CPU-basic friendly: model loads are lazy, generations are capped at 192 new tokens, and we fall back gracefully if a model/adapter is unavailable. """ from __future__ import annotations import logging import os import re import threading import time import traceback from dataclasses import dataclass from typing import Any, Dict, Generator, List, Optional, Tuple import gradio as gr import requests from huggingface_hub import HfApi from oracle import ORACLE_SOLUTIONS, get_oracle_code # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- ENV_URL = os.environ.get( "OPENSLEUTH_ENV_URL", "https://anugrah55-opensleuth-env-gemini-cli.hf.space", ).rstrip("/") BASE_MODEL_ID = os.environ.get("BASE_MODEL_ID", "Qwen/Qwen2.5-0.5B-Instruct") ADAPTER_05B_ID = os.environ.get( "ADAPTER_05B_ID", "anugrah55/opensleuth-qwen2.5-0.5b-grpo" ) ADAPTER_3B_ID = os.environ.get( "ADAPTER_3B_ID", "anugrah55/opensleuth-qwen2.5-3b-grpo-v2" ) BASE_MODEL_3B_ID = os.environ.get("BASE_MODEL_3B_ID", "Qwen/Qwen2.5-3B-Instruct") MAX_NEW_TOKENS = int(os.environ.get("MAX_NEW_TOKENS", "192")) N_PROBES = int(os.environ.get("N_PROBES", "6")) HF_TOKEN = os.environ.get("HF_TOKEN") GITHUB_URL = "https://github.com/" HUB_DATASET_URL = "https://huggingface.co/datasets/anugrah55/opensleuth-tasks" ENV_SPACE_URL = "https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli" SYSTEM_PROMPT = ( "You are an algorithmic detective. You are given the public signature of a hidden " "Python function plus several (input, output) examples observed by probing it. " "Your job is to write a Python function that *exactly* reproduces the hidden " "function's behavior on all valid inputs. Match its return values AND its " "exception types on invalid inputs. Keep your implementation as simple and clean " "as possible (it is penalised for being needlessly branchy). Return ONLY the " "function definition wrapped in a single ```python ... ``` code block." ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") log = logging.getLogger("opensleuth.demo") # --------------------------------------------------------------------------- # Env client (thin) # --------------------------------------------------------------------------- class EnvClient: def __init__(self, base_url: str, timeout: float = 30.0) -> None: self.base_url = base_url.rstrip("/") self.timeout = timeout def _get(self, path: str, **params) -> Dict[str, Any]: r = requests.get(f"{self.base_url}{path}", params=params or None, timeout=self.timeout) r.raise_for_status() return r.json() def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: r = requests.post(f"{self.base_url}{path}", json=payload, timeout=self.timeout) r.raise_for_status() return r.json() def list_tasks(self) -> List[Dict[str, Any]]: return self._get("/tasks")["tasks"] def sample_inputs(self, name: str, n: int = 6, seed: int = 0) -> List[str]: return list(self._get(f"/tasks/{name}/sample_inputs", n=n, seed=seed)["inputs"]) def reset(self, target_name: str, seed: int = 0, max_steps: int = 25) -> Dict[str, Any]: return self._post( "/reset", {"target_name": target_name, "seed": seed, "max_steps": max_steps}, ) def probe(self, episode_id: str, input_repr: str) -> Dict[str, Any]: return self._post( "/step", { "episode_id": episode_id, "action": {"action_type": "probe", "input_repr": input_repr}, }, ) def submit(self, episode_id: str, code: str) -> Dict[str, Any]: return self._post( "/step", { "episode_id": episode_id, "action": {"action_type": "submit", "code": code}, }, ) CLIENT = EnvClient(ENV_URL) def fetch_tasks() -> List[Dict[str, Any]]: """Pull the live task catalog. Falls back to a hardcoded list if env is unreachable so the dropdown always has something to show.""" try: return CLIENT.list_tasks() except Exception as e: # noqa: BLE001 log.warning("could not fetch /tasks from env (%s); using static fallback", e) return [{"name": n, "signature": "", "description": "", "difficulty": "?", "edge_case_count": 0, "source": "fallback"} for n in sorted(ORACLE_SOLUTIONS)] # --------------------------------------------------------------------------- # Prompt + code extraction (lifted from training/opensleuth_train/prompt.py) # --------------------------------------------------------------------------- _CODE_RE = re.compile(r"```(?:python)?\s*(.*?)```", re.DOTALL | re.IGNORECASE) def build_prompt(target_name: str, signature: str, probes: List[Tuple[str, str, bool]]) -> str: lines = [ f"## Hidden function: {target_name}", "", "### Public signature & docstring", signature.strip() or "(no signature provided)", "", "### Observed probes", ] if not probes: lines.append("(none)") else: for inp, out, is_err in probes: tag = "raises" if is_err else "returns" lines.append(f"- input={inp} -> {tag} {out}") lines += [ "", "### Task", f"Write a Python function named `{target_name}` that reproduces the hidden " "function's behaviour. Return ONLY the function definition in a single " "```python ... ``` code block. Do not add explanations.", ] return "\n".join(lines) def extract_code(completion: str) -> str: m = _CODE_RE.search(completion) if m: return m.group(1).strip() return completion.strip() # --------------------------------------------------------------------------- # Backend registry # --------------------------------------------------------------------------- @dataclass class BackendInfo: key: str label: str kind: str # "oracle" | "hf" (transformers + peft) base_model: Optional[str] = None adapter: Optional[str] = None BACKENDS: Dict[str, BackendInfo] = { "oracle": BackendInfo( key="oracle", label="oracle (reference impl)", kind="oracle", ), "base-0.5b": BackendInfo( key="base-0.5b", label="base Qwen 0.5B (no fine-tune)", kind="hf", base_model=BASE_MODEL_ID, adapter=None, ), "trained-0.5b": BackendInfo( key="trained-0.5b", label="trained Qwen 0.5B (GRPO LoRA)", kind="hf", base_model=BASE_MODEL_ID, adapter=ADAPTER_05B_ID, ), "trained-3b": BackendInfo( key="trained-3b", label="trained Qwen 3B (GRPO LoRA)", kind="hf", base_model=BASE_MODEL_3B_ID, adapter=ADAPTER_3B_ID, ), } BACKEND_CHOICES = [(b.label, b.key) for b in BACKENDS.values()] def _adapter_has_weights(repo_id: str) -> bool: """Hub probe: True iff the adapter repo actually contains adapter weights. We treat repos with only `.gitattributes` (still training, pre-push) as 'not yet trained'.""" try: api = HfApi(token=HF_TOKEN) files = api.list_repo_files(repo_id) except Exception as e: # noqa: BLE001 log.warning("adapter availability probe failed for %s: %s", repo_id, e) return False return any(f.endswith("adapter_model.safetensors") or f.endswith("adapter_model.bin") for f in files) # --------------------------------------------------------------------------- # Lazy HF model cache # --------------------------------------------------------------------------- _MODEL_LOCK = threading.Lock() _LOADED: Dict[str, Tuple[Any, Any]] = {} # cache_key -> (tokenizer, model) def _model_cache_key(base: str, adapter: Optional[str]) -> str: return f"{base}::{adapter or '_base_'}" def _load_hf(base: str, adapter: Optional[str]) -> Tuple[Any, Any]: """Load the (base, optional LoRA) on CPU. Cached across calls.""" key = _model_cache_key(base, adapter) with _MODEL_LOCK: if key in _LOADED: return _LOADED[key] log.info("loading HF model base=%s adapter=%s", base, adapter) import torch # noqa: WPS433 from transformers import AutoModelForCausalLM, AutoTokenizer # noqa: WPS433 tok = AutoTokenizer.from_pretrained(base, trust_remote_code=True, token=HF_TOKEN) model = AutoModelForCausalLM.from_pretrained( base, torch_dtype=torch.float32, device_map={"": "cpu"}, trust_remote_code=True, low_cpu_mem_usage=True, token=HF_TOKEN, ) if adapter: from peft import PeftModel # noqa: WPS433 model = PeftModel.from_pretrained(model, adapter, token=HF_TOKEN) model.eval() _LOADED[key] = (tok, model) log.info("loaded %s in %d cached models", key, len(_LOADED)) return tok, model def _generate_hf(base: str, adapter: Optional[str], prompt: str) -> str: tok, model = _load_hf(base, adapter) import torch # noqa: WPS433 messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ] text = tok.apply_chat_template(messages, add_generation_prompt=True, tokenize=False) inputs = tok(text, return_tensors="pt") with torch.no_grad(): out = model.generate( **inputs, max_new_tokens=MAX_NEW_TOKENS, do_sample=False, temperature=1.0, pad_token_id=tok.eos_token_id, ) return tok.decode(out[0, inputs["input_ids"].shape[1]:], skip_special_tokens=True) # --------------------------------------------------------------------------- # Reward table formatter # --------------------------------------------------------------------------- def _empty_reward_table() -> List[List[Any]]: return [ ["execution_reward", "—"], ["edge_pass_rate", "—"], ["complexity_penalty", "—"], ["reward_hack_penalty", "—"], ["floor_penalty", "—"], ["perfect_bonus", "—"], ["TOTAL reward", "—"], ] def _reward_table_from_info(info: Dict[str, Any], total: float) -> List[List[Any]]: def _fmt(x): if x is None: return "—" if isinstance(x, float): return f"{x:+.2f}" return str(x) edge = info.get("edge_pass_rate") edge_str = f"{edge:.0%}" if isinstance(edge, (int, float)) else "—" return [ ["execution_reward", _fmt(info.get("execution_reward"))], ["edge_pass_rate", edge_str], ["complexity_penalty", _fmt(-(info.get("complexity_penalty") or 0.0))], ["reward_hack_penalty", _fmt(-(info.get("reward_hack_penalty") or 0.0))], ["floor_penalty", _fmt(-(info.get("floor_penalty") or 0.0))], ["perfect_bonus", _fmt(info.get("perfect_bonus"))], ["TOTAL reward", _fmt(total)], ] # --------------------------------------------------------------------------- # Streaming runner # --------------------------------------------------------------------------- def _format_log(lines: List[str]) -> str: return "\n".join(lines) def run_agent( task_name: str, backend_key: str, seed: int = 0, ) -> Generator[Tuple[str, str, List[List[Any]], str], None, None]: """Run one agent rollout end-to-end and stream UI updates. Yields tuples of (log_text, code_markdown, reward_table, status). """ backend = BACKENDS.get(backend_key) if backend is None: yield ("Unknown backend.", "", _empty_reward_table(), "error") return if not task_name: yield ("Pick a task first.", "", _empty_reward_table(), "error") return log_lines: List[str] = [] code_md = "" table = _empty_reward_table() def push(line: str = "", *, status: str = "running") -> Tuple[str, str, List[List[Any]], str]: if line: log_lines.append(line) return _format_log(log_lines), code_md, table, status yield push(f"task={task_name} backend={backend.label} seed={seed}") yield push(f"env={ENV_URL}") # 1. Reset env try: ep = CLIENT.reset(task_name, seed=seed, max_steps=N_PROBES + 5) except Exception as e: # noqa: BLE001 yield push(f"[error] /reset failed: {e}", status="error") return eid = ep["episode_id"] sig = ep.get("target_function_signature", "") yield push(f"\n=== reset ===\nepisode_id={eid}") yield push(f"signature: {sig.splitlines()[0] if sig else '(none)'}") # 2. Sample probe inputs from env's own auto-fuzzer try: inputs = CLIENT.sample_inputs(task_name, n=N_PROBES, seed=seed) except Exception as e: # noqa: BLE001 yield push(f"[warn] sample_inputs failed: {e}; falling back to ['1']*N", status="running") inputs = ["1"] * N_PROBES # 3. Probe loop yield push(f"\n=== probing ({len(inputs)} inputs) ===") history: List[Tuple[str, str, bool]] = [] for i, inp in enumerate(inputs, 1): try: resp = CLIENT.probe(eid, inp) except Exception as e: # noqa: BLE001 yield push(f" probe {i}/{len(inputs)} input={inp} [error] {e}") continue last = resp["observation"]["probe_history"][-1] out = last["output_repr"] is_err = bool(last["is_error"]) history.append((last["input_repr"], out, is_err)) tag = "raises" if is_err else "->" yield push(f" probe {i}/{len(inputs)} input={inp} {tag} {out}") time.sleep(0.05) # tiny delay so the UI feels live, not spammed # 4. Build prompt + generate code prompt = build_prompt(task_name, sig, history) yield push(f"\n=== generating code ({backend.label}) ===") if backend.kind == "oracle": completion = "```python\n" + get_oracle_code(task_name) + "```" code = extract_code(completion) yield push("oracle: pulled canonical reference implementation.") elif backend.kind == "hf": if backend.adapter and not _adapter_has_weights(backend.adapter): yield push( f"[info] adapter {backend.adapter!r} has no weights yet " f"(repo only contains .gitattributes); falling back to base model output.", ) backend = BackendInfo( key=backend.key, label=f"{backend.label} → base fallback", kind="hf", base_model=backend.base_model, adapter=None, ) try: yield push( f"loading {backend.base_model} on CPU" + (f" + LoRA {backend.adapter}" if backend.adapter else "") + " ... (cold-start may take 30-90s the first time)" ) t0 = time.time() completion = _generate_hf(backend.base_model, backend.adapter, prompt) yield push(f"generated in {time.time() - t0:.1f}s ({MAX_NEW_TOKENS} max new tokens)") except Exception as e: # noqa: BLE001 tb = traceback.format_exc(limit=2) yield push(f"[error] generation failed: {type(e).__name__}: {e}\n{tb}", status="error") return code = extract_code(completion) else: yield push(f"[error] unknown backend kind: {backend.kind}", status="error") return if not code.strip(): yield push("[warn] model emitted empty completion; submitting empty stub.") code = f"def {task_name}(*args, **kwargs):\n pass\n" code_md = f"```python\n{code}\n```" yield push("\n=== submitting code to /step ===") # 5. Submit + verifier breakdown try: sub_resp = CLIENT.submit(eid, code) except Exception as e: # noqa: BLE001 yield push(f"[error] /submit failed: {e}", status="error") return info = sub_resp.get("info", {}) or {} total = float(sub_resp.get("reward", 0.0)) table = _reward_table_from_info(info, total) yield push(f"verifier: matches {info.get('matches', 0)}/{info.get('fuzz_count', 0)}") if info.get("define_error"): yield push(f" define_error: {info['define_error']}") by_cat = info.get("matches_by_category") or {} counts = info.get("counts_by_category") or {} for cat in ("edge", "random"): m = by_cat.get(cat) c = counts.get(cat) if m is not None and c is not None: yield push(f" {cat:>6}: {m}/{c}") yield push( f"\nreward breakdown:" f" exec={info.get('execution_reward', 0):.2f}" f" -complexity={info.get('complexity_penalty', 0):.2f}" f" -hack={info.get('reward_hack_penalty', 0):.2f}" f" -floor={info.get('floor_penalty', 0):.2f}" f" +perfect={info.get('perfect_bonus', 0):.2f}" ) final_status = "done" if info.get("execution_reward", 0) >= 99.999: yield push(f"\n*** TOTAL REWARD = {total:+.2f} (PERFECT) ***", status=final_status) else: yield push(f"\nTOTAL REWARD = {total:+.2f}", status=final_status) # --------------------------------------------------------------------------- # UI helpers # --------------------------------------------------------------------------- def _task_label(t: Dict[str, Any]) -> str: diff = t.get("difficulty") or "?" src = t.get("source", "?") sig = t.get("signature") or t["name"] return f"[{diff}/{src}] {sig}" def build_task_choices() -> List[Tuple[str, str]]: tasks = fetch_tasks() tasks_sorted = sorted( tasks, key=lambda t: ( {"easy": 0, "medium": 1, "hard": 2}.get(t.get("difficulty") or "", 9), t["name"], ), ) return [(_task_label(t), t["name"]) for t in tasks_sorted] # --------------------------------------------------------------------------- # Comparison: oracle vs trained adapter on a single task # --------------------------------------------------------------------------- def quick_compare(task_name: str, seed: int = 0) -> str: """Side-by-side: oracle reward vs trained-0.5b reward on the same task. Used by the 'baseline-vs-trained' panel. Runs *non-streaming* and just returns a Markdown summary (we already have streaming for the main panel). Falls back gracefully if either backend fails. """ out_lines = [f"### Reward comparison on `{task_name}` (seed={seed})", ""] rows: List[Tuple[str, str]] = [] for key in ("oracle", "trained-0.5b"): backend = BACKENDS[key] try: ep = CLIENT.reset(task_name, seed=seed, max_steps=2) except Exception as e: # noqa: BLE001 rows.append((backend.label, f"reset failed: {e}")) continue if backend.kind == "oracle": code = get_oracle_code(task_name) else: if backend.adapter and not _adapter_has_weights(backend.adapter): rows.append((backend.label, "adapter not yet trained")) continue try: inputs = CLIENT.sample_inputs(task_name, n=N_PROBES, seed=seed) history = [] for inp in inputs: try: r = CLIENT.probe(ep["episode_id"], inp) last = r["observation"]["probe_history"][-1] history.append((last["input_repr"], last["output_repr"], bool(last["is_error"]))) except Exception: # noqa: BLE001 pass prompt = build_prompt(task_name, ep.get("target_function_signature", ""), history) completion = _generate_hf(backend.base_model, backend.adapter, prompt) code = extract_code(completion) or f"def {task_name}(*a, **k): pass" except Exception as e: # noqa: BLE001 rows.append((backend.label, f"generation failed: {e}")) continue try: sub = CLIENT.submit(ep["episode_id"], code) total = float(sub.get("reward", 0.0)) info = sub.get("info", {}) or {} rows.append( ( backend.label, f"reward={total:+.2f} exec={info.get('execution_reward', 0):.0f}/100" f" matches={info.get('matches', 0)}/{info.get('fuzz_count', 0)}", ) ) except Exception as e: # noqa: BLE001 rows.append((backend.label, f"submit failed: {e}")) out_lines.append("| backend | result |") out_lines.append("| --- | --- |") for label, r in rows: out_lines.append(f"| {label} | {r} |") return "\n".join(out_lines) # --------------------------------------------------------------------------- # UI # --------------------------------------------------------------------------- INTRO_MARKDOWN = """ # OpenSleuth — live agent demo **The Algorithmic Detective:** an LLM agent reverse-engineers an unknown black-box Python function by *probing* it with inputs and then *submitting* a Python replica. The env scores the submission by domain-aware fuzzing against the hidden reference, with edge-case stratification, a complexity penalty, and anti-reward-hacking signals. Pick a task, pick an agent, hit **Run agent**. """.strip() FOOTER_MARKDOWN = f""" --- **Links** · [env Space]({ENV_SPACE_URL}) · [task dataset]({HUB_DATASET_URL}) · [GitHub]({GITHUB_URL}) **Backends:** `oracle` is the known-correct reference impl (always +100). `base Qwen 0.5B` is `Qwen/Qwen2.5-0.5B-Instruct` with no fine-tuning. `trained Qwen 0.5B` is the GRPO LoRA at `{ADAPTER_05B_ID}`. `trained Qwen 3B` is the GRPO LoRA at `{ADAPTER_3B_ID}` (gracefully falls back to "adapter not yet trained" if the repo has no weights). Models run on CPU-basic, so first generation per backend includes a cold-load (~30–90s for 0.5B). Generations are capped at {MAX_NEW_TOKENS} new tokens. """.strip() def build_ui() -> gr.Blocks: with gr.Blocks(title="OpenSleuth — live agent demo", theme=gr.themes.Soft()) as demo: gr.Markdown(INTRO_MARKDOWN) # populated lazily so the Space can boot even if the env is mid-deploy task_choices = gr.State(value=[]) with gr.Row(): task_dd = gr.Dropdown( label="Task (15 black-box functions, easy → hard)", choices=[], value=None, interactive=True, ) backend_dd = gr.Dropdown( label="Agent backend", choices=BACKEND_CHOICES, value="oracle", interactive=True, ) seed_in = gr.Number(label="Seed", value=0, precision=0, scale=0, minimum=0) run_btn = gr.Button("Run agent", variant="primary", scale=0) with gr.Row(): log_box = gr.Textbox( label="Live agent log", value="(idle — pick a task and a backend, then hit Run agent)", lines=22, max_lines=40, interactive=False, show_copy_button=True, ) with gr.Row(): with gr.Column(scale=2): code_md = gr.Markdown(label="Submitted code", value="") with gr.Column(scale=1): reward_tbl = gr.Dataframe( headers=["component", "value"], value=_empty_reward_table(), label="Reward breakdown", interactive=False, wrap=True, ) with gr.Accordion("oracle vs trained-0.5b head-to-head", open=False): with gr.Row(): cmp_btn = gr.Button("Run quick comparison", variant="secondary") cmp_md = gr.Markdown(value="(no comparison run yet)") gr.Markdown(FOOTER_MARKDOWN) # ---- wiring ------------------------------------------------------ def _refresh_tasks(): choices = build_task_choices() default = choices[0][1] if choices else None return gr.Dropdown(choices=choices, value=default), choices demo.load(_refresh_tasks, outputs=[task_dd, task_choices]) run_btn.click( fn=run_agent, inputs=[task_dd, backend_dd, seed_in], outputs=[log_box, code_md, reward_tbl, gr.State()], show_progress="minimal", ) cmp_btn.click( fn=quick_compare, inputs=[task_dd, seed_in], outputs=[cmp_md], show_progress="minimal", ) return demo demo = build_ui().queue(default_concurrency_limit=2) if __name__ == "__main__": demo.launch()