from __future__ import annotations """Simple helpers to curate a tiny VLAC evaluation dataset. The goal is to grab a handful of demonstrations from LIBERO-style HDF5 files, export a few key frames as PNGs, and write a lightweight JSON manifest that the qualitative evaluation scripts can consume. Each record stores the relative paths of the saved images plus an optional reference trajectory. Two dataset layouts are supported: ``task_progress`` One entry per demo with an initial frame followed by a few evenly spaced progress frames. The JSON is a list of dicts with the following fields: { "suite": str, "task": str, "demo_id": str, "frames": [ {"path": str, "progress": float}, # includes the initial frame ... ], "reference": [str, ...] # optional, relative image paths } ``task_done`` One entry per demo containing a "negative" frame (pre-success), the success frame, and an optional reference trajectory. { "suite": str, "task": str, "demo_id": str, "negative": str, "positive": str, "reference": [str, ...] } The script intentionally keeps the logic compact so it is easy to tweak when probing the VLAC service on a toy dataset. """ import json import os from pathlib import Path from typing import Iterable, List, Optional, Sequence import random import h5py import numpy as np from PIL import Image # --------------------------------------------------------------------------- # Configuration (edit to match your local paths) # --------------------------------------------------------------------------- INPUT_FOLDERS: Sequence[str] = ( # "/home/zechen/Data/Robo/LIBERO_Regen/libero_object_regen", "/home/zechen/Data/Robo/LIBERO_Regen/libero_10_regen", ) # MODE: str = "task_done" # "task_done" or "task_progress" # OUTPUT_DIR: str = "toy_vlac_done_dataset_libero10" # DEMO_LIMIT_PER_TASK: int = 20 # how many demos to export from each task file # MAX_REFERENCE_FRAMES: int = 8 MODE: str = "task_progress" # "task_done" or "task_progress" OUTPUT_DIR: str = "toy_vlac_progress_dataset_libero10" DEMO_LIMIT_PER_TASK: int = 10 # how many demos to export from each task file PROGRESS_FRAMES_PER_DEMO: int = 7 # not counting the initial frame MAX_REFERENCE_FRAMES: int = 8 # --------------------------------------------------------------------------- # Utility helpers # --------------------------------------------------------------------------- def list_hdf5_files(folders: Iterable[str]) -> List[Path]: files: List[Path] = [] for folder in folders: path = Path(folder) if not path.is_dir(): print(f"[skip] folder not found: {path}") continue files.extend(sorted(path.glob("*.hdf5"))) return files def load_demo_arrays(demo_group: h5py.Group, demo_name: str): demo = demo_group[demo_name] frames = demo["obs/agentview_rgb"] dones = demo.get("dones") dones_array = np.asarray(dones[:]) if dones is not None else None return frames, dones_array def first_success_index(dones: Optional[np.ndarray], total_frames: int) -> int: if dones is None: return total_frames - 1 indices = np.where(dones == 1)[0] return int(indices[0]) if indices.size > 0 else total_frames - 1 def save_frame(array: np.ndarray, path: Path) -> str: path.parent.mkdir(parents=True, exist_ok=True) Image.fromarray(array).transpose(Image.FLIP_TOP_BOTTOM).save(path) return str(path) def select_reference_name(demo_names: Sequence[str], current_idx: int) -> Optional[str]: if len(demo_names) <= 1: return None return demo_names[(current_idx + 1) % len(demo_names)] def export_reference_frames( demo_group: h5py.Group, reference_demo: Optional[str], images_root: Path, demo_folder: Path, ) -> List[str]: if reference_demo is None: return [] frames, dones = load_demo_arrays(demo_group, reference_demo) total = frames.shape[0] if total < 2: return [] success_frame = first_success_index(dones, total) if success_frame <= 0: return [] available = success_frame + 1 # inclusive of the success frame count = min(MAX_REFERENCE_FRAMES, available) if count < 2: count = 2 indices = np.linspace(0, success_frame, num=count) indices = sorted({int(round(idx)) for idx in indices}) if indices[0] != 0: indices.insert(0, 0) if indices[-1] != success_frame: indices.append(success_frame) indices = indices[:MAX_REFERENCE_FRAMES] rel_paths: List[str] = [] for ref_idx, frame_index in enumerate(indices): rel_path = demo_folder / f"reference_{ref_idx:02d}.png" save_frame(frames[frame_index], images_root / rel_path) rel_paths.append(str(rel_path)) return rel_paths # --------------------------------------------------------------------------- # Dataset creation # --------------------------------------------------------------------------- def build_progress_entries(hdf5_path: Path, images_root: Path) -> List[dict]: entries: List[dict] = [] suite = hdf5_path.parent.name task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip() print(f"[progress] {suite} :: {task}") with h5py.File(hdf5_path, "r") as handle: data_group = handle.get("data") if data_group is None: print(" - skipping (no data group)") return entries demo_names = sorted(data_group.keys()) for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]): frames, dones = load_demo_arrays(data_group, demo_name) total = frames.shape[0] success_frame = first_success_index(dones, total) if success_frame < 1: print(f" - skipping demo {demo_name} (success at first frame)") continue demo_folder = Path(f"{hdf5_path.stem}_{demo_name}") ref_paths = export_reference_frames(data_group, select_reference_name(demo_names, demo_idx), images_root, demo_folder) # Save the initial frame frame_records: List[dict] = [] initial_rel = demo_folder / "initial.png" save_frame(frames[0], images_root / initial_rel) frame_records.append({"path": str(initial_rel), "progress": 0.0}) # Evenly spaced progress frames between t=1 and success sample_count = min(PROGRESS_FRAMES_PER_DEMO, success_frame) indices = np.linspace(1, success_frame, num=sample_count, dtype=int) for step_idx, frame_index in enumerate(indices): rel_path = demo_folder / f"frame_{step_idx:02d}.png" save_frame(frames[frame_index], images_root / rel_path) progress = float(frame_index / success_frame) frame_records.append({"path": str(rel_path), "progress": round(progress, 3)}) entries.append( { "suite": suite, "task": task, "demo_id": str(demo_folder), "frames": frame_records, "reference": ref_paths, } ) print(f" - exported demo {demo_name} ({len(frame_records)} frames, {len(ref_paths)} ref)") return entries def build_done_entries(hdf5_path: Path, images_root: Path) -> List[dict]: entries: List[dict] = [] suite = hdf5_path.parent.name task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip() print(f"[done] {suite} :: {task}") with h5py.File(hdf5_path, "r") as handle: data_group = handle.get("data") if data_group is None: print(" - skipping (no data group)") return entries demo_names = sorted(data_group.keys()) for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]): frames, dones = load_demo_arrays(data_group, demo_name) total = frames.shape[0] success_frame = first_success_index(dones, total) if success_frame <= 0: print(f" - skipping demo {demo_name} (missing success)") continue # pick a negative frame comfortably before success and with a valid predecessor lower = max(1, success_frame // 4) upper = max(1, success_frame - success_frame // 4) negative_index = random.randint(lower, upper) negative_prev_index = max(0, negative_index - 1) positive_prev_index = max(0, success_frame - 1) demo_folder = Path(f"{hdf5_path.stem}_{demo_name}") ref_paths = export_reference_frames( data_group, select_reference_name(demo_names, demo_idx), images_root, demo_folder, ) initial_rel = demo_folder / "initial.png" save_frame(frames[0], images_root / initial_rel) neg_prev_rel = demo_folder / f"neg_prev_{negative_prev_index:04d}.png" neg_curr_rel = demo_folder / f"neg_curr_{negative_index:04d}.png" pos_prev_rel = demo_folder / f"pos_prev_{positive_prev_index:04d}.png" pos_curr_rel = demo_folder / f"pos_curr_{success_frame:04d}.png" save_frame(frames[negative_prev_index], images_root / neg_prev_rel) save_frame(frames[negative_index], images_root / neg_curr_rel) save_frame(frames[positive_prev_index], images_root / pos_prev_rel) save_frame(frames[success_frame], images_root / pos_curr_rel) samples = [ { "label": 0, "initial": str(initial_rel), "prev": str(neg_prev_rel), "curr": str(neg_curr_rel), }, { "label": 1, "initial": str(initial_rel), "prev": str(pos_prev_rel), "curr": str(pos_curr_rel), }, ] entries.append( { "suite": suite, "task": task, "demo_id": str(demo_folder), "samples": samples, "reference": ref_paths, } ) print( f" - exported demo {demo_name} (samples: {len(samples)}, ref frames: {len(ref_paths)})" ) return entries # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: files = list_hdf5_files(INPUT_FOLDERS) if not files: print("No HDF5 files found. Update INPUT_FOLDERS and try again.") return output_dir = Path(OUTPUT_DIR) images_root = output_dir / "images" images_root.mkdir(parents=True, exist_ok=True) if MODE == "task_progress": all_entries: List[dict] = [] for path in files: all_entries.extend(build_progress_entries(path, images_root)) json_path = output_dir / "dataset_frame_progress.json" elif MODE == "task_done": all_entries = [] for path in files: all_entries.extend(build_done_entries(path, images_root)) json_path = output_dir / "dataset_task_done.json" else: raise ValueError(f"Unsupported MODE: {MODE}") with json_path.open("w", encoding="utf-8") as f: json.dump(all_entries, f, indent=2) print(f"\nSaved {len(all_entries)} entries to {json_path}") print(f"Image root: {images_root}") if __name__ == "__main__": main()