space-deploy
switch to batch2 (sports, 100 annotations from 5 sub-categories)
4ead231
"""Review-only FastAPI app for OmniSTG annotation feedback.
Design:
- One public URL, anyone can open it. We auto-mint an anonymous reviewer id
on first visit and store it in a cookie. Returning visitors keep the same
id and resume.
- Every annotation is shown read-only with: video link + 3 reference images +
bbox-overlaid keyframes + the 3 questions (TG/SG/QA) and the existing
answers.
- Reviewer fills 3 score fields (1=red/delete, 2=yellow/needs_fix, 3=green/keep)
+ 3 ref_type tags + 1 qa_type + optional comment.
- On submit we serialize the answer into a single JSON file and push it to a
HF Dataset (REVIEW_DATASET_REPO). Each submission is its own file so we
never have to lock or merge.
Env vars (set in HF Space):
HF_TOKEN write token
REVIEW_DATASET_REPO e.g. VCLab-PolyU/omnistg-reviews
ANNO_PER_USER_TARGET default 0 (no per-user cap)
ANNO_REVIEWS_TARGET default 3 (each anno needs at most N reviews)
"""
from __future__ import annotations
import json
import os
import random
import secrets
import sys
import time
from io import BytesIO
from pathlib import Path
from threading import Lock
# ensure sibling modules are importable regardless of how uvicorn is invoked
_SCRIPTS_DIR = Path(__file__).resolve().parent
if str(_SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(_SCRIPTS_DIR))
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
BASE = Path(__file__).resolve().parent.parent
STATIC = BASE / "static"
TEMPLATES = BASE / "templates"
DATA = BASE / "data"
REVIEW_DATASET_REPO = os.environ.get("REVIEW_DATASET_REPO", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
TARGET_REVIEWS_PER_ANNO = int(os.environ.get("ANNO_REVIEWS_TARGET", "3"))
app = FastAPI(title="OmniSTG Review")
app.mount("/static", StaticFiles(directory=str(STATIC)), name="static")
templates = Jinja2Templates(directory=str(TEMPLATES))
# ---------- in-memory state ----------
#
# We keep two pieces of state in memory:
# ANNOTATIONS list of all annotations loaded from data/annotations.jsonl
# REVIEW_COUNT { anno_id: int } how many submissions we've forwarded so far
# (best-effort; rebuilt from HF Dataset on
# startup so it survives restarts)
# USER_DONE { user_id: set[anno_id] } what each user has already reviewed
# (rebuilt from HF Dataset on startup)
ANNOTATIONS: list[dict] = []
REVIEW_COUNT: dict[str, int] = {}
USER_DONE: dict[str, set[str]] = {}
_state_lock = Lock()
def load_annotations() -> None:
"""Read data/annotations.jsonl into memory."""
global ANNOTATIONS
f = DATA / "annotations.jsonl"
if not f.exists():
print(f"[startup] {f} missing — no annotations to serve")
ANNOTATIONS = []
return
items = []
with f.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
items.append(json.loads(line))
except Exception as e:
print(f"[startup] bad jsonl line: {e}")
ANNOTATIONS = items
print(f"[startup] loaded {len(items)} annotations")
def restore_review_state() -> None:
"""Pull existing reviews from HF Dataset to rebuild REVIEW_COUNT and USER_DONE.
Each review file lives at reviews/<reviewer_id>/<anno_id>__<ts>.json in the
dataset repo. We list them and update counters.
"""
if not (HF_TOKEN and REVIEW_DATASET_REPO):
print("[startup] HF_TOKEN/REVIEW_DATASET_REPO not set — skipping state restore")
return
try:
from huggingface_hub import HfApi, hf_hub_download
api = HfApi(token=HF_TOKEN)
try:
files = api.list_repo_files(repo_id=REVIEW_DATASET_REPO, repo_type="dataset")
except Exception as e:
print(f"[startup] can't list review repo (probably empty): {e}")
return
n = 0
for path in files:
# reviews/<reviewer_id>/<anno_id>__<ts>.json
parts = path.split("/")
if len(parts) != 3 or parts[0] != "reviews" or not parts[2].endswith(".json"):
continue
reviewer_id = parts[1]
stem = parts[2][:-5] # drop .json
anno_id = stem.rsplit("__", 1)[0]
REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1
USER_DONE.setdefault(reviewer_id, set()).add(anno_id)
n += 1
print(f"[startup] restored {n} prior reviews from {REVIEW_DATASET_REPO}")
except Exception as e:
print(f"[startup] restore_review_state failed: {e}")
@app.on_event("startup")
def _on_startup():
load_annotations()
restore_review_state()
# Make sure the dataset repo exists so the first push doesn't 404.
if HF_TOKEN and REVIEW_DATASET_REPO:
try:
from huggingface_hub import HfApi
HfApi(token=HF_TOKEN).create_repo(
repo_id=REVIEW_DATASET_REPO,
repo_type="dataset",
exist_ok=True,
private=True,
)
print(f"[startup] review dataset ready: {REVIEW_DATASET_REPO}")
except Exception as e:
print(f"[startup] create_repo failed: {e}")
# ---------- anonymous identity ----------
COOKIE = "rid" # reviewer id cookie
def _set_rid_cookie(resp, rid: str) -> None:
# SameSite=None+Secure so the cookie survives HF iframe wrappers.
resp.set_cookie(
COOKIE, rid,
max_age=180 * 86400,
samesite="none", secure=True, httponly=False, path="/",
)
def _ensure_rid(request: Request) -> tuple[str, RedirectResponse | None]:
"""Return (rid, redirect-or-None). If no cookie present, mint one and
return a redirect that re-loads the same URL with the cookie set."""
rid = request.cookies.get(COOKIE)
if rid:
return rid, None
rid = "anon_" + secrets.token_urlsafe(6)[:8]
# Redirect to the same path so the new cookie goes round-trip.
target = str(request.url)
resp = RedirectResponse(target, status_code=303)
_set_rid_cookie(resp, rid)
return rid, resp
# ---------- task picking ----------
def pick_next(rid: str) -> dict | None:
"""Pick the annotation with the FEWEST existing reviews that the user
hasn't seen yet. Random tiebreaker."""
with _state_lock:
done_by_me = USER_DONE.get(rid, set())
candidates = []
min_count = 10**9
for a in ANNOTATIONS:
if a["anno_id"] in done_by_me:
continue
c = REVIEW_COUNT.get(a["anno_id"], 0)
if c >= TARGET_REVIEWS_PER_ANNO:
continue
if c < min_count:
min_count = c
candidates = [a]
elif c == min_count:
candidates.append(a)
if not candidates:
return None
return random.choice(candidates)
def push_review(payload: dict) -> bool:
"""Push one review JSON to the HF Dataset.
Each submission gets its own file under reviews/<rid>/<anno>__<ts>.json so
concurrent writes never collide.
"""
if not (HF_TOKEN and REVIEW_DATASET_REPO):
# Locally we still want to persist. Drop into data/local_reviews/.
out = DATA / "local_reviews" / payload["reviewer_id"]
out.mkdir(parents=True, exist_ok=True)
ts = int(time.time() * 1000)
(out / f"{payload['anno_id']}__{ts}.json").write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return True
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
ts = int(time.time() * 1000)
path = f"reviews/{payload['reviewer_id']}/{payload['anno_id']}__{ts}.json"
body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
api.upload_file(
path_or_fileobj=BytesIO(body),
path_in_repo=path,
repo_id=REVIEW_DATASET_REPO,
repo_type="dataset",
commit_message=f"review {payload['anno_id']} by {payload['reviewer_id']}",
)
return True
except Exception as e:
print(f"[push_review] failed: {e}")
return False
# ---------- routes ----------
@app.get("/healthz")
def healthz():
return {"ok": True, "annos": len(ANNOTATIONS)}
@app.get("/", response_class=HTMLResponse)
def root(request: Request):
rid, redirect = _ensure_rid(request)
if redirect is not None:
return redirect
return RedirectResponse("/review", status_code=303)
@app.get("/review", response_class=HTMLResponse)
def review_page(request: Request):
rid, redirect = _ensure_rid(request)
if redirect is not None:
return redirect
anno = pick_next(rid)
done_by_me = len(USER_DONE.get(rid, set()))
total = len(ANNOTATIONS)
fully_done = sum(1 for a in ANNOTATIONS if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO)
return templates.TemplateResponse(
"review.html",
{
"request": request,
"rid": rid,
"anno": anno,
"done_by_me": done_by_me,
"total": total,
"fully_done": fully_done,
"target_per_anno": TARGET_REVIEWS_PER_ANNO,
},
)
@app.post("/review/submit")
def review_submit(
request: Request,
anno_id: str = Form(...),
tg_score: int = Form(...),
tg_ref_type: str = Form(""),
sg_score: int = Form(...),
sg_ref_type: str = Form(""),
qa_score: int = Form(...),
qa_ref_type: str = Form(""),
qa_type: str = Form(""),
comment: str = Form(""),
):
rid, redirect = _ensure_rid(request)
if redirect is not None:
return redirect
# Reject obviously bogus inputs
if not any(a["anno_id"] == anno_id for a in ANNOTATIONS):
raise HTTPException(400, "unknown anno_id")
for s in (tg_score, sg_score, qa_score):
if s not in (1, 2, 3):
raise HTTPException(400, "scores must be 1, 2, or 3")
# If any sub-score is 1 or 2, the reviewer must justify it.
lowest = min(tg_score, sg_score, qa_score)
if lowest <= 2 and len((comment or "").strip()) < 5:
raise HTTPException(
400,
"1 分或 2 分必须在备注里写明原因(至少 5 个字符)"
" — please include a comment explaining why you gave a 1/2 score",
)
payload = {
"reviewer_id": rid,
"anno_id": anno_id,
"ts": int(time.time()),
"tg": {"score": tg_score, "ref_type": tg_ref_type},
"sg": {"score": sg_score, "ref_type": sg_ref_type},
"qa": {"score": qa_score, "ref_type": qa_ref_type, "qa_type": qa_type},
"comment": comment,
}
ok = push_review(payload)
if not ok:
raise HTTPException(500, "failed to persist review; please retry")
with _state_lock:
USER_DONE.setdefault(rid, set()).add(anno_id)
REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1
return RedirectResponse("/review", status_code=303)
@app.get("/stats")
def stats(request: Request):
"""Lightweight JSON stats endpoint (public)."""
return {
"annos_total": len(ANNOTATIONS),
"annos_fully_done": sum(
1 for a in ANNOTATIONS
if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO
),
"reviews_total": sum(REVIEW_COUNT.values()),
"unique_reviewers": len(USER_DONE),
"target_per_anno": TARGET_REVIEWS_PER_ANNO,
}