Spaces:
Sleeping
Sleeping
| """Pull all review feedback from the HF Dataset and produce: | |
| output/reviews_all.jsonl all individual reviews, one per line | |
| output/per_anno.jsonl aggregated per annotation (avg score, ref_type / qa_type | |
| vote counts, all comments) | |
| output/feedback_life.xlsx copy of the original 视频问答.xlsx | |
| output/feedback_art.xlsx with extra columns + yellow/red row colors | |
| (1=red row, 2=yellow row, 3=plain) | |
| Run locally (after the campaign): | |
| set HF_TOKEN=hf_xxx | |
| set REVIEW_DATASET_REPO=VCLab-PolyU/omnistg-reviews | |
| python scripts/export_reviews.py --src-root ../720多模态视频/720多模态视频 \ | |
| --out-dir output/exported | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import shutil | |
| import statistics | |
| import sys | |
| import zipfile | |
| from collections import Counter, defaultdict | |
| from io import BytesIO | |
| from pathlib import Path | |
| from xml.etree import ElementTree as ET | |
| XL_NS = {"x": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} | |
| def fetch_reviews(out_jsonl: Path) -> list[dict]: | |
| repo = os.environ.get("REVIEW_DATASET_REPO", "") | |
| token = os.environ.get("HF_TOKEN", "") | |
| out_jsonl.parent.mkdir(parents=True, exist_ok=True) | |
| reviews: list[dict] = [] | |
| if not repo or not token: | |
| # Fallback: scan local data/local_reviews/ | |
| local_dir = Path(__file__).resolve().parent.parent / "data" / "local_reviews" | |
| if local_dir.exists(): | |
| for p in local_dir.rglob("*.json"): | |
| with p.open(encoding="utf-8") as f: | |
| reviews.append(json.load(f)) | |
| print(f"[fetch] loaded {len(reviews)} local reviews from {local_dir}") | |
| else: | |
| from huggingface_hub import HfApi, hf_hub_download | |
| api = HfApi(token=token) | |
| files = [ | |
| f for f in api.list_repo_files(repo_id=repo, repo_type="dataset") | |
| if f.startswith("reviews/") and f.endswith(".json") | |
| ] | |
| print(f"[fetch] {len(files)} review files in {repo}") | |
| for path in files: | |
| local = hf_hub_download( | |
| repo_id=repo, repo_type="dataset", filename=path, token=token | |
| ) | |
| with open(local, encoding="utf-8") as f: | |
| reviews.append(json.load(f)) | |
| with out_jsonl.open("w", encoding="utf-8") as f: | |
| for r in reviews: | |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
| print(f"[fetch] wrote {out_jsonl} ({len(reviews)} reviews)") | |
| return reviews | |
| def aggregate(reviews: list[dict], out_jsonl: Path) -> dict[str, dict]: | |
| """Group reviews by anno_id and compute aggregates.""" | |
| grouped: dict[str, list[dict]] = defaultdict(list) | |
| for r in reviews: | |
| grouped[r["anno_id"]].append(r) | |
| agg: dict[str, dict] = {} | |
| for anno_id, rs in grouped.items(): | |
| def stats(field): | |
| vals = [r[field]["score"] for r in rs if r.get(field)] | |
| return { | |
| "n": len(vals), | |
| "avg": round(statistics.mean(vals), 3) if vals else None, | |
| "min": min(vals) if vals else None, | |
| "votes": dict(Counter(vals)), | |
| } | |
| def votes(field, sub): | |
| c = Counter(r[field].get(sub) for r in rs if r.get(field) and r[field].get(sub)) | |
| return dict(c) | |
| agg[anno_id] = { | |
| "anno_id": anno_id, | |
| "n_reviews": len(rs), | |
| "tg_score": stats("tg"), | |
| "sg_score": stats("sg"), | |
| "qa_score": stats("qa"), | |
| "tg_ref_type_votes": votes("tg", "ref_type"), | |
| "sg_ref_type_votes": votes("sg", "ref_type"), | |
| "qa_ref_type_votes": votes("qa", "ref_type"), | |
| "qa_type_votes": votes("qa", "qa_type"), | |
| "comments": [r.get("comment", "") for r in rs if r.get("comment")], | |
| } | |
| with out_jsonl.open("w", encoding="utf-8") as f: | |
| for v in agg.values(): | |
| f.write(json.dumps(v, ensure_ascii=False) + "\n") | |
| print(f"[agg] wrote {out_jsonl} ({len(agg)} annos with reviews)") | |
| return agg | |
| # ---------- xlsx writer (no openpyxl needed, manipulate the zip directly) ---------- | |
| def build_feedback_xlsx(src_xlsx: Path, dst_xlsx: Path, agg_lookup: dict[str, dict], | |
| video_id_to_anno_id: dict[str, str]) -> None: | |
| """Copy the source xlsx and recolor each video-row + append review columns. | |
| Color rule: | |
| - row min_score == 1 -> red fill | |
| - row min_score == 2 -> yellow fill | |
| - else -> default | |
| Where "row min_score" = min(tg_score.min, sg_score.min, qa_score.min). | |
| Appended columns (I..R): | |
| I n_reviews | |
| J tg_avg | |
| K sg_avg | |
| L qa_avg | |
| M tg_ref_type (top vote) | |
| N sg_ref_type (top vote) | |
| O qa_ref_type (top vote) | |
| P qa_type (top vote) | |
| Q min_score (worst sub-score across TG/SG/QA) | |
| R comments (joined) | |
| """ | |
| if not src_xlsx.exists(): | |
| print(f"[xlsx] missing {src_xlsx}, skipping") | |
| return | |
| dst_xlsx.parent.mkdir(parents=True, exist_ok=True) | |
| # Unzip source xlsx into a temp dir. | |
| tmp = dst_xlsx.parent / (dst_xlsx.stem + "_unzipped_tmp") | |
| if tmp.exists(): | |
| shutil.rmtree(tmp) | |
| tmp.mkdir(parents=True) | |
| with zipfile.ZipFile(src_xlsx) as zf: | |
| zf.extractall(tmp) | |
| # 1) Read sharedStrings to get a string array, plus a writer that lets us add new strings. | |
| ss_path = tmp / "xl" / "sharedStrings.xml" | |
| ss_tree = ET.parse(ss_path) | |
| ss_root = ss_tree.getroot() | |
| strings: list[str] = [] | |
| for si in ss_root.findall("x:si", XL_NS): | |
| # Could be <t>...</t> directly or rich text <r><t>...</t></r>... | |
| ts = si.findall(".//x:t", XL_NS) | |
| strings.append("".join(t.text or "" for t in ts)) | |
| # Build an index for fast add-or-get | |
| str_index: dict[str, int] = {s: i for i, s in enumerate(strings)} | |
| def intern(s: str) -> int: | |
| if s not in str_index: | |
| str_index[s] = len(strings) | |
| strings.append(s) | |
| return str_index[s] | |
| # 2) Read styles.xml; we need to add 2 new fills (red + yellow) and 2 new | |
| # cellXfs entries that reference them. We then know their style indexes. | |
| styles_path = tmp / "xl" / "styles.xml" | |
| styles_tree = ET.parse(styles_path) | |
| styles_root = styles_tree.getroot() | |
| def add_fill(rgb: str) -> int: | |
| fills = styles_root.find("x:fills", XL_NS) | |
| new_fill = ET.SubElement(fills, f"{{{XL_NS['x']}}}fill") | |
| pf = ET.SubElement(new_fill, f"{{{XL_NS['x']}}}patternFill") | |
| pf.set("patternType", "solid") | |
| fg = ET.SubElement(pf, f"{{{XL_NS['x']}}}fgColor") | |
| fg.set("rgb", rgb) | |
| bg = ET.SubElement(pf, f"{{{XL_NS['x']}}}bgColor") | |
| bg.set("indexed", "64") | |
| idx = int(fills.attrib.get("count", str(len(fills) - 1))) | |
| fills.set("count", str(idx + 1)) | |
| return idx | |
| red_fill_idx = add_fill("FFFFC7CE") # light red | |
| yellow_fill_idx = add_fill("FFFFEB9C") # light yellow | |
| def add_xf(fill_idx: int) -> int: | |
| cellxfs = styles_root.find("x:cellXfs", XL_NS) | |
| # base on first xf so we don't break formatting | |
| base = cellxfs.find("x:xf", XL_NS) | |
| new_xf = ET.SubElement(cellxfs, f"{{{XL_NS['x']}}}xf") | |
| # Copy minimal attribs from base | |
| for k in ("numFmtId", "fontId", "borderId", "xfId"): | |
| if base is not None and k in base.attrib: | |
| new_xf.set(k, base.attrib[k]) | |
| new_xf.set("fillId", str(fill_idx)) | |
| new_xf.set("applyFill", "1") | |
| idx = int(cellxfs.attrib.get("count", str(len(cellxfs) - 1))) | |
| cellxfs.set("count", str(idx + 1)) | |
| return idx | |
| red_xf = add_xf(red_fill_idx) | |
| yellow_xf = add_xf(yellow_fill_idx) | |
| # 3) Walk sheet1.xml: for every row whose A cell is a known video_id, set | |
| # the row's style to red/yellow/none and append review columns I..R. | |
| sheet_path = tmp / "xl" / "worksheets" / "sheet1.xml" | |
| sheet_tree = ET.parse(sheet_path) | |
| sheet_root = sheet_tree.getroot() | |
| sheet_data = sheet_root.find("x:sheetData", XL_NS) | |
| def cell_val(cell) -> str | None: | |
| t = cell.attrib.get("t", "") | |
| v = cell.find("x:v", XL_NS) | |
| if v is None or v.text is None: | |
| return None | |
| if t == "s": | |
| try: | |
| return strings[int(v.text)] | |
| except Exception: | |
| return None | |
| return v.text | |
| def make_cell(ref: str, text: str | None, style: int | None = None) -> ET.Element: | |
| c = ET.Element(f"{{{XL_NS['x']}}}c", {"r": ref, "t": "s"}) | |
| if style is not None: | |
| c.set("s", str(style)) | |
| v = ET.SubElement(c, f"{{{XL_NS['x']}}}v") | |
| v.text = str(intern(text or "")) | |
| return c | |
| def set_cell_style(cell, style_idx: int) -> None: | |
| cell.set("s", str(style_idx)) | |
| HDR_CELLS = ("I", "J", "K", "L", "M", "N", "O", "P", "Q", "R") | |
| HDR_LABELS = ( | |
| "n_reviews", "tg_avg", "sg_avg", "qa_avg", | |
| "tg_ref_type", "sg_ref_type", "qa_ref_type", "qa_type", | |
| "min_score", "comments", | |
| ) | |
| rows = list(sheet_data.findall("x:row", XL_NS)) | |
| for row in rows: | |
| row_num = int(row.attrib["r"]) | |
| cells = {c.attrib.get("r", "")[0]: c for c in row.findall("x:c", XL_NS)} | |
| a_cell = cells.get("A") | |
| if a_cell is None: | |
| continue | |
| a_val = cell_val(a_cell) | |
| if a_val is None: | |
| continue | |
| # Header row: append the 10 new column headers and continue. | |
| if row_num == 1: | |
| for col, label in zip(HDR_CELLS, HDR_LABELS): | |
| row.append(make_cell(f"{col}{row_num}", label)) | |
| continue | |
| anno_id = video_id_to_anno_id.get(a_val) | |
| if not anno_id: | |
| continue | |
| agg = agg_lookup.get(anno_id) | |
| if not agg: | |
| continue | |
| def top_vote(d: dict) -> str: | |
| if not d: | |
| return "" | |
| return max(d.items(), key=lambda kv: kv[1])[0] | |
| tg_min = agg["tg_score"]["min"] | |
| sg_min = agg["sg_score"]["min"] | |
| qa_min = agg["qa_score"]["min"] | |
| mins = [m for m in (tg_min, sg_min, qa_min) if m is not None] | |
| row_min = min(mins) if mins else None | |
| # Recolor cells: pick the style based on row_min. | |
| style_for_row = None | |
| if row_min == 1: | |
| style_for_row = red_xf | |
| elif row_min == 2: | |
| style_for_row = yellow_xf | |
| if style_for_row is not None: | |
| for c in row.findall("x:c", XL_NS): | |
| set_cell_style(c, style_for_row) | |
| # Append the 10 review cells. | |
| new_values = [ | |
| str(agg["n_reviews"]), | |
| str(agg["tg_score"]["avg"] if agg["tg_score"]["avg"] is not None else ""), | |
| str(agg["sg_score"]["avg"] if agg["sg_score"]["avg"] is not None else ""), | |
| str(agg["qa_score"]["avg"] if agg["qa_score"]["avg"] is not None else ""), | |
| top_vote(agg["tg_ref_type_votes"]), | |
| top_vote(agg["sg_ref_type_votes"]), | |
| top_vote(agg["qa_ref_type_votes"]), | |
| top_vote(agg["qa_type_votes"]), | |
| str(row_min if row_min is not None else ""), | |
| " | ".join(agg["comments"][:10]), | |
| ] | |
| for col, val in zip(HDR_CELLS, new_values): | |
| row.append(make_cell(f"{col}{row_num}", val, style_for_row)) | |
| # 4) Write everything back into a new xlsx. | |
| # Re-serialize sharedStrings (we may have appended new entries). | |
| new_ss = ET.Element(f"{{{XL_NS['x']}}}sst", { | |
| "count": str(len(strings)), "uniqueCount": str(len(set(strings))), | |
| }) | |
| for s in strings: | |
| si = ET.SubElement(new_ss, f"{{{XL_NS['x']}}}si") | |
| t = ET.SubElement(si, f"{{{XL_NS['x']}}}t") | |
| # Preserve leading/trailing whitespace. | |
| t.set("{http://www.w3.org/XML/1998/namespace}space", "preserve") | |
| t.text = s | |
| ET.register_namespace("", XL_NS["x"]) | |
| ET.ElementTree(new_ss).write(ss_path, xml_declaration=True, encoding="UTF-8", method="xml") | |
| styles_tree.write(styles_path, xml_declaration=True, encoding="UTF-8", method="xml") | |
| sheet_tree.write(sheet_path, xml_declaration=True, encoding="UTF-8", method="xml") | |
| # Repack as zip. | |
| if dst_xlsx.exists(): | |
| dst_xlsx.unlink() | |
| with zipfile.ZipFile(dst_xlsx, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root_dir, _, files in os.walk(tmp): | |
| for fn in files: | |
| full = Path(root_dir) / fn | |
| rel = full.relative_to(tmp) | |
| zf.write(full, rel.as_posix()) | |
| shutil.rmtree(tmp) | |
| print(f"[xlsx] wrote {dst_xlsx}") | |
| def build_video_id_to_anno_id(annotations_jsonl: Path) -> dict[str, str]: | |
| out: dict[str, str] = {} | |
| with annotations_jsonl.open(encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| r = json.loads(line) | |
| out[r["video_id"]] = r["anno_id"] | |
| return out | |
| def main() -> None: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--src-root", required=True, | |
| help="root that contains <category>/视频问答.xlsx") | |
| ap.add_argument("--out-dir", default="output/exported") | |
| ap.add_argument("--annotations", default="data/annotations.jsonl") | |
| args = ap.parse_args() | |
| out = Path(args.out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| reviews = fetch_reviews(out / "reviews_all.jsonl") | |
| if not reviews: | |
| print("[done] no reviews to aggregate; exiting early") | |
| return | |
| agg = aggregate(reviews, out / "per_anno.jsonl") | |
| vid2anno = build_video_id_to_anno_id(Path(args.annotations)) | |
| src_root = Path(args.src_root) | |
| XLSX_NAME = "\u89c6\u9891\u95ee\u7b54.xlsx" # 视频问答.xlsx | |
| name_map = { | |
| "\u751f\u6d3b\u65b9\u5f0f\u4e0e\u65e5\u5e38": "feedback_life.xlsx", # 生活方式与日常 | |
| "\u827a\u672f\u8868\u6f14\u6587\u5316": "feedback_art.xlsx", # 艺术表演文化 | |
| } | |
| for cat_dir in src_root.iterdir(): | |
| if not cat_dir.is_dir(): | |
| continue | |
| src_xlsx = cat_dir / XLSX_NAME | |
| if not src_xlsx.exists(): | |
| continue | |
| out_name = name_map.get(cat_dir.name, f"feedback_{cat_dir.name}.xlsx") | |
| build_feedback_xlsx(src_xlsx, out / out_name, agg, vid2anno) | |
| print("[done]") | |
| if __name__ == "__main__": | |
| main() | |