"""Thin HTTP client for the OpenSleuth env Space.""" from __future__ import annotations import logging import os import time from typing import Any, Dict, List, Optional import requests log = logging.getLogger("opensleuth.client") class EnvClient: def __init__(self, base_url: str | None = None, timeout: float = 30.0, retries: int = 3): self.base_url = (base_url or os.environ.get("ENV_URL", "http://127.0.0.1:7860")).rstrip("/") self.timeout = timeout self.retries = retries def _post(self, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: last_exc: Exception | None = None for attempt in range(self.retries): try: r = requests.post(f"{self.base_url}{path}", json=payload, timeout=self.timeout) r.raise_for_status() return r.json() except (requests.RequestException, ValueError) as e: # noqa: PERF203 last_exc = e wait = 0.5 * (2 ** attempt) log.warning("env POST %s failed (%s); retrying in %.1fs", path, e, wait) time.sleep(wait) raise RuntimeError(f"env POST {path} failed after {self.retries} retries: {last_exc}") def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: last_exc: Exception | None = None for attempt in range(self.retries): try: r = requests.get(f"{self.base_url}{path}", params=params, timeout=self.timeout) r.raise_for_status() return r.json() except (requests.RequestException, ValueError) as e: # noqa: PERF203 last_exc = e wait = 0.5 * (2 ** attempt) log.warning("env GET %s failed (%s); retrying in %.1fs", path, e, wait) time.sleep(wait) raise RuntimeError(f"env GET {path} failed after {self.retries} retries: {last_exc}") def health(self) -> Dict[str, Any]: r = requests.get(f"{self.base_url}/health", timeout=self.timeout) r.raise_for_status() return r.json() def list_functions(self) -> list[Dict[str, str]]: """Legacy v0.3 endpoint -- only the 9 builtin functions.""" r = requests.get(f"{self.base_url}/functions", timeout=self.timeout) r.raise_for_status() return r.json()["functions"] def list_tasks( self, source: str = "all", difficulty: Optional[str] = None, ) -> List[Dict[str, Any]]: """v0.4 catalog endpoint -- builtins + Hub-driven tasks. Each item carries: ``name``, ``signature``, ``description``, ``difficulty`` (``easy|medium|hard|None``), ``edge_case_count``, ``source`` (``builtin|hub``). """ params: Dict[str, Any] = {"source": source} if difficulty: params["difficulty"] = difficulty return self._get("/tasks", params=params)["tasks"] def sample_inputs(self, target_name: str, n: int = 8, seed: int = 0) -> List[str]: """Pull ``n`` ready-to-probe input_repr strings from the env's own auto-fuzzer. Encapsulates the fuzz logic on the env side so the trainer doesn't have to keep its own per-task input pools in sync.""" resp = self._get( f"/tasks/{target_name}/sample_inputs", params={"n": n, "seed": seed}, ) return list(resp["inputs"]) def reset(self, target_name: str, seed: int = 0, max_steps: int = 25) -> Dict[str, Any]: return self._post("/reset", {"target_name": target_name, "seed": seed, "max_steps": max_steps}) def step(self, episode_id: str, action: Dict[str, Any]) -> Dict[str, Any]: return self._post("/step", {"episode_id": episode_id, "action": action}) # --- High-level helpers used by the reward function -------------------- def submit(self, episode_id: str, code: str) -> Dict[str, Any]: return self.step(episode_id, {"action_type": "submit", "code": code}) def probe(self, episode_id: str, input_repr: str) -> Dict[str, Any]: return self.step(episode_id, {"action_type": "probe", "input_repr": input_repr}) def score_submission(self, target_name: str, code: str, seed: int = 0) -> float: """One-shot: open an episode, submit the code, return total reward.""" ep = self.reset(target_name=target_name, seed=seed, max_steps=2) resp = self.submit(ep["episode_id"], code) return float(resp["reward"])