Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """In-process hourly KPI rollup, owned by the backend Space lifespan. | |
| Replaces an external GitHub Actions cron so the rollup lives next to the data | |
| and reuses the Space's existing HF token — no production secrets on the | |
| public source repo. See ``scripts/build_kpis.py`` for the data-flow diagram | |
| and metric definitions. | |
| Behaviour:: | |
| lifespan startup → start APScheduler with cron("5 * * * *", UTC) | |
| → fire a best-effort 6-hour backfill (fire-and-forget) | |
| each :05 → run ``build_kpis.run_for_hour`` for the just-completed hour | |
| lifespan shutdown → scheduler.shutdown(wait=False) | |
| Environment:: | |
| HF_KPI_WRITE_TOKEN | HF_SESSION_UPLOAD_TOKEN | HF_TOKEN | HF_ADMIN_TOKEN | |
| First one found is used. Least-privilege first. | |
| KPI_SOURCE_REPO default smolagents/ml-intern-sessions | |
| KPI_TARGET_REPO default smolagents/ml-intern-kpis | |
| ML_INTERN_KPIS_DISABLED if truthy, the scheduler is not started | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import importlib.util | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| _PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| # Hold strong refs to backfill tasks so asyncio doesn't GC them mid-run. | |
| _background_tasks: set[asyncio.Task] = set() | |
| _scheduler = None # AsyncIOScheduler instance (lazy import) | |
| def _resolve_token() -> Optional[str]: | |
| """Pick the first available HF token. Least-privilege first.""" | |
| for var in ( | |
| "HF_KPI_WRITE_TOKEN", | |
| "HF_SESSION_UPLOAD_TOKEN", | |
| "HF_TOKEN", | |
| "HF_ADMIN_TOKEN", | |
| ): | |
| val = os.environ.get(var) | |
| if val: | |
| return val | |
| return None | |
| def _load_build_kpis(): | |
| """Import ``scripts/build_kpis.py`` without putting ``scripts/`` on sys.path.""" | |
| spec = importlib.util.spec_from_file_location( | |
| "build_kpis", _PROJECT_ROOT / "scripts" / "build_kpis.py", | |
| ) | |
| mod = importlib.util.module_from_spec(spec) | |
| assert spec.loader is not None | |
| spec.loader.exec_module(mod) | |
| return mod | |
| async def _run_hour(hour_dt: datetime) -> None: | |
| """Run one hourly rollup off the event loop. Best-effort, never raises.""" | |
| token = _resolve_token() | |
| if not token: | |
| logger.warning("kpis_scheduler: no HF token available, skipping %s", hour_dt) | |
| return | |
| try: | |
| mod = _load_build_kpis() | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| source = os.environ.get("KPI_SOURCE_REPO", "smolagents/ml-intern-sessions") | |
| target = os.environ.get("KPI_TARGET_REPO", "smolagents/ml-intern-kpis") | |
| await asyncio.to_thread(mod.run_for_hour, api, source, target, hour_dt, token) | |
| except Exception as e: | |
| logger.warning("kpis_scheduler: rollup for %s failed: %s", hour_dt, e) | |
| async def run_last_completed_hour() -> None: | |
| """The scheduled-at-:05 job. Rolls up the previous whole hour.""" | |
| now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) | |
| await _run_hour(now - timedelta(hours=1)) | |
| async def backfill(hours: int = 6) -> None: | |
| """Catch-up pass for hours the Space was down. Idempotent (overwrites).""" | |
| now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) | |
| for i in range(1, hours + 1): | |
| await _run_hour(now - timedelta(hours=i)) | |
| def start(backfill_hours: int = 6) -> None: | |
| """Called from FastAPI lifespan startup.""" | |
| global _scheduler | |
| if os.environ.get("ML_INTERN_KPIS_DISABLED"): | |
| logger.info("kpis_scheduler: disabled via ML_INTERN_KPIS_DISABLED") | |
| return | |
| if _scheduler is not None: | |
| return | |
| try: | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.cron import CronTrigger | |
| except ImportError: | |
| logger.warning("kpis_scheduler: apscheduler not installed, skipping") | |
| return | |
| _scheduler = AsyncIOScheduler(timezone="UTC") | |
| _scheduler.add_job( | |
| run_last_completed_hour, | |
| CronTrigger(minute=5), | |
| id="kpis_hourly", | |
| misfire_grace_time=600, # tolerate a 10-min misfire window | |
| coalesce=True, # collapse multiple missed fires into one | |
| max_instances=1, | |
| replace_existing=True, | |
| ) | |
| _scheduler.start() | |
| logger.info("kpis_scheduler: started (cron '5 * * * *' UTC)") | |
| # Non-blocking backfill. Hold a strong ref until done so asyncio doesn't | |
| # GC the task before it finishes. | |
| try: | |
| task = asyncio.get_running_loop().create_task(backfill(backfill_hours)) | |
| _background_tasks.add(task) | |
| task.add_done_callback(_background_tasks.discard) | |
| except RuntimeError: | |
| # Not in an event loop (tests); skip backfill. | |
| pass | |
| async def shutdown() -> None: | |
| """Called from FastAPI lifespan shutdown.""" | |
| global _scheduler | |
| if _scheduler is None: | |
| return | |
| _scheduler.shutdown(wait=False) | |
| _scheduler = None | |
| logger.info("kpis_scheduler: stopped") | |