Spaces:
Running
Running
File size: 6,121 Bytes
1bf5b23 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | """HF I/O for the EgoMemReason leaderboard.
Two repos:
- PUBLIC_DATASET Ted412/EgoMemReason-Leaderboard (one JSON per submission)
- PRIVATE_DATASET Ted412/EgoMemReason-Private (annotations_private.json)
Boot path: ensure_private_annotations() downloads the private annotations file
on app start so evaluator.score_submission() can read it from cwd.
"""
import functools
import io
import json
import os
import time
import uuid
from datetime import datetime, timedelta, timezone
from huggingface_hub import HfApi, snapshot_download
# Hard-coded for this challenge. Override via env vars in dev.
PUBLIC_DATASET = os.environ.get("EGOMEM_PUBLIC_DATASET", "Ted412/EgoMemReason-Leaderboard")
PRIVATE_DATASET = os.environ.get("EGOMEM_PRIVATE_DATASET", "Ted412/EgoMemReason-Private")
ANNOTATIONS_FILENAME = "annotations_private.json"
HF_TOKEN = os.environ.get("HF_TOKEN") # write scope on PUBLIC_DATASET; read scope on PRIVATE_DATASET
def ensure_private_annotations(dest_path=ANNOTATIONS_FILENAME):
"""Download annotations_private.json from the private dataset on app boot.
Only called once per Space restart. If the file is already present (local
dev case where you've copied it manually), do nothing.
"""
if os.path.exists(dest_path):
return dest_path
if not HF_TOKEN:
raise RuntimeError(
"HF_TOKEN env var not set; cannot pull private annotations from "
f"{PRIVATE_DATASET}. Either set HF_TOKEN or place {dest_path} in cwd."
)
local_dir = snapshot_download(
repo_id=PRIVATE_DATASET,
repo_type="dataset",
token=HF_TOKEN,
allow_patterns=[ANNOTATIONS_FILENAME],
)
src = os.path.join(local_dir, ANNOTATIONS_FILENAME)
if not os.path.exists(src):
raise RuntimeError(
f"{ANNOTATIONS_FILENAME} not found in {PRIVATE_DATASET}. "
"Upload it via the HF Files UI of the private dataset repo."
)
# Symlink rather than copy — snapshot_download already cached it.
if not os.path.exists(dest_path):
os.symlink(src, dest_path)
return dest_path
def _now_iso():
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
@functools.lru_cache(maxsize=1)
def _cached_submissions(cache_bucket):
"""Pulls all submission JSON files. Bucket is int(time/60) so cache rolls every minute."""
del cache_bucket # only here to invalidate the cache
try:
local_dir = snapshot_download(
repo_id=PUBLIC_DATASET,
repo_type="dataset",
token=HF_TOKEN, # not strictly required for public read but avoids rate-limiting
allow_patterns=["submissions/*.json"],
)
except Exception:
return []
folder = os.path.join(local_dir, "submissions")
if not os.path.isdir(folder):
return []
out = []
for fn in os.listdir(folder):
if not fn.endswith(".json"):
continue
try:
with open(os.path.join(folder, fn)) as f:
out.append(json.load(f))
except Exception:
continue
return out
def list_submissions():
return _cached_submissions(int(time.time() / 60))
def _invalidate_cache():
_cached_submissions.cache_clear()
def count_recent(hf_user_id, hours=24):
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
n = 0
for sub in list_submissions():
if sub.get("hf_user_id") != hf_user_id:
continue
ts = sub.get("submitted_at_utc", "")
try:
t = datetime.fromisoformat(ts.rstrip("Z")).replace(tzinfo=timezone.utc)
except ValueError:
continue
if t >= cutoff:
n += 1
return n
def _upload_record(record):
payload = json.dumps(record, indent=2).encode("utf-8")
HfApi().upload_file(
path_or_fileobj=io.BytesIO(payload),
path_in_repo=f"submissions/{record['submission_id']}.json",
repo_id=PUBLIC_DATASET,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"submission {record['submission_id'][:8]} from {record['hf_user_id']}",
)
def append_submission(*, hf_user_id, team_name, method_name, model_size,
uses_external_data, uses_video_frames, method_description,
project_url, publication_url, metrics):
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN not set; cannot persist submission.")
sid = str(uuid.uuid4())
record = {
"submission_id": sid,
"submitted_at_utc": _now_iso(),
"hf_user_id": hf_user_id,
"team_name": team_name,
"method_name": method_name,
"model_size": model_size or "",
"uses_external_data": bool(uses_external_data),
"uses_video_frames": uses_video_frames,
"method_description": method_description or "",
"project_url": project_url or "",
"publication_url": publication_url or "",
"is_selected": False,
"metrics": metrics,
}
_upload_record(record)
_invalidate_cache()
return sid
def set_selected(submission_id, requesting_user):
"""Mark `submission_id` as the requesting_user's selected entry.
Enforces one-selected-per-user. Raises PermissionError if the submission
does not belong to requesting_user.
"""
target = None
for sub in list_submissions():
if sub["submission_id"] == submission_id:
target = sub
break
if target is None:
raise ValueError(f"submission_id not found: {submission_id}")
if target["hf_user_id"] != requesting_user:
raise PermissionError("You can only modify your own submissions.")
# Un-select any other submission this user previously selected.
for sub in list_submissions():
if (sub["hf_user_id"] == requesting_user
and sub["is_selected"]
and sub["submission_id"] != submission_id):
sub["is_selected"] = False
_upload_record(sub)
target["is_selected"] = True
_upload_record(target)
_invalidate_cache()
|