Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| from fastapi.templating import Jinja2Templates | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| import pandas as pd | |
| import uvicorn | |
| from contextlib import asynccontextmanager | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| import logging | |
| from collections import OrderedDict | |
| from typing import Any, Dict, Optional, Tuple | |
| from backend.data_loader import ( | |
| download_dataset_snapshots, | |
| load_scoreboard, | |
| load_requests, | |
| build_details_index, | |
| load_benchmark_details, | |
| ) | |
| from backend.submission_handler import submit_model | |
| from backend.config import TASKS, HIDDEN_TASKS, BENCHMARK_METADATA, API, hf_api_token, BENCHMARK_DETAILS_PAGE_SIZE | |
| from fastapi import FastAPI, Request, Form, BackgroundTasks, HTTPException | |
| # Logging setup | |
| logging.getLogger("apscheduler").setLevel(logging.WARNING) | |
| # --- Global Cache Variables --- | |
| GLOBAL_LEADERBOARD_DATA = [] | |
| GLOBAL_QUEUE_DATA = {} | |
| GLOBAL_DETAILS_INDEX = {} | |
| GLOBAL_BENCHMARK_DETAILS_CACHE: "OrderedDict[Tuple[str, str], Dict[str, Any]]" = OrderedDict() | |
| BENCHMARK_DETAILS_CACHE_MAX_ITEMS = 32 | |
| ACCEPTED_PAGES = ["about.html", "header.html", "leaderboard.html", "submit.html"] | |
| def refresh(): | |
| df = load_scoreboard() | |
| return df | |
| # --- Cache Update Functions --- | |
| def update_leaderboard_cache(): | |
| """Reads data from disk, processes it, and updates the global variable.""" | |
| global GLOBAL_LEADERBOARD_DATA | |
| try: | |
| df = load_scoreboard() | |
| if df.empty: | |
| GLOBAL_LEADERBOARD_DATA = [] | |
| else: | |
| df = df.drop(columns=["Model Size Filter"], errors="ignore") | |
| # Keep scores numeric, but show Unknown for missing metadata fields. | |
| score_cols = [t[2] for t in TASKS] + [t[2] for t in HIDDEN_TASKS] + ["Average", "Rank"] | |
| for col in score_cols: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) | |
| if "Model Size" in df.columns: | |
| size_series = pd.to_numeric(df["Model Size"], errors="coerce") | |
| df["Model Size"] = size_series.apply(lambda v: round(float(v), 1) if pd.notna(v) else "Unknown") | |
| if "Hub ❤️" in df.columns: | |
| likes_series = pd.to_numeric(df["Hub ❤️"], errors="coerce") | |
| df["Hub ❤️"] = likes_series.apply(lambda v: int(v) if pd.notna(v) else "Unknown") | |
| for col in ["License", "Revision", "Type", "Full Type", "Precision"]: | |
| if col in df.columns: | |
| df[col] = df[col].replace("", pd.NA).fillna("Unknown") | |
| # Update global variable | |
| GLOBAL_LEADERBOARD_DATA = df.drop(columns=["datetime"]).to_dict(orient="records") | |
| except Exception as e: | |
| logging.error(f"❌ Error updating leaderboard cache: {e}") | |
| def update_queue_cache(): | |
| """Reads queue data from disk and updates the global variable.""" | |
| global GLOBAL_QUEUE_DATA | |
| statuses = ["pending", "running", "finished", "failed"] | |
| new_queue_data = {} | |
| try: | |
| for status in statuses: | |
| df = load_requests(status) | |
| if df.empty: | |
| new_queue_data[status] = [] | |
| else: | |
| models = [] | |
| for _, row in df.iterrows(): | |
| # Handle potential column name variations | |
| name = row.get("model", row.get("model_name", "Unknown")) | |
| user = row.get("sender", row.get("revision", "Unknown")) | |
| models.append({"name": name, "user": user}) | |
| new_queue_data[status] = models | |
| # Update global variable | |
| GLOBAL_QUEUE_DATA = new_queue_data | |
| except Exception as e: | |
| logging.error(f"❌ Error updating queue cache: {e}") | |
| def update_details_cache(): | |
| """Builds details-parquet index and updates the global variable.""" | |
| global GLOBAL_DETAILS_INDEX, GLOBAL_BENCHMARK_DETAILS_CACHE | |
| try: | |
| GLOBAL_DETAILS_INDEX = build_details_index() | |
| GLOBAL_BENCHMARK_DETAILS_CACHE.clear() | |
| except Exception as e: | |
| logging.error(f"❌ Error updating details cache: {e}") | |
| def _slice_details_payload( | |
| payload: Dict[str, Any], | |
| cursor: int, | |
| page_size: int, | |
| ) -> Dict[str, Any]: | |
| rows = payload.get("rows", []) if isinstance(payload, dict) else [] | |
| total = len(rows) | |
| start = max(0, int(cursor)) | |
| size = max(1, int(page_size)) | |
| end = min(start + size, total) | |
| page_rows = rows[start:end] | |
| has_more = end < total | |
| next_cursor = end if has_more else None | |
| return { | |
| "benchmark": payload.get("benchmark"), | |
| "subtasks": payload.get("subtasks", []), | |
| "rows": page_rows, | |
| "cursor": start, | |
| "next_cursor": next_cursor, | |
| "has_more": has_more, | |
| "total_rows": total, | |
| "page_size": size, | |
| } | |
| # --- Lifespan & Scheduler --- | |
| async def lifespan(app: FastAPI): | |
| # 1. Trigger downloads and cache updates immediately on startup | |
| download_dataset_snapshots() | |
| update_leaderboard_cache() | |
| update_queue_cache() | |
| update_details_cache() | |
| # 2. Schedule periodic updates | |
| scheduler = BackgroundScheduler() | |
| # Dataset snapshots (every 30 mins) | |
| scheduler.add_job(download_dataset_snapshots, "interval", minutes=30) | |
| # Cache updates (every 10 mins) | |
| scheduler.add_job(update_leaderboard_cache, "interval", minutes=10) | |
| scheduler.add_job(update_queue_cache, "interval", minutes=10) | |
| scheduler.add_job(update_details_cache, "interval", minutes=10) | |
| scheduler.start() | |
| yield | |
| scheduler.shutdown() | |
| app = FastAPI(lifespan=lifespan) | |
| templates = Jinja2Templates(directory="frontend") | |
| # --- Routes --- | |
| async def read_root(request: Request): | |
| eval_columns = [t[2] for t in TASKS] | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "eval_columns": eval_columns, | |
| "benchmark_metadata": BENCHMARK_METADATA, | |
| }) | |
| async def get_leaderboard_data(): | |
| """Returns the cached leaderboard data.""" | |
| return JSONResponse(content={"data": GLOBAL_LEADERBOARD_DATA}) | |
| async def get_queue_status(): | |
| """Returns the cached queue status.""" | |
| return JSONResponse(content=GLOBAL_QUEUE_DATA) | |
| async def get_model_likes( | |
| model_name: str = Form(...), | |
| revision: str = Form(...) | |
| ): | |
| """Fetches the number of likes for a model from Hugging Face Hub.""" | |
| try: | |
| info = API.model_info(repo_id=model_name, revision=revision, token=hf_api_token) | |
| likes = info.likes | |
| downloads = info.downloads | |
| return JSONResponse(content={"likes": likes, "downloads": downloads}) | |
| except Exception as e: | |
| logging.error(f"Error fetching likes for {model_name}: {e}") | |
| return JSONResponse(content={"error": str(e)}, status_code=400) | |
| async def get_benchmark_details( | |
| model_name: str = Form(...), | |
| benchmark: str = Form(...), | |
| cursor: Optional[int] = Form(default=0), | |
| page_size: Optional[int] = Form(default=None), | |
| ): | |
| """Fetches per-question details for a specific model benchmark score.""" | |
| try: | |
| if not GLOBAL_DETAILS_INDEX: | |
| update_details_cache() | |
| cache_key = (str(model_name or "").strip(), str(benchmark or "").strip()) | |
| payload = GLOBAL_BENCHMARK_DETAILS_CACHE.get(cache_key) | |
| if payload is None: | |
| payload = load_benchmark_details( | |
| model_name=model_name, | |
| benchmark_display=benchmark, | |
| details_index=GLOBAL_DETAILS_INDEX, | |
| max_rows=0, | |
| ) | |
| GLOBAL_BENCHMARK_DETAILS_CACHE[cache_key] = payload | |
| while len(GLOBAL_BENCHMARK_DETAILS_CACHE) > BENCHMARK_DETAILS_CACHE_MAX_ITEMS: | |
| GLOBAL_BENCHMARK_DETAILS_CACHE.popitem(last=False) | |
| else: | |
| GLOBAL_BENCHMARK_DETAILS_CACHE.move_to_end(cache_key) | |
| effective_page_size = page_size if isinstance(page_size, int) and page_size > 0 else BENCHMARK_DETAILS_PAGE_SIZE | |
| response_payload = _slice_details_payload( | |
| payload=payload, | |
| cursor=cursor if isinstance(cursor, int) and cursor >= 0 else 0, | |
| page_size=effective_page_size, | |
| ) | |
| return JSONResponse(content=response_payload) | |
| except Exception as e: | |
| logging.error(f"Error fetching benchmark details for {model_name}/{benchmark}: {e}") | |
| return JSONResponse(content={"error": str(e)}, status_code=400) | |
| async def handle_submission( | |
| model_name: str = Form(...), | |
| model_type: str = Form(...), | |
| # precision: str = Form(...), | |
| # revision: str = Form(...), | |
| # weight_type: str = Form(...), | |
| # base_model: str = Form(None) | |
| ): | |
| """Handles form submission.""" | |
| try: | |
| result_msg = submit_model( | |
| model_name=model_name, | |
| # base_model=base_model, | |
| # revision=revision, | |
| # precision=precision, | |
| # weight_type=weight_type, | |
| model_type=model_type | |
| ) | |
| if result_msg.startswith("**Success**"): | |
| # Optional: Trigger an immediate cache update on success so the user sees it in the queue | |
| update_queue_cache() | |
| return JSONResponse(content={"status": "success", "message": result_msg}, status_code=200) | |
| else: | |
| return JSONResponse(content={"status": "error", "message": result_msg}, status_code=400) | |
| except Exception as e: | |
| return JSONResponse(content={"status": "error", "message": str(e)}, status_code=400) | |
| # Dynamic route for pages | |
| async def read_page(request: Request, page_name: str): | |
| if page_name not in ACCEPTED_PAGES: | |
| raise HTTPException(status_code=404, detail="Page not found") | |
| return templates.TemplateResponse(page_name, {"request": request}) | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True, access_log=False) | |