Spaces:
Sleeping
Sleeping
| """Review-only FastAPI app for OmniSTG annotation feedback. | |
| Design: | |
| - One public URL, anyone can open it. We auto-mint an anonymous reviewer id | |
| on first visit and store it in a cookie. Returning visitors keep the same | |
| id and resume. | |
| - Every annotation is shown read-only with: video link + 3 reference images + | |
| bbox-overlaid keyframes + the 3 questions (TG/SG/QA) and the existing | |
| answers. | |
| - Reviewer fills 3 score fields (1=red/delete, 2=yellow/needs_fix, 3=green/keep) | |
| + 3 ref_type tags + 1 qa_type + optional comment. | |
| - On submit we serialize the answer into a single JSON file and push it to a | |
| HF Dataset (REVIEW_DATASET_REPO). Each submission is its own file so we | |
| never have to lock or merge. | |
| Env vars (set in HF Space): | |
| HF_TOKEN write token | |
| REVIEW_DATASET_REPO e.g. VCLab-PolyU/omnistg-reviews | |
| ANNO_PER_USER_TARGET default 0 (no per-user cap) | |
| ANNO_REVIEWS_TARGET default 3 (each anno needs at most N reviews) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import secrets | |
| import sys | |
| import time | |
| from io import BytesIO | |
| from pathlib import Path | |
| from threading import Lock | |
| # ensure sibling modules are importable regardless of how uvicorn is invoked | |
| _SCRIPTS_DIR = Path(__file__).resolve().parent | |
| if str(_SCRIPTS_DIR) not in sys.path: | |
| sys.path.insert(0, str(_SCRIPTS_DIR)) | |
| from fastapi import FastAPI, Form, HTTPException, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| BASE = Path(__file__).resolve().parent.parent | |
| STATIC = BASE / "static" | |
| TEMPLATES = BASE / "templates" | |
| DATA = BASE / "data" | |
| REVIEW_DATASET_REPO = os.environ.get("REVIEW_DATASET_REPO", "") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| TARGET_REVIEWS_PER_ANNO = int(os.environ.get("ANNO_REVIEWS_TARGET", "3")) | |
| app = FastAPI(title="OmniSTG Review") | |
| app.mount("/static", StaticFiles(directory=str(STATIC)), name="static") | |
| templates = Jinja2Templates(directory=str(TEMPLATES)) | |
| # ---------- in-memory state ---------- | |
| # | |
| # We keep two pieces of state in memory: | |
| # ANNOTATIONS list of all annotations loaded from data/annotations.jsonl | |
| # REVIEW_COUNT { anno_id: int } how many submissions we've forwarded so far | |
| # (best-effort; rebuilt from HF Dataset on | |
| # startup so it survives restarts) | |
| # USER_DONE { user_id: set[anno_id] } what each user has already reviewed | |
| # (rebuilt from HF Dataset on startup) | |
| ANNOTATIONS: list[dict] = [] | |
| REVIEW_COUNT: dict[str, int] = {} | |
| USER_DONE: dict[str, set[str]] = {} | |
| _state_lock = Lock() | |
| def load_annotations() -> None: | |
| """Read data/annotations.jsonl into memory.""" | |
| global ANNOTATIONS | |
| f = DATA / "annotations.jsonl" | |
| if not f.exists(): | |
| print(f"[startup] {f} missing — no annotations to serve") | |
| ANNOTATIONS = [] | |
| return | |
| items = [] | |
| with f.open(encoding="utf-8") as fh: | |
| for line in fh: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| items.append(json.loads(line)) | |
| except Exception as e: | |
| print(f"[startup] bad jsonl line: {e}") | |
| ANNOTATIONS = items | |
| print(f"[startup] loaded {len(items)} annotations") | |
| def restore_review_state() -> None: | |
| """Pull existing reviews from HF Dataset to rebuild REVIEW_COUNT and USER_DONE. | |
| Each review file lives at reviews/<reviewer_id>/<anno_id>__<ts>.json in the | |
| dataset repo. We list them and update counters. | |
| """ | |
| if not (HF_TOKEN and REVIEW_DATASET_REPO): | |
| print("[startup] HF_TOKEN/REVIEW_DATASET_REPO not set — skipping state restore") | |
| return | |
| try: | |
| from huggingface_hub import HfApi, hf_hub_download | |
| api = HfApi(token=HF_TOKEN) | |
| try: | |
| files = api.list_repo_files(repo_id=REVIEW_DATASET_REPO, repo_type="dataset") | |
| except Exception as e: | |
| print(f"[startup] can't list review repo (probably empty): {e}") | |
| return | |
| n = 0 | |
| for path in files: | |
| # reviews/<reviewer_id>/<anno_id>__<ts>.json | |
| parts = path.split("/") | |
| if len(parts) != 3 or parts[0] != "reviews" or not parts[2].endswith(".json"): | |
| continue | |
| reviewer_id = parts[1] | |
| stem = parts[2][:-5] # drop .json | |
| anno_id = stem.rsplit("__", 1)[0] | |
| REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1 | |
| USER_DONE.setdefault(reviewer_id, set()).add(anno_id) | |
| n += 1 | |
| print(f"[startup] restored {n} prior reviews from {REVIEW_DATASET_REPO}") | |
| except Exception as e: | |
| print(f"[startup] restore_review_state failed: {e}") | |
| def _on_startup(): | |
| load_annotations() | |
| restore_review_state() | |
| # Make sure the dataset repo exists so the first push doesn't 404. | |
| if HF_TOKEN and REVIEW_DATASET_REPO: | |
| try: | |
| from huggingface_hub import HfApi | |
| HfApi(token=HF_TOKEN).create_repo( | |
| repo_id=REVIEW_DATASET_REPO, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| private=True, | |
| ) | |
| print(f"[startup] review dataset ready: {REVIEW_DATASET_REPO}") | |
| except Exception as e: | |
| print(f"[startup] create_repo failed: {e}") | |
| # ---------- anonymous identity ---------- | |
| COOKIE = "rid" # reviewer id cookie | |
| def _set_rid_cookie(resp, rid: str) -> None: | |
| # SameSite=None+Secure so the cookie survives HF iframe wrappers. | |
| resp.set_cookie( | |
| COOKIE, rid, | |
| max_age=180 * 86400, | |
| samesite="none", secure=True, httponly=False, path="/", | |
| ) | |
| def _ensure_rid(request: Request) -> tuple[str, RedirectResponse | None]: | |
| """Return (rid, redirect-or-None). If no cookie present, mint one and | |
| return a redirect that re-loads the same URL with the cookie set.""" | |
| rid = request.cookies.get(COOKIE) | |
| if rid: | |
| return rid, None | |
| rid = "anon_" + secrets.token_urlsafe(6)[:8] | |
| # Redirect to the same path so the new cookie goes round-trip. | |
| target = str(request.url) | |
| resp = RedirectResponse(target, status_code=303) | |
| _set_rid_cookie(resp, rid) | |
| return rid, resp | |
| # ---------- task picking ---------- | |
| def pick_next(rid: str) -> dict | None: | |
| """Pick the annotation with the FEWEST existing reviews that the user | |
| hasn't seen yet. Random tiebreaker.""" | |
| with _state_lock: | |
| done_by_me = USER_DONE.get(rid, set()) | |
| candidates = [] | |
| min_count = 10**9 | |
| for a in ANNOTATIONS: | |
| if a["anno_id"] in done_by_me: | |
| continue | |
| c = REVIEW_COUNT.get(a["anno_id"], 0) | |
| if c >= TARGET_REVIEWS_PER_ANNO: | |
| continue | |
| if c < min_count: | |
| min_count = c | |
| candidates = [a] | |
| elif c == min_count: | |
| candidates.append(a) | |
| if not candidates: | |
| return None | |
| return random.choice(candidates) | |
| def push_review(payload: dict) -> bool: | |
| """Push one review JSON to the HF Dataset. | |
| Each submission gets its own file under reviews/<rid>/<anno>__<ts>.json so | |
| concurrent writes never collide. | |
| """ | |
| if not (HF_TOKEN and REVIEW_DATASET_REPO): | |
| # Locally we still want to persist. Drop into data/local_reviews/. | |
| out = DATA / "local_reviews" / payload["reviewer_id"] | |
| out.mkdir(parents=True, exist_ok=True) | |
| ts = int(time.time() * 1000) | |
| (out / f"{payload['anno_id']}__{ts}.json").write_text( | |
| json.dumps(payload, ensure_ascii=False, indent=2), | |
| encoding="utf-8", | |
| ) | |
| return True | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=HF_TOKEN) | |
| ts = int(time.time() * 1000) | |
| path = f"reviews/{payload['reviewer_id']}/{payload['anno_id']}__{ts}.json" | |
| body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") | |
| api.upload_file( | |
| path_or_fileobj=BytesIO(body), | |
| path_in_repo=path, | |
| repo_id=REVIEW_DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"review {payload['anno_id']} by {payload['reviewer_id']}", | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"[push_review] failed: {e}") | |
| return False | |
| # ---------- routes ---------- | |
| def healthz(): | |
| return {"ok": True, "annos": len(ANNOTATIONS)} | |
| def root(request: Request): | |
| rid, redirect = _ensure_rid(request) | |
| if redirect is not None: | |
| return redirect | |
| return RedirectResponse("/review", status_code=303) | |
| def review_page(request: Request): | |
| rid, redirect = _ensure_rid(request) | |
| if redirect is not None: | |
| return redirect | |
| anno = pick_next(rid) | |
| done_by_me = len(USER_DONE.get(rid, set())) | |
| total = len(ANNOTATIONS) | |
| fully_done = sum(1 for a in ANNOTATIONS if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO) | |
| return templates.TemplateResponse( | |
| "review.html", | |
| { | |
| "request": request, | |
| "rid": rid, | |
| "anno": anno, | |
| "done_by_me": done_by_me, | |
| "total": total, | |
| "fully_done": fully_done, | |
| "target_per_anno": TARGET_REVIEWS_PER_ANNO, | |
| }, | |
| ) | |
| def review_submit( | |
| request: Request, | |
| anno_id: str = Form(...), | |
| tg_score: int = Form(...), | |
| tg_ref_type: str = Form(""), | |
| sg_score: int = Form(...), | |
| sg_ref_type: str = Form(""), | |
| qa_score: int = Form(...), | |
| qa_ref_type: str = Form(""), | |
| qa_type: str = Form(""), | |
| comment: str = Form(""), | |
| ): | |
| rid, redirect = _ensure_rid(request) | |
| if redirect is not None: | |
| return redirect | |
| # Reject obviously bogus inputs | |
| if not any(a["anno_id"] == anno_id for a in ANNOTATIONS): | |
| raise HTTPException(400, "unknown anno_id") | |
| for s in (tg_score, sg_score, qa_score): | |
| if s not in (1, 2, 3): | |
| raise HTTPException(400, "scores must be 1, 2, or 3") | |
| # If any sub-score is 1 or 2, the reviewer must justify it. | |
| lowest = min(tg_score, sg_score, qa_score) | |
| if lowest <= 2 and len((comment or "").strip()) < 5: | |
| raise HTTPException( | |
| 400, | |
| "1 分或 2 分必须在备注里写明原因(至少 5 个字符)" | |
| " — please include a comment explaining why you gave a 1/2 score", | |
| ) | |
| payload = { | |
| "reviewer_id": rid, | |
| "anno_id": anno_id, | |
| "ts": int(time.time()), | |
| "tg": {"score": tg_score, "ref_type": tg_ref_type}, | |
| "sg": {"score": sg_score, "ref_type": sg_ref_type}, | |
| "qa": {"score": qa_score, "ref_type": qa_ref_type, "qa_type": qa_type}, | |
| "comment": comment, | |
| } | |
| ok = push_review(payload) | |
| if not ok: | |
| raise HTTPException(500, "failed to persist review; please retry") | |
| with _state_lock: | |
| USER_DONE.setdefault(rid, set()).add(anno_id) | |
| REVIEW_COUNT[anno_id] = REVIEW_COUNT.get(anno_id, 0) + 1 | |
| return RedirectResponse("/review", status_code=303) | |
| def stats(request: Request): | |
| """Lightweight JSON stats endpoint (public).""" | |
| return { | |
| "annos_total": len(ANNOTATIONS), | |
| "annos_fully_done": sum( | |
| 1 for a in ANNOTATIONS | |
| if REVIEW_COUNT.get(a["anno_id"], 0) >= TARGET_REVIEWS_PER_ANNO | |
| ), | |
| "reviews_total": sum(REVIEW_COUNT.values()), | |
| "unique_reviewers": len(USER_DONE), | |
| "target_per_anno": TARGET_REVIEWS_PER_ANNO, | |
| } | |