| from __future__ import annotations |
|
|
| """Simple helpers to curate a tiny VLAC evaluation dataset. |
| |
| The goal is to grab a handful of demonstrations from LIBERO-style HDF5 files, |
| export a few key frames as PNGs, and write a lightweight JSON manifest that the |
| qualitative evaluation scripts can consume. Each record stores the relative |
| paths of the saved images plus an optional reference trajectory. |
| |
| Two dataset layouts are supported: |
| |
| ``task_progress`` |
| One entry per demo with an initial frame followed by a few evenly spaced |
| progress frames. The JSON is a list of dicts with the following fields: |
| |
| { |
| "suite": str, |
| "task": str, |
| "demo_id": str, |
| "frames": [ |
| {"path": str, "progress": float}, # includes the initial frame |
| ... |
| ], |
| "reference": [str, ...] # optional, relative image paths |
| } |
| |
| ``task_done`` |
| One entry per demo containing a "negative" frame (pre-success), the success |
| frame, and an optional reference trajectory. |
| |
| { |
| "suite": str, |
| "task": str, |
| "demo_id": str, |
| "negative": str, |
| "positive": str, |
| "reference": [str, ...] |
| } |
| |
| The script intentionally keeps the logic compact so it is easy to tweak when |
| probing the VLAC service on a toy dataset. |
| """ |
|
|
| import json |
| import os |
| from pathlib import Path |
| from typing import Iterable, List, Optional, Sequence |
|
|
| import random |
| import h5py |
| import numpy as np |
| from PIL import Image |
|
|
| |
| |
| |
|
|
| INPUT_FOLDERS: Sequence[str] = ( |
| |
| "/home/zechen/Data/Robo/LIBERO_Regen/libero_10_regen", |
| ) |
|
|
| |
| |
| |
| |
|
|
| MODE: str = "task_progress" |
| OUTPUT_DIR: str = "toy_vlac_progress_dataset_libero10" |
| DEMO_LIMIT_PER_TASK: int = 10 |
| PROGRESS_FRAMES_PER_DEMO: int = 7 |
| MAX_REFERENCE_FRAMES: int = 8 |
|
|
| |
| |
| |
|
|
|
|
| def list_hdf5_files(folders: Iterable[str]) -> List[Path]: |
| files: List[Path] = [] |
| for folder in folders: |
| path = Path(folder) |
| if not path.is_dir(): |
| print(f"[skip] folder not found: {path}") |
| continue |
| files.extend(sorted(path.glob("*.hdf5"))) |
| return files |
|
|
|
|
| def load_demo_arrays(demo_group: h5py.Group, demo_name: str): |
| demo = demo_group[demo_name] |
| frames = demo["obs/agentview_rgb"] |
| dones = demo.get("dones") |
| dones_array = np.asarray(dones[:]) if dones is not None else None |
| return frames, dones_array |
|
|
|
|
| def first_success_index(dones: Optional[np.ndarray], total_frames: int) -> int: |
| if dones is None: |
| return total_frames - 1 |
| indices = np.where(dones == 1)[0] |
| return int(indices[0]) if indices.size > 0 else total_frames - 1 |
|
|
|
|
| def save_frame(array: np.ndarray, path: Path) -> str: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| Image.fromarray(array).transpose(Image.FLIP_TOP_BOTTOM).save(path) |
| return str(path) |
|
|
|
|
| def select_reference_name(demo_names: Sequence[str], current_idx: int) -> Optional[str]: |
| if len(demo_names) <= 1: |
| return None |
| return demo_names[(current_idx + 1) % len(demo_names)] |
|
|
|
|
| def export_reference_frames( |
| demo_group: h5py.Group, |
| reference_demo: Optional[str], |
| images_root: Path, |
| demo_folder: Path, |
| ) -> List[str]: |
| if reference_demo is None: |
| return [] |
|
|
| frames, dones = load_demo_arrays(demo_group, reference_demo) |
| total = frames.shape[0] |
| if total < 2: |
| return [] |
|
|
| success_frame = first_success_index(dones, total) |
| if success_frame <= 0: |
| return [] |
|
|
| available = success_frame + 1 |
| count = min(MAX_REFERENCE_FRAMES, available) |
| if count < 2: |
| count = 2 |
|
|
| indices = np.linspace(0, success_frame, num=count) |
| indices = sorted({int(round(idx)) for idx in indices}) |
|
|
| if indices[0] != 0: |
| indices.insert(0, 0) |
| if indices[-1] != success_frame: |
| indices.append(success_frame) |
|
|
| indices = indices[:MAX_REFERENCE_FRAMES] |
|
|
| rel_paths: List[str] = [] |
| for ref_idx, frame_index in enumerate(indices): |
| rel_path = demo_folder / f"reference_{ref_idx:02d}.png" |
| save_frame(frames[frame_index], images_root / rel_path) |
| rel_paths.append(str(rel_path)) |
| return rel_paths |
|
|
|
|
| |
| |
| |
|
|
|
|
| def build_progress_entries(hdf5_path: Path, images_root: Path) -> List[dict]: |
| entries: List[dict] = [] |
| suite = hdf5_path.parent.name |
| task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip() |
| print(f"[progress] {suite} :: {task}") |
|
|
| with h5py.File(hdf5_path, "r") as handle: |
| data_group = handle.get("data") |
| if data_group is None: |
| print(" - skipping (no data group)") |
| return entries |
|
|
| demo_names = sorted(data_group.keys()) |
| for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]): |
| frames, dones = load_demo_arrays(data_group, demo_name) |
| total = frames.shape[0] |
| success_frame = first_success_index(dones, total) |
| if success_frame < 1: |
| print(f" - skipping demo {demo_name} (success at first frame)") |
| continue |
|
|
| demo_folder = Path(f"{hdf5_path.stem}_{demo_name}") |
| ref_paths = export_reference_frames(data_group, select_reference_name(demo_names, demo_idx), images_root, demo_folder) |
|
|
| |
| frame_records: List[dict] = [] |
| initial_rel = demo_folder / "initial.png" |
| save_frame(frames[0], images_root / initial_rel) |
| frame_records.append({"path": str(initial_rel), "progress": 0.0}) |
|
|
| |
| sample_count = min(PROGRESS_FRAMES_PER_DEMO, success_frame) |
| indices = np.linspace(1, success_frame, num=sample_count, dtype=int) |
| for step_idx, frame_index in enumerate(indices): |
| rel_path = demo_folder / f"frame_{step_idx:02d}.png" |
| save_frame(frames[frame_index], images_root / rel_path) |
| progress = float(frame_index / success_frame) |
| frame_records.append({"path": str(rel_path), "progress": round(progress, 3)}) |
|
|
| entries.append( |
| { |
| "suite": suite, |
| "task": task, |
| "demo_id": str(demo_folder), |
| "frames": frame_records, |
| "reference": ref_paths, |
| } |
| ) |
| print(f" - exported demo {demo_name} ({len(frame_records)} frames, {len(ref_paths)} ref)") |
| return entries |
|
|
|
|
| def build_done_entries(hdf5_path: Path, images_root: Path) -> List[dict]: |
| entries: List[dict] = [] |
| suite = hdf5_path.parent.name |
| task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip() |
| print(f"[done] {suite} :: {task}") |
|
|
| with h5py.File(hdf5_path, "r") as handle: |
| data_group = handle.get("data") |
| if data_group is None: |
| print(" - skipping (no data group)") |
| return entries |
|
|
| demo_names = sorted(data_group.keys()) |
| for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]): |
| frames, dones = load_demo_arrays(data_group, demo_name) |
| total = frames.shape[0] |
| success_frame = first_success_index(dones, total) |
| if success_frame <= 0: |
| print(f" - skipping demo {demo_name} (missing success)") |
| continue |
|
|
| |
| lower = max(1, success_frame // 4) |
| upper = max(1, success_frame - success_frame // 4) |
| negative_index = random.randint(lower, upper) |
| |
| negative_prev_index = max(0, negative_index - 1) |
| positive_prev_index = max(0, success_frame - 1) |
|
|
| demo_folder = Path(f"{hdf5_path.stem}_{demo_name}") |
| ref_paths = export_reference_frames( |
| data_group, |
| select_reference_name(demo_names, demo_idx), |
| images_root, |
| demo_folder, |
| ) |
|
|
| initial_rel = demo_folder / "initial.png" |
| save_frame(frames[0], images_root / initial_rel) |
|
|
| neg_prev_rel = demo_folder / f"neg_prev_{negative_prev_index:04d}.png" |
| neg_curr_rel = demo_folder / f"neg_curr_{negative_index:04d}.png" |
| pos_prev_rel = demo_folder / f"pos_prev_{positive_prev_index:04d}.png" |
| pos_curr_rel = demo_folder / f"pos_curr_{success_frame:04d}.png" |
|
|
| save_frame(frames[negative_prev_index], images_root / neg_prev_rel) |
| save_frame(frames[negative_index], images_root / neg_curr_rel) |
| save_frame(frames[positive_prev_index], images_root / pos_prev_rel) |
| save_frame(frames[success_frame], images_root / pos_curr_rel) |
|
|
| samples = [ |
| { |
| "label": 0, |
| "initial": str(initial_rel), |
| "prev": str(neg_prev_rel), |
| "curr": str(neg_curr_rel), |
| }, |
| { |
| "label": 1, |
| "initial": str(initial_rel), |
| "prev": str(pos_prev_rel), |
| "curr": str(pos_curr_rel), |
| }, |
| ] |
|
|
| entries.append( |
| { |
| "suite": suite, |
| "task": task, |
| "demo_id": str(demo_folder), |
| "samples": samples, |
| "reference": ref_paths, |
| } |
| ) |
| print( |
| f" - exported demo {demo_name} (samples: {len(samples)}, ref frames: {len(ref_paths)})" |
| ) |
| return entries |
|
|
|
|
| |
| |
| |
|
|
|
|
| def main() -> None: |
| files = list_hdf5_files(INPUT_FOLDERS) |
| if not files: |
| print("No HDF5 files found. Update INPUT_FOLDERS and try again.") |
| return |
|
|
| output_dir = Path(OUTPUT_DIR) |
| images_root = output_dir / "images" |
| images_root.mkdir(parents=True, exist_ok=True) |
|
|
| if MODE == "task_progress": |
| all_entries: List[dict] = [] |
| for path in files: |
| all_entries.extend(build_progress_entries(path, images_root)) |
| json_path = output_dir / "dataset_frame_progress.json" |
| elif MODE == "task_done": |
| all_entries = [] |
| for path in files: |
| all_entries.extend(build_done_entries(path, images_root)) |
| json_path = output_dir / "dataset_task_done.json" |
| else: |
| raise ValueError(f"Unsupported MODE: {MODE}") |
|
|
| with json_path.open("w", encoding="utf-8") as f: |
| json.dump(all_entries, f, indent=2) |
|
|
| print(f"\nSaved {len(all_entries)} entries to {json_path}") |
| print(f"Image root: {images_root}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|