# backend/data_loader.py import json import os import contextlib import io import logging import re import ast from collections import deque from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict, List, Any, Optional from urllib.parse import quote import numpy as np import pandas as pd import requests from huggingface_hub import snapshot_download from datetime import datetime from huggingface_hub.constants import HF_HUB_CACHE from backend.config import ( API, DETAILS_REPO_ID, REQUESTS_REPO_ID, RESULTS_REPO_ID, TASKS, TASK_SOURCES, HIDDEN_TASKS, MODEL_TYPE_TO_EMOJI, hf_api_token, ) from backend.helpers import unify_precision, get_model_size logger = logging.getLogger(__name__) _SOURCE_BY_PREFIX = { prefix.lower(): source for source, cfg in TASK_SOURCES.items() for prefix in cfg.get("prefixes", []) } _TASKS_BY_SOURCE = { source: cfg.get("tasks", []) for source, cfg in TASK_SOURCES.items() } # Wire hidden tasks into the "results" source so _parse_result_file extracts # them alongside normal tasks without touching the shared TASK_SOURCES dict. _TASKS_BY_SOURCE["results"] = list(_TASKS_BY_SOURCE.get("results", [])) + HIDDEN_TASKS _RESULT_SCORE_CACHE: Dict[tuple[str, str], Optional[float]] = {} def _extract_task_bases(task_key: Any) -> List[str]: if isinstance(task_key, list): bases: List[str] = [] for item in task_key: bases.extend(_extract_task_bases(item)) return bases if not isinstance(task_key, str): return [] key = task_key.strip() if not key: return [] return [key.split(":", 1)[0].split("|", 1)[0].strip()] BENCHMARK_DISPLAY_TO_BASES: Dict[str, List[str]] = {} for task_key, _, display in TASKS: bases = BENCHMARK_DISPLAY_TO_BASES.setdefault(display, []) for base in _extract_task_bases(task_key): if base and base not in bases: bases.append(base) def _extract_base_metric_pairs(task_key: Any, metric_key: Any) -> List[tuple[str, str]]: pairs: List[tuple[str, str]] = [] if isinstance(task_key, list): if isinstance(metric_key, list): for tk, mk in zip(task_key, metric_key): if isinstance(mk, tuple): mk = mk[0] pairs.extend(_extract_base_metric_pairs(tk, mk)) return pairs if not isinstance(task_key, str) or not isinstance(metric_key, str): return pairs base = task_key.split(":", 1)[0].split("|", 1)[0].strip() if base: pairs.append((base, metric_key)) return pairs BENCHMARK_BASE_TO_METRICS: Dict[str, List[str]] = {} BENCHMARK_DISPLAY_TO_BASE_METRICS: Dict[str, Dict[str, List[str]]] = {} for task_key, metric_key, display in TASKS: display_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.setdefault(display, {}) for base, metric_name in _extract_base_metric_pairs(task_key, metric_key): base_bucket = BENCHMARK_BASE_TO_METRICS.setdefault(base, []) if metric_name and metric_name not in base_bucket: base_bucket.append(metric_name) display_metric_bucket = display_bucket.setdefault(base, []) if metric_name and metric_name not in display_metric_bucket: display_metric_bucket.append(metric_name) DETAILS_EXTENSIONS = {".parquet", ".json", ".jsonl"} def _norm_key(value: Any) -> str: return re.sub(r"[^a-z0-9]+", "", str(value or "").strip().lower()) def _canonical_base_key(value: Any) -> str: n = _norm_key(value) if n.startswith("qimma"): return n[len("qimma"):] return n # ----------------------------------------------------------------------------- # Utilities # ----------------------------------------------------------------------------- def silent_snapshot_download(**kwargs): with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): return snapshot_download(**kwargs) def _resolve_details_base_path() -> Path: repo_cache_root = Path(HF_HUB_CACHE) / f"datasets--{DETAILS_REPO_ID.replace('/', '--')}" snapshots_root = repo_cache_root / "snapshots" if snapshots_root.exists(): candidates = [p for p in snapshots_root.iterdir() if p.is_dir()] if candidates: return max(candidates, key=lambda p: p.stat().st_mtime) manual_root = repo_cache_root / "manual-snapshot" manual_root.mkdir(parents=True, exist_ok=True) return manual_root def _download_details_file(relative_path: str, base_path: Path, retries: int = 3) -> bool: encoded_rel_path = quote(relative_path, safe="/") url = f"https://huggingface.co/datasets/{DETAILS_REPO_ID}/resolve/main/{encoded_rel_path}" headers = {} if hf_api_token: headers["Authorization"] = f"Bearer {hf_api_token}" target_path = base_path / relative_path target_path.parent.mkdir(parents=True, exist_ok=True) partial_path = target_path.with_suffix(target_path.suffix + ".part") for attempt in range(1, retries + 1): try: with requests.get(url, stream=True, timeout=(10, 90), headers=headers) as resp: resp.raise_for_status() with open(partial_path, "wb") as f: for chunk in resp.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) if partial_path.exists(): os.replace(partial_path, target_path) elif target_path.exists(): return True else: raise FileNotFoundError(f"Temporary download file missing: {partial_path}") return True except Exception as e: with contextlib.suppress(Exception): partial_path.unlink(missing_ok=True) logger.warning( "Retry %s/%s for details file '%s' failed: %s", attempt, retries, relative_path, e, ) return False def _sync_details_dataset(base_path: Path): try: remote_files = [ f for f in API.list_repo_files(repo_id=DETAILS_REPO_ID, repo_type="dataset") if Path(f).suffix.lower() in DETAILS_EXTENSIONS and Path(f).name.startswith("details_") ] except Exception as e: logger.warning("Could not list files for details repo '%s': %s", DETAILS_REPO_ID, e) return local_files = { str(p.relative_to(base_path)).replace(os.sep, "/") for p in base_path.rglob("*") if p.is_file() and p.suffix.lower() in DETAILS_EXTENSIONS } remote_set = set(remote_files) ready_local = local_files & remote_set missing_files = [f for f in remote_files if f not in local_files] total_count = len(remote_files) local_count = len(ready_local) if not missing_files: logger.info("Details files ready: %s/%s", local_count, total_count) return logger.info( "Details files ready: %s/%s. Downloading %s missing files...", local_count, total_count, len(missing_files), ) failed_files: List[str] = [] total_missing = len(missing_files) for idx, rel_path in enumerate(missing_files, start=1): logger.info("Downloading missing details file %s/%s: %s", idx, total_missing, rel_path) if not _download_details_file(rel_path, base_path): failed_files.append(rel_path) if failed_files: logger.warning( "Details sync incomplete. Downloaded %s/%s missing files. Still missing %s files.", total_missing - len(failed_files), total_missing, len(failed_files), ) for rel_path in failed_files: logger.warning("Still missing: %s", rel_path) else: logger.info("Details sync complete: downloaded %s/%s missing files.", total_missing, total_missing) def download_datasets(): """ Download requests + results datasets (read-only, anonymous). """ req_path = silent_snapshot_download( repo_id=REQUESTS_REPO_ID, repo_type="dataset", allow_patterns="*.json", ) os.environ["EVAL_REQUESTS_PATH"] = req_path res_path = silent_snapshot_download( repo_id=RESULTS_REPO_ID, repo_type="dataset", allow_patterns=["*.json", "*.jsonl"], ) os.environ["EVAL_RESULTS_PATH"] = res_path details_path = silent_snapshot_download( repo_id=DETAILS_REPO_ID, repo_type="dataset", allow_patterns=["*.parquet", "*.json", "*.jsonl"], ) os.environ["EVAL_DETAILS_PATH"] = details_path # ----------------------------------------------------------------------------- # Requests # ----------------------------------------------------------------------------- def load_requests(status: Optional[str] = None) -> pd.DataFrame: base = os.getenv("EVAL_REQUESTS_PATH") if not base: return pd.DataFrame() rows = [] for p in Path(base).rglob("*.json"): try: with open(p, "r", encoding="utf-8") as f: d = json.load(f) except Exception: continue if status is None or d.get("status", "").lower() == status.lower(): rows.append(d) return pd.DataFrame(rows) # ----------------------------------------------------------------------------- # Results parsing # ----------------------------------------------------------------------------- def _infer_source_from_filename(path: Path) -> Optional[str]: parsed = _parse_result_filename(path) if parsed: return parsed.get("source") stem = path.stem if "_" not in stem: return None prefix = stem.split("_", 1)[0].lower() return _SOURCE_BY_PREFIX.get(prefix) def _parse_result_filename(path: Path) -> Optional[Dict[str, Any]]: stem = path.stem if not stem.startswith("results_"): return None try: _, dt_str = stem.rsplit("_", 1) parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f") except Exception: return None name_part = stem[len("results_"):].rsplit("_", 1)[0].strip() if not name_part: return {"source": "results", "datetime": parsed_dt, "name_part": ""} base_hint = name_part.split("|", 1)[0].strip() canon = _canonical_base_key(base_hint) if canon in {"evalplus", "humaneval", "mbpp"}: source = "code" elif canon in {"fannorflop", "fannflop"}: source = "fannflop" else: source = "results" # Ignore redundant single-benchmark mbpp result shards. if canon == "mbpp": return {"source": "ignore", "datetime": parsed_dt, "name_part": name_part} return {"source": source, "datetime": parsed_dt, "name_part": name_part} def _load_json_payload_any(path: Path) -> Any: if path.suffix.lower() != ".jsonl": with open(path, "r", encoding="utf-8") as f: return json.load(f) text = path.read_text(encoding="utf-8", errors="ignore").strip() if not text: return {} with contextlib.suppress(Exception): return json.loads(text) rows: List[Any] = [] for line in text.splitlines(): line = line.strip() if not line: continue with contextlib.suppress(Exception): rows.append(json.loads(line)) return rows def _parse_result_file(path: Path) -> Optional[Dict[str, Any]]: try: raw = _load_json_payload_any(path) except Exception: return None parsed_name = _parse_result_filename(path) source_type = parsed_name["source"] if parsed_name else _infer_source_from_filename(path) if source_type in {None, "ignore"}: return None data = raw if isinstance(raw, list): data = next((x for x in raw if isinstance(x, dict) and ("results" in x or "model_name" in x)), None) if data is None and raw and isinstance(raw[0], dict): data = raw[0] if not isinstance(data, dict): return None cfg = data.get("config_general", {}) results = data.get("results", {}) if not isinstance(results, dict): return None model = cfg.get("model_name") or data.get("model_name", "UNK") precision = unify_precision(cfg.get("model_dtype", "UNK")) parsed_dt = parsed_name["datetime"] if parsed_name else None if parsed_dt is None: return None row = { "Model Name": model, "Precision": precision, "datetime": parsed_dt, "Source Type": source_type, } for task_key, metric_key, display in _TASKS_BY_SOURCE.get(source_type, []): if isinstance(task_key, list): weight_total = 0 metric_total = 0 for t, (m, w) in zip(task_key, metric_key): val = results.get(t, {}).get(m, 0) metric_total += (val * w) weight_total += w val = metric_total / weight_total if weight_total > 0 else np.nan else: val = np.nan if task_key in results and metric_key in results[task_key]: val = results.get(task_key, {}).get(metric_key) if val is None: logger.warning( "Missing metric value for task '%s' in model '%s'", task_key, model, ) row[display] = val return row def _latest_model_benchmark_score_pct(model_name: str, benchmark_display: str) -> Optional[float]: cache_key = (model_name, benchmark_display) if cache_key in _RESULT_SCORE_CACHE: return _RESULT_SCORE_CACHE[cache_key] base = os.getenv("EVAL_RESULTS_PATH") if not base: _RESULT_SCORE_CACHE[cache_key] = None return None latest_dt: Optional[datetime] = None latest_val: Optional[float] = None for p in Path(base).rglob("*"): if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}: continue row = _parse_result_file(p) if not row: continue if str(row.get("Model Name", "")).strip() != str(model_name).strip(): continue raw_val = _to_float_scalar(row.get(benchmark_display)) if raw_val is None: continue row_dt = row.get("datetime") if not isinstance(row_dt, datetime): continue if latest_dt is None or row_dt > latest_dt: latest_dt = row_dt latest_val = raw_val * 100.0 _RESULT_SCORE_CACHE[cache_key] = latest_val return latest_val def _parse_details_filename(path: Path) -> Optional[Dict[str, Any]]: stem = path.stem if "_" not in stem: return None details_part, dt_str = stem.rsplit("_", 1) if not details_part.startswith("details_"): return None try: parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f") except Exception: return None task_full = details_part[len("details_"):].strip() if not task_full: return None benchmark_base = task_full.split(":", 1)[0].split("|", 1)[0].strip() if ":" in task_full: subtask = task_full.split(":", 1)[1].strip() else: subtask = "overall" subtask = re.sub(r"\|\d+$", "", subtask).strip() or "overall" return { "benchmark_base": benchmark_base, "subtask": subtask, "datetime": parsed_dt, "task_full": task_full, } def build_details_index() -> Dict[str, Dict[str, Dict[str, Dict[str, Any]]]]: """ Build an index of latest detail file paths per model/benchmark/subtask. """ details_base = os.getenv("EVAL_DETAILS_PATH") if not details_base: return {} base_path = Path(details_base) if not base_path.exists(): return {} index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]] = {} for p in base_path.rglob("*"): if not p.is_file() or p.suffix.lower() not in DETAILS_EXTENSIONS: continue parsed = _parse_details_filename(p) if not parsed: continue try: rel_parts = p.relative_to(base_path).parts except Exception: continue if len(rel_parts) < 2: continue model_name = "/".join(rel_parts[:-1]).strip("/") if not model_name: continue benchmark_base = parsed["benchmark_base"] subtask = parsed["subtask"] dt = parsed["datetime"] model_bucket = index.setdefault(model_name, {}) bench_bucket = model_bucket.setdefault(benchmark_base, {}) current = bench_bucket.get(subtask) if current is None or dt > current["datetime"]: bench_bucket[subtask] = { "path": str(p), "datetime": dt, "task_full": parsed["task_full"], } return index def _as_list(value: Any) -> List[Any]: if value is None: return [] if isinstance(value, list): return value if isinstance(value, tuple): return list(value) if isinstance(value, np.ndarray): return value.tolist() return [value] def _as_dict(value: Any) -> Dict[str, Any]: if isinstance(value, dict): return value if isinstance(value, (bytes, bytearray)): try: value = value.decode("utf-8", errors="ignore") except Exception: return {} if isinstance(value, str): s = value.strip() if not s: return {} try: parsed = json.loads(s) return parsed if isinstance(parsed, dict) else {} except Exception: try: parsed = ast.literal_eval(s) return parsed if isinstance(parsed, dict) else {} except Exception: return {} if isinstance(value, list): # Some parquet backends can expose map-like structs as list of pairs. try: if all(isinstance(item, (list, tuple)) and len(item) == 2 for item in value): return {str(k): v for k, v in value} except Exception: return {} return {} def _py_scalar(value: Any) -> Any: if isinstance(value, np.ndarray): if value.ndim == 0: return _py_scalar(value.item()) if value.size == 1: return _py_scalar(value.reshape(-1)[0]) return [_py_scalar(v) for v in value.tolist()] if isinstance(value, np.generic): return value.item() return value def _decode_structured_string(value: Any) -> Any: value = _py_scalar(value) if not isinstance(value, str): return value s = value.strip() if not s: return value looks_structured = ( (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")) ) if not looks_structured: return value for parser in (json.loads, ast.literal_eval): with contextlib.suppress(Exception): parsed = parser(s) if isinstance(parsed, (dict, list)): return _json_safe(parsed) return value def _json_safe(value: Any) -> Any: value = _py_scalar(value) if isinstance(value, dict): return {str(k): _json_safe(v) for k, v in value.items()} if isinstance(value, list): return [_json_safe(v) for v in value] if isinstance(value, tuple): return [_json_safe(v) for v in value] return value def _to_float_scalar(value: Any) -> Optional[float]: value = _py_scalar(value) if isinstance(value, (int, float, np.integer, np.floating)): return float(value) return None def _normalize_indices(value: Any) -> List[int]: indices: List[int] = [] for item in _as_list(value): item = _py_scalar(item) if isinstance(item, (int, np.integer)): indices.append(int(item)) return indices def _format_answer(values: List[Any]) -> Any: if not values: return None clean = [str(_py_scalar(v)) for v in values] if len(clean) == 1: return clean[0] return ", ".join(clean) def _norm_answer(value: Any) -> str: value = _py_scalar(value) if value is None: return "" return str(value).strip() def _is_primitive_answer(value: Any) -> bool: value = _py_scalar(value) return value is not None and isinstance(value, (str, int, float, bool, np.integer, np.floating)) def _pick_metric( metric: Dict[str, Any], benchmark_base: str, preferred_metrics: Optional[List[str]] = None, ) -> tuple[Optional[str], Optional[float]]: if not isinstance(metric, dict) or not metric: return None, None preferred = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(benchmark_base, []) if not preferred: canon_base = _canonical_base_key(benchmark_base) for base_key, names in BENCHMARK_BASE_TO_METRICS.items(): if _canonical_base_key(base_key) == canon_base: preferred = names break for name in preferred: if name in metric: val = _to_float_scalar(metric.get(name)) if val is not None: return name, val # Fallback for known detail formats. for name in ["normalized_score_norm", "BERTScore-F", "acc", "accuracy"]: if name in metric: val = _to_float_scalar(metric.get(name)) if val is not None: return name, val for name, raw_val in metric.items(): val = _to_float_scalar(raw_val) if val is not None: return str(name), val return None, None def _is_binary_metric_name(metric_name: Optional[str]) -> bool: if not metric_name: return False n = metric_name.lower() return ( n.startswith("acc") or "accuracy" in n or "score_norm" in n or n.endswith("_status") or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"} ) def _is_choice_metric_name(metric_name: Optional[str]) -> bool: if not metric_name: return False n = metric_name.lower() return ( n.startswith("acc") or "mc_prob" in n or "score_norm" in n or n.endswith("_status") or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"} ) def _extract_predicted_answer(model_response: Dict[str, Any], choices: List[Any]) -> Any: logprobs = model_response.get("logprobs") if logprobs is not None and choices: values = _as_list(logprobs) try: idx = int(np.argmax(np.asarray(values, dtype=float))) if 0 <= idx < len(choices): return choices[idx] except Exception: pass text_post_processed = _as_list(model_response.get("text_post_processed")) if text_post_processed: return text_post_processed[0] text = _as_list(model_response.get("text")) if text: return text[0] return None def _first_non_empty(values: Any) -> Optional[str]: for v in _as_list(values): if v is None: continue s = str(v).strip() if s: return s return None def _structured_record_to_row( record: Dict[str, Any], subtask: str, benchmark_base: str, preferred_metrics: Optional[List[str]] = None, ) -> Dict[str, Any]: doc = _as_dict(record.get("doc")) metric = _as_dict(record.get("metric")) model_response = _as_dict(record.get("model_response")) choices = _as_list(doc.get("choices")) choices = [_py_scalar(c) for c in choices] gold_indices = _normalize_indices(doc.get("gold_index")) gold_values: List[Any] = [] for idx in gold_indices: if 0 <= idx < len(choices): gold_values.append(choices[idx]) gold_answer = _format_answer(gold_values) metric_name, metric_value = _pick_metric(metric, benchmark_base, preferred_metrics) model_response_dict = model_response if isinstance(model_response, dict) else {} predicted_answer = _extract_predicted_answer(model_response_dict, choices) output_text = _first_non_empty(model_response_dict.get("text_post_processed")) if output_text is None: output_text = _first_non_empty(model_response_dict.get("text")) if output_text is None and predicted_answer is not None: output_text = str(predicted_answer) is_correct = None if metric_value is not None and _is_binary_metric_name(metric_name) and metric_value in (0.0, 1.0): is_correct = bool(metric_value) else: binary_score = _to_float_scalar(metric.get("normalized_score_norm")) if binary_score is not None and binary_score in (0.0, 1.0): is_correct = bool(binary_score) # For multi-gold classification (e.g. Mizan), accept prediction if it matches any gold option. pred_norm = _norm_answer(predicted_answer) choice_norms = {_norm_answer(c) for c in choices if _norm_answer(c)} gold_norms = {_norm_answer(g) for g in gold_values if _norm_answer(g)} if _is_choice_metric_name(metric_name) and pred_norm and pred_norm in choice_norms and gold_norms: is_correct = pred_norm in gold_norms predicted_answer = _py_scalar(predicted_answer) if isinstance(predicted_answer, list): predicted_answer = _format_answer(predicted_answer) prompt = ( doc.get("query") or doc.get("original_query") or doc.get("instruction") or model_response_dict.get("input") or "" ) return _json_safe({ "subtask": subtask, "question_id": _py_scalar(doc.get("id")), "task_name": _py_scalar(doc.get("task_name")), "prompt": prompt, "input_prompt": model_response_dict.get("input"), "output": output_text, "choices": [str(c) for c in choices], "gold_answer": _py_scalar(gold_answer), "predicted_answer": _py_scalar(predicted_answer), "is_correct": is_correct, "metric_name": metric_name, "metric": metric_value, }) def _read_detail_parquet( path: str, subtask: str, benchmark_base: str, preferred_metrics: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: try: df = pd.read_parquet(path) except Exception as e: logger.warning("Could not read details parquet '%s': %s", path, e) return [] records = df.to_dict(orient="records") if not records: return [] sample = records[0] if isinstance(records[0], dict) else {} has_structured_fields = isinstance(sample, dict) and any( key in sample for key in ("doc", "metric", "model_response") ) if has_structured_fields: return [ _structured_record_to_row(record, subtask, benchmark_base, preferred_metrics) for record in records if isinstance(record, dict) ] # Simple row format (e.g. fannorflop parquet). rows: List[Dict[str, Any]] = [] for rec in records: if not isinstance(rec, dict): continue metric_key = next( ( k for k in ("BertScore", "bert_score", "f1", "score", "metric") if k in rec and _to_float_scalar(rec.get(k)) is not None ), None, ) metric_value = rec.get(metric_key) if metric_key else None output = ( rec.get("extracted_response") or rec.get("response") or rec.get("extracted_json") or rec.get("raw_response") ) predicted = rec.get("predicted_answer") or output gold_raw = rec.get("gold_answer") gold_display = gold_raw if gold_raw not in (None, "") else ( rec.get("gold_verse_explanations") if rec.get("gold_verse_explanations") not in (None, "") else rec.get("verse_explanations") ) is_correct = None # Only enable binary correct/wrong mode for explicit gold_answer labels. binary_mode = _is_primitive_answer(gold_raw) and _is_primitive_answer(predicted) if binary_mode: gold_norm = _norm_answer(gold_raw) pred_norm = _norm_answer(predicted) if gold_norm and pred_norm: is_correct = (gold_norm == pred_norm) metric_key = "fannorflop" metric_value = 1.0 if is_correct else 0.0 rows.append(_make_simple_row( subtask=subtask, question_id=rec.get("id") or rec.get("question_id"), task_name=benchmark_base, prompt=rec.get("prompt"), output=output, gold_answer=gold_display, predicted_answer=predicted, metric_name=metric_key, metric_value=metric_value, is_correct=is_correct, )) return rows def _load_json_payload(path: str) -> Any: p = Path(path) if p.suffix.lower() == ".jsonl": text = p.read_text(encoding="utf-8", errors="ignore").strip() if not text: return [] try: return json.loads(text) except Exception: rows: List[Any] = [] for line in text.splitlines(): line = line.strip() if not line: continue with contextlib.suppress(Exception): rows.append(json.loads(line)) return rows with open(p, "r", encoding="utf-8") as f: return json.load(f) def _make_simple_row( *, subtask: str, question_id: Any, task_name: Any, prompt: Any, output: Any, gold_answer: Any, predicted_answer: Any, metric_name: Any, metric_value: Any, is_correct: Any, summary_accuracy_override: Any = None, ) -> Dict[str, Any]: row = { "subtask": subtask, "question_id": _py_scalar(question_id), "task_name": _py_scalar(task_name), "prompt": _decode_structured_string(prompt or ""), "input_prompt": None, "output": _decode_structured_string(output), "choices": [], "gold_answer": _decode_structured_string(gold_answer), "predicted_answer": _decode_structured_string(predicted_answer), "is_correct": is_correct, "metric_name": metric_name, "metric": _to_float_scalar(metric_value), } if summary_accuracy_override is not None: row["_summary_accuracy_override"] = _to_float_scalar(summary_accuracy_override) return _json_safe(row) def _read_detail_fannorflop_rows(records: List[Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] for rec in records: if not isinstance(rec, dict): continue metric_key = None for k in ("BertScore", "bert_score", "score", "f1"): if k in rec: metric_key = k break metric_value = rec.get(metric_key) if metric_key else None output = rec.get("extracted_response") or rec.get("response") predicted = rec.get("predicted_answer") or output gold = rec.get("gold_answer") if gold in (None, ""): gold = rec.get("gold_verse_explanations") if gold in (None, ""): gold = rec.get("verse_explanations") is_correct = None # Only enable binary mode when explicit gold_answer exists. binary_mode = _is_primitive_answer(gold) and _is_primitive_answer(predicted) if binary_mode and gold not in (None, "") and predicted not in (None, ""): is_correct = (_norm_answer(gold) == _norm_answer(predicted)) metric_key = "fannorflop" metric_value = 1.0 if is_correct else 0.0 rows.append(_make_simple_row( subtask=subtask, question_id=rec.get("id"), task_name=benchmark_base, prompt=rec.get("prompt"), output=output, gold_answer=gold, predicted_answer=predicted, metric_name=metric_key, metric_value=metric_value, is_correct=is_correct, )) return rows def _read_detail_code_eval_json(data: Dict[str, Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] eval_map = data.get("eval") if not isinstance(eval_map, dict): return rows summary_override = None pass_at_k = data.get("pass_at_k") if isinstance(pass_at_k, dict): plus = pass_at_k.get("plus") if isinstance(plus, dict): pass_at_1 = _to_float_scalar(plus.get("pass@1")) if pass_at_1 is not None: summary_override = pass_at_1 * 100.0 for task_id, entries in eval_map.items(): for rec in _as_list(entries): if not isinstance(rec, dict): continue plus_status = str(rec.get("plus_status", "")).strip().lower() plus_status_text = plus_status if plus_status in {"pass", "fail"} else "" is_correct = None metric_value = None if plus_status in {"pass", "fail"}: is_correct = (plus_status == "pass") metric_value = 1.0 if is_correct else 0.0 output = rec.get("solution") or rec.get("completion") rows.append(_make_simple_row( subtask=subtask, question_id=rec.get("task_id") or task_id, task_name=task_id, prompt="", output=output, gold_answer=rec.get("gold_answer") or "", predicted_answer=plus_status_text or rec.get("predicted_answer") or "", metric_name="eval_plus", metric_value=metric_value, is_correct=is_correct, summary_accuracy_override=summary_override, )) return rows def _read_detail_json_any( path: str, subtask: str, benchmark_base: str, preferred_metrics: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: try: data = _load_json_payload(path) except Exception as e: logger.warning("Could not read details json/jsonl '%s': %s", path, e) return [] base_norm = _canonical_base_key(benchmark_base) if base_norm == "fannorflop": if isinstance(data, list): return _read_detail_fannorflop_rows(data, subtask, benchmark_base) if isinstance(data, dict) and isinstance(data.get("rows"), list): return _read_detail_fannorflop_rows(data["rows"], subtask, benchmark_base) if isinstance(data, dict) and isinstance(data.get("eval"), dict): return _read_detail_code_eval_json(data, subtask, benchmark_base) if isinstance(data, list): rows: List[Dict[str, Any]] = [] for rec in data: if not isinstance(rec, dict): continue if any(k in rec for k in ("doc", "metric", "model_response")): rows.append(_structured_record_to_row(rec, subtask, benchmark_base, preferred_metrics)) if rows: return rows if data and isinstance(data[0], dict): return _read_detail_fannorflop_rows(data, subtask, benchmark_base) return [] def _read_detail_file( path: str, subtask: str, benchmark_base: str, preferred_metrics: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: ext = Path(path).suffix.lower() if ext == ".parquet": return _read_detail_parquet(path, subtask, benchmark_base, preferred_metrics) if ext in {".json", ".jsonl"}: return _read_detail_json_any(path, subtask, benchmark_base, preferred_metrics) return [] def load_benchmark_details( model_name: str, benchmark_display: str, details_index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]], max_rows: int = 250, ) -> Dict[str, Any]: """ Load per-question benchmark details for a model from indexed parquet files. """ model_bucket = details_index.get(model_name, {}) if not model_bucket: target_model = model_name.strip().lower() for indexed_model, bucket in details_index.items(): if indexed_model.strip().lower() == target_model: model_bucket = bucket break benchmark_bases = BENCHMARK_DISPLAY_TO_BASES.get(benchmark_display, []) if not benchmark_bases: benchmark_bases = [benchmark_display] selected_entries: List[tuple[str, str, Dict[str, Any], List[str]]] = [] for base in benchmark_bases: subtasks = model_bucket.get(base, {}) selected_base = base if not subtasks: base_l = _canonical_base_key(base) for indexed_base, bucket in model_bucket.items(): if _canonical_base_key(indexed_base) == base_l: selected_base = indexed_base subtasks = bucket break display_metric_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.get(benchmark_display, {}) preferred_metrics = display_metric_bucket.get(selected_base) if preferred_metrics is None: # Key-normalized fallback. for k, v in display_metric_bucket.items(): if _canonical_base_key(k) == _canonical_base_key(selected_base): preferred_metrics = v break preferred_metrics = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(selected_base, []) if not preferred_metrics: canon_base = _canonical_base_key(selected_base) for k, v in BENCHMARK_BASE_TO_METRICS.items(): if _canonical_base_key(k) == canon_base: preferred_metrics = v break for subtask, info in subtasks.items(): selected_entries.append((selected_base, subtask, info, preferred_metrics)) if not selected_entries: return {"benchmark": benchmark_display, "subtasks": [], "rows": []} selected_entries.sort(key=lambda x: x[1].lower()) rows_by_subtask: List[List[Dict[str, Any]]] = [] subtasks_summary: List[Dict[str, Any]] = [] for base, subtask, info, preferred_metrics in selected_entries: display_subtask = benchmark_display if subtask == "overall" else subtask rows = _read_detail_file(info["path"], display_subtask, base, preferred_metrics) rows_by_subtask.append(rows) scored_rows = [r for r in rows if r.get("metric") is not None] metric_name = next((str(r.get("metric_name")) for r in scored_rows if r.get("metric_name")), None) use_metric_mode = metric_name is not None and not _is_binary_metric_name(metric_name) summary_override = next( (_to_float_scalar(r.get("_summary_accuracy_override")) for r in rows if r.get("_summary_accuracy_override") is not None), None, ) if use_metric_mode: correct = None scored = len(scored_rows) avg_metric = (sum(float(r["metric"]) for r in scored_rows) / scored) if scored > 0 else None accuracy = round(avg_metric * 100, 2) if avg_metric is not None else None summary_mode = "metric" else: binary_rows = [r for r in rows if isinstance(r.get("is_correct"), bool)] correct = sum(1 for r in binary_rows if r["is_correct"]) scored = len(binary_rows) accuracy = round((correct / scored) * 100, 2) if scored > 0 else None if summary_override is not None: accuracy = round(summary_override, 2) if scored > 0: correct = int(round((accuracy / 100.0) * scored)) summary_mode = "binary" # FannOrFlop details parquet may have per-row BertScore=0 while official score lives in results f1. if _canonical_base_key(base) == "fannorflop": outside_score = _latest_model_benchmark_score_pct(model_name, benchmark_display) if outside_score is not None: accuracy = round(outside_score, 2) summary_mode = "metric" correct = None subtasks_summary.append({ "subtask": display_subtask, "total": len(rows), "scored": scored, "correct": correct, "accuracy": accuracy, "mode": summary_mode, }) total_rows = sum(len(rows) for rows in rows_by_subtask) if max_rows > 0 and total_rows > max_rows: queues = [deque(rows) for rows in rows_by_subtask] all_rows: List[Dict[str, Any]] = [] while len(all_rows) < max_rows: progressed = False for q in queues: if not q: continue all_rows.append(q.popleft()) progressed = True if len(all_rows) >= max_rows: break if not progressed: break else: all_rows = [row for rows in rows_by_subtask for row in rows] for row in all_rows: if isinstance(row, dict): row.pop("_summary_accuracy_override", None) return { "benchmark": benchmark_display, "subtasks": subtasks_summary, "rows": all_rows, } # Manual size overrides (in billions) for models where HF API returns no safetensors metadata. _MODEL_SIZE_OVERRIDES: Dict[str, float] = { "Qwen/Qwen2.5-14B-Instruct": 14.0, "Qwen/Qwen2.5-32B-Instruct": 32.0, "Qwen/Qwen3-30B-A3B-Instruct-2507": 30.0, "Qwen/Qwen3-235B-A22B-Instruct-2507": 235.0, "google/gemma-3-270m-it": 0.27, "google/gemma-3-1b-it": 1.0, "google/gemma-3-1b-pt": 1.0, "google/gemma-3-4b-it": 4.0, "google/gemma-3-12b-it": 12.0, "google/gemma-3-27b-pt": 27.0, "microsoft/Phi-4-mini-instruct": 3.8, } def _fetch_hf_metadata(model_name: str) -> Dict[str, Any]: try: info = API.model_info(repo_id=model_name, token=hf_api_token) except Exception as e: logger.warning("Could not fetch HF metadata for '%s': %s", model_name, e) return {} card_data = getattr(info, "card_data", None) if isinstance(card_data, dict): license_name = card_data.get("license") else: license_name = getattr(card_data, "license", None) model_size = get_model_size(model_info=info) if model_size == 0: safetensors = getattr(info, "safetensors", None) if not safetensors or not safetensors.get("total"): model_size = _MODEL_SIZE_OVERRIDES.get(model_name) return { "License": license_name, "Revision": getattr(info, "sha", None), "Model Size": model_size, "Hub ❤️": getattr(info, "likes", None), } def load_scoreboard() -> pd.DataFrame: """ Main entrypoint used by the Space UI. """ download_datasets() result_base = os.getenv("EVAL_RESULTS_PATH") if not result_base: return pd.DataFrame() rows = [] for p in Path(result_base).rglob("*"): if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}: continue row = _parse_result_file(p) if row: rows.append(row) if not rows: return pd.DataFrame() df = pd.DataFrame(rows) df["datetime"] = pd.to_datetime(df["datetime"]) # Keep latest file per (model, source), then merge source metrics per model. df = df.sort_values("datetime", ascending=False) df = df.drop_duplicates(subset=["Model Name", "Source Type"], keep="first") task_cols = [t[2] for t in TASKS] hidden_cols = [t[2] for t in HIDDEN_TASKS] all_score_cols = task_cols + hidden_cols for col in all_score_cols: if col not in df.columns: df[col] = np.nan def first_non_null(values): for v in values: if pd.notna(v): return v return np.nan def first_valid_precision(values): for v in values: if isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"}: return v for v in values: if pd.notna(v): return v return "UNK" agg_map = { "datetime": "max", "Precision": first_valid_precision, } agg_map.update({col: first_non_null for col in all_score_cols}) df = df.groupby("Model Name", as_index=False).agg(agg_map) # numeric — hidden_cols converted but excluded from Average for col in all_score_cols: df[col] = (pd.to_numeric(df[col], errors="coerce") * 100).round(2) df["Average"] = df[task_cols].mean(axis=1).round(2) # metadata from Hugging Face API (fetched in parallel for speed) model_names = df["Model Name"].dropna().unique().tolist() hf_meta: Dict[str, Dict[str, Any]] = {} if model_names: max_workers = min(12, len(model_names)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_model = { executor.submit(_fetch_hf_metadata, model_name): model_name for model_name in model_names } for future in as_completed(future_to_model): model_name = future_to_model[future] hf_meta[model_name] = future.result() or {} df["License"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("License")) df["Revision"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Revision")) df["Model Size"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Model Size")) df["Hub ❤️"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Hub ❤️")) df["Type"] = None df["Full Type"] = None # Merge metadata from requests repo (all statuses), not just finished. req_meta = load_requests(None) if not req_meta.empty: if "model" not in req_meta.columns and "model_name" in req_meta.columns: req_meta["model"] = req_meta["model_name"] if "model" not in req_meta.columns: req_meta = pd.DataFrame() if not req_meta.empty: if "precision" in req_meta.columns: req_meta["precision"] = req_meta["precision"].apply(unify_precision) else: req_meta["precision"] = None has_precision_values = req_meta["precision"].apply( lambda v: isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"} ).any() meta = ( req_meta.groupby(["model", "precision"]).last().reset_index() if has_precision_values else pd.DataFrame() ) meta_by_model = req_meta.groupby(["model"]).last().reset_index() def is_missing(v: Any) -> bool: return v is None or (isinstance(v, str) and not v.strip()) or pd.isna(v) def enrich(row): m = pd.DataFrame() if has_precision_values and not meta.empty: m = meta[ (meta["model"] == row["Model Name"]) & (meta["precision"] == row["Precision"]) ] if m.empty: m = meta_by_model[meta_by_model["model"] == row["Model Name"]] if not m.empty: m = m.iloc[0] if is_missing(row.get("License")): row["License"] = m.get("license") if is_missing(row.get("Revision")): row["Revision"] = m.get("revision") model_type_raw = m.get("model_type", "Missing") row["Type"] = MODEL_TYPE_TO_EMOJI.get( model_type_raw, model_type_raw ) row["Full Type"] = model_type_raw return row df = df.apply(enrich, axis=1) df = df.sort_values("Average", ascending=False).reset_index(drop=True) df.insert(0, "Rank", range(1, len(df) + 1)) return df download_dataset_snapshots = download_datasets