"""Review-only FastAPI app for OmniSTG annotation feedback. Design: - One public URL, anyone can open it. We auto-mint an anonymous reviewer id on first visit and store it in a cookie. Returning visitors keep the same id and resume. - Every annotation is shown read-only with: video link + 3 reference images + bbox-overlaid keyframes + the 3 questions (TG/SG/QA) and the existing answers. - Reviewer fills 3 score fields (1=red/delete, 2=yellow/needs_fix, 3=green/keep) + 3 ref_type tags + 1 qa_type + optional comment. - On submit we serialize the answer into a single JSON file and push it to a HF Dataset (REVIEW_DATASET_REPO). Each submission is its own file so we never have to lock or merge. Env vars (set in HF Space): HF_TOKEN write token REVIEW_DATASET_REPO e.g. VCLab-PolyU/omnistg-reviews ANNO_PER_USER_TARGET default 0 (no per-user cap) ANNO_REVIEWS_TARGET default 3 (each anno needs at most N reviews) """ from __future__ import annotations import json import os import random import secrets import sys import time from io import BytesIO from pathlib import Path from threading import Lock # ensure sibling modules are importable regardless of how uvicorn is invoked _SCRIPTS_DIR = Path(__file__).resolve().parent if str(_SCRIPTS_DIR) not in sys.path: sys.path.insert(0, str(_SCRIPTS_DIR)) from fastapi import FastAPI, Form, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates BASE = Path(__file__).resolve().parent.parent STATIC = BASE / "static" TEMPLATES = BASE / "templates" DATA = BASE / "data" REVIEW_DATASET_REPO = os.environ.get("REVIEW_DATASET_REPO", "") HF_TOKEN = os.environ.get("HF_TOKEN", "") TARGET_REVIEWS_PER_ANNO = int(os.environ.get("ANNO_REVIEWS_TARGET", "3")) app = FastAPI(title="OmniSTG Review") app.mount("/static", StaticFiles(directory=str(STATIC)), name="static") templates = Jinja2Templates(directory=str(TEMPLATES)) # ---------- in-memory state ---------- # # We keep two pieces of state in memory: # ANNOTATIONS list of all annotations loaded from data/annotations.jsonl # REVIEW_COUNT { anno_id: int } how many submissions we've forwarded so far # (best-effort; rebuilt from HF Dataset on # startup so it survives restarts) # USER_DONE { user_id: set[anno_id] } what each user has already reviewed # (rebuilt from HF Dataset on startup) ANNOTATIONS: list[dict] = [] REVIEW_COUNT: dict[str, int] = {} USER_DONE: dict[str, set[str]] = {} _state_lock = Lock() def load_annotations() -> None: """Read data/annotations.jsonl into memory.""" global ANNOTATIONS f = DATA / "annotations.jsonl" if not f.exists(): print(f"[startup] {f} missing — no annotations to serve") ANNOTATIONS = [] return items = [] with f.open(encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: items.append(json.loads(line)) except Exception as e: print(f"[startup] bad jsonl line: {e}") ANNOTATIONS = items print(f"[startup] loaded {len(items)} annotations") def restore_review_state() -> None: """Pull existing reviews from HF Dataset to rebuild REVIEW_COUNT and USER_DONE. Each review file lives at reviews//__.json in the dataset repo. We list them and update counters. """ if not (HF_TOKEN and REVIEW_DATASET_REPO): print("[startup] HF_TOKEN/REVIEW_DATASET_REPO not set — skipping state restore") return try: from huggingface_hub import HfApi, hf_hub_download api = HfApi(token=HF_TOKEN) try: files = api.list_repo_files(repo_id=REVIEW_DATASET_REPO, repo_type="dataset") except Exception as e: print(f"[startup] can't list review repo (probably empty): {e}") return n = 0 for path in files: # reviews//__.json parts = path.split("/") if len(parts) != 3 or parts[0] != "reviews" or not parts[2].endswith(".json"): continue reviewer_id = parts[1] stem = parts[2][:-5] # drop .json anno_id = stem.rsplit("__", 1)[0] REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1 USER_DONE.setdefault(reviewer_id, set()).add(anno_id) n += 1 print(f"[startup] restored {n} prior reviews from {REVIEW_DATASET_REPO}") except Exception as e: print(f"[startup] restore_review_state failed: {e}") @app.on_event("startup") def _on_startup(): load_annotations() restore_review_state() # Make sure the dataset repo exists so the first push doesn't 404. if HF_TOKEN and REVIEW_DATASET_REPO: try: from huggingface_hub import HfApi HfApi(token=HF_TOKEN).create_repo( repo_id=REVIEW_DATASET_REPO, repo_type="dataset", exist_ok=True, private=True, ) print(f"[startup] review dataset ready: {REVIEW_DATASET_REPO}") except Exception as e: print(f"[startup] create_repo failed: {e}") # ---------- anonymous identity ---------- COOKIE = "rid" # reviewer id cookie def _set_rid_cookie(resp, rid: str) -> None: # SameSite=None+Secure so the cookie survives HF iframe wrappers. resp.set_cookie( COOKIE, rid, max_age=180 * 86400, samesite="none", secure=True, httponly=False, path="/", ) def _ensure_rid(request: Request) -> tuple[str, RedirectResponse | None]: """Return (rid, redirect-or-None). If no cookie present, mint one and return a redirect that re-loads the same URL with the cookie set.""" rid = request.cookies.get(COOKIE) if rid: return rid, None rid = "anon_" + secrets.token_urlsafe(6)[:8] # Redirect to the same path so the new cookie goes round-trip. target = str(request.url) resp = RedirectResponse(target, status_code=303) _set_rid_cookie(resp, rid) return rid, resp # ---------- task picking ---------- def pick_next(rid: str) -> dict | None: """Pick the annotation with the FEWEST existing reviews that the user hasn't seen yet. Random tiebreaker.""" with _state_lock: done_by_me = USER_DONE.get(rid, set()) candidates = [] min_count = 10**9 for a in ANNOTATIONS: if a["anno_id"] in done_by_me: continue c = REVIEW_COUNT.get(a["anno_id"], 0) if c >= TARGET_REVIEWS_PER_ANNO: continue if c < min_count: min_count = c candidates = [a] elif c == min_count: candidates.append(a) if not candidates: return None return random.choice(candidates) def push_review(payload: dict) -> bool: """Push one review JSON to the HF Dataset. Each submission gets its own file under reviews//__.json so concurrent writes never collide. """ if not (HF_TOKEN and REVIEW_DATASET_REPO): # Locally we still want to persist. Drop into data/local_reviews/. out = DATA / "local_reviews" / payload["reviewer_id"] out.mkdir(parents=True, exist_ok=True) ts = int(time.time() * 1000) (out / f"{payload['anno_id']}__{ts}.json").write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) return True try: from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) ts = int(time.time() * 1000) path = f"reviews/{payload['reviewer_id']}/{payload['anno_id']}__{ts}.json" body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") api.upload_file( path_or_fileobj=BytesIO(body), path_in_repo=path, repo_id=REVIEW_DATASET_REPO, repo_type="dataset", commit_message=f"review {payload['anno_id']} by {payload['reviewer_id']}", ) return True except Exception as e: print(f"[push_review] failed: {e}") return False # ---------- routes ---------- @app.get("/healthz") def healthz(): return {"ok": True, "annos": len(ANNOTATIONS)} @app.get("/", response_class=HTMLResponse) def root(request: Request): rid, redirect = _ensure_rid(request) if redirect is not None: return redirect return RedirectResponse("/review", status_code=303) @app.get("/review", response_class=HTMLResponse) def review_page(request: Request): rid, redirect = _ensure_rid(request) if redirect is not None: return redirect anno = pick_next(rid) done_by_me = len(USER_DONE.get(rid, set())) total = len(ANNOTATIONS) fully_done = sum(1 for a in ANNOTATIONS if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO) return templates.TemplateResponse( "review.html", { "request": request, "rid": rid, "anno": anno, "done_by_me": done_by_me, "total": total, "fully_done": fully_done, "target_per_anno": TARGET_REVIEWS_PER_ANNO, }, ) @app.post("/review/submit") def review_submit( request: Request, anno_id: str = Form(...), tg_score: int = Form(...), tg_ref_type: str = Form(""), sg_score: int = Form(...), sg_ref_type: str = Form(""), qa_score: int = Form(...), qa_ref_type: str = Form(""), qa_type: str = Form(""), comment: str = Form(""), ): rid, redirect = _ensure_rid(request) if redirect is not None: return redirect # Reject obviously bogus inputs if not any(a["anno_id"] == anno_id for a in ANNOTATIONS): raise HTTPException(400, "unknown anno_id") for s in (tg_score, sg_score, qa_score): if s not in (1, 2, 3): raise HTTPException(400, "scores must be 1, 2, or 3") # If any sub-score is 1 or 2, the reviewer must justify it. lowest = min(tg_score, sg_score, qa_score) if lowest <= 2 and len((comment or "").strip()) < 5: raise HTTPException( 400, "1 分或 2 分必须在备注里写明原因(至少 5 个字符)" " — please include a comment explaining why you gave a 1/2 score", ) payload = { "reviewer_id": rid, "anno_id": anno_id, "ts": int(time.time()), "tg": {"score": tg_score, "ref_type": tg_ref_type}, "sg": {"score": sg_score, "ref_type": sg_ref_type}, "qa": {"score": qa_score, "ref_type": qa_ref_type, "qa_type": qa_type}, "comment": comment, } ok = push_review(payload) if not ok: raise HTTPException(500, "failed to persist review; please retry") with _state_lock: USER_DONE.setdefault(rid, set()).add(anno_id) REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1 return RedirectResponse("/review", status_code=303) @app.get("/stats") def stats(request: Request): """Lightweight JSON stats endpoint (public).""" return { "annos_total": len(ANNOTATIONS), "annos_fully_done": sum( 1 for a in ANNOTATIONS if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO ), "reviews_total": sum(REVIEW_COUNT.values()), "unique_reviewers": len(USER_DONE), "target_per_anno": TARGET_REVIEWS_PER_ANNO, }