Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 4,954 Bytes
be350cb 2c820a4 be350cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """
Utilities for logging solver scores to a Hugging Face dataset.
"""
from __future__ import annotations
import json
import re
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from huggingface_hub import HfApi, hf_hub_download
AVERAGE_RE = re.compile(r"Average normalized score:\s*([0-9.]+)")
DEFAULT_FILENAME = "records.jsonl"
def _hydra_join(*parts: str | None) -> str:
tokens = [str(part).strip().replace(" ", "_") for part in parts if part]
return "/".join(tokens) if tokens else "default"
def detect_agent_version(config_path: str = "agent/config_mcp_example.json") -> str:
"""
Returns a short string identifying the current agent version:
<git short sha>-<config hash>.
"""
try:
commit = (
subprocess.check_output(["git", "rev-parse", "--short", "HEAD"])
.decode()
.strip()
)
except Exception:
commit = "unknown"
config_file = Path(config_path)
config_stem = config_file.stem or "config"
parent_name = config_file.parent.name if config_file.parent.name else None
return _hydra_join(parent_name, config_stem, commit)
def parse_average_score(text: str) -> float | None:
"""Extracts the 'Average normalized score' value from Inspect logs."""
match = AVERAGE_RE.search(text)
if match:
try:
return float(match.group(1))
except ValueError:
return None
return None
def latest_log_file(
log_dir: Path, extensions: tuple[str, ...] = (".eval", ".json")
) -> Path | None:
"""Returns the most recent log file in log_dir matching the provided extensions."""
if not log_dir.exists():
return None
files: list[Path] = []
for ext in extensions:
files.extend(log_dir.glob(f"*{ext}"))
if not files:
return None
files.sort(key=lambda path: path.stat().st_mtime)
return files[-1]
@dataclass
class LeaderboardClient:
"""Simple helper to append JSONL rows to a HF dataset."""
repo_id: str
token: str
filename: str = DEFAULT_FILENAME
def append_record(self, record: dict[str, Any]) -> None:
tmp_dir = Path(tempfile.mkdtemp(prefix="leaderboard_"))
local_file = tmp_dir / self.filename
self._download_existing(local_file)
if not local_file.exists():
local_file.write_text("", encoding="utf-8")
with local_file.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")
HfApi(token=self.token).upload_file(
path_or_fileobj=str(local_file),
path_in_repo=self.filename,
repo_id=self.repo_id,
repo_type="dataset",
)
try:
local_file.unlink()
tmp_dir.rmdir()
except OSError:
pass
def _download_existing(self, destination: Path) -> None:
destination.parent.mkdir(parents=True, exist_ok=True)
try:
downloaded = hf_hub_download(
repo_id=self.repo_id,
filename=self.filename,
repo_type="dataset",
token=self.token,
)
shutil.copy(Path(downloaded), destination)
except Exception:
destination.write_text("", encoding="utf-8")
def build_record(
solver_name: str,
solver_kwargs: dict[str, Any],
dataset_name: str,
dataset_split: str,
limit: int | None,
score: float,
command: list[str],
log_path: Path | None,
criterion_checks: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Assembles a JSON-serialisable record for the leaderboard dataset."""
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"solver": solver_name,
"solver_kwargs": solver_kwargs,
"dataset_name": dataset_name,
"dataset_split": dataset_split,
"limit": limit,
"score": score,
"command": command,
}
if solver_name == "hf_agent":
record["solver_version"] = detect_agent_version(
solver_kwargs.get("config_path", "agent/config_mcp_example.json")
)
else:
version_spec = solver_kwargs.get("version")
if isinstance(version_spec, (list, tuple)):
record["solver_version"] = _hydra_join(*version_spec)
elif isinstance(version_spec, dict):
record["solver_version"] = _hydra_join(
*[f"{k}={v}" for k, v in version_spec.items()]
)
elif isinstance(version_spec, str):
record["solver_version"] = version_spec
else:
record["solver_version"] = _hydra_join(solver_name, "default")
if log_path:
record["log_artifact"] = str(log_path)
record["criterion_checks"] = criterion_checks or []
return record
|