Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Utilities for logging solver scores to a Hugging Face dataset. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| from huggingface_hub import HfApi, hf_hub_download | |
| AVERAGE_RE = re.compile(r"Average normalized score:\s*([0-9.]+)") | |
| DEFAULT_FILENAME = "records.jsonl" | |
| def _hydra_join(*parts: str | None) -> str: | |
| tokens = [str(part).strip().replace(" ", "_") for part in parts if part] | |
| return "/".join(tokens) if tokens else "default" | |
| def detect_agent_version(config_path: str = "agent/config_mcp_example.json") -> str: | |
| """ | |
| Returns a short string identifying the current agent version: | |
| <git short sha>-<config hash>. | |
| """ | |
| try: | |
| commit = ( | |
| subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]) | |
| .decode() | |
| .strip() | |
| ) | |
| except Exception: | |
| commit = "unknown" | |
| config_file = Path(config_path) | |
| config_stem = config_file.stem or "config" | |
| parent_name = config_file.parent.name if config_file.parent.name else None | |
| return _hydra_join(parent_name, config_stem, commit) | |
| def parse_average_score(text: str) -> float | None: | |
| """Extracts the 'Average normalized score' value from Inspect logs.""" | |
| match = AVERAGE_RE.search(text) | |
| if match: | |
| try: | |
| return float(match.group(1)) | |
| except ValueError: | |
| return None | |
| return None | |
| def latest_log_file( | |
| log_dir: Path, extensions: tuple[str, ...] = (".eval", ".json") | |
| ) -> Path | None: | |
| """Returns the most recent log file in log_dir matching the provided extensions.""" | |
| if not log_dir.exists(): | |
| return None | |
| files: list[Path] = [] | |
| for ext in extensions: | |
| files.extend(log_dir.glob(f"*{ext}")) | |
| if not files: | |
| return None | |
| files.sort(key=lambda path: path.stat().st_mtime) | |
| return files[-1] | |
| class LeaderboardClient: | |
| """Simple helper to append JSONL rows to a HF dataset.""" | |
| repo_id: str | |
| token: str | |
| filename: str = DEFAULT_FILENAME | |
| def append_record(self, record: dict[str, Any]) -> None: | |
| tmp_dir = Path(tempfile.mkdtemp(prefix="leaderboard_")) | |
| local_file = tmp_dir / self.filename | |
| self._download_existing(local_file) | |
| if not local_file.exists(): | |
| local_file.write_text("", encoding="utf-8") | |
| with local_file.open("a", encoding="utf-8") as fh: | |
| fh.write(json.dumps(record) + "\n") | |
| HfApi(token=self.token).upload_file( | |
| path_or_fileobj=str(local_file), | |
| path_in_repo=self.filename, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| ) | |
| try: | |
| local_file.unlink() | |
| tmp_dir.rmdir() | |
| except OSError: | |
| pass | |
| def _download_existing(self, destination: Path) -> None: | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| downloaded = hf_hub_download( | |
| repo_id=self.repo_id, | |
| filename=self.filename, | |
| repo_type="dataset", | |
| token=self.token, | |
| ) | |
| shutil.copy(Path(downloaded), destination) | |
| except Exception: | |
| destination.write_text("", encoding="utf-8") | |
| def build_record( | |
| solver_name: str, | |
| solver_kwargs: dict[str, Any], | |
| dataset_name: str, | |
| dataset_split: str, | |
| limit: int | None, | |
| score: float, | |
| command: list[str], | |
| log_path: Path | None, | |
| criterion_checks: list[dict[str, Any]] | None = None, | |
| ) -> dict[str, Any]: | |
| """Assembles a JSON-serialisable record for the leaderboard dataset.""" | |
| record = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "solver": solver_name, | |
| "solver_kwargs": solver_kwargs, | |
| "dataset_name": dataset_name, | |
| "dataset_split": dataset_split, | |
| "limit": limit, | |
| "score": score, | |
| "command": command, | |
| } | |
| if solver_name == "hf_agent": | |
| record["solver_version"] = detect_agent_version( | |
| solver_kwargs.get("config_path", "agent/config_mcp_example.json") | |
| ) | |
| else: | |
| version_spec = solver_kwargs.get("version") | |
| if isinstance(version_spec, (list, tuple)): | |
| record["solver_version"] = _hydra_join(*version_spec) | |
| elif isinstance(version_spec, dict): | |
| record["solver_version"] = _hydra_join( | |
| *[f"{k}={v}" for k, v in version_spec.items()] | |
| ) | |
| elif isinstance(version_spec, str): | |
| record["solver_version"] = version_spec | |
| else: | |
| record["solver_version"] = _hydra_join(solver_name, "default") | |
| if log_path: | |
| record["log_artifact"] = str(log_path) | |
| record["criterion_checks"] = criterion_checks or [] | |
| return record | |