| """Thin HTTP client for the OpenSleuth env Space.""" |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import time |
| from typing import Any, Dict, List, Optional |
|
|
| import requests |
|
|
| log = logging.getLogger("opensleuth.client") |
|
|
|
|
| class EnvClient: |
| def __init__(self, base_url: str | None = None, timeout: float = 30.0, retries: int = 3): |
| self.base_url = (base_url or os.environ.get("ENV_URL", "http://127.0.0.1:7860")).rstrip("/") |
| self.timeout = timeout |
| self.retries = retries |
|
|
| def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: |
| last_exc: Exception | None = None |
| for attempt in range(self.retries): |
| try: |
| r = requests.post(f"{self.base_url}{path}", json=payload, timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
| except (requests.RequestException, ValueError) as e: |
| last_exc = e |
| wait = 0.5 * (2 ** attempt) |
| log.warning("env POST %s failed (%s); retrying in %.1fs", path, e, wait) |
| time.sleep(wait) |
| raise RuntimeError(f"env POST {path} failed after {self.retries} retries: {last_exc}") |
|
|
| def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
| last_exc: Exception | None = None |
| for attempt in range(self.retries): |
| try: |
| r = requests.get(f"{self.base_url}{path}", params=params, timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
| except (requests.RequestException, ValueError) as e: |
| last_exc = e |
| wait = 0.5 * (2 ** attempt) |
| log.warning("env GET %s failed (%s); retrying in %.1fs", path, e, wait) |
| time.sleep(wait) |
| raise RuntimeError(f"env GET {path} failed after {self.retries} retries: {last_exc}") |
|
|
| def health(self) -> Dict[str, Any]: |
| r = requests.get(f"{self.base_url}/health", timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
|
|
| def list_functions(self) -> list[Dict[str, str]]: |
| """Legacy v0.3 endpoint -- only the 9 builtin functions.""" |
| r = requests.get(f"{self.base_url}/functions", timeout=self.timeout) |
| r.raise_for_status() |
| return r.json()["functions"] |
|
|
| def list_tasks( |
| self, |
| source: str = "all", |
| difficulty: Optional[str] = None, |
| ) -> List[Dict[str, Any]]: |
| """v0.4 catalog endpoint -- builtins + Hub-driven tasks. |
| |
| Each item carries: ``name``, ``signature``, ``description``, |
| ``difficulty`` (``easy|medium|hard|None``), ``edge_case_count``, |
| ``source`` (``builtin|hub``). |
| """ |
| params: Dict[str, Any] = {"source": source} |
| if difficulty: |
| params["difficulty"] = difficulty |
| return self._get("/tasks", params=params)["tasks"] |
|
|
| def sample_inputs(self, target_name: str, n: int = 8, seed: int = 0) -> List[str]: |
| """Pull ``n`` ready-to-probe input_repr strings from the env's own |
| auto-fuzzer. Encapsulates the fuzz logic on the env side so the |
| trainer doesn't have to keep its own per-task input pools in sync.""" |
| resp = self._get( |
| f"/tasks/{target_name}/sample_inputs", |
| params={"n": n, "seed": seed}, |
| ) |
| return list(resp["inputs"]) |
|
|
| def reset(self, target_name: str, seed: int = 0, max_steps: int = 25) -> Dict[str, Any]: |
| return self._post("/reset", {"target_name": target_name, "seed": seed, "max_steps": max_steps}) |
|
|
| def step(self, episode_id: str, action: Dict[str, Any]) -> Dict[str, Any]: |
| return self._post("/step", {"episode_id": episode_id, "action": action}) |
|
|
| |
|
|
| def submit(self, episode_id: str, code: str) -> Dict[str, Any]: |
| return self.step(episode_id, {"action_type": "submit", "code": code}) |
|
|
| def probe(self, episode_id: str, input_repr: str) -> Dict[str, Any]: |
| return self.step(episode_id, {"action_type": "probe", "input_repr": input_repr}) |
|
|
| def score_submission(self, target_name: str, code: str, seed: int = 0) -> float: |
| """One-shot: open an episode, submit the code, return total reward.""" |
| ep = self.reset(target_name=target_name, seed=seed, max_steps=2) |
| resp = self.submit(ep["episode_id"], code) |
| return float(resp["reward"]) |
|
|