File size: 6,121 Bytes
1bf5b23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""HF I/O for the EgoMemReason leaderboard.

Two repos:
  - PUBLIC_DATASET  Ted412/EgoMemReason-Leaderboard  (one JSON per submission)
  - PRIVATE_DATASET Ted412/EgoMemReason-Private     (annotations_private.json)

Boot path: ensure_private_annotations() downloads the private annotations file
on app start so evaluator.score_submission() can read it from cwd.
"""

import functools
import io
import json
import os
import time
import uuid
from datetime import datetime, timedelta, timezone

from huggingface_hub import HfApi, snapshot_download

# Hard-coded for this challenge. Override via env vars in dev.
PUBLIC_DATASET = os.environ.get("EGOMEM_PUBLIC_DATASET", "Ted412/EgoMemReason-Leaderboard")
PRIVATE_DATASET = os.environ.get("EGOMEM_PRIVATE_DATASET", "Ted412/EgoMemReason-Private")
ANNOTATIONS_FILENAME = "annotations_private.json"

HF_TOKEN = os.environ.get("HF_TOKEN")  # write scope on PUBLIC_DATASET; read scope on PRIVATE_DATASET


def ensure_private_annotations(dest_path=ANNOTATIONS_FILENAME):
    """Download annotations_private.json from the private dataset on app boot.

    Only called once per Space restart. If the file is already present (local
    dev case where you've copied it manually), do nothing.
    """
    if os.path.exists(dest_path):
        return dest_path
    if not HF_TOKEN:
        raise RuntimeError(
            "HF_TOKEN env var not set; cannot pull private annotations from "
            f"{PRIVATE_DATASET}. Either set HF_TOKEN or place {dest_path} in cwd."
        )
    local_dir = snapshot_download(
        repo_id=PRIVATE_DATASET,
        repo_type="dataset",
        token=HF_TOKEN,
        allow_patterns=[ANNOTATIONS_FILENAME],
    )
    src = os.path.join(local_dir, ANNOTATIONS_FILENAME)
    if not os.path.exists(src):
        raise RuntimeError(
            f"{ANNOTATIONS_FILENAME} not found in {PRIVATE_DATASET}. "
            "Upload it via the HF Files UI of the private dataset repo."
        )
    # Symlink rather than copy — snapshot_download already cached it.
    if not os.path.exists(dest_path):
        os.symlink(src, dest_path)
    return dest_path


def _now_iso():
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


@functools.lru_cache(maxsize=1)
def _cached_submissions(cache_bucket):
    """Pulls all submission JSON files. Bucket is int(time/60) so cache rolls every minute."""
    del cache_bucket  # only here to invalidate the cache
    try:
        local_dir = snapshot_download(
            repo_id=PUBLIC_DATASET,
            repo_type="dataset",
            token=HF_TOKEN,  # not strictly required for public read but avoids rate-limiting
            allow_patterns=["submissions/*.json"],
        )
    except Exception:
        return []
    folder = os.path.join(local_dir, "submissions")
    if not os.path.isdir(folder):
        return []
    out = []
    for fn in os.listdir(folder):
        if not fn.endswith(".json"):
            continue
        try:
            with open(os.path.join(folder, fn)) as f:
                out.append(json.load(f))
        except Exception:
            continue
    return out


def list_submissions():
    return _cached_submissions(int(time.time() / 60))


def _invalidate_cache():
    _cached_submissions.cache_clear()


def count_recent(hf_user_id, hours=24):
    cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
    n = 0
    for sub in list_submissions():
        if sub.get("hf_user_id") != hf_user_id:
            continue
        ts = sub.get("submitted_at_utc", "")
        try:
            t = datetime.fromisoformat(ts.rstrip("Z")).replace(tzinfo=timezone.utc)
        except ValueError:
            continue
        if t >= cutoff:
            n += 1
    return n


def _upload_record(record):
    payload = json.dumps(record, indent=2).encode("utf-8")
    HfApi().upload_file(
        path_or_fileobj=io.BytesIO(payload),
        path_in_repo=f"submissions/{record['submission_id']}.json",
        repo_id=PUBLIC_DATASET,
        repo_type="dataset",
        token=HF_TOKEN,
        commit_message=f"submission {record['submission_id'][:8]} from {record['hf_user_id']}",
    )


def append_submission(*, hf_user_id, team_name, method_name, model_size,
                      uses_external_data, uses_video_frames, method_description,
                      project_url, publication_url, metrics):
    if not HF_TOKEN:
        raise RuntimeError("HF_TOKEN not set; cannot persist submission.")
    sid = str(uuid.uuid4())
    record = {
        "submission_id": sid,
        "submitted_at_utc": _now_iso(),
        "hf_user_id": hf_user_id,
        "team_name": team_name,
        "method_name": method_name,
        "model_size": model_size or "",
        "uses_external_data": bool(uses_external_data),
        "uses_video_frames": uses_video_frames,
        "method_description": method_description or "",
        "project_url": project_url or "",
        "publication_url": publication_url or "",
        "is_selected": False,
        "metrics": metrics,
    }
    _upload_record(record)
    _invalidate_cache()
    return sid


def set_selected(submission_id, requesting_user):
    """Mark `submission_id` as the requesting_user's selected entry.

    Enforces one-selected-per-user. Raises PermissionError if the submission
    does not belong to requesting_user.
    """
    target = None
    for sub in list_submissions():
        if sub["submission_id"] == submission_id:
            target = sub
            break
    if target is None:
        raise ValueError(f"submission_id not found: {submission_id}")
    if target["hf_user_id"] != requesting_user:
        raise PermissionError("You can only modify your own submissions.")

    # Un-select any other submission this user previously selected.
    for sub in list_submissions():
        if (sub["hf_user_id"] == requesting_user
                and sub["is_selected"]
                and sub["submission_id"] != submission_id):
            sub["is_selected"] = False
            _upload_record(sub)

    target["is_selected"] = True
    _upload_record(target)
    _invalidate_cache()