Spaces:
Sleeping
Sleeping
| """Pack batch2 (sports) review data. | |
| Differences vs batch1: | |
| - One xlsx (Sport1.xlsx) with FIVE sheets (足球/篮球/排球/艺术体操/游泳). | |
| - Reference images are EMBEDDED inside the xlsx (in xl/media/) and anchored | |
| to specific cells via xl/drawings/drawingN.xml. So we don't read PNGs | |
| from per-video folders for ref_time/ref_space/ref_options; we extract | |
| them from the xlsx. | |
| - Bbox jpg+json still live in the per-video folder under MultiSports/. | |
| - TG_Frames/ is flat: one file <video_id>_<tg_answer>.jpg per anno. | |
| Output: | |
| data/annotations.jsonl one record per row | |
| static/anno_assets/<anno_id>/ | |
| ref_col<colidx>_<n>.png embedded refs from xlsx, named by source col | |
| frame_<idx>.jpg bbox keyframes | |
| tg_frames.jpg TG strip from TG_Frames/ | |
| Usage: | |
| python scripts/pack_batch2.py \ | |
| --batch-root "../batch2/<top-cat-dir>/<inner-cat-dir>" \ | |
| --xlsx-name Sport1.xlsx \ | |
| --videos-subdir MultiSports \ | |
| --tg-frames-subdir TG_Frames \ | |
| --out-data data/annotations.jsonl \ | |
| --out-assets static/anno_assets | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| import shutil | |
| import sys | |
| import zipfile | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Optional | |
| from xml.etree import ElementTree as ET | |
| # Sport sub-category code mapping (sheet name -> filesystem-safe id). | |
| # Names contain CJK chars so we use Unicode escapes in source to be codepage-safe. | |
| SUBCAT_MAP = { | |
| "\u8db3\u7403": "football", # 足球 | |
| "\u7bee\u7403": "basketball", # 篮球 | |
| "\u6392\u7403": "volleyball", # 排球 | |
| "\u827a\u672f\u4f53\u64cd": "artistic_gymnastics", # 艺术体操 | |
| "\u6e38\u6cf3": "swimming", # 游泳 | |
| } | |
| NS = { | |
| "x": "http://schemas.openxmlformats.org/spreadsheetml/2006/main", | |
| "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", | |
| "xdr": "http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing", | |
| "a": "http://schemas.openxmlformats.org/drawingml/2006/main", | |
| "rel": "http://schemas.openxmlformats.org/package/2006/relationships", | |
| } | |
| # ----------------------- xlsx parsing ----------------------- | |
| class Xlsx: | |
| """Cheap-and-cheerful xlsx reader (no openpyxl required).""" | |
| def __init__(self, xlsx_path: Path) -> None: | |
| self.zf = zipfile.ZipFile(xlsx_path) | |
| self._strings: list[str] = [] | |
| self._load_shared_strings() | |
| self._workbook = ET.fromstring(self.zf.read("xl/workbook.xml")) | |
| self._wb_rels = self._read_rels("xl/_rels/workbook.xml.rels") | |
| def close(self) -> None: | |
| self.zf.close() | |
| # shared strings ---- | |
| def _load_shared_strings(self) -> None: | |
| try: | |
| data = self.zf.read("xl/sharedStrings.xml") | |
| except KeyError: | |
| return | |
| root = ET.fromstring(data) | |
| for si in root.findall("x:si", NS): | |
| ts = si.findall(".//x:t", NS) | |
| self._strings.append("".join((t.text or "") for t in ts)) | |
| def s(self, idx: int) -> str: | |
| if 0 <= idx < len(self._strings): | |
| return self._strings[idx] | |
| return "" | |
| # rels ---- | |
| def _read_rels(self, path: str) -> dict[str, str]: | |
| try: | |
| data = self.zf.read(path) | |
| except KeyError: | |
| return {} | |
| root = ET.fromstring(data) | |
| out: dict[str, str] = {} | |
| for rel in root.findall("rel:Relationship", NS): | |
| out[rel.get("Id")] = rel.get("Target") | |
| return out | |
| # sheets ---- | |
| def sheets(self) -> list[dict]: | |
| """Return [{name, sheetId, target_path}, ...] in workbook order.""" | |
| out = [] | |
| for s in self._workbook.findall(".//x:sheets/x:sheet", NS): | |
| rid = s.get(f"{{{NS['r']}}}id") | |
| target = self._wb_rels.get(rid, "") | |
| # Targets are relative to xl/, e.g. "worksheets/sheet1.xml". | |
| target_path = "xl/" + target.lstrip("/") | |
| out.append({ | |
| "name": s.get("name", "").strip(), | |
| "sheetId": s.get("sheetId"), | |
| "rid": rid, | |
| "target": target_path, | |
| }) | |
| return out | |
| # one sheet ---- | |
| def read_sheet(self, sheet_target: str) -> tuple[list[dict], list[dict]]: | |
| """Return (rows, image_anchors) for a single sheet. | |
| rows is [{ "row": 1-indexed-int, "cells": {col_letter: str}}, ...] | |
| image_anchors is [{"row": 0-idx, "col": 0-idx, "rId": "rId..."}, ...] | |
| """ | |
| sheet_xml = self.zf.read(sheet_target).decode("utf-8") | |
| root = ET.fromstring(sheet_xml) | |
| rows: list[dict] = [] | |
| for row in root.findall("x:sheetData/x:row", NS): | |
| rnum = int(row.get("r")) | |
| cells: dict[str, str] = {} | |
| for c in row.findall("x:c", NS): | |
| ref = c.get("r", "") | |
| m = re.match(r"^([A-Z]+)(\d+)$", ref) | |
| if not m: | |
| continue | |
| col = m.group(1) | |
| t = c.get("t", "") | |
| v = c.find("x:v", NS) | |
| if v is None or v.text is None: | |
| continue | |
| if t == "s": | |
| cells[col] = self.s(int(v.text)) | |
| else: | |
| cells[col] = v.text | |
| if cells: | |
| rows.append({"row": rnum, "cells": cells}) | |
| # ---- drawing ---- | |
| # sheet target is like "xl/worksheets/sheet1.xml". Its rels live at | |
| # "xl/worksheets/_rels/sheet1.xml.rels". | |
| sheet_rels_path = sheet_target.replace("worksheets/", "worksheets/_rels/") + ".rels" | |
| sheet_rels = self._read_rels(sheet_rels_path) | |
| anchors: list[dict] = [] | |
| for rid, target in sheet_rels.items(): | |
| if not target.endswith(".xml") or "drawings/" not in target: | |
| continue | |
| # Resolve drawing path to absolute zip path. | |
| drawing_path = self._resolve_target("xl/worksheets/", target) | |
| drawing_xml = self.zf.read(drawing_path).decode("utf-8") | |
| drawing_root = ET.fromstring(drawing_xml) | |
| drawing_rels_path = drawing_path.replace("drawings/", "drawings/_rels/") + ".rels" | |
| drawing_rels = self._read_rels(drawing_rels_path) | |
| for anc in drawing_root.iter("{%s}oneCellAnchor" % NS["xdr"]): | |
| fr = anc.find("xdr:from", NS) | |
| if fr is None: | |
| continue | |
| col = int(fr.findtext("xdr:col", "0", NS)) | |
| row = int(fr.findtext("xdr:row", "0", NS)) | |
| blip = anc.find(".//a:blip", NS) | |
| if blip is None: | |
| continue | |
| embed = blip.get(f"{{{NS['r']}}}embed") | |
| image_target = drawing_rels.get(embed, "") | |
| if not image_target: | |
| continue | |
| image_path = self._resolve_target(drawing_path.rsplit("/", 1)[0] + "/", image_target) | |
| anchors.append({ | |
| "row": row, "col": col, "image_path": image_path, | |
| }) | |
| for anc in drawing_root.iter("{%s}twoCellAnchor" % NS["xdr"]): | |
| fr = anc.find("xdr:from", NS) | |
| if fr is None: | |
| continue | |
| col = int(fr.findtext("xdr:col", "0", NS)) | |
| row = int(fr.findtext("xdr:row", "0", NS)) | |
| blip = anc.find(".//a:blip", NS) | |
| if blip is None: | |
| continue | |
| embed = blip.get(f"{{{NS['r']}}}embed") | |
| image_target = drawing_rels.get(embed, "") | |
| if not image_target: | |
| continue | |
| image_path = self._resolve_target(drawing_path.rsplit("/", 1)[0] + "/", image_target) | |
| anchors.append({ | |
| "row": row, "col": col, "image_path": image_path, | |
| }) | |
| return rows, anchors | |
| def _resolve_target(self, base_dir: str, target: str) -> str: | |
| """Resolve a relationship target relative to the file declaring it.""" | |
| # Targets often start with "../"; collapse against base_dir. | |
| parts = (base_dir + target).split("/") | |
| out: list[str] = [] | |
| for p in parts: | |
| if p == "..": | |
| if out: | |
| out.pop() | |
| elif p in ("", "."): | |
| continue | |
| else: | |
| out.append(p) | |
| return "/".join(out) | |
| def extract_image(self, archive_path: str, dst_path: Path) -> None: | |
| dst_path.parent.mkdir(parents=True, exist_ok=True) | |
| with self.zf.open(archive_path) as src, dst_path.open("wb") as dst: | |
| shutil.copyfileobj(src, dst) | |
| # ----------------------- bbox + tg frames ----------------------- | |
| def load_bboxes(video_dir: Path) -> list[dict]: | |
| """Read all *.json next to *.jpg in this dir; return parsed bbox metadata.""" | |
| out = [] | |
| for js in sorted(video_dir.glob("*.json")): | |
| try: | |
| doc = json.loads(js.read_text(encoding="utf-8")) | |
| except Exception: | |
| continue | |
| m = re.search(r"_(\d+)\.json$", js.name) | |
| frame_idx = int(m.group(1)) if m else None | |
| boxes = [] | |
| for s in doc.get("shapes", []): | |
| if s.get("shape_type") != "rectangle": | |
| continue | |
| pts = s.get("points") or [] | |
| if len(pts) < 2: | |
| continue | |
| (x1, y1), (x2, y2) = pts[0], pts[1] | |
| if x1 > x2: x1, x2 = x2, x1 | |
| if y1 > y2: y1, y2 = y2, y1 | |
| boxes.append({ | |
| "label": str(s.get("label", "?")), | |
| "x1": round(x1, 2), "y1": round(y1, 2), | |
| "x2": round(x2, 2), "y2": round(y2, 2), | |
| }) | |
| # find matching jpg | |
| jpg = video_dir / (doc.get("imagePath") or js.with_suffix(".jpg").name) | |
| out.append({ | |
| "frame_idx": frame_idx, | |
| "jpg_path": jpg if jpg.exists() else None, | |
| "image_w": doc.get("imageWidth"), | |
| "image_h": doc.get("imageHeight"), | |
| "boxes": boxes, | |
| }) | |
| out.sort(key=lambda x: x["frame_idx"] if x["frame_idx"] is not None else 0) | |
| return out | |
| def index_tg_frames(tg_root: Path) -> dict[str, Path]: | |
| """Build {video_id -> jpg_path}. Filenames are <video_id>_<answer>.jpg.""" | |
| if not tg_root.exists(): | |
| return {} | |
| items = list(tg_root.rglob("*.jpg")) | |
| return {p.stem: p for p in items} # we'll do prefix-match later via lookup | |
| def resolve_tg_frame(tg_index: dict[str, Path], video_id: str) -> Optional[Path]: | |
| """Find the TG frame strip whose stem starts with '<video_id>_'.""" | |
| needle = video_id + "_" | |
| matches = [p for stem, p in tg_index.items() if stem.startswith(needle)] | |
| if not matches: | |
| return None | |
| return min(matches, key=lambda p: len(p.stem)) | |
| # ----------------------- main ----------------------- | |
| def main() -> None: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--batch-root", required=True, | |
| help="dir containing the xlsx + MultiSports/ + TG_Frames/") | |
| ap.add_argument("--xlsx-name", default="Sport1.xlsx") | |
| ap.add_argument("--videos-subdir", default="MultiSports") | |
| ap.add_argument("--tg-frames-subdir", default="TG_Frames") | |
| ap.add_argument("--out-data", required=True) | |
| ap.add_argument("--out-assets", required=True) | |
| args = ap.parse_args() | |
| batch_root = Path(args.batch_root).resolve() | |
| xlsx_path = batch_root / args.xlsx_name | |
| videos_root = batch_root / args.videos_subdir | |
| tg_root = batch_root / args.tg_frames_subdir | |
| if not xlsx_path.exists(): | |
| print(f"[fatal] xlsx not found: {xlsx_path}") | |
| sys.exit(1) | |
| out_data = Path(args.out_data).resolve() | |
| out_assets = Path(args.out_assets).resolve() | |
| out_data.parent.mkdir(parents=True, exist_ok=True) | |
| out_assets.mkdir(parents=True, exist_ok=True) | |
| tg_index = index_tg_frames(tg_root) | |
| print(f"[tg] indexed {len(tg_index)} TG strips from {tg_root}") | |
| print(f"[xlsx] opening {xlsx_path}") | |
| xlsx = Xlsx(xlsx_path) | |
| sheets = xlsx.sheets() | |
| print(f"[xlsx] {len(sheets)} sheets:") | |
| for s in sheets: | |
| print(f" - {s['name']!r} (sheetId={s['sheetId']}, target={s['target']})") | |
| annotations: list[dict] = [] | |
| skipped: list[dict] = [] | |
| for sh in sheets: | |
| sub_id = SUBCAT_MAP.get(sh["name"]) | |
| if sub_id is None: | |
| print(f"[warn] unknown sheet name {sh['name']!r}, skipping") | |
| continue | |
| rows, anchors = xlsx.read_sheet(sh["target"]) | |
| # Group anchors by row index (0-based) -> list of (col, image_path) | |
| anchors_by_row: dict[int, list[dict]] = defaultdict(list) | |
| for a in anchors: | |
| anchors_by_row[a["row"]].append(a) | |
| # Sort each row by col so smaller col index comes first (matches B/D/F order). | |
| for r in anchors_by_row: | |
| anchors_by_row[r].sort(key=lambda a: a["col"]) | |
| print(f"[sheet:{sub_id}] {len(rows)} rows, {len(anchors)} image anchors") | |
| for r in rows: | |
| if r["row"] == 1: | |
| continue # header | |
| cells = r["cells"] | |
| raw_video_id = (cells.get("A") or "").strip() | |
| if not raw_video_id: | |
| continue | |
| # Skip group separator rows like cell A == "1", "2", "3" etc. | |
| if re.fullmatch(r"\d+", raw_video_id): | |
| continue | |
| # Skip rows where col A is a CJK label (e.g. "沙滩排球" subcategory header). | |
| if not re.search(r"[A-Za-z0-9_]", raw_video_id): | |
| continue | |
| # Normalize the id to find the on-disk folder: | |
| # - Excel sometimes wraps a leading-dash id in full-width parens, | |
| # e.g. "-edKF31R_Tk" -> "(-)edKF31R_Tk" | |
| # - Some labelers leave a stray trailing "=". | |
| video_id = raw_video_id.rstrip("=") | |
| video_id = video_id.replace("\uFF08\u002D\uFF09", "-") # (-) -> - | |
| video_id = video_id.replace("\uFF08\uFF0D\uFF09", "-") # (-) -> - | |
| anno_id = f"sport__{sub_id}__{video_id}" | |
| asset_dir = out_assets / anno_id | |
| asset_dir.mkdir(parents=True, exist_ok=True) | |
| # ---- embedded images from xlsx, indexed by source column ---- | |
| # In the sheet, xdr:row is 0-based, so the row that contains | |
| # 1-indexed row 2 has xdr:row==1. Anchors usually attach to the | |
| # row's TOP edge. | |
| row_anchors = anchors_by_row.get(r["row"] - 1, []) | |
| ref_files: list[dict] = [] | |
| # Group by which "slot" they look like: | |
| # slot determined by xdr:col index (B=1, D=3, F=5 in sheet order). | |
| COL_SLOT = {1: "ref_time", 3: "ref_space", 5: "ref_options"} | |
| saved: dict[str, str] = {} | |
| for i, a in enumerate(row_anchors): | |
| slot = COL_SLOT.get(a["col"]) | |
| if slot and slot not in saved: | |
| dst = asset_dir / f"{slot}.png" | |
| xlsx.extract_image(a["image_path"], dst) | |
| saved[slot] = dst.name | |
| else: | |
| # extra image; preserve under a generic name | |
| dst = asset_dir / f"ref_extra_col{a['col']}_{i}.png" | |
| xlsx.extract_image(a["image_path"], dst) | |
| # ---- bbox frames from MultiSports/<video_id>/ ---- | |
| video_dir = videos_root / video_id | |
| frames_meta: list[dict] = [] | |
| if video_dir.exists(): | |
| bboxes = load_bboxes(video_dir) | |
| for bb in bboxes: | |
| idx = bb["frame_idx"] if bb["frame_idx"] is not None else 0 | |
| dst_jpg = asset_dir / f"frame_{idx}.jpg" | |
| if bb["jpg_path"]: | |
| shutil.copyfile(bb["jpg_path"], dst_jpg) | |
| frames_meta.append({ | |
| "frame_idx": bb["frame_idx"], | |
| "image": dst_jpg.name, | |
| "image_w": bb["image_w"], | |
| "image_h": bb["image_h"], | |
| "boxes": bb["boxes"], | |
| }) | |
| else: | |
| skipped.append({"anno_id": anno_id, "reason": "video dir missing", "video_id": video_id}) | |
| # ---- TG strip ---- | |
| tg_jpg = resolve_tg_frame(tg_index, video_id) | |
| tg_name = None | |
| if tg_jpg is not None: | |
| dst = asset_dir / "tg_frames.jpg" | |
| shutil.copyfile(tg_jpg, dst) | |
| tg_name = dst.name | |
| # ---- options blob -> dict ---- | |
| options_blob = (cells.get("G") or "") | |
| qa_options: dict[str, str] = {} | |
| for line in re.split(r"[\r\n]+", options_blob): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| m = re.match(r"^([A-EA-E])[\s\.\:、]?\s*(.+)$", line) | |
| if m: | |
| key = m.group(1) | |
| if "\uFF21" <= key <= "\uFF25": | |
| key = chr(ord(key) - 0xFF21 + ord("A")) | |
| qa_options[key] = m.group(2).strip() | |
| rec = { | |
| "anno_id": anno_id, | |
| "category": "sports", | |
| "subcategory": sub_id, | |
| "subcategory_zh": sh["name"], | |
| "video_id": video_id, | |
| "tg_question": cells.get("B") or "", | |
| "tg_answer": cells.get("C") or "", | |
| "sg_question": cells.get("D") or "", | |
| "sg_answer": cells.get("E") or "", # batch2 actually fills this | |
| "qa_question": cells.get("F") or "", | |
| "qa_options": qa_options, | |
| "qa_answer": cells.get("H") or "", | |
| "extra_col_i": cells.get("I") or "", # diagnostic | |
| "ref_time": saved.get("ref_time"), | |
| "ref_space": saved.get("ref_space"), | |
| "ref_options": saved.get("ref_options"), | |
| "tg_frames": tg_name, | |
| "frames": frames_meta, | |
| "xlsx_row": r["row"], | |
| "xlsx_source": "Sport1.xlsx", | |
| } | |
| annotations.append(rec) | |
| xlsx.close() | |
| # ---- write jsonl, ASCII-safe ---- | |
| out_data.write_text( | |
| "\n".join(json.dumps(a, ensure_ascii=True) for a in annotations) + "\n", | |
| encoding="utf-8", | |
| ) | |
| # ---- stats ---- | |
| print() | |
| print(f"TOTAL annotations: {len(annotations)}") | |
| print(f" skipped (video dir missing): {len(skipped)}") | |
| by_sub: dict[str, int] = defaultdict(int) | |
| for a in annotations: | |
| by_sub[a["subcategory"]] += 1 | |
| for k, n in sorted(by_sub.items(), key=lambda kv: -kv[1]): | |
| print(f" {k}: {n}") | |
| print() | |
| def count_field(field: str) -> int: | |
| return sum(1 for a in annotations if a.get(field)) | |
| for f in ("ref_time", "ref_space", "ref_options", "tg_frames", "sg_answer"): | |
| print(f" {f}: {count_field(f)} / {len(annotations)}") | |
| print() | |
| print(f"wrote {out_data}") | |
| print(f"wrote {out_assets}/ ({len(annotations)} anno dirs)") | |
| if skipped: | |
| print() | |
| print("=== skipped (first 10) ===") | |
| for s in skipped[:10]: | |
| print(f" {s['anno_id']}: {s['reason']}") | |
| if __name__ == "__main__": | |
| main() | |