Spaces:
Running
Running
| """HF I/O for the EgoMemReason leaderboard. | |
| Two repos: | |
| - PUBLIC_DATASET Ted412/EgoMemReason-Leaderboard (one JSON per submission) | |
| - PRIVATE_DATASET Ted412/EgoMemReason-Private (annotations_private.json) | |
| Boot path: ensure_private_annotations() downloads the private annotations file | |
| on app start so evaluator.score_submission() can read it from cwd. | |
| """ | |
| import functools | |
| import io | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from huggingface_hub import HfApi, snapshot_download | |
| # Hard-coded for this challenge. Override via env vars in dev. | |
| PUBLIC_DATASET = os.environ.get("EGOMEM_PUBLIC_DATASET", "Ted412/EgoMemReason-Leaderboard") | |
| PRIVATE_DATASET = os.environ.get("EGOMEM_PRIVATE_DATASET", "Ted412/EgoMemReason-Private") | |
| ANNOTATIONS_FILENAME = "annotations_private.json" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") # write scope on PUBLIC_DATASET; read scope on PRIVATE_DATASET | |
| def ensure_private_annotations(dest_path=ANNOTATIONS_FILENAME): | |
| """Download annotations_private.json from the private dataset on app boot. | |
| Only called once per Space restart. If the file is already present (local | |
| dev case where you've copied it manually), do nothing. | |
| """ | |
| if os.path.exists(dest_path): | |
| return dest_path | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| "HF_TOKEN env var not set; cannot pull private annotations from " | |
| f"{PRIVATE_DATASET}. Either set HF_TOKEN or place {dest_path} in cwd." | |
| ) | |
| local_dir = snapshot_download( | |
| repo_id=PRIVATE_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| allow_patterns=[ANNOTATIONS_FILENAME], | |
| ) | |
| src = os.path.join(local_dir, ANNOTATIONS_FILENAME) | |
| if not os.path.exists(src): | |
| raise RuntimeError( | |
| f"{ANNOTATIONS_FILENAME} not found in {PRIVATE_DATASET}. " | |
| "Upload it via the HF Files UI of the private dataset repo." | |
| ) | |
| # Symlink rather than copy — snapshot_download already cached it. | |
| if not os.path.exists(dest_path): | |
| os.symlink(src, dest_path) | |
| return dest_path | |
| def _now_iso(): | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| def _cached_submissions(cache_bucket): | |
| """Pulls all submission JSON files. Bucket is int(time/60) so cache rolls every minute.""" | |
| del cache_bucket # only here to invalidate the cache | |
| try: | |
| local_dir = snapshot_download( | |
| repo_id=PUBLIC_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN, # not strictly required for public read but avoids rate-limiting | |
| allow_patterns=["submissions/*.json"], | |
| ) | |
| except Exception: | |
| return [] | |
| folder = os.path.join(local_dir, "submissions") | |
| if not os.path.isdir(folder): | |
| return [] | |
| out = [] | |
| for fn in os.listdir(folder): | |
| if not fn.endswith(".json"): | |
| continue | |
| try: | |
| with open(os.path.join(folder, fn)) as f: | |
| out.append(json.load(f)) | |
| except Exception: | |
| continue | |
| return out | |
| def list_submissions(): | |
| return _cached_submissions(int(time.time() / 60)) | |
| def _invalidate_cache(): | |
| _cached_submissions.cache_clear() | |
| def count_recent(hf_user_id, hours=24): | |
| cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) | |
| n = 0 | |
| for sub in list_submissions(): | |
| if sub.get("hf_user_id") != hf_user_id: | |
| continue | |
| ts = sub.get("submitted_at_utc", "") | |
| try: | |
| t = datetime.fromisoformat(ts.rstrip("Z")).replace(tzinfo=timezone.utc) | |
| except ValueError: | |
| continue | |
| if t >= cutoff: | |
| n += 1 | |
| return n | |
| def _upload_record(record): | |
| payload = json.dumps(record, indent=2).encode("utf-8") | |
| HfApi().upload_file( | |
| path_or_fileobj=io.BytesIO(payload), | |
| path_in_repo=f"submissions/{record['submission_id']}.json", | |
| repo_id=PUBLIC_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"submission {record['submission_id'][:8]} from {record['hf_user_id']}", | |
| ) | |
| def append_submission(*, hf_user_id, team_name, method_name, model_size, | |
| uses_external_data, uses_video_frames, method_description, | |
| project_url, publication_url, metrics): | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN not set; cannot persist submission.") | |
| sid = str(uuid.uuid4()) | |
| record = { | |
| "submission_id": sid, | |
| "submitted_at_utc": _now_iso(), | |
| "hf_user_id": hf_user_id, | |
| "team_name": team_name, | |
| "method_name": method_name, | |
| "model_size": model_size or "", | |
| "uses_external_data": bool(uses_external_data), | |
| "uses_video_frames": uses_video_frames, | |
| "method_description": method_description or "", | |
| "project_url": project_url or "", | |
| "publication_url": publication_url or "", | |
| "is_selected": False, | |
| "metrics": metrics, | |
| } | |
| _upload_record(record) | |
| _invalidate_cache() | |
| return sid | |
| def set_selected(submission_id, requesting_user): | |
| """Mark `submission_id` as the requesting_user's selected entry. | |
| Enforces one-selected-per-user. Raises PermissionError if the submission | |
| does not belong to requesting_user. | |
| """ | |
| target = None | |
| for sub in list_submissions(): | |
| if sub["submission_id"] == submission_id: | |
| target = sub | |
| break | |
| if target is None: | |
| raise ValueError(f"submission_id not found: {submission_id}") | |
| if target["hf_user_id"] != requesting_user: | |
| raise PermissionError("You can only modify your own submissions.") | |
| # Un-select any other submission this user previously selected. | |
| for sub in list_submissions(): | |
| if (sub["hf_user_id"] == requesting_user | |
| and sub["is_selected"] | |
| and sub["submission_id"] != submission_id): | |
| sub["is_selected"] = False | |
| _upload_record(sub) | |
| target["is_selected"] = True | |
| _upload_record(target) | |
| _invalidate_cache() | |