"""HF I/O for the EgoMemReason leaderboard. Two repos: - PUBLIC_DATASET Ted412/EgoMemReason-Leaderboard (one JSON per submission) - PRIVATE_DATASET Ted412/EgoMemReason-Private (annotations_private.json) Boot path: ensure_private_annotations() downloads the private annotations file on app start so evaluator.score_submission() can read it from cwd. """ import functools import io import json import os import time import uuid from datetime import datetime, timedelta, timezone from huggingface_hub import HfApi, snapshot_download # Hard-coded for this challenge. Override via env vars in dev. PUBLIC_DATASET = os.environ.get("EGOMEM_PUBLIC_DATASET", "Ted412/EgoMemReason-Leaderboard") PRIVATE_DATASET = os.environ.get("EGOMEM_PRIVATE_DATASET", "Ted412/EgoMemReason-Private") ANNOTATIONS_FILENAME = "annotations_private.json" HF_TOKEN = os.environ.get("HF_TOKEN") # write scope on PUBLIC_DATASET; read scope on PRIVATE_DATASET def ensure_private_annotations(dest_path=ANNOTATIONS_FILENAME): """Download annotations_private.json from the private dataset on app boot. Only called once per Space restart. If the file is already present (local dev case where you've copied it manually), do nothing. """ if os.path.exists(dest_path): return dest_path if not HF_TOKEN: raise RuntimeError( "HF_TOKEN env var not set; cannot pull private annotations from " f"{PRIVATE_DATASET}. Either set HF_TOKEN or place {dest_path} in cwd." ) local_dir = snapshot_download( repo_id=PRIVATE_DATASET, repo_type="dataset", token=HF_TOKEN, allow_patterns=[ANNOTATIONS_FILENAME], ) src = os.path.join(local_dir, ANNOTATIONS_FILENAME) if not os.path.exists(src): raise RuntimeError( f"{ANNOTATIONS_FILENAME} not found in {PRIVATE_DATASET}. " "Upload it via the HF Files UI of the private dataset repo." ) # Symlink rather than copy — snapshot_download already cached it. if not os.path.exists(dest_path): os.symlink(src, dest_path) return dest_path def _now_iso(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") @functools.lru_cache(maxsize=1) def _cached_submissions(cache_bucket): """Pulls all submission JSON files. Bucket is int(time/60) so cache rolls every minute.""" del cache_bucket # only here to invalidate the cache try: local_dir = snapshot_download( repo_id=PUBLIC_DATASET, repo_type="dataset", token=HF_TOKEN, # not strictly required for public read but avoids rate-limiting allow_patterns=["submissions/*.json"], ) except Exception: return [] folder = os.path.join(local_dir, "submissions") if not os.path.isdir(folder): return [] out = [] for fn in os.listdir(folder): if not fn.endswith(".json"): continue try: with open(os.path.join(folder, fn)) as f: out.append(json.load(f)) except Exception: continue return out def list_submissions(): return _cached_submissions(int(time.time() / 60)) def _invalidate_cache(): _cached_submissions.cache_clear() def count_recent(hf_user_id, hours=24): cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) n = 0 for sub in list_submissions(): if sub.get("hf_user_id") != hf_user_id: continue ts = sub.get("submitted_at_utc", "") try: t = datetime.fromisoformat(ts.rstrip("Z")).replace(tzinfo=timezone.utc) except ValueError: continue if t >= cutoff: n += 1 return n def _upload_record(record): payload = json.dumps(record, indent=2).encode("utf-8") HfApi().upload_file( path_or_fileobj=io.BytesIO(payload), path_in_repo=f"submissions/{record['submission_id']}.json", repo_id=PUBLIC_DATASET, repo_type="dataset", token=HF_TOKEN, commit_message=f"submission {record['submission_id'][:8]} from {record['hf_user_id']}", ) def append_submission(*, hf_user_id, team_name, method_name, model_size, uses_external_data, uses_video_frames, method_description, project_url, publication_url, metrics): if not HF_TOKEN: raise RuntimeError("HF_TOKEN not set; cannot persist submission.") sid = str(uuid.uuid4()) record = { "submission_id": sid, "submitted_at_utc": _now_iso(), "hf_user_id": hf_user_id, "team_name": team_name, "method_name": method_name, "model_size": model_size or "", "uses_external_data": bool(uses_external_data), "uses_video_frames": uses_video_frames, "method_description": method_description or "", "project_url": project_url or "", "publication_url": publication_url or "", "is_selected": False, "metrics": metrics, } _upload_record(record) _invalidate_cache() return sid def set_selected(submission_id, requesting_user): """Mark `submission_id` as the requesting_user's selected entry. Enforces one-selected-per-user. Raises PermissionError if the submission does not belong to requesting_user. """ target = None for sub in list_submissions(): if sub["submission_id"] == submission_id: target = sub break if target is None: raise ValueError(f"submission_id not found: {submission_id}") if target["hf_user_id"] != requesting_user: raise PermissionError("You can only modify your own submissions.") # Un-select any other submission this user previously selected. for sub in list_submissions(): if (sub["hf_user_id"] == requesting_user and sub["is_selected"] and sub["submission_id"] != submission_id): sub["is_selected"] = False _upload_record(sub) target["is_selected"] = True _upload_record(target) _invalidate_cache()