leaderboard / backend /data_loader.py
Alyafeai's picture
fix(ui): render structured benchmark details correctly
7749d9c
# backend/data_loader.py
import json
import os
import contextlib
import io
import logging
import re
import ast
from collections import deque
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Any, Optional
from urllib.parse import quote
import numpy as np
import pandas as pd
import requests
from huggingface_hub import snapshot_download
from datetime import datetime
from huggingface_hub.constants import HF_HUB_CACHE
from backend.config import (
API,
DETAILS_REPO_ID,
REQUESTS_REPO_ID,
RESULTS_REPO_ID,
TASKS,
TASK_SOURCES,
HIDDEN_TASKS,
MODEL_TYPE_TO_EMOJI,
hf_api_token,
)
from backend.helpers import unify_precision, get_model_size
logger = logging.getLogger(__name__)
_SOURCE_BY_PREFIX = {
prefix.lower(): source
for source, cfg in TASK_SOURCES.items()
for prefix in cfg.get("prefixes", [])
}
_TASKS_BY_SOURCE = {
source: cfg.get("tasks", [])
for source, cfg in TASK_SOURCES.items()
}
# Wire hidden tasks into the "results" source so _parse_result_file extracts
# them alongside normal tasks without touching the shared TASK_SOURCES dict.
_TASKS_BY_SOURCE["results"] = list(_TASKS_BY_SOURCE.get("results", [])) + HIDDEN_TASKS
_RESULT_SCORE_CACHE: Dict[tuple[str, str], Optional[float]] = {}
def _extract_task_bases(task_key: Any) -> List[str]:
if isinstance(task_key, list):
bases: List[str] = []
for item in task_key:
bases.extend(_extract_task_bases(item))
return bases
if not isinstance(task_key, str):
return []
key = task_key.strip()
if not key:
return []
return [key.split(":", 1)[0].split("|", 1)[0].strip()]
BENCHMARK_DISPLAY_TO_BASES: Dict[str, List[str]] = {}
for task_key, _, display in TASKS:
bases = BENCHMARK_DISPLAY_TO_BASES.setdefault(display, [])
for base in _extract_task_bases(task_key):
if base and base not in bases:
bases.append(base)
def _extract_base_metric_pairs(task_key: Any, metric_key: Any) -> List[tuple[str, str]]:
pairs: List[tuple[str, str]] = []
if isinstance(task_key, list):
if isinstance(metric_key, list):
for tk, mk in zip(task_key, metric_key):
if isinstance(mk, tuple):
mk = mk[0]
pairs.extend(_extract_base_metric_pairs(tk, mk))
return pairs
if not isinstance(task_key, str) or not isinstance(metric_key, str):
return pairs
base = task_key.split(":", 1)[0].split("|", 1)[0].strip()
if base:
pairs.append((base, metric_key))
return pairs
BENCHMARK_BASE_TO_METRICS: Dict[str, List[str]] = {}
BENCHMARK_DISPLAY_TO_BASE_METRICS: Dict[str, Dict[str, List[str]]] = {}
for task_key, metric_key, display in TASKS:
display_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.setdefault(display, {})
for base, metric_name in _extract_base_metric_pairs(task_key, metric_key):
base_bucket = BENCHMARK_BASE_TO_METRICS.setdefault(base, [])
if metric_name and metric_name not in base_bucket:
base_bucket.append(metric_name)
display_metric_bucket = display_bucket.setdefault(base, [])
if metric_name and metric_name not in display_metric_bucket:
display_metric_bucket.append(metric_name)
DETAILS_EXTENSIONS = {".parquet", ".json", ".jsonl"}
def _norm_key(value: Any) -> str:
return re.sub(r"[^a-z0-9]+", "", str(value or "").strip().lower())
def _canonical_base_key(value: Any) -> str:
n = _norm_key(value)
if n.startswith("qimma"):
return n[len("qimma"):]
return n
# -----------------------------------------------------------------------------
# Utilities
# -----------------------------------------------------------------------------
def silent_snapshot_download(**kwargs):
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
return snapshot_download(**kwargs)
def _resolve_details_base_path() -> Path:
repo_cache_root = Path(HF_HUB_CACHE) / f"datasets--{DETAILS_REPO_ID.replace('/', '--')}"
snapshots_root = repo_cache_root / "snapshots"
if snapshots_root.exists():
candidates = [p for p in snapshots_root.iterdir() if p.is_dir()]
if candidates:
return max(candidates, key=lambda p: p.stat().st_mtime)
manual_root = repo_cache_root / "manual-snapshot"
manual_root.mkdir(parents=True, exist_ok=True)
return manual_root
def _download_details_file(relative_path: str, base_path: Path, retries: int = 3) -> bool:
encoded_rel_path = quote(relative_path, safe="/")
url = f"https://huggingface.co/datasets/{DETAILS_REPO_ID}/resolve/main/{encoded_rel_path}"
headers = {}
if hf_api_token:
headers["Authorization"] = f"Bearer {hf_api_token}"
target_path = base_path / relative_path
target_path.parent.mkdir(parents=True, exist_ok=True)
partial_path = target_path.with_suffix(target_path.suffix + ".part")
for attempt in range(1, retries + 1):
try:
with requests.get(url, stream=True, timeout=(10, 90), headers=headers) as resp:
resp.raise_for_status()
with open(partial_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
if partial_path.exists():
os.replace(partial_path, target_path)
elif target_path.exists():
return True
else:
raise FileNotFoundError(f"Temporary download file missing: {partial_path}")
return True
except Exception as e:
with contextlib.suppress(Exception):
partial_path.unlink(missing_ok=True)
logger.warning(
"Retry %s/%s for details file '%s' failed: %s",
attempt,
retries,
relative_path,
e,
)
return False
def _sync_details_dataset(base_path: Path):
try:
remote_files = [
f for f in API.list_repo_files(repo_id=DETAILS_REPO_ID, repo_type="dataset")
if Path(f).suffix.lower() in DETAILS_EXTENSIONS and Path(f).name.startswith("details_")
]
except Exception as e:
logger.warning("Could not list files for details repo '%s': %s", DETAILS_REPO_ID, e)
return
local_files = {
str(p.relative_to(base_path)).replace(os.sep, "/")
for p in base_path.rglob("*")
if p.is_file() and p.suffix.lower() in DETAILS_EXTENSIONS
}
remote_set = set(remote_files)
ready_local = local_files & remote_set
missing_files = [f for f in remote_files if f not in local_files]
total_count = len(remote_files)
local_count = len(ready_local)
if not missing_files:
logger.info("Details files ready: %s/%s", local_count, total_count)
return
logger.info(
"Details files ready: %s/%s. Downloading %s missing files...",
local_count,
total_count,
len(missing_files),
)
failed_files: List[str] = []
total_missing = len(missing_files)
for idx, rel_path in enumerate(missing_files, start=1):
logger.info("Downloading missing details file %s/%s: %s", idx, total_missing, rel_path)
if not _download_details_file(rel_path, base_path):
failed_files.append(rel_path)
if failed_files:
logger.warning(
"Details sync incomplete. Downloaded %s/%s missing files. Still missing %s files.",
total_missing - len(failed_files),
total_missing,
len(failed_files),
)
for rel_path in failed_files:
logger.warning("Still missing: %s", rel_path)
else:
logger.info("Details sync complete: downloaded %s/%s missing files.", total_missing, total_missing)
def download_datasets():
"""
Download requests + results datasets (read-only, anonymous).
"""
req_path = silent_snapshot_download(
repo_id=REQUESTS_REPO_ID,
repo_type="dataset",
allow_patterns="*.json",
)
os.environ["EVAL_REQUESTS_PATH"] = req_path
res_path = silent_snapshot_download(
repo_id=RESULTS_REPO_ID,
repo_type="dataset",
allow_patterns=["*.json", "*.jsonl"],
)
os.environ["EVAL_RESULTS_PATH"] = res_path
details_path = silent_snapshot_download(
repo_id=DETAILS_REPO_ID,
repo_type="dataset",
allow_patterns=["*.parquet", "*.json", "*.jsonl"],
)
os.environ["EVAL_DETAILS_PATH"] = details_path
# -----------------------------------------------------------------------------
# Requests
# -----------------------------------------------------------------------------
def load_requests(status: Optional[str] = None) -> pd.DataFrame:
base = os.getenv("EVAL_REQUESTS_PATH")
if not base:
return pd.DataFrame()
rows = []
for p in Path(base).rglob("*.json"):
try:
with open(p, "r", encoding="utf-8") as f:
d = json.load(f)
except Exception:
continue
if status is None or d.get("status", "").lower() == status.lower():
rows.append(d)
return pd.DataFrame(rows)
# -----------------------------------------------------------------------------
# Results parsing
# -----------------------------------------------------------------------------
def _infer_source_from_filename(path: Path) -> Optional[str]:
parsed = _parse_result_filename(path)
if parsed:
return parsed.get("source")
stem = path.stem
if "_" not in stem:
return None
prefix = stem.split("_", 1)[0].lower()
return _SOURCE_BY_PREFIX.get(prefix)
def _parse_result_filename(path: Path) -> Optional[Dict[str, Any]]:
stem = path.stem
if not stem.startswith("results_"):
return None
try:
_, dt_str = stem.rsplit("_", 1)
parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f")
except Exception:
return None
name_part = stem[len("results_"):].rsplit("_", 1)[0].strip()
if not name_part:
return {"source": "results", "datetime": parsed_dt, "name_part": ""}
base_hint = name_part.split("|", 1)[0].strip()
canon = _canonical_base_key(base_hint)
if canon in {"evalplus", "humaneval", "mbpp"}:
source = "code"
elif canon in {"fannorflop", "fannflop"}:
source = "fannflop"
else:
source = "results"
# Ignore redundant single-benchmark mbpp result shards.
if canon == "mbpp":
return {"source": "ignore", "datetime": parsed_dt, "name_part": name_part}
return {"source": source, "datetime": parsed_dt, "name_part": name_part}
def _load_json_payload_any(path: Path) -> Any:
if path.suffix.lower() != ".jsonl":
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
text = path.read_text(encoding="utf-8", errors="ignore").strip()
if not text:
return {}
with contextlib.suppress(Exception):
return json.loads(text)
rows: List[Any] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
with contextlib.suppress(Exception):
rows.append(json.loads(line))
return rows
def _parse_result_file(path: Path) -> Optional[Dict[str, Any]]:
try:
raw = _load_json_payload_any(path)
except Exception:
return None
parsed_name = _parse_result_filename(path)
source_type = parsed_name["source"] if parsed_name else _infer_source_from_filename(path)
if source_type in {None, "ignore"}:
return None
data = raw
if isinstance(raw, list):
data = next((x for x in raw if isinstance(x, dict) and ("results" in x or "model_name" in x)), None)
if data is None and raw and isinstance(raw[0], dict):
data = raw[0]
if not isinstance(data, dict):
return None
cfg = data.get("config_general", {})
results = data.get("results", {})
if not isinstance(results, dict):
return None
model = cfg.get("model_name") or data.get("model_name", "UNK")
precision = unify_precision(cfg.get("model_dtype", "UNK"))
parsed_dt = parsed_name["datetime"] if parsed_name else None
if parsed_dt is None:
return None
row = {
"Model Name": model,
"Precision": precision,
"datetime": parsed_dt,
"Source Type": source_type,
}
for task_key, metric_key, display in _TASKS_BY_SOURCE.get(source_type, []):
if isinstance(task_key, list):
weight_total = 0
metric_total = 0
for t, (m, w) in zip(task_key, metric_key):
val = results.get(t, {}).get(m, 0)
metric_total += (val * w)
weight_total += w
val = metric_total / weight_total if weight_total > 0 else np.nan
else:
val = np.nan
if task_key in results and metric_key in results[task_key]:
val = results.get(task_key, {}).get(metric_key)
if val is None:
logger.warning(
"Missing metric value for task '%s' in model '%s'",
task_key,
model,
)
row[display] = val
return row
def _latest_model_benchmark_score_pct(model_name: str, benchmark_display: str) -> Optional[float]:
cache_key = (model_name, benchmark_display)
if cache_key in _RESULT_SCORE_CACHE:
return _RESULT_SCORE_CACHE[cache_key]
base = os.getenv("EVAL_RESULTS_PATH")
if not base:
_RESULT_SCORE_CACHE[cache_key] = None
return None
latest_dt: Optional[datetime] = None
latest_val: Optional[float] = None
for p in Path(base).rglob("*"):
if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}:
continue
row = _parse_result_file(p)
if not row:
continue
if str(row.get("Model Name", "")).strip() != str(model_name).strip():
continue
raw_val = _to_float_scalar(row.get(benchmark_display))
if raw_val is None:
continue
row_dt = row.get("datetime")
if not isinstance(row_dt, datetime):
continue
if latest_dt is None or row_dt > latest_dt:
latest_dt = row_dt
latest_val = raw_val * 100.0
_RESULT_SCORE_CACHE[cache_key] = latest_val
return latest_val
def _parse_details_filename(path: Path) -> Optional[Dict[str, Any]]:
stem = path.stem
if "_" not in stem:
return None
details_part, dt_str = stem.rsplit("_", 1)
if not details_part.startswith("details_"):
return None
try:
parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f")
except Exception:
return None
task_full = details_part[len("details_"):].strip()
if not task_full:
return None
benchmark_base = task_full.split(":", 1)[0].split("|", 1)[0].strip()
if ":" in task_full:
subtask = task_full.split(":", 1)[1].strip()
else:
subtask = "overall"
subtask = re.sub(r"\|\d+$", "", subtask).strip() or "overall"
return {
"benchmark_base": benchmark_base,
"subtask": subtask,
"datetime": parsed_dt,
"task_full": task_full,
}
def build_details_index() -> Dict[str, Dict[str, Dict[str, Dict[str, Any]]]]:
"""
Build an index of latest detail file paths per model/benchmark/subtask.
"""
details_base = os.getenv("EVAL_DETAILS_PATH")
if not details_base:
return {}
base_path = Path(details_base)
if not base_path.exists():
return {}
index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]] = {}
for p in base_path.rglob("*"):
if not p.is_file() or p.suffix.lower() not in DETAILS_EXTENSIONS:
continue
parsed = _parse_details_filename(p)
if not parsed:
continue
try:
rel_parts = p.relative_to(base_path).parts
except Exception:
continue
if len(rel_parts) < 2:
continue
model_name = "/".join(rel_parts[:-1]).strip("/")
if not model_name:
continue
benchmark_base = parsed["benchmark_base"]
subtask = parsed["subtask"]
dt = parsed["datetime"]
model_bucket = index.setdefault(model_name, {})
bench_bucket = model_bucket.setdefault(benchmark_base, {})
current = bench_bucket.get(subtask)
if current is None or dt > current["datetime"]:
bench_bucket[subtask] = {
"path": str(p),
"datetime": dt,
"task_full": parsed["task_full"],
}
return index
def _as_list(value: Any) -> List[Any]:
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, tuple):
return list(value)
if isinstance(value, np.ndarray):
return value.tolist()
return [value]
def _as_dict(value: Any) -> Dict[str, Any]:
if isinstance(value, dict):
return value
if isinstance(value, (bytes, bytearray)):
try:
value = value.decode("utf-8", errors="ignore")
except Exception:
return {}
if isinstance(value, str):
s = value.strip()
if not s:
return {}
try:
parsed = json.loads(s)
return parsed if isinstance(parsed, dict) else {}
except Exception:
try:
parsed = ast.literal_eval(s)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
if isinstance(value, list):
# Some parquet backends can expose map-like structs as list of pairs.
try:
if all(isinstance(item, (list, tuple)) and len(item) == 2 for item in value):
return {str(k): v for k, v in value}
except Exception:
return {}
return {}
def _py_scalar(value: Any) -> Any:
if isinstance(value, np.ndarray):
if value.ndim == 0:
return _py_scalar(value.item())
if value.size == 1:
return _py_scalar(value.reshape(-1)[0])
return [_py_scalar(v) for v in value.tolist()]
if isinstance(value, np.generic):
return value.item()
return value
def _decode_structured_string(value: Any) -> Any:
value = _py_scalar(value)
if not isinstance(value, str):
return value
s = value.strip()
if not s:
return value
looks_structured = (
(s.startswith("{") and s.endswith("}")) or
(s.startswith("[") and s.endswith("]"))
)
if not looks_structured:
return value
for parser in (json.loads, ast.literal_eval):
with contextlib.suppress(Exception):
parsed = parser(s)
if isinstance(parsed, (dict, list)):
return _json_safe(parsed)
return value
def _json_safe(value: Any) -> Any:
value = _py_scalar(value)
if isinstance(value, dict):
return {str(k): _json_safe(v) for k, v in value.items()}
if isinstance(value, list):
return [_json_safe(v) for v in value]
if isinstance(value, tuple):
return [_json_safe(v) for v in value]
return value
def _to_float_scalar(value: Any) -> Optional[float]:
value = _py_scalar(value)
if isinstance(value, (int, float, np.integer, np.floating)):
return float(value)
return None
def _normalize_indices(value: Any) -> List[int]:
indices: List[int] = []
for item in _as_list(value):
item = _py_scalar(item)
if isinstance(item, (int, np.integer)):
indices.append(int(item))
return indices
def _format_answer(values: List[Any]) -> Any:
if not values:
return None
clean = [str(_py_scalar(v)) for v in values]
if len(clean) == 1:
return clean[0]
return ", ".join(clean)
def _norm_answer(value: Any) -> str:
value = _py_scalar(value)
if value is None:
return ""
return str(value).strip()
def _is_primitive_answer(value: Any) -> bool:
value = _py_scalar(value)
return value is not None and isinstance(value, (str, int, float, bool, np.integer, np.floating))
def _pick_metric(
metric: Dict[str, Any],
benchmark_base: str,
preferred_metrics: Optional[List[str]] = None,
) -> tuple[Optional[str], Optional[float]]:
if not isinstance(metric, dict) or not metric:
return None, None
preferred = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(benchmark_base, [])
if not preferred:
canon_base = _canonical_base_key(benchmark_base)
for base_key, names in BENCHMARK_BASE_TO_METRICS.items():
if _canonical_base_key(base_key) == canon_base:
preferred = names
break
for name in preferred:
if name in metric:
val = _to_float_scalar(metric.get(name))
if val is not None:
return name, val
# Fallback for known detail formats.
for name in ["normalized_score_norm", "BERTScore-F", "acc", "accuracy"]:
if name in metric:
val = _to_float_scalar(metric.get(name))
if val is not None:
return name, val
for name, raw_val in metric.items():
val = _to_float_scalar(raw_val)
if val is not None:
return str(name), val
return None, None
def _is_binary_metric_name(metric_name: Optional[str]) -> bool:
if not metric_name:
return False
n = metric_name.lower()
return (
n.startswith("acc")
or "accuracy" in n
or "score_norm" in n
or n.endswith("_status")
or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"}
)
def _is_choice_metric_name(metric_name: Optional[str]) -> bool:
if not metric_name:
return False
n = metric_name.lower()
return (
n.startswith("acc")
or "mc_prob" in n
or "score_norm" in n
or n.endswith("_status")
or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"}
)
def _extract_predicted_answer(model_response: Dict[str, Any], choices: List[Any]) -> Any:
logprobs = model_response.get("logprobs")
if logprobs is not None and choices:
values = _as_list(logprobs)
try:
idx = int(np.argmax(np.asarray(values, dtype=float)))
if 0 <= idx < len(choices):
return choices[idx]
except Exception:
pass
text_post_processed = _as_list(model_response.get("text_post_processed"))
if text_post_processed:
return text_post_processed[0]
text = _as_list(model_response.get("text"))
if text:
return text[0]
return None
def _first_non_empty(values: Any) -> Optional[str]:
for v in _as_list(values):
if v is None:
continue
s = str(v).strip()
if s:
return s
return None
def _structured_record_to_row(
record: Dict[str, Any],
subtask: str,
benchmark_base: str,
preferred_metrics: Optional[List[str]] = None,
) -> Dict[str, Any]:
doc = _as_dict(record.get("doc"))
metric = _as_dict(record.get("metric"))
model_response = _as_dict(record.get("model_response"))
choices = _as_list(doc.get("choices"))
choices = [_py_scalar(c) for c in choices]
gold_indices = _normalize_indices(doc.get("gold_index"))
gold_values: List[Any] = []
for idx in gold_indices:
if 0 <= idx < len(choices):
gold_values.append(choices[idx])
gold_answer = _format_answer(gold_values)
metric_name, metric_value = _pick_metric(metric, benchmark_base, preferred_metrics)
model_response_dict = model_response if isinstance(model_response, dict) else {}
predicted_answer = _extract_predicted_answer(model_response_dict, choices)
output_text = _first_non_empty(model_response_dict.get("text_post_processed"))
if output_text is None:
output_text = _first_non_empty(model_response_dict.get("text"))
if output_text is None and predicted_answer is not None:
output_text = str(predicted_answer)
is_correct = None
if metric_value is not None and _is_binary_metric_name(metric_name) and metric_value in (0.0, 1.0):
is_correct = bool(metric_value)
else:
binary_score = _to_float_scalar(metric.get("normalized_score_norm"))
if binary_score is not None and binary_score in (0.0, 1.0):
is_correct = bool(binary_score)
# For multi-gold classification (e.g. Mizan), accept prediction if it matches any gold option.
pred_norm = _norm_answer(predicted_answer)
choice_norms = {_norm_answer(c) for c in choices if _norm_answer(c)}
gold_norms = {_norm_answer(g) for g in gold_values if _norm_answer(g)}
if _is_choice_metric_name(metric_name) and pred_norm and pred_norm in choice_norms and gold_norms:
is_correct = pred_norm in gold_norms
predicted_answer = _py_scalar(predicted_answer)
if isinstance(predicted_answer, list):
predicted_answer = _format_answer(predicted_answer)
prompt = (
doc.get("query")
or doc.get("original_query")
or doc.get("instruction")
or model_response_dict.get("input")
or ""
)
return _json_safe({
"subtask": subtask,
"question_id": _py_scalar(doc.get("id")),
"task_name": _py_scalar(doc.get("task_name")),
"prompt": prompt,
"input_prompt": model_response_dict.get("input"),
"output": output_text,
"choices": [str(c) for c in choices],
"gold_answer": _py_scalar(gold_answer),
"predicted_answer": _py_scalar(predicted_answer),
"is_correct": is_correct,
"metric_name": metric_name,
"metric": metric_value,
})
def _read_detail_parquet(
path: str,
subtask: str,
benchmark_base: str,
preferred_metrics: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
try:
df = pd.read_parquet(path)
except Exception as e:
logger.warning("Could not read details parquet '%s': %s", path, e)
return []
records = df.to_dict(orient="records")
if not records:
return []
sample = records[0] if isinstance(records[0], dict) else {}
has_structured_fields = isinstance(sample, dict) and any(
key in sample for key in ("doc", "metric", "model_response")
)
if has_structured_fields:
return [
_structured_record_to_row(record, subtask, benchmark_base, preferred_metrics)
for record in records
if isinstance(record, dict)
]
# Simple row format (e.g. fannorflop parquet).
rows: List[Dict[str, Any]] = []
for rec in records:
if not isinstance(rec, dict):
continue
metric_key = next(
(
k
for k in ("BertScore", "bert_score", "f1", "score", "metric")
if k in rec and _to_float_scalar(rec.get(k)) is not None
),
None,
)
metric_value = rec.get(metric_key) if metric_key else None
output = (
rec.get("extracted_response")
or rec.get("response")
or rec.get("extracted_json")
or rec.get("raw_response")
)
predicted = rec.get("predicted_answer") or output
gold_raw = rec.get("gold_answer")
gold_display = gold_raw if gold_raw not in (None, "") else (
rec.get("gold_verse_explanations")
if rec.get("gold_verse_explanations") not in (None, "")
else rec.get("verse_explanations")
)
is_correct = None
# Only enable binary correct/wrong mode for explicit gold_answer labels.
binary_mode = _is_primitive_answer(gold_raw) and _is_primitive_answer(predicted)
if binary_mode:
gold_norm = _norm_answer(gold_raw)
pred_norm = _norm_answer(predicted)
if gold_norm and pred_norm:
is_correct = (gold_norm == pred_norm)
metric_key = "fannorflop"
metric_value = 1.0 if is_correct else 0.0
rows.append(_make_simple_row(
subtask=subtask,
question_id=rec.get("id") or rec.get("question_id"),
task_name=benchmark_base,
prompt=rec.get("prompt"),
output=output,
gold_answer=gold_display,
predicted_answer=predicted,
metric_name=metric_key,
metric_value=metric_value,
is_correct=is_correct,
))
return rows
def _load_json_payload(path: str) -> Any:
p = Path(path)
if p.suffix.lower() == ".jsonl":
text = p.read_text(encoding="utf-8", errors="ignore").strip()
if not text:
return []
try:
return json.loads(text)
except Exception:
rows: List[Any] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
with contextlib.suppress(Exception):
rows.append(json.loads(line))
return rows
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
def _make_simple_row(
*,
subtask: str,
question_id: Any,
task_name: Any,
prompt: Any,
output: Any,
gold_answer: Any,
predicted_answer: Any,
metric_name: Any,
metric_value: Any,
is_correct: Any,
summary_accuracy_override: Any = None,
) -> Dict[str, Any]:
row = {
"subtask": subtask,
"question_id": _py_scalar(question_id),
"task_name": _py_scalar(task_name),
"prompt": _decode_structured_string(prompt or ""),
"input_prompt": None,
"output": _decode_structured_string(output),
"choices": [],
"gold_answer": _decode_structured_string(gold_answer),
"predicted_answer": _decode_structured_string(predicted_answer),
"is_correct": is_correct,
"metric_name": metric_name,
"metric": _to_float_scalar(metric_value),
}
if summary_accuracy_override is not None:
row["_summary_accuracy_override"] = _to_float_scalar(summary_accuracy_override)
return _json_safe(row)
def _read_detail_fannorflop_rows(records: List[Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for rec in records:
if not isinstance(rec, dict):
continue
metric_key = None
for k in ("BertScore", "bert_score", "score", "f1"):
if k in rec:
metric_key = k
break
metric_value = rec.get(metric_key) if metric_key else None
output = rec.get("extracted_response") or rec.get("response")
predicted = rec.get("predicted_answer") or output
gold = rec.get("gold_answer")
if gold in (None, ""):
gold = rec.get("gold_verse_explanations")
if gold in (None, ""):
gold = rec.get("verse_explanations")
is_correct = None
# Only enable binary mode when explicit gold_answer exists.
binary_mode = _is_primitive_answer(gold) and _is_primitive_answer(predicted)
if binary_mode and gold not in (None, "") and predicted not in (None, ""):
is_correct = (_norm_answer(gold) == _norm_answer(predicted))
metric_key = "fannorflop"
metric_value = 1.0 if is_correct else 0.0
rows.append(_make_simple_row(
subtask=subtask,
question_id=rec.get("id"),
task_name=benchmark_base,
prompt=rec.get("prompt"),
output=output,
gold_answer=gold,
predicted_answer=predicted,
metric_name=metric_key,
metric_value=metric_value,
is_correct=is_correct,
))
return rows
def _read_detail_code_eval_json(data: Dict[str, Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
eval_map = data.get("eval")
if not isinstance(eval_map, dict):
return rows
summary_override = None
pass_at_k = data.get("pass_at_k")
if isinstance(pass_at_k, dict):
plus = pass_at_k.get("plus")
if isinstance(plus, dict):
pass_at_1 = _to_float_scalar(plus.get("pass@1"))
if pass_at_1 is not None:
summary_override = pass_at_1 * 100.0
for task_id, entries in eval_map.items():
for rec in _as_list(entries):
if not isinstance(rec, dict):
continue
plus_status = str(rec.get("plus_status", "")).strip().lower()
plus_status_text = plus_status if plus_status in {"pass", "fail"} else ""
is_correct = None
metric_value = None
if plus_status in {"pass", "fail"}:
is_correct = (plus_status == "pass")
metric_value = 1.0 if is_correct else 0.0
output = rec.get("solution") or rec.get("completion")
rows.append(_make_simple_row(
subtask=subtask,
question_id=rec.get("task_id") or task_id,
task_name=task_id,
prompt="",
output=output,
gold_answer=rec.get("gold_answer") or "",
predicted_answer=plus_status_text or rec.get("predicted_answer") or "",
metric_name="eval_plus",
metric_value=metric_value,
is_correct=is_correct,
summary_accuracy_override=summary_override,
))
return rows
def _read_detail_json_any(
path: str,
subtask: str,
benchmark_base: str,
preferred_metrics: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
try:
data = _load_json_payload(path)
except Exception as e:
logger.warning("Could not read details json/jsonl '%s': %s", path, e)
return []
base_norm = _canonical_base_key(benchmark_base)
if base_norm == "fannorflop":
if isinstance(data, list):
return _read_detail_fannorflop_rows(data, subtask, benchmark_base)
if isinstance(data, dict) and isinstance(data.get("rows"), list):
return _read_detail_fannorflop_rows(data["rows"], subtask, benchmark_base)
if isinstance(data, dict) and isinstance(data.get("eval"), dict):
return _read_detail_code_eval_json(data, subtask, benchmark_base)
if isinstance(data, list):
rows: List[Dict[str, Any]] = []
for rec in data:
if not isinstance(rec, dict):
continue
if any(k in rec for k in ("doc", "metric", "model_response")):
rows.append(_structured_record_to_row(rec, subtask, benchmark_base, preferred_metrics))
if rows:
return rows
if data and isinstance(data[0], dict):
return _read_detail_fannorflop_rows(data, subtask, benchmark_base)
return []
def _read_detail_file(
path: str,
subtask: str,
benchmark_base: str,
preferred_metrics: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
ext = Path(path).suffix.lower()
if ext == ".parquet":
return _read_detail_parquet(path, subtask, benchmark_base, preferred_metrics)
if ext in {".json", ".jsonl"}:
return _read_detail_json_any(path, subtask, benchmark_base, preferred_metrics)
return []
def load_benchmark_details(
model_name: str,
benchmark_display: str,
details_index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]],
max_rows: int = 250,
) -> Dict[str, Any]:
"""
Load per-question benchmark details for a model from indexed parquet files.
"""
model_bucket = details_index.get(model_name, {})
if not model_bucket:
target_model = model_name.strip().lower()
for indexed_model, bucket in details_index.items():
if indexed_model.strip().lower() == target_model:
model_bucket = bucket
break
benchmark_bases = BENCHMARK_DISPLAY_TO_BASES.get(benchmark_display, [])
if not benchmark_bases:
benchmark_bases = [benchmark_display]
selected_entries: List[tuple[str, str, Dict[str, Any], List[str]]] = []
for base in benchmark_bases:
subtasks = model_bucket.get(base, {})
selected_base = base
if not subtasks:
base_l = _canonical_base_key(base)
for indexed_base, bucket in model_bucket.items():
if _canonical_base_key(indexed_base) == base_l:
selected_base = indexed_base
subtasks = bucket
break
display_metric_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.get(benchmark_display, {})
preferred_metrics = display_metric_bucket.get(selected_base)
if preferred_metrics is None:
# Key-normalized fallback.
for k, v in display_metric_bucket.items():
if _canonical_base_key(k) == _canonical_base_key(selected_base):
preferred_metrics = v
break
preferred_metrics = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(selected_base, [])
if not preferred_metrics:
canon_base = _canonical_base_key(selected_base)
for k, v in BENCHMARK_BASE_TO_METRICS.items():
if _canonical_base_key(k) == canon_base:
preferred_metrics = v
break
for subtask, info in subtasks.items():
selected_entries.append((selected_base, subtask, info, preferred_metrics))
if not selected_entries:
return {"benchmark": benchmark_display, "subtasks": [], "rows": []}
selected_entries.sort(key=lambda x: x[1].lower())
rows_by_subtask: List[List[Dict[str, Any]]] = []
subtasks_summary: List[Dict[str, Any]] = []
for base, subtask, info, preferred_metrics in selected_entries:
display_subtask = benchmark_display if subtask == "overall" else subtask
rows = _read_detail_file(info["path"], display_subtask, base, preferred_metrics)
rows_by_subtask.append(rows)
scored_rows = [r for r in rows if r.get("metric") is not None]
metric_name = next((str(r.get("metric_name")) for r in scored_rows if r.get("metric_name")), None)
use_metric_mode = metric_name is not None and not _is_binary_metric_name(metric_name)
summary_override = next(
(_to_float_scalar(r.get("_summary_accuracy_override")) for r in rows if r.get("_summary_accuracy_override") is not None),
None,
)
if use_metric_mode:
correct = None
scored = len(scored_rows)
avg_metric = (sum(float(r["metric"]) for r in scored_rows) / scored) if scored > 0 else None
accuracy = round(avg_metric * 100, 2) if avg_metric is not None else None
summary_mode = "metric"
else:
binary_rows = [r for r in rows if isinstance(r.get("is_correct"), bool)]
correct = sum(1 for r in binary_rows if r["is_correct"])
scored = len(binary_rows)
accuracy = round((correct / scored) * 100, 2) if scored > 0 else None
if summary_override is not None:
accuracy = round(summary_override, 2)
if scored > 0:
correct = int(round((accuracy / 100.0) * scored))
summary_mode = "binary"
# FannOrFlop details parquet may have per-row BertScore=0 while official score lives in results f1.
if _canonical_base_key(base) == "fannorflop":
outside_score = _latest_model_benchmark_score_pct(model_name, benchmark_display)
if outside_score is not None:
accuracy = round(outside_score, 2)
summary_mode = "metric"
correct = None
subtasks_summary.append({
"subtask": display_subtask,
"total": len(rows),
"scored": scored,
"correct": correct,
"accuracy": accuracy,
"mode": summary_mode,
})
total_rows = sum(len(rows) for rows in rows_by_subtask)
if max_rows > 0 and total_rows > max_rows:
queues = [deque(rows) for rows in rows_by_subtask]
all_rows: List[Dict[str, Any]] = []
while len(all_rows) < max_rows:
progressed = False
for q in queues:
if not q:
continue
all_rows.append(q.popleft())
progressed = True
if len(all_rows) >= max_rows:
break
if not progressed:
break
else:
all_rows = [row for rows in rows_by_subtask for row in rows]
for row in all_rows:
if isinstance(row, dict):
row.pop("_summary_accuracy_override", None)
return {
"benchmark": benchmark_display,
"subtasks": subtasks_summary,
"rows": all_rows,
}
# Manual size overrides (in billions) for models where HF API returns no safetensors metadata.
_MODEL_SIZE_OVERRIDES: Dict[str, float] = {
"Qwen/Qwen2.5-14B-Instruct": 14.0,
"Qwen/Qwen2.5-32B-Instruct": 32.0,
"Qwen/Qwen3-30B-A3B-Instruct-2507": 30.0,
"Qwen/Qwen3-235B-A22B-Instruct-2507": 235.0,
"google/gemma-3-270m-it": 0.27,
"google/gemma-3-1b-it": 1.0,
"google/gemma-3-1b-pt": 1.0,
"google/gemma-3-4b-it": 4.0,
"google/gemma-3-12b-it": 12.0,
"google/gemma-3-27b-pt": 27.0,
"microsoft/Phi-4-mini-instruct": 3.8,
}
def _fetch_hf_metadata(model_name: str) -> Dict[str, Any]:
try:
info = API.model_info(repo_id=model_name, token=hf_api_token)
except Exception as e:
logger.warning("Could not fetch HF metadata for '%s': %s", model_name, e)
return {}
card_data = getattr(info, "card_data", None)
if isinstance(card_data, dict):
license_name = card_data.get("license")
else:
license_name = getattr(card_data, "license", None)
model_size = get_model_size(model_info=info)
if model_size == 0:
safetensors = getattr(info, "safetensors", None)
if not safetensors or not safetensors.get("total"):
model_size = _MODEL_SIZE_OVERRIDES.get(model_name)
return {
"License": license_name,
"Revision": getattr(info, "sha", None),
"Model Size": model_size,
"Hub ❤️": getattr(info, "likes", None),
}
def load_scoreboard() -> pd.DataFrame:
"""
Main entrypoint used by the Space UI.
"""
download_datasets()
result_base = os.getenv("EVAL_RESULTS_PATH")
if not result_base:
return pd.DataFrame()
rows = []
for p in Path(result_base).rglob("*"):
if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}:
continue
row = _parse_result_file(p)
if row:
rows.append(row)
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df["datetime"] = pd.to_datetime(df["datetime"])
# Keep latest file per (model, source), then merge source metrics per model.
df = df.sort_values("datetime", ascending=False)
df = df.drop_duplicates(subset=["Model Name", "Source Type"], keep="first")
task_cols = [t[2] for t in TASKS]
hidden_cols = [t[2] for t in HIDDEN_TASKS]
all_score_cols = task_cols + hidden_cols
for col in all_score_cols:
if col not in df.columns:
df[col] = np.nan
def first_non_null(values):
for v in values:
if pd.notna(v):
return v
return np.nan
def first_valid_precision(values):
for v in values:
if isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"}:
return v
for v in values:
if pd.notna(v):
return v
return "UNK"
agg_map = {
"datetime": "max",
"Precision": first_valid_precision,
}
agg_map.update({col: first_non_null for col in all_score_cols})
df = df.groupby("Model Name", as_index=False).agg(agg_map)
# numeric — hidden_cols converted but excluded from Average
for col in all_score_cols:
df[col] = (pd.to_numeric(df[col], errors="coerce") * 100).round(2)
df["Average"] = df[task_cols].mean(axis=1).round(2)
# metadata from Hugging Face API (fetched in parallel for speed)
model_names = df["Model Name"].dropna().unique().tolist()
hf_meta: Dict[str, Dict[str, Any]] = {}
if model_names:
max_workers = min(12, len(model_names))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_model = {
executor.submit(_fetch_hf_metadata, model_name): model_name
for model_name in model_names
}
for future in as_completed(future_to_model):
model_name = future_to_model[future]
hf_meta[model_name] = future.result() or {}
df["License"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("License"))
df["Revision"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Revision"))
df["Model Size"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Model Size"))
df["Hub ❤️"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Hub ❤️"))
df["Type"] = None
df["Full Type"] = None
# Merge metadata from requests repo (all statuses), not just finished.
req_meta = load_requests(None)
if not req_meta.empty:
if "model" not in req_meta.columns and "model_name" in req_meta.columns:
req_meta["model"] = req_meta["model_name"]
if "model" not in req_meta.columns:
req_meta = pd.DataFrame()
if not req_meta.empty:
if "precision" in req_meta.columns:
req_meta["precision"] = req_meta["precision"].apply(unify_precision)
else:
req_meta["precision"] = None
has_precision_values = req_meta["precision"].apply(
lambda v: isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"}
).any()
meta = (
req_meta.groupby(["model", "precision"]).last().reset_index()
if has_precision_values
else pd.DataFrame()
)
meta_by_model = req_meta.groupby(["model"]).last().reset_index()
def is_missing(v: Any) -> bool:
return v is None or (isinstance(v, str) and not v.strip()) or pd.isna(v)
def enrich(row):
m = pd.DataFrame()
if has_precision_values and not meta.empty:
m = meta[
(meta["model"] == row["Model Name"]) &
(meta["precision"] == row["Precision"])
]
if m.empty:
m = meta_by_model[meta_by_model["model"] == row["Model Name"]]
if not m.empty:
m = m.iloc[0]
if is_missing(row.get("License")):
row["License"] = m.get("license")
if is_missing(row.get("Revision")):
row["Revision"] = m.get("revision")
model_type_raw = m.get("model_type", "Missing")
row["Type"] = MODEL_TYPE_TO_EMOJI.get(
model_type_raw, model_type_raw
)
row["Full Type"] = model_type_raw
return row
df = df.apply(enrich, axis=1)
df = df.sort_values("Average", ascending=False).reset_index(drop=True)
df.insert(0, "Rank", range(1, len(df) + 1))
return df
download_dataset_snapshots = download_datasets