Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| # backend/data_loader.py | |
| import json | |
| import os | |
| import contextlib | |
| import io | |
| import logging | |
| import re | |
| import ast | |
| from collections import deque | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from urllib.parse import quote | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from huggingface_hub import snapshot_download | |
| from datetime import datetime | |
| from huggingface_hub.constants import HF_HUB_CACHE | |
| from backend.config import ( | |
| API, | |
| DETAILS_REPO_ID, | |
| REQUESTS_REPO_ID, | |
| RESULTS_REPO_ID, | |
| TASKS, | |
| TASK_SOURCES, | |
| HIDDEN_TASKS, | |
| MODEL_TYPE_TO_EMOJI, | |
| hf_api_token, | |
| ) | |
| from backend.helpers import unify_precision, get_model_size | |
| logger = logging.getLogger(__name__) | |
| _SOURCE_BY_PREFIX = { | |
| prefix.lower(): source | |
| for source, cfg in TASK_SOURCES.items() | |
| for prefix in cfg.get("prefixes", []) | |
| } | |
| _TASKS_BY_SOURCE = { | |
| source: cfg.get("tasks", []) | |
| for source, cfg in TASK_SOURCES.items() | |
| } | |
| # Wire hidden tasks into the "results" source so _parse_result_file extracts | |
| # them alongside normal tasks without touching the shared TASK_SOURCES dict. | |
| _TASKS_BY_SOURCE["results"] = list(_TASKS_BY_SOURCE.get("results", [])) + HIDDEN_TASKS | |
| _RESULT_SCORE_CACHE: Dict[tuple[str, str], Optional[float]] = {} | |
| def _extract_task_bases(task_key: Any) -> List[str]: | |
| if isinstance(task_key, list): | |
| bases: List[str] = [] | |
| for item in task_key: | |
| bases.extend(_extract_task_bases(item)) | |
| return bases | |
| if not isinstance(task_key, str): | |
| return [] | |
| key = task_key.strip() | |
| if not key: | |
| return [] | |
| return [key.split(":", 1)[0].split("|", 1)[0].strip()] | |
| BENCHMARK_DISPLAY_TO_BASES: Dict[str, List[str]] = {} | |
| for task_key, _, display in TASKS: | |
| bases = BENCHMARK_DISPLAY_TO_BASES.setdefault(display, []) | |
| for base in _extract_task_bases(task_key): | |
| if base and base not in bases: | |
| bases.append(base) | |
| def _extract_base_metric_pairs(task_key: Any, metric_key: Any) -> List[tuple[str, str]]: | |
| pairs: List[tuple[str, str]] = [] | |
| if isinstance(task_key, list): | |
| if isinstance(metric_key, list): | |
| for tk, mk in zip(task_key, metric_key): | |
| if isinstance(mk, tuple): | |
| mk = mk[0] | |
| pairs.extend(_extract_base_metric_pairs(tk, mk)) | |
| return pairs | |
| if not isinstance(task_key, str) or not isinstance(metric_key, str): | |
| return pairs | |
| base = task_key.split(":", 1)[0].split("|", 1)[0].strip() | |
| if base: | |
| pairs.append((base, metric_key)) | |
| return pairs | |
| BENCHMARK_BASE_TO_METRICS: Dict[str, List[str]] = {} | |
| BENCHMARK_DISPLAY_TO_BASE_METRICS: Dict[str, Dict[str, List[str]]] = {} | |
| for task_key, metric_key, display in TASKS: | |
| display_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.setdefault(display, {}) | |
| for base, metric_name in _extract_base_metric_pairs(task_key, metric_key): | |
| base_bucket = BENCHMARK_BASE_TO_METRICS.setdefault(base, []) | |
| if metric_name and metric_name not in base_bucket: | |
| base_bucket.append(metric_name) | |
| display_metric_bucket = display_bucket.setdefault(base, []) | |
| if metric_name and metric_name not in display_metric_bucket: | |
| display_metric_bucket.append(metric_name) | |
| DETAILS_EXTENSIONS = {".parquet", ".json", ".jsonl"} | |
| def _norm_key(value: Any) -> str: | |
| return re.sub(r"[^a-z0-9]+", "", str(value or "").strip().lower()) | |
| def _canonical_base_key(value: Any) -> str: | |
| n = _norm_key(value) | |
| if n.startswith("qimma"): | |
| return n[len("qimma"):] | |
| return n | |
| # ----------------------------------------------------------------------------- | |
| # Utilities | |
| # ----------------------------------------------------------------------------- | |
| def silent_snapshot_download(**kwargs): | |
| with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): | |
| return snapshot_download(**kwargs) | |
| def _resolve_details_base_path() -> Path: | |
| repo_cache_root = Path(HF_HUB_CACHE) / f"datasets--{DETAILS_REPO_ID.replace('/', '--')}" | |
| snapshots_root = repo_cache_root / "snapshots" | |
| if snapshots_root.exists(): | |
| candidates = [p for p in snapshots_root.iterdir() if p.is_dir()] | |
| if candidates: | |
| return max(candidates, key=lambda p: p.stat().st_mtime) | |
| manual_root = repo_cache_root / "manual-snapshot" | |
| manual_root.mkdir(parents=True, exist_ok=True) | |
| return manual_root | |
| def _download_details_file(relative_path: str, base_path: Path, retries: int = 3) -> bool: | |
| encoded_rel_path = quote(relative_path, safe="/") | |
| url = f"https://huggingface.co/datasets/{DETAILS_REPO_ID}/resolve/main/{encoded_rel_path}" | |
| headers = {} | |
| if hf_api_token: | |
| headers["Authorization"] = f"Bearer {hf_api_token}" | |
| target_path = base_path / relative_path | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| partial_path = target_path.with_suffix(target_path.suffix + ".part") | |
| for attempt in range(1, retries + 1): | |
| try: | |
| with requests.get(url, stream=True, timeout=(10, 90), headers=headers) as resp: | |
| resp.raise_for_status() | |
| with open(partial_path, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| f.write(chunk) | |
| if partial_path.exists(): | |
| os.replace(partial_path, target_path) | |
| elif target_path.exists(): | |
| return True | |
| else: | |
| raise FileNotFoundError(f"Temporary download file missing: {partial_path}") | |
| return True | |
| except Exception as e: | |
| with contextlib.suppress(Exception): | |
| partial_path.unlink(missing_ok=True) | |
| logger.warning( | |
| "Retry %s/%s for details file '%s' failed: %s", | |
| attempt, | |
| retries, | |
| relative_path, | |
| e, | |
| ) | |
| return False | |
| def _sync_details_dataset(base_path: Path): | |
| try: | |
| remote_files = [ | |
| f for f in API.list_repo_files(repo_id=DETAILS_REPO_ID, repo_type="dataset") | |
| if Path(f).suffix.lower() in DETAILS_EXTENSIONS and Path(f).name.startswith("details_") | |
| ] | |
| except Exception as e: | |
| logger.warning("Could not list files for details repo '%s': %s", DETAILS_REPO_ID, e) | |
| return | |
| local_files = { | |
| str(p.relative_to(base_path)).replace(os.sep, "/") | |
| for p in base_path.rglob("*") | |
| if p.is_file() and p.suffix.lower() in DETAILS_EXTENSIONS | |
| } | |
| remote_set = set(remote_files) | |
| ready_local = local_files & remote_set | |
| missing_files = [f for f in remote_files if f not in local_files] | |
| total_count = len(remote_files) | |
| local_count = len(ready_local) | |
| if not missing_files: | |
| logger.info("Details files ready: %s/%s", local_count, total_count) | |
| return | |
| logger.info( | |
| "Details files ready: %s/%s. Downloading %s missing files...", | |
| local_count, | |
| total_count, | |
| len(missing_files), | |
| ) | |
| failed_files: List[str] = [] | |
| total_missing = len(missing_files) | |
| for idx, rel_path in enumerate(missing_files, start=1): | |
| logger.info("Downloading missing details file %s/%s: %s", idx, total_missing, rel_path) | |
| if not _download_details_file(rel_path, base_path): | |
| failed_files.append(rel_path) | |
| if failed_files: | |
| logger.warning( | |
| "Details sync incomplete. Downloaded %s/%s missing files. Still missing %s files.", | |
| total_missing - len(failed_files), | |
| total_missing, | |
| len(failed_files), | |
| ) | |
| for rel_path in failed_files: | |
| logger.warning("Still missing: %s", rel_path) | |
| else: | |
| logger.info("Details sync complete: downloaded %s/%s missing files.", total_missing, total_missing) | |
| def download_datasets(): | |
| """ | |
| Download requests + results datasets (read-only, anonymous). | |
| """ | |
| req_path = silent_snapshot_download( | |
| repo_id=REQUESTS_REPO_ID, | |
| repo_type="dataset", | |
| allow_patterns="*.json", | |
| ) | |
| os.environ["EVAL_REQUESTS_PATH"] = req_path | |
| res_path = silent_snapshot_download( | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| allow_patterns=["*.json", "*.jsonl"], | |
| ) | |
| os.environ["EVAL_RESULTS_PATH"] = res_path | |
| details_path = silent_snapshot_download( | |
| repo_id=DETAILS_REPO_ID, | |
| repo_type="dataset", | |
| allow_patterns=["*.parquet", "*.json", "*.jsonl"], | |
| ) | |
| os.environ["EVAL_DETAILS_PATH"] = details_path | |
| # ----------------------------------------------------------------------------- | |
| # Requests | |
| # ----------------------------------------------------------------------------- | |
| def load_requests(status: Optional[str] = None) -> pd.DataFrame: | |
| base = os.getenv("EVAL_REQUESTS_PATH") | |
| if not base: | |
| return pd.DataFrame() | |
| rows = [] | |
| for p in Path(base).rglob("*.json"): | |
| try: | |
| with open(p, "r", encoding="utf-8") as f: | |
| d = json.load(f) | |
| except Exception: | |
| continue | |
| if status is None or d.get("status", "").lower() == status.lower(): | |
| rows.append(d) | |
| return pd.DataFrame(rows) | |
| # ----------------------------------------------------------------------------- | |
| # Results parsing | |
| # ----------------------------------------------------------------------------- | |
| def _infer_source_from_filename(path: Path) -> Optional[str]: | |
| parsed = _parse_result_filename(path) | |
| if parsed: | |
| return parsed.get("source") | |
| stem = path.stem | |
| if "_" not in stem: | |
| return None | |
| prefix = stem.split("_", 1)[0].lower() | |
| return _SOURCE_BY_PREFIX.get(prefix) | |
| def _parse_result_filename(path: Path) -> Optional[Dict[str, Any]]: | |
| stem = path.stem | |
| if not stem.startswith("results_"): | |
| return None | |
| try: | |
| _, dt_str = stem.rsplit("_", 1) | |
| parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f") | |
| except Exception: | |
| return None | |
| name_part = stem[len("results_"):].rsplit("_", 1)[0].strip() | |
| if not name_part: | |
| return {"source": "results", "datetime": parsed_dt, "name_part": ""} | |
| base_hint = name_part.split("|", 1)[0].strip() | |
| canon = _canonical_base_key(base_hint) | |
| if canon in {"evalplus", "humaneval", "mbpp"}: | |
| source = "code" | |
| elif canon in {"fannorflop", "fannflop"}: | |
| source = "fannflop" | |
| else: | |
| source = "results" | |
| # Ignore redundant single-benchmark mbpp result shards. | |
| if canon == "mbpp": | |
| return {"source": "ignore", "datetime": parsed_dt, "name_part": name_part} | |
| return {"source": source, "datetime": parsed_dt, "name_part": name_part} | |
| def _load_json_payload_any(path: Path) -> Any: | |
| if path.suffix.lower() != ".jsonl": | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| text = path.read_text(encoding="utf-8", errors="ignore").strip() | |
| if not text: | |
| return {} | |
| with contextlib.suppress(Exception): | |
| return json.loads(text) | |
| rows: List[Any] = [] | |
| for line in text.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| with contextlib.suppress(Exception): | |
| rows.append(json.loads(line)) | |
| return rows | |
| def _parse_result_file(path: Path) -> Optional[Dict[str, Any]]: | |
| try: | |
| raw = _load_json_payload_any(path) | |
| except Exception: | |
| return None | |
| parsed_name = _parse_result_filename(path) | |
| source_type = parsed_name["source"] if parsed_name else _infer_source_from_filename(path) | |
| if source_type in {None, "ignore"}: | |
| return None | |
| data = raw | |
| if isinstance(raw, list): | |
| data = next((x for x in raw if isinstance(x, dict) and ("results" in x or "model_name" in x)), None) | |
| if data is None and raw and isinstance(raw[0], dict): | |
| data = raw[0] | |
| if not isinstance(data, dict): | |
| return None | |
| cfg = data.get("config_general", {}) | |
| results = data.get("results", {}) | |
| if not isinstance(results, dict): | |
| return None | |
| model = cfg.get("model_name") or data.get("model_name", "UNK") | |
| precision = unify_precision(cfg.get("model_dtype", "UNK")) | |
| parsed_dt = parsed_name["datetime"] if parsed_name else None | |
| if parsed_dt is None: | |
| return None | |
| row = { | |
| "Model Name": model, | |
| "Precision": precision, | |
| "datetime": parsed_dt, | |
| "Source Type": source_type, | |
| } | |
| for task_key, metric_key, display in _TASKS_BY_SOURCE.get(source_type, []): | |
| if isinstance(task_key, list): | |
| weight_total = 0 | |
| metric_total = 0 | |
| for t, (m, w) in zip(task_key, metric_key): | |
| val = results.get(t, {}).get(m, 0) | |
| metric_total += (val * w) | |
| weight_total += w | |
| val = metric_total / weight_total if weight_total > 0 else np.nan | |
| else: | |
| val = np.nan | |
| if task_key in results and metric_key in results[task_key]: | |
| val = results.get(task_key, {}).get(metric_key) | |
| if val is None: | |
| logger.warning( | |
| "Missing metric value for task '%s' in model '%s'", | |
| task_key, | |
| model, | |
| ) | |
| row[display] = val | |
| return row | |
| def _latest_model_benchmark_score_pct(model_name: str, benchmark_display: str) -> Optional[float]: | |
| cache_key = (model_name, benchmark_display) | |
| if cache_key in _RESULT_SCORE_CACHE: | |
| return _RESULT_SCORE_CACHE[cache_key] | |
| base = os.getenv("EVAL_RESULTS_PATH") | |
| if not base: | |
| _RESULT_SCORE_CACHE[cache_key] = None | |
| return None | |
| latest_dt: Optional[datetime] = None | |
| latest_val: Optional[float] = None | |
| for p in Path(base).rglob("*"): | |
| if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}: | |
| continue | |
| row = _parse_result_file(p) | |
| if not row: | |
| continue | |
| if str(row.get("Model Name", "")).strip() != str(model_name).strip(): | |
| continue | |
| raw_val = _to_float_scalar(row.get(benchmark_display)) | |
| if raw_val is None: | |
| continue | |
| row_dt = row.get("datetime") | |
| if not isinstance(row_dt, datetime): | |
| continue | |
| if latest_dt is None or row_dt > latest_dt: | |
| latest_dt = row_dt | |
| latest_val = raw_val * 100.0 | |
| _RESULT_SCORE_CACHE[cache_key] = latest_val | |
| return latest_val | |
| def _parse_details_filename(path: Path) -> Optional[Dict[str, Any]]: | |
| stem = path.stem | |
| if "_" not in stem: | |
| return None | |
| details_part, dt_str = stem.rsplit("_", 1) | |
| if not details_part.startswith("details_"): | |
| return None | |
| try: | |
| parsed_dt = datetime.strptime(dt_str, "%Y-%m-%dT%H-%M-%S.%f") | |
| except Exception: | |
| return None | |
| task_full = details_part[len("details_"):].strip() | |
| if not task_full: | |
| return None | |
| benchmark_base = task_full.split(":", 1)[0].split("|", 1)[0].strip() | |
| if ":" in task_full: | |
| subtask = task_full.split(":", 1)[1].strip() | |
| else: | |
| subtask = "overall" | |
| subtask = re.sub(r"\|\d+$", "", subtask).strip() or "overall" | |
| return { | |
| "benchmark_base": benchmark_base, | |
| "subtask": subtask, | |
| "datetime": parsed_dt, | |
| "task_full": task_full, | |
| } | |
| def build_details_index() -> Dict[str, Dict[str, Dict[str, Dict[str, Any]]]]: | |
| """ | |
| Build an index of latest detail file paths per model/benchmark/subtask. | |
| """ | |
| details_base = os.getenv("EVAL_DETAILS_PATH") | |
| if not details_base: | |
| return {} | |
| base_path = Path(details_base) | |
| if not base_path.exists(): | |
| return {} | |
| index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]] = {} | |
| for p in base_path.rglob("*"): | |
| if not p.is_file() or p.suffix.lower() not in DETAILS_EXTENSIONS: | |
| continue | |
| parsed = _parse_details_filename(p) | |
| if not parsed: | |
| continue | |
| try: | |
| rel_parts = p.relative_to(base_path).parts | |
| except Exception: | |
| continue | |
| if len(rel_parts) < 2: | |
| continue | |
| model_name = "/".join(rel_parts[:-1]).strip("/") | |
| if not model_name: | |
| continue | |
| benchmark_base = parsed["benchmark_base"] | |
| subtask = parsed["subtask"] | |
| dt = parsed["datetime"] | |
| model_bucket = index.setdefault(model_name, {}) | |
| bench_bucket = model_bucket.setdefault(benchmark_base, {}) | |
| current = bench_bucket.get(subtask) | |
| if current is None or dt > current["datetime"]: | |
| bench_bucket[subtask] = { | |
| "path": str(p), | |
| "datetime": dt, | |
| "task_full": parsed["task_full"], | |
| } | |
| return index | |
| def _as_list(value: Any) -> List[Any]: | |
| if value is None: | |
| return [] | |
| if isinstance(value, list): | |
| return value | |
| if isinstance(value, tuple): | |
| return list(value) | |
| if isinstance(value, np.ndarray): | |
| return value.tolist() | |
| return [value] | |
| def _as_dict(value: Any) -> Dict[str, Any]: | |
| if isinstance(value, dict): | |
| return value | |
| if isinstance(value, (bytes, bytearray)): | |
| try: | |
| value = value.decode("utf-8", errors="ignore") | |
| except Exception: | |
| return {} | |
| if isinstance(value, str): | |
| s = value.strip() | |
| if not s: | |
| return {} | |
| try: | |
| parsed = json.loads(s) | |
| return parsed if isinstance(parsed, dict) else {} | |
| except Exception: | |
| try: | |
| parsed = ast.literal_eval(s) | |
| return parsed if isinstance(parsed, dict) else {} | |
| except Exception: | |
| return {} | |
| if isinstance(value, list): | |
| # Some parquet backends can expose map-like structs as list of pairs. | |
| try: | |
| if all(isinstance(item, (list, tuple)) and len(item) == 2 for item in value): | |
| return {str(k): v for k, v in value} | |
| except Exception: | |
| return {} | |
| return {} | |
| def _py_scalar(value: Any) -> Any: | |
| if isinstance(value, np.ndarray): | |
| if value.ndim == 0: | |
| return _py_scalar(value.item()) | |
| if value.size == 1: | |
| return _py_scalar(value.reshape(-1)[0]) | |
| return [_py_scalar(v) for v in value.tolist()] | |
| if isinstance(value, np.generic): | |
| return value.item() | |
| return value | |
| def _decode_structured_string(value: Any) -> Any: | |
| value = _py_scalar(value) | |
| if not isinstance(value, str): | |
| return value | |
| s = value.strip() | |
| if not s: | |
| return value | |
| looks_structured = ( | |
| (s.startswith("{") and s.endswith("}")) or | |
| (s.startswith("[") and s.endswith("]")) | |
| ) | |
| if not looks_structured: | |
| return value | |
| for parser in (json.loads, ast.literal_eval): | |
| with contextlib.suppress(Exception): | |
| parsed = parser(s) | |
| if isinstance(parsed, (dict, list)): | |
| return _json_safe(parsed) | |
| return value | |
| def _json_safe(value: Any) -> Any: | |
| value = _py_scalar(value) | |
| if isinstance(value, dict): | |
| return {str(k): _json_safe(v) for k, v in value.items()} | |
| if isinstance(value, list): | |
| return [_json_safe(v) for v in value] | |
| if isinstance(value, tuple): | |
| return [_json_safe(v) for v in value] | |
| return value | |
| def _to_float_scalar(value: Any) -> Optional[float]: | |
| value = _py_scalar(value) | |
| if isinstance(value, (int, float, np.integer, np.floating)): | |
| return float(value) | |
| return None | |
| def _normalize_indices(value: Any) -> List[int]: | |
| indices: List[int] = [] | |
| for item in _as_list(value): | |
| item = _py_scalar(item) | |
| if isinstance(item, (int, np.integer)): | |
| indices.append(int(item)) | |
| return indices | |
| def _format_answer(values: List[Any]) -> Any: | |
| if not values: | |
| return None | |
| clean = [str(_py_scalar(v)) for v in values] | |
| if len(clean) == 1: | |
| return clean[0] | |
| return ", ".join(clean) | |
| def _norm_answer(value: Any) -> str: | |
| value = _py_scalar(value) | |
| if value is None: | |
| return "" | |
| return str(value).strip() | |
| def _is_primitive_answer(value: Any) -> bool: | |
| value = _py_scalar(value) | |
| return value is not None and isinstance(value, (str, int, float, bool, np.integer, np.floating)) | |
| def _pick_metric( | |
| metric: Dict[str, Any], | |
| benchmark_base: str, | |
| preferred_metrics: Optional[List[str]] = None, | |
| ) -> tuple[Optional[str], Optional[float]]: | |
| if not isinstance(metric, dict) or not metric: | |
| return None, None | |
| preferred = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(benchmark_base, []) | |
| if not preferred: | |
| canon_base = _canonical_base_key(benchmark_base) | |
| for base_key, names in BENCHMARK_BASE_TO_METRICS.items(): | |
| if _canonical_base_key(base_key) == canon_base: | |
| preferred = names | |
| break | |
| for name in preferred: | |
| if name in metric: | |
| val = _to_float_scalar(metric.get(name)) | |
| if val is not None: | |
| return name, val | |
| # Fallback for known detail formats. | |
| for name in ["normalized_score_norm", "BERTScore-F", "acc", "accuracy"]: | |
| if name in metric: | |
| val = _to_float_scalar(metric.get(name)) | |
| if val is not None: | |
| return name, val | |
| for name, raw_val in metric.items(): | |
| val = _to_float_scalar(raw_val) | |
| if val is not None: | |
| return str(name), val | |
| return None, None | |
| def _is_binary_metric_name(metric_name: Optional[str]) -> bool: | |
| if not metric_name: | |
| return False | |
| n = metric_name.lower() | |
| return ( | |
| n.startswith("acc") | |
| or "accuracy" in n | |
| or "score_norm" in n | |
| or n.endswith("_status") | |
| or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"} | |
| ) | |
| def _is_choice_metric_name(metric_name: Optional[str]) -> bool: | |
| if not metric_name: | |
| return False | |
| n = metric_name.lower() | |
| return ( | |
| n.startswith("acc") | |
| or "mc_prob" in n | |
| or "score_norm" in n | |
| or n.endswith("_status") | |
| or n in {"exact_match", "fann_or_flop", "fannorflop", "eval_plus"} | |
| ) | |
| def _extract_predicted_answer(model_response: Dict[str, Any], choices: List[Any]) -> Any: | |
| logprobs = model_response.get("logprobs") | |
| if logprobs is not None and choices: | |
| values = _as_list(logprobs) | |
| try: | |
| idx = int(np.argmax(np.asarray(values, dtype=float))) | |
| if 0 <= idx < len(choices): | |
| return choices[idx] | |
| except Exception: | |
| pass | |
| text_post_processed = _as_list(model_response.get("text_post_processed")) | |
| if text_post_processed: | |
| return text_post_processed[0] | |
| text = _as_list(model_response.get("text")) | |
| if text: | |
| return text[0] | |
| return None | |
| def _first_non_empty(values: Any) -> Optional[str]: | |
| for v in _as_list(values): | |
| if v is None: | |
| continue | |
| s = str(v).strip() | |
| if s: | |
| return s | |
| return None | |
| def _structured_record_to_row( | |
| record: Dict[str, Any], | |
| subtask: str, | |
| benchmark_base: str, | |
| preferred_metrics: Optional[List[str]] = None, | |
| ) -> Dict[str, Any]: | |
| doc = _as_dict(record.get("doc")) | |
| metric = _as_dict(record.get("metric")) | |
| model_response = _as_dict(record.get("model_response")) | |
| choices = _as_list(doc.get("choices")) | |
| choices = [_py_scalar(c) for c in choices] | |
| gold_indices = _normalize_indices(doc.get("gold_index")) | |
| gold_values: List[Any] = [] | |
| for idx in gold_indices: | |
| if 0 <= idx < len(choices): | |
| gold_values.append(choices[idx]) | |
| gold_answer = _format_answer(gold_values) | |
| metric_name, metric_value = _pick_metric(metric, benchmark_base, preferred_metrics) | |
| model_response_dict = model_response if isinstance(model_response, dict) else {} | |
| predicted_answer = _extract_predicted_answer(model_response_dict, choices) | |
| output_text = _first_non_empty(model_response_dict.get("text_post_processed")) | |
| if output_text is None: | |
| output_text = _first_non_empty(model_response_dict.get("text")) | |
| if output_text is None and predicted_answer is not None: | |
| output_text = str(predicted_answer) | |
| is_correct = None | |
| if metric_value is not None and _is_binary_metric_name(metric_name) and metric_value in (0.0, 1.0): | |
| is_correct = bool(metric_value) | |
| else: | |
| binary_score = _to_float_scalar(metric.get("normalized_score_norm")) | |
| if binary_score is not None and binary_score in (0.0, 1.0): | |
| is_correct = bool(binary_score) | |
| # For multi-gold classification (e.g. Mizan), accept prediction if it matches any gold option. | |
| pred_norm = _norm_answer(predicted_answer) | |
| choice_norms = {_norm_answer(c) for c in choices if _norm_answer(c)} | |
| gold_norms = {_norm_answer(g) for g in gold_values if _norm_answer(g)} | |
| if _is_choice_metric_name(metric_name) and pred_norm and pred_norm in choice_norms and gold_norms: | |
| is_correct = pred_norm in gold_norms | |
| predicted_answer = _py_scalar(predicted_answer) | |
| if isinstance(predicted_answer, list): | |
| predicted_answer = _format_answer(predicted_answer) | |
| prompt = ( | |
| doc.get("query") | |
| or doc.get("original_query") | |
| or doc.get("instruction") | |
| or model_response_dict.get("input") | |
| or "" | |
| ) | |
| return _json_safe({ | |
| "subtask": subtask, | |
| "question_id": _py_scalar(doc.get("id")), | |
| "task_name": _py_scalar(doc.get("task_name")), | |
| "prompt": prompt, | |
| "input_prompt": model_response_dict.get("input"), | |
| "output": output_text, | |
| "choices": [str(c) for c in choices], | |
| "gold_answer": _py_scalar(gold_answer), | |
| "predicted_answer": _py_scalar(predicted_answer), | |
| "is_correct": is_correct, | |
| "metric_name": metric_name, | |
| "metric": metric_value, | |
| }) | |
| def _read_detail_parquet( | |
| path: str, | |
| subtask: str, | |
| benchmark_base: str, | |
| preferred_metrics: Optional[List[str]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| try: | |
| df = pd.read_parquet(path) | |
| except Exception as e: | |
| logger.warning("Could not read details parquet '%s': %s", path, e) | |
| return [] | |
| records = df.to_dict(orient="records") | |
| if not records: | |
| return [] | |
| sample = records[0] if isinstance(records[0], dict) else {} | |
| has_structured_fields = isinstance(sample, dict) and any( | |
| key in sample for key in ("doc", "metric", "model_response") | |
| ) | |
| if has_structured_fields: | |
| return [ | |
| _structured_record_to_row(record, subtask, benchmark_base, preferred_metrics) | |
| for record in records | |
| if isinstance(record, dict) | |
| ] | |
| # Simple row format (e.g. fannorflop parquet). | |
| rows: List[Dict[str, Any]] = [] | |
| for rec in records: | |
| if not isinstance(rec, dict): | |
| continue | |
| metric_key = next( | |
| ( | |
| k | |
| for k in ("BertScore", "bert_score", "f1", "score", "metric") | |
| if k in rec and _to_float_scalar(rec.get(k)) is not None | |
| ), | |
| None, | |
| ) | |
| metric_value = rec.get(metric_key) if metric_key else None | |
| output = ( | |
| rec.get("extracted_response") | |
| or rec.get("response") | |
| or rec.get("extracted_json") | |
| or rec.get("raw_response") | |
| ) | |
| predicted = rec.get("predicted_answer") or output | |
| gold_raw = rec.get("gold_answer") | |
| gold_display = gold_raw if gold_raw not in (None, "") else ( | |
| rec.get("gold_verse_explanations") | |
| if rec.get("gold_verse_explanations") not in (None, "") | |
| else rec.get("verse_explanations") | |
| ) | |
| is_correct = None | |
| # Only enable binary correct/wrong mode for explicit gold_answer labels. | |
| binary_mode = _is_primitive_answer(gold_raw) and _is_primitive_answer(predicted) | |
| if binary_mode: | |
| gold_norm = _norm_answer(gold_raw) | |
| pred_norm = _norm_answer(predicted) | |
| if gold_norm and pred_norm: | |
| is_correct = (gold_norm == pred_norm) | |
| metric_key = "fannorflop" | |
| metric_value = 1.0 if is_correct else 0.0 | |
| rows.append(_make_simple_row( | |
| subtask=subtask, | |
| question_id=rec.get("id") or rec.get("question_id"), | |
| task_name=benchmark_base, | |
| prompt=rec.get("prompt"), | |
| output=output, | |
| gold_answer=gold_display, | |
| predicted_answer=predicted, | |
| metric_name=metric_key, | |
| metric_value=metric_value, | |
| is_correct=is_correct, | |
| )) | |
| return rows | |
| def _load_json_payload(path: str) -> Any: | |
| p = Path(path) | |
| if p.suffix.lower() == ".jsonl": | |
| text = p.read_text(encoding="utf-8", errors="ignore").strip() | |
| if not text: | |
| return [] | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| rows: List[Any] = [] | |
| for line in text.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| with contextlib.suppress(Exception): | |
| rows.append(json.loads(line)) | |
| return rows | |
| with open(p, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def _make_simple_row( | |
| *, | |
| subtask: str, | |
| question_id: Any, | |
| task_name: Any, | |
| prompt: Any, | |
| output: Any, | |
| gold_answer: Any, | |
| predicted_answer: Any, | |
| metric_name: Any, | |
| metric_value: Any, | |
| is_correct: Any, | |
| summary_accuracy_override: Any = None, | |
| ) -> Dict[str, Any]: | |
| row = { | |
| "subtask": subtask, | |
| "question_id": _py_scalar(question_id), | |
| "task_name": _py_scalar(task_name), | |
| "prompt": _decode_structured_string(prompt or ""), | |
| "input_prompt": None, | |
| "output": _decode_structured_string(output), | |
| "choices": [], | |
| "gold_answer": _decode_structured_string(gold_answer), | |
| "predicted_answer": _decode_structured_string(predicted_answer), | |
| "is_correct": is_correct, | |
| "metric_name": metric_name, | |
| "metric": _to_float_scalar(metric_value), | |
| } | |
| if summary_accuracy_override is not None: | |
| row["_summary_accuracy_override"] = _to_float_scalar(summary_accuracy_override) | |
| return _json_safe(row) | |
| def _read_detail_fannorflop_rows(records: List[Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]: | |
| rows: List[Dict[str, Any]] = [] | |
| for rec in records: | |
| if not isinstance(rec, dict): | |
| continue | |
| metric_key = None | |
| for k in ("BertScore", "bert_score", "score", "f1"): | |
| if k in rec: | |
| metric_key = k | |
| break | |
| metric_value = rec.get(metric_key) if metric_key else None | |
| output = rec.get("extracted_response") or rec.get("response") | |
| predicted = rec.get("predicted_answer") or output | |
| gold = rec.get("gold_answer") | |
| if gold in (None, ""): | |
| gold = rec.get("gold_verse_explanations") | |
| if gold in (None, ""): | |
| gold = rec.get("verse_explanations") | |
| is_correct = None | |
| # Only enable binary mode when explicit gold_answer exists. | |
| binary_mode = _is_primitive_answer(gold) and _is_primitive_answer(predicted) | |
| if binary_mode and gold not in (None, "") and predicted not in (None, ""): | |
| is_correct = (_norm_answer(gold) == _norm_answer(predicted)) | |
| metric_key = "fannorflop" | |
| metric_value = 1.0 if is_correct else 0.0 | |
| rows.append(_make_simple_row( | |
| subtask=subtask, | |
| question_id=rec.get("id"), | |
| task_name=benchmark_base, | |
| prompt=rec.get("prompt"), | |
| output=output, | |
| gold_answer=gold, | |
| predicted_answer=predicted, | |
| metric_name=metric_key, | |
| metric_value=metric_value, | |
| is_correct=is_correct, | |
| )) | |
| return rows | |
| def _read_detail_code_eval_json(data: Dict[str, Any], subtask: str, benchmark_base: str) -> List[Dict[str, Any]]: | |
| rows: List[Dict[str, Any]] = [] | |
| eval_map = data.get("eval") | |
| if not isinstance(eval_map, dict): | |
| return rows | |
| summary_override = None | |
| pass_at_k = data.get("pass_at_k") | |
| if isinstance(pass_at_k, dict): | |
| plus = pass_at_k.get("plus") | |
| if isinstance(plus, dict): | |
| pass_at_1 = _to_float_scalar(plus.get("pass@1")) | |
| if pass_at_1 is not None: | |
| summary_override = pass_at_1 * 100.0 | |
| for task_id, entries in eval_map.items(): | |
| for rec in _as_list(entries): | |
| if not isinstance(rec, dict): | |
| continue | |
| plus_status = str(rec.get("plus_status", "")).strip().lower() | |
| plus_status_text = plus_status if plus_status in {"pass", "fail"} else "" | |
| is_correct = None | |
| metric_value = None | |
| if plus_status in {"pass", "fail"}: | |
| is_correct = (plus_status == "pass") | |
| metric_value = 1.0 if is_correct else 0.0 | |
| output = rec.get("solution") or rec.get("completion") | |
| rows.append(_make_simple_row( | |
| subtask=subtask, | |
| question_id=rec.get("task_id") or task_id, | |
| task_name=task_id, | |
| prompt="", | |
| output=output, | |
| gold_answer=rec.get("gold_answer") or "", | |
| predicted_answer=plus_status_text or rec.get("predicted_answer") or "", | |
| metric_name="eval_plus", | |
| metric_value=metric_value, | |
| is_correct=is_correct, | |
| summary_accuracy_override=summary_override, | |
| )) | |
| return rows | |
| def _read_detail_json_any( | |
| path: str, | |
| subtask: str, | |
| benchmark_base: str, | |
| preferred_metrics: Optional[List[str]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| try: | |
| data = _load_json_payload(path) | |
| except Exception as e: | |
| logger.warning("Could not read details json/jsonl '%s': %s", path, e) | |
| return [] | |
| base_norm = _canonical_base_key(benchmark_base) | |
| if base_norm == "fannorflop": | |
| if isinstance(data, list): | |
| return _read_detail_fannorflop_rows(data, subtask, benchmark_base) | |
| if isinstance(data, dict) and isinstance(data.get("rows"), list): | |
| return _read_detail_fannorflop_rows(data["rows"], subtask, benchmark_base) | |
| if isinstance(data, dict) and isinstance(data.get("eval"), dict): | |
| return _read_detail_code_eval_json(data, subtask, benchmark_base) | |
| if isinstance(data, list): | |
| rows: List[Dict[str, Any]] = [] | |
| for rec in data: | |
| if not isinstance(rec, dict): | |
| continue | |
| if any(k in rec for k in ("doc", "metric", "model_response")): | |
| rows.append(_structured_record_to_row(rec, subtask, benchmark_base, preferred_metrics)) | |
| if rows: | |
| return rows | |
| if data and isinstance(data[0], dict): | |
| return _read_detail_fannorflop_rows(data, subtask, benchmark_base) | |
| return [] | |
| def _read_detail_file( | |
| path: str, | |
| subtask: str, | |
| benchmark_base: str, | |
| preferred_metrics: Optional[List[str]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| ext = Path(path).suffix.lower() | |
| if ext == ".parquet": | |
| return _read_detail_parquet(path, subtask, benchmark_base, preferred_metrics) | |
| if ext in {".json", ".jsonl"}: | |
| return _read_detail_json_any(path, subtask, benchmark_base, preferred_metrics) | |
| return [] | |
| def load_benchmark_details( | |
| model_name: str, | |
| benchmark_display: str, | |
| details_index: Dict[str, Dict[str, Dict[str, Dict[str, Any]]]], | |
| max_rows: int = 250, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Load per-question benchmark details for a model from indexed parquet files. | |
| """ | |
| model_bucket = details_index.get(model_name, {}) | |
| if not model_bucket: | |
| target_model = model_name.strip().lower() | |
| for indexed_model, bucket in details_index.items(): | |
| if indexed_model.strip().lower() == target_model: | |
| model_bucket = bucket | |
| break | |
| benchmark_bases = BENCHMARK_DISPLAY_TO_BASES.get(benchmark_display, []) | |
| if not benchmark_bases: | |
| benchmark_bases = [benchmark_display] | |
| selected_entries: List[tuple[str, str, Dict[str, Any], List[str]]] = [] | |
| for base in benchmark_bases: | |
| subtasks = model_bucket.get(base, {}) | |
| selected_base = base | |
| if not subtasks: | |
| base_l = _canonical_base_key(base) | |
| for indexed_base, bucket in model_bucket.items(): | |
| if _canonical_base_key(indexed_base) == base_l: | |
| selected_base = indexed_base | |
| subtasks = bucket | |
| break | |
| display_metric_bucket = BENCHMARK_DISPLAY_TO_BASE_METRICS.get(benchmark_display, {}) | |
| preferred_metrics = display_metric_bucket.get(selected_base) | |
| if preferred_metrics is None: | |
| # Key-normalized fallback. | |
| for k, v in display_metric_bucket.items(): | |
| if _canonical_base_key(k) == _canonical_base_key(selected_base): | |
| preferred_metrics = v | |
| break | |
| preferred_metrics = preferred_metrics or BENCHMARK_BASE_TO_METRICS.get(selected_base, []) | |
| if not preferred_metrics: | |
| canon_base = _canonical_base_key(selected_base) | |
| for k, v in BENCHMARK_BASE_TO_METRICS.items(): | |
| if _canonical_base_key(k) == canon_base: | |
| preferred_metrics = v | |
| break | |
| for subtask, info in subtasks.items(): | |
| selected_entries.append((selected_base, subtask, info, preferred_metrics)) | |
| if not selected_entries: | |
| return {"benchmark": benchmark_display, "subtasks": [], "rows": []} | |
| selected_entries.sort(key=lambda x: x[1].lower()) | |
| rows_by_subtask: List[List[Dict[str, Any]]] = [] | |
| subtasks_summary: List[Dict[str, Any]] = [] | |
| for base, subtask, info, preferred_metrics in selected_entries: | |
| display_subtask = benchmark_display if subtask == "overall" else subtask | |
| rows = _read_detail_file(info["path"], display_subtask, base, preferred_metrics) | |
| rows_by_subtask.append(rows) | |
| scored_rows = [r for r in rows if r.get("metric") is not None] | |
| metric_name = next((str(r.get("metric_name")) for r in scored_rows if r.get("metric_name")), None) | |
| use_metric_mode = metric_name is not None and not _is_binary_metric_name(metric_name) | |
| summary_override = next( | |
| (_to_float_scalar(r.get("_summary_accuracy_override")) for r in rows if r.get("_summary_accuracy_override") is not None), | |
| None, | |
| ) | |
| if use_metric_mode: | |
| correct = None | |
| scored = len(scored_rows) | |
| avg_metric = (sum(float(r["metric"]) for r in scored_rows) / scored) if scored > 0 else None | |
| accuracy = round(avg_metric * 100, 2) if avg_metric is not None else None | |
| summary_mode = "metric" | |
| else: | |
| binary_rows = [r for r in rows if isinstance(r.get("is_correct"), bool)] | |
| correct = sum(1 for r in binary_rows if r["is_correct"]) | |
| scored = len(binary_rows) | |
| accuracy = round((correct / scored) * 100, 2) if scored > 0 else None | |
| if summary_override is not None: | |
| accuracy = round(summary_override, 2) | |
| if scored > 0: | |
| correct = int(round((accuracy / 100.0) * scored)) | |
| summary_mode = "binary" | |
| # FannOrFlop details parquet may have per-row BertScore=0 while official score lives in results f1. | |
| if _canonical_base_key(base) == "fannorflop": | |
| outside_score = _latest_model_benchmark_score_pct(model_name, benchmark_display) | |
| if outside_score is not None: | |
| accuracy = round(outside_score, 2) | |
| summary_mode = "metric" | |
| correct = None | |
| subtasks_summary.append({ | |
| "subtask": display_subtask, | |
| "total": len(rows), | |
| "scored": scored, | |
| "correct": correct, | |
| "accuracy": accuracy, | |
| "mode": summary_mode, | |
| }) | |
| total_rows = sum(len(rows) for rows in rows_by_subtask) | |
| if max_rows > 0 and total_rows > max_rows: | |
| queues = [deque(rows) for rows in rows_by_subtask] | |
| all_rows: List[Dict[str, Any]] = [] | |
| while len(all_rows) < max_rows: | |
| progressed = False | |
| for q in queues: | |
| if not q: | |
| continue | |
| all_rows.append(q.popleft()) | |
| progressed = True | |
| if len(all_rows) >= max_rows: | |
| break | |
| if not progressed: | |
| break | |
| else: | |
| all_rows = [row for rows in rows_by_subtask for row in rows] | |
| for row in all_rows: | |
| if isinstance(row, dict): | |
| row.pop("_summary_accuracy_override", None) | |
| return { | |
| "benchmark": benchmark_display, | |
| "subtasks": subtasks_summary, | |
| "rows": all_rows, | |
| } | |
| # Manual size overrides (in billions) for models where HF API returns no safetensors metadata. | |
| _MODEL_SIZE_OVERRIDES: Dict[str, float] = { | |
| "Qwen/Qwen2.5-14B-Instruct": 14.0, | |
| "Qwen/Qwen2.5-32B-Instruct": 32.0, | |
| "Qwen/Qwen3-30B-A3B-Instruct-2507": 30.0, | |
| "Qwen/Qwen3-235B-A22B-Instruct-2507": 235.0, | |
| "google/gemma-3-270m-it": 0.27, | |
| "google/gemma-3-1b-it": 1.0, | |
| "google/gemma-3-1b-pt": 1.0, | |
| "google/gemma-3-4b-it": 4.0, | |
| "google/gemma-3-12b-it": 12.0, | |
| "google/gemma-3-27b-pt": 27.0, | |
| "microsoft/Phi-4-mini-instruct": 3.8, | |
| } | |
| def _fetch_hf_metadata(model_name: str) -> Dict[str, Any]: | |
| try: | |
| info = API.model_info(repo_id=model_name, token=hf_api_token) | |
| except Exception as e: | |
| logger.warning("Could not fetch HF metadata for '%s': %s", model_name, e) | |
| return {} | |
| card_data = getattr(info, "card_data", None) | |
| if isinstance(card_data, dict): | |
| license_name = card_data.get("license") | |
| else: | |
| license_name = getattr(card_data, "license", None) | |
| model_size = get_model_size(model_info=info) | |
| if model_size == 0: | |
| safetensors = getattr(info, "safetensors", None) | |
| if not safetensors or not safetensors.get("total"): | |
| model_size = _MODEL_SIZE_OVERRIDES.get(model_name) | |
| return { | |
| "License": license_name, | |
| "Revision": getattr(info, "sha", None), | |
| "Model Size": model_size, | |
| "Hub ❤️": getattr(info, "likes", None), | |
| } | |
| def load_scoreboard() -> pd.DataFrame: | |
| """ | |
| Main entrypoint used by the Space UI. | |
| """ | |
| download_datasets() | |
| result_base = os.getenv("EVAL_RESULTS_PATH") | |
| if not result_base: | |
| return pd.DataFrame() | |
| rows = [] | |
| for p in Path(result_base).rglob("*"): | |
| if not p.is_file() or p.suffix.lower() not in {".json", ".jsonl"}: | |
| continue | |
| row = _parse_result_file(p) | |
| if row: | |
| rows.append(row) | |
| if not rows: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(rows) | |
| df["datetime"] = pd.to_datetime(df["datetime"]) | |
| # Keep latest file per (model, source), then merge source metrics per model. | |
| df = df.sort_values("datetime", ascending=False) | |
| df = df.drop_duplicates(subset=["Model Name", "Source Type"], keep="first") | |
| task_cols = [t[2] for t in TASKS] | |
| hidden_cols = [t[2] for t in HIDDEN_TASKS] | |
| all_score_cols = task_cols + hidden_cols | |
| for col in all_score_cols: | |
| if col not in df.columns: | |
| df[col] = np.nan | |
| def first_non_null(values): | |
| for v in values: | |
| if pd.notna(v): | |
| return v | |
| return np.nan | |
| def first_valid_precision(values): | |
| for v in values: | |
| if isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"}: | |
| return v | |
| for v in values: | |
| if pd.notna(v): | |
| return v | |
| return "UNK" | |
| agg_map = { | |
| "datetime": "max", | |
| "Precision": first_valid_precision, | |
| } | |
| agg_map.update({col: first_non_null for col in all_score_cols}) | |
| df = df.groupby("Model Name", as_index=False).agg(agg_map) | |
| # numeric — hidden_cols converted but excluded from Average | |
| for col in all_score_cols: | |
| df[col] = (pd.to_numeric(df[col], errors="coerce") * 100).round(2) | |
| df["Average"] = df[task_cols].mean(axis=1).round(2) | |
| # metadata from Hugging Face API (fetched in parallel for speed) | |
| model_names = df["Model Name"].dropna().unique().tolist() | |
| hf_meta: Dict[str, Dict[str, Any]] = {} | |
| if model_names: | |
| max_workers = min(12, len(model_names)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_model = { | |
| executor.submit(_fetch_hf_metadata, model_name): model_name | |
| for model_name in model_names | |
| } | |
| for future in as_completed(future_to_model): | |
| model_name = future_to_model[future] | |
| hf_meta[model_name] = future.result() or {} | |
| df["License"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("License")) | |
| df["Revision"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Revision")) | |
| df["Model Size"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Model Size")) | |
| df["Hub ❤️"] = df["Model Name"].map(lambda name: hf_meta.get(name, {}).get("Hub ❤️")) | |
| df["Type"] = None | |
| df["Full Type"] = None | |
| # Merge metadata from requests repo (all statuses), not just finished. | |
| req_meta = load_requests(None) | |
| if not req_meta.empty: | |
| if "model" not in req_meta.columns and "model_name" in req_meta.columns: | |
| req_meta["model"] = req_meta["model_name"] | |
| if "model" not in req_meta.columns: | |
| req_meta = pd.DataFrame() | |
| if not req_meta.empty: | |
| if "precision" in req_meta.columns: | |
| req_meta["precision"] = req_meta["precision"].apply(unify_precision) | |
| else: | |
| req_meta["precision"] = None | |
| has_precision_values = req_meta["precision"].apply( | |
| lambda v: isinstance(v, str) and v.strip() and v not in {"Missing", "UNK"} | |
| ).any() | |
| meta = ( | |
| req_meta.groupby(["model", "precision"]).last().reset_index() | |
| if has_precision_values | |
| else pd.DataFrame() | |
| ) | |
| meta_by_model = req_meta.groupby(["model"]).last().reset_index() | |
| def is_missing(v: Any) -> bool: | |
| return v is None or (isinstance(v, str) and not v.strip()) or pd.isna(v) | |
| def enrich(row): | |
| m = pd.DataFrame() | |
| if has_precision_values and not meta.empty: | |
| m = meta[ | |
| (meta["model"] == row["Model Name"]) & | |
| (meta["precision"] == row["Precision"]) | |
| ] | |
| if m.empty: | |
| m = meta_by_model[meta_by_model["model"] == row["Model Name"]] | |
| if not m.empty: | |
| m = m.iloc[0] | |
| if is_missing(row.get("License")): | |
| row["License"] = m.get("license") | |
| if is_missing(row.get("Revision")): | |
| row["Revision"] = m.get("revision") | |
| model_type_raw = m.get("model_type", "Missing") | |
| row["Type"] = MODEL_TYPE_TO_EMOJI.get( | |
| model_type_raw, model_type_raw | |
| ) | |
| row["Full Type"] = model_type_raw | |
| return row | |
| df = df.apply(enrich, axis=1) | |
| df = df.sort_values("Average", ascending=False).reset_index(drop=True) | |
| df.insert(0, "Rank", range(1, len(df) + 1)) | |
| return df | |
| download_dataset_snapshots = download_datasets | |