opensleuth-demo / app.py
anugrah55's picture
Use standard Spaces launch form
cf454ff verified
"""OpenSleuth live demo Space.
A clickable Gradio app that lets a viewer watch the OpenSleuth agent solve
one of the 15 catalog tasks live: pick a black-box function, pick an agent
backend, watch the agent probe the env, submit a Python replica, and see
the verifier reward streamed back in real time.
Backends:
* "oracle" — submit the canonical reference implementation
* "base Qwen 0.5B" — Qwen/Qwen2.5-0.5B-Instruct, no fine-tuning
* "trained Qwen 0.5B" — base + GRPO LoRA from anugrah55/opensleuth-qwen2.5-0.5b-grpo
* "trained Qwen 3B" — base + GRPO LoRA from anugrah55/opensleuth-qwen2.5-3b-grpo-v2
(gracefully degraded if adapter repo is empty)
Networks: hits the live env Space at https://anugrah55-opensleuth-env-gemini-cli.hf.space
for /tasks, /reset, /step (probe + submit), /tasks/{name}/sample_inputs.
CPU-basic friendly: model loads are lazy, generations are capped at 192
new tokens, and we fall back gracefully if a model/adapter is unavailable.
"""
from __future__ import annotations
import logging
import os
import re
import threading
import time
import traceback
from dataclasses import dataclass
from typing import Any, Dict, Generator, List, Optional, Tuple
import gradio as gr
import requests
from huggingface_hub import HfApi
from oracle import ORACLE_SOLUTIONS, get_oracle_code
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ENV_URL = os.environ.get(
"OPENSLEUTH_ENV_URL",
"https://anugrah55-opensleuth-env-gemini-cli.hf.space",
).rstrip("/")
BASE_MODEL_ID = os.environ.get("BASE_MODEL_ID", "Qwen/Qwen2.5-0.5B-Instruct")
ADAPTER_05B_ID = os.environ.get(
"ADAPTER_05B_ID", "anugrah55/opensleuth-qwen2.5-0.5b-grpo"
)
ADAPTER_3B_ID = os.environ.get(
"ADAPTER_3B_ID", "anugrah55/opensleuth-qwen2.5-3b-grpo-v2"
)
BASE_MODEL_3B_ID = os.environ.get("BASE_MODEL_3B_ID", "Qwen/Qwen2.5-3B-Instruct")
MAX_NEW_TOKENS = int(os.environ.get("MAX_NEW_TOKENS", "192"))
N_PROBES = int(os.environ.get("N_PROBES", "6"))
HF_TOKEN = os.environ.get("HF_TOKEN")
GITHUB_URL = "https://github.com/"
HUB_DATASET_URL = "https://huggingface.co/datasets/anugrah55/opensleuth-tasks"
ENV_SPACE_URL = "https://huggingface.co/spaces/anugrah55/opensleuth-env-gemini-cli"
SYSTEM_PROMPT = (
"You are an algorithmic detective. You are given the public signature of a hidden "
"Python function plus several (input, output) examples observed by probing it. "
"Your job is to write a Python function that *exactly* reproduces the hidden "
"function's behavior on all valid inputs. Match its return values AND its "
"exception types on invalid inputs. Keep your implementation as simple and clean "
"as possible (it is penalised for being needlessly branchy). Return ONLY the "
"function definition wrapped in a single ```python ... ``` code block."
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
log = logging.getLogger("opensleuth.demo")
# ---------------------------------------------------------------------------
# Env client (thin)
# ---------------------------------------------------------------------------
class EnvClient:
def __init__(self, base_url: str, timeout: float = 30.0) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def _get(self, path: str, **params) -> Dict[str, Any]:
r = requests.get(f"{self.base_url}{path}", params=params or None, timeout=self.timeout)
r.raise_for_status()
return r.json()
def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
r = requests.post(f"{self.base_url}{path}", json=payload, timeout=self.timeout)
r.raise_for_status()
return r.json()
def list_tasks(self) -> List[Dict[str, Any]]:
return self._get("/tasks")["tasks"]
def sample_inputs(self, name: str, n: int = 6, seed: int = 0) -> List[str]:
return list(self._get(f"/tasks/{name}/sample_inputs", n=n, seed=seed)["inputs"])
def reset(self, target_name: str, seed: int = 0, max_steps: int = 25) -> Dict[str, Any]:
return self._post(
"/reset",
{"target_name": target_name, "seed": seed, "max_steps": max_steps},
)
def probe(self, episode_id: str, input_repr: str) -> Dict[str, Any]:
return self._post(
"/step",
{
"episode_id": episode_id,
"action": {"action_type": "probe", "input_repr": input_repr},
},
)
def submit(self, episode_id: str, code: str) -> Dict[str, Any]:
return self._post(
"/step",
{
"episode_id": episode_id,
"action": {"action_type": "submit", "code": code},
},
)
CLIENT = EnvClient(ENV_URL)
def fetch_tasks() -> List[Dict[str, Any]]:
"""Pull the live task catalog. Falls back to a hardcoded list if env is
unreachable so the dropdown always has something to show."""
try:
return CLIENT.list_tasks()
except Exception as e: # noqa: BLE001
log.warning("could not fetch /tasks from env (%s); using static fallback", e)
return [{"name": n, "signature": "", "description": "", "difficulty": "?",
"edge_case_count": 0, "source": "fallback"}
for n in sorted(ORACLE_SOLUTIONS)]
# ---------------------------------------------------------------------------
# Prompt + code extraction (lifted from training/opensleuth_train/prompt.py)
# ---------------------------------------------------------------------------
_CODE_RE = re.compile(r"```(?:python)?\s*(.*?)```", re.DOTALL | re.IGNORECASE)
def build_prompt(target_name: str, signature: str, probes: List[Tuple[str, str, bool]]) -> str:
lines = [
f"## Hidden function: {target_name}",
"",
"### Public signature & docstring",
signature.strip() or "(no signature provided)",
"",
"### Observed probes",
]
if not probes:
lines.append("(none)")
else:
for inp, out, is_err in probes:
tag = "raises" if is_err else "returns"
lines.append(f"- input={inp} -> {tag} {out}")
lines += [
"",
"### Task",
f"Write a Python function named `{target_name}` that reproduces the hidden "
"function's behaviour. Return ONLY the function definition in a single "
"```python ... ``` code block. Do not add explanations.",
]
return "\n".join(lines)
def extract_code(completion: str) -> str:
m = _CODE_RE.search(completion)
if m:
return m.group(1).strip()
return completion.strip()
# ---------------------------------------------------------------------------
# Backend registry
# ---------------------------------------------------------------------------
@dataclass
class BackendInfo:
key: str
label: str
kind: str # "oracle" | "hf" (transformers + peft)
base_model: Optional[str] = None
adapter: Optional[str] = None
BACKENDS: Dict[str, BackendInfo] = {
"oracle": BackendInfo(
key="oracle",
label="oracle (reference impl)",
kind="oracle",
),
"base-0.5b": BackendInfo(
key="base-0.5b",
label="base Qwen 0.5B (no fine-tune)",
kind="hf",
base_model=BASE_MODEL_ID,
adapter=None,
),
"trained-0.5b": BackendInfo(
key="trained-0.5b",
label="trained Qwen 0.5B (GRPO LoRA)",
kind="hf",
base_model=BASE_MODEL_ID,
adapter=ADAPTER_05B_ID,
),
"trained-3b": BackendInfo(
key="trained-3b",
label="trained Qwen 3B (GRPO LoRA)",
kind="hf",
base_model=BASE_MODEL_3B_ID,
adapter=ADAPTER_3B_ID,
),
}
BACKEND_CHOICES = [(b.label, b.key) for b in BACKENDS.values()]
def _adapter_has_weights(repo_id: str) -> bool:
"""Hub probe: True iff the adapter repo actually contains adapter
weights. We treat repos with only `.gitattributes` (still training,
pre-push) as 'not yet trained'."""
try:
api = HfApi(token=HF_TOKEN)
files = api.list_repo_files(repo_id)
except Exception as e: # noqa: BLE001
log.warning("adapter availability probe failed for %s: %s", repo_id, e)
return False
return any(f.endswith("adapter_model.safetensors") or f.endswith("adapter_model.bin") for f in files)
# ---------------------------------------------------------------------------
# Lazy HF model cache
# ---------------------------------------------------------------------------
_MODEL_LOCK = threading.Lock()
_LOADED: Dict[str, Tuple[Any, Any]] = {} # cache_key -> (tokenizer, model)
def _model_cache_key(base: str, adapter: Optional[str]) -> str:
return f"{base}::{adapter or '_base_'}"
def _load_hf(base: str, adapter: Optional[str]) -> Tuple[Any, Any]:
"""Load the (base, optional LoRA) on CPU. Cached across calls."""
key = _model_cache_key(base, adapter)
with _MODEL_LOCK:
if key in _LOADED:
return _LOADED[key]
log.info("loading HF model base=%s adapter=%s", base, adapter)
import torch # noqa: WPS433
from transformers import AutoModelForCausalLM, AutoTokenizer # noqa: WPS433
tok = AutoTokenizer.from_pretrained(base, trust_remote_code=True, token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
base,
torch_dtype=torch.float32,
device_map={"": "cpu"},
trust_remote_code=True,
low_cpu_mem_usage=True,
token=HF_TOKEN,
)
if adapter:
from peft import PeftModel # noqa: WPS433
model = PeftModel.from_pretrained(model, adapter, token=HF_TOKEN)
model.eval()
_LOADED[key] = (tok, model)
log.info("loaded %s in %d cached models", key, len(_LOADED))
return tok, model
def _generate_hf(base: str, adapter: Optional[str], prompt: str) -> str:
tok, model = _load_hf(base, adapter)
import torch # noqa: WPS433
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
text = tok.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
inputs = tok(text, return_tensors="pt")
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=MAX_NEW_TOKENS,
do_sample=False,
temperature=1.0,
pad_token_id=tok.eos_token_id,
)
return tok.decode(out[0, inputs["input_ids"].shape[1]:], skip_special_tokens=True)
# ---------------------------------------------------------------------------
# Reward table formatter
# ---------------------------------------------------------------------------
def _empty_reward_table() -> List[List[Any]]:
return [
["execution_reward", "—"],
["edge_pass_rate", "—"],
["complexity_penalty", "—"],
["reward_hack_penalty", "—"],
["floor_penalty", "—"],
["perfect_bonus", "—"],
["TOTAL reward", "—"],
]
def _reward_table_from_info(info: Dict[str, Any], total: float) -> List[List[Any]]:
def _fmt(x):
if x is None:
return "—"
if isinstance(x, float):
return f"{x:+.2f}"
return str(x)
edge = info.get("edge_pass_rate")
edge_str = f"{edge:.0%}" if isinstance(edge, (int, float)) else "—"
return [
["execution_reward", _fmt(info.get("execution_reward"))],
["edge_pass_rate", edge_str],
["complexity_penalty", _fmt(-(info.get("complexity_penalty") or 0.0))],
["reward_hack_penalty", _fmt(-(info.get("reward_hack_penalty") or 0.0))],
["floor_penalty", _fmt(-(info.get("floor_penalty") or 0.0))],
["perfect_bonus", _fmt(info.get("perfect_bonus"))],
["TOTAL reward", _fmt(total)],
]
# ---------------------------------------------------------------------------
# Streaming runner
# ---------------------------------------------------------------------------
def _format_log(lines: List[str]) -> str:
return "\n".join(lines)
def run_agent(
task_name: str,
backend_key: str,
seed: int = 0,
) -> Generator[Tuple[str, str, List[List[Any]], str], None, None]:
"""Run one agent rollout end-to-end and stream UI updates.
Yields tuples of (log_text, code_markdown, reward_table, status).
"""
backend = BACKENDS.get(backend_key)
if backend is None:
yield ("Unknown backend.", "", _empty_reward_table(), "error")
return
if not task_name:
yield ("Pick a task first.", "", _empty_reward_table(), "error")
return
log_lines: List[str] = []
code_md = ""
table = _empty_reward_table()
def push(line: str = "", *, status: str = "running") -> Tuple[str, str, List[List[Any]], str]:
if line:
log_lines.append(line)
return _format_log(log_lines), code_md, table, status
yield push(f"task={task_name} backend={backend.label} seed={seed}")
yield push(f"env={ENV_URL}")
# 1. Reset env
try:
ep = CLIENT.reset(task_name, seed=seed, max_steps=N_PROBES + 5)
except Exception as e: # noqa: BLE001
yield push(f"[error] /reset failed: {e}", status="error")
return
eid = ep["episode_id"]
sig = ep.get("target_function_signature", "")
yield push(f"\n=== reset ===\nepisode_id={eid}")
yield push(f"signature: {sig.splitlines()[0] if sig else '(none)'}")
# 2. Sample probe inputs from env's own auto-fuzzer
try:
inputs = CLIENT.sample_inputs(task_name, n=N_PROBES, seed=seed)
except Exception as e: # noqa: BLE001
yield push(f"[warn] sample_inputs failed: {e}; falling back to ['1']*N", status="running")
inputs = ["1"] * N_PROBES
# 3. Probe loop
yield push(f"\n=== probing ({len(inputs)} inputs) ===")
history: List[Tuple[str, str, bool]] = []
for i, inp in enumerate(inputs, 1):
try:
resp = CLIENT.probe(eid, inp)
except Exception as e: # noqa: BLE001
yield push(f" probe {i}/{len(inputs)} input={inp} [error] {e}")
continue
last = resp["observation"]["probe_history"][-1]
out = last["output_repr"]
is_err = bool(last["is_error"])
history.append((last["input_repr"], out, is_err))
tag = "raises" if is_err else "->"
yield push(f" probe {i}/{len(inputs)} input={inp} {tag} {out}")
time.sleep(0.05) # tiny delay so the UI feels live, not spammed
# 4. Build prompt + generate code
prompt = build_prompt(task_name, sig, history)
yield push(f"\n=== generating code ({backend.label}) ===")
if backend.kind == "oracle":
completion = "```python\n" + get_oracle_code(task_name) + "```"
code = extract_code(completion)
yield push("oracle: pulled canonical reference implementation.")
elif backend.kind == "hf":
if backend.adapter and not _adapter_has_weights(backend.adapter):
yield push(
f"[info] adapter {backend.adapter!r} has no weights yet "
f"(repo only contains .gitattributes); falling back to base model output.",
)
backend = BackendInfo(
key=backend.key, label=f"{backend.label} → base fallback",
kind="hf", base_model=backend.base_model, adapter=None,
)
try:
yield push(
f"loading {backend.base_model} on CPU"
+ (f" + LoRA {backend.adapter}" if backend.adapter else "")
+ " ... (cold-start may take 30-90s the first time)"
)
t0 = time.time()
completion = _generate_hf(backend.base_model, backend.adapter, prompt)
yield push(f"generated in {time.time() - t0:.1f}s ({MAX_NEW_TOKENS} max new tokens)")
except Exception as e: # noqa: BLE001
tb = traceback.format_exc(limit=2)
yield push(f"[error] generation failed: {type(e).__name__}: {e}\n{tb}", status="error")
return
code = extract_code(completion)
else:
yield push(f"[error] unknown backend kind: {backend.kind}", status="error")
return
if not code.strip():
yield push("[warn] model emitted empty completion; submitting empty stub.")
code = f"def {task_name}(*args, **kwargs):\n pass\n"
code_md = f"```python\n{code}\n```"
yield push("\n=== submitting code to /step ===")
# 5. Submit + verifier breakdown
try:
sub_resp = CLIENT.submit(eid, code)
except Exception as e: # noqa: BLE001
yield push(f"[error] /submit failed: {e}", status="error")
return
info = sub_resp.get("info", {}) or {}
total = float(sub_resp.get("reward", 0.0))
table = _reward_table_from_info(info, total)
yield push(f"verifier: matches {info.get('matches', 0)}/{info.get('fuzz_count', 0)}")
if info.get("define_error"):
yield push(f" define_error: {info['define_error']}")
by_cat = info.get("matches_by_category") or {}
counts = info.get("counts_by_category") or {}
for cat in ("edge", "random"):
m = by_cat.get(cat)
c = counts.get(cat)
if m is not None and c is not None:
yield push(f" {cat:>6}: {m}/{c}")
yield push(
f"\nreward breakdown:"
f" exec={info.get('execution_reward', 0):.2f}"
f" -complexity={info.get('complexity_penalty', 0):.2f}"
f" -hack={info.get('reward_hack_penalty', 0):.2f}"
f" -floor={info.get('floor_penalty', 0):.2f}"
f" +perfect={info.get('perfect_bonus', 0):.2f}"
)
final_status = "done"
if info.get("execution_reward", 0) >= 99.999:
yield push(f"\n*** TOTAL REWARD = {total:+.2f} (PERFECT) ***", status=final_status)
else:
yield push(f"\nTOTAL REWARD = {total:+.2f}", status=final_status)
# ---------------------------------------------------------------------------
# UI helpers
# ---------------------------------------------------------------------------
def _task_label(t: Dict[str, Any]) -> str:
diff = t.get("difficulty") or "?"
src = t.get("source", "?")
sig = t.get("signature") or t["name"]
return f"[{diff}/{src}] {sig}"
def build_task_choices() -> List[Tuple[str, str]]:
tasks = fetch_tasks()
tasks_sorted = sorted(
tasks,
key=lambda t: (
{"easy": 0, "medium": 1, "hard": 2}.get(t.get("difficulty") or "", 9),
t["name"],
),
)
return [(_task_label(t), t["name"]) for t in tasks_sorted]
# ---------------------------------------------------------------------------
# Comparison: oracle vs trained adapter on a single task
# ---------------------------------------------------------------------------
def quick_compare(task_name: str, seed: int = 0) -> str:
"""Side-by-side: oracle reward vs trained-0.5b reward on the same task.
Used by the 'baseline-vs-trained' panel. Runs *non-streaming* and just
returns a Markdown summary (we already have streaming for the main
panel). Falls back gracefully if either backend fails.
"""
out_lines = [f"### Reward comparison on `{task_name}` (seed={seed})", ""]
rows: List[Tuple[str, str]] = []
for key in ("oracle", "trained-0.5b"):
backend = BACKENDS[key]
try:
ep = CLIENT.reset(task_name, seed=seed, max_steps=2)
except Exception as e: # noqa: BLE001
rows.append((backend.label, f"reset failed: {e}"))
continue
if backend.kind == "oracle":
code = get_oracle_code(task_name)
else:
if backend.adapter and not _adapter_has_weights(backend.adapter):
rows.append((backend.label, "adapter not yet trained"))
continue
try:
inputs = CLIENT.sample_inputs(task_name, n=N_PROBES, seed=seed)
history = []
for inp in inputs:
try:
r = CLIENT.probe(ep["episode_id"], inp)
last = r["observation"]["probe_history"][-1]
history.append((last["input_repr"], last["output_repr"], bool(last["is_error"])))
except Exception: # noqa: BLE001
pass
prompt = build_prompt(task_name, ep.get("target_function_signature", ""), history)
completion = _generate_hf(backend.base_model, backend.adapter, prompt)
code = extract_code(completion) or f"def {task_name}(*a, **k): pass"
except Exception as e: # noqa: BLE001
rows.append((backend.label, f"generation failed: {e}"))
continue
try:
sub = CLIENT.submit(ep["episode_id"], code)
total = float(sub.get("reward", 0.0))
info = sub.get("info", {}) or {}
rows.append(
(
backend.label,
f"reward={total:+.2f} exec={info.get('execution_reward', 0):.0f}/100"
f" matches={info.get('matches', 0)}/{info.get('fuzz_count', 0)}",
)
)
except Exception as e: # noqa: BLE001
rows.append((backend.label, f"submit failed: {e}"))
out_lines.append("| backend | result |")
out_lines.append("| --- | --- |")
for label, r in rows:
out_lines.append(f"| {label} | {r} |")
return "\n".join(out_lines)
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
INTRO_MARKDOWN = """
# OpenSleuth — live agent demo
**The Algorithmic Detective:** an LLM agent reverse-engineers an unknown
black-box Python function by *probing* it with inputs and then *submitting*
a Python replica. The env scores the submission by domain-aware fuzzing
against the hidden reference, with edge-case stratification, a complexity
penalty, and anti-reward-hacking signals.
Pick a task, pick an agent, hit **Run agent**.
""".strip()
FOOTER_MARKDOWN = f"""
---
**Links** ·
[env Space]({ENV_SPACE_URL}) ·
[task dataset]({HUB_DATASET_URL}) ·
[GitHub]({GITHUB_URL})
**Backends:** `oracle` is the known-correct reference impl (always +100).
`base Qwen 0.5B` is `Qwen/Qwen2.5-0.5B-Instruct` with no fine-tuning.
`trained Qwen 0.5B` is the GRPO LoRA at `{ADAPTER_05B_ID}`.
`trained Qwen 3B` is the GRPO LoRA at `{ADAPTER_3B_ID}` (gracefully
falls back to "adapter not yet trained" if the repo has no weights).
Models run on CPU-basic, so first generation per backend includes a cold-load
(~30–90s for 0.5B). Generations are capped at {MAX_NEW_TOKENS} new tokens.
""".strip()
def build_ui() -> gr.Blocks:
with gr.Blocks(title="OpenSleuth — live agent demo", theme=gr.themes.Soft()) as demo:
gr.Markdown(INTRO_MARKDOWN)
# populated lazily so the Space can boot even if the env is mid-deploy
task_choices = gr.State(value=[])
with gr.Row():
task_dd = gr.Dropdown(
label="Task (15 black-box functions, easy → hard)",
choices=[],
value=None,
interactive=True,
)
backend_dd = gr.Dropdown(
label="Agent backend",
choices=BACKEND_CHOICES,
value="oracle",
interactive=True,
)
seed_in = gr.Number(label="Seed", value=0, precision=0, scale=0, minimum=0)
run_btn = gr.Button("Run agent", variant="primary", scale=0)
with gr.Row():
log_box = gr.Textbox(
label="Live agent log",
value="(idle — pick a task and a backend, then hit Run agent)",
lines=22,
max_lines=40,
interactive=False,
show_copy_button=True,
)
with gr.Row():
with gr.Column(scale=2):
code_md = gr.Markdown(label="Submitted code", value="")
with gr.Column(scale=1):
reward_tbl = gr.Dataframe(
headers=["component", "value"],
value=_empty_reward_table(),
label="Reward breakdown",
interactive=False,
wrap=True,
)
with gr.Accordion("oracle vs trained-0.5b head-to-head", open=False):
with gr.Row():
cmp_btn = gr.Button("Run quick comparison", variant="secondary")
cmp_md = gr.Markdown(value="(no comparison run yet)")
gr.Markdown(FOOTER_MARKDOWN)
# ---- wiring ------------------------------------------------------
def _refresh_tasks():
choices = build_task_choices()
default = choices[0][1] if choices else None
return gr.Dropdown(choices=choices, value=default), choices
demo.load(_refresh_tasks, outputs=[task_dd, task_choices])
run_btn.click(
fn=run_agent,
inputs=[task_dd, backend_dd, seed_in],
outputs=[log_box, code_md, reward_tbl, gr.State()],
show_progress="minimal",
)
cmp_btn.click(
fn=quick_compare,
inputs=[task_dd, seed_in],
outputs=[cmp_md],
show_progress="minimal",
)
return demo
demo = build_ui().queue(default_concurrency_limit=2)
if __name__ == "__main__":
demo.launch()