| """ |
| Prepare an expert demonstration for each task in LIBERO-10. |
| We use the prior: the demonstration that has the shortest frames sequence is regarded as the expert demonstration. |
| We select this expert demonstration for each task and save the full image sequences as PNGs in a folder. |
| |
| Usage: |
| python prepare_expert_demo.py --libero_task_suite libero_10 --libero_raw_data_dir /path/to/libero_10 --output_dir /path/to/output |
| |
| Example: |
| python prepare_expert_demo.py --libero_task_suite libero_10 --libero_raw_data_dir /path/to/libero_10 --output_dir /path/to/output |
| |
| """ |
|
|
| import argparse |
| import json |
| import shutil |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| import h5py |
| import numpy as np |
| from PIL import Image |
| from tqdm import tqdm |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Select and export expert LIBERO demonstrations") |
| parser.add_argument( |
| "--libero_task_suite", |
| required=True, |
| help="Name of the LIBERO task suite (used for bookkeeping in the output)", |
| ) |
| parser.add_argument( |
| "--libero_raw_data_dir", |
| required=True, |
| type=Path, |
| help="Directory containing the raw LIBERO HDF5 demo files", |
| ) |
| parser.add_argument( |
| "--output_dir", |
| required=True, |
| type=Path, |
| help="Directory where expert demo frames will be exported", |
| ) |
| parser.add_argument( |
| "--overwrite", |
| action="store_true", |
| help="Overwrite per-task export folders if they already exist", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def list_hdf5_files(root: Path) -> List[Path]: |
| if not root.is_dir(): |
| raise FileNotFoundError(f"LIBERO raw data directory not found: {root}") |
| return sorted(p for p in root.glob("*.hdf5") if p.is_file()) |
|
|
|
|
| def has_agentview_frames(demo_group: h5py.Group) -> bool: |
| obs_group = demo_group.get("obs") |
| return obs_group is not None and "agentview_rgb" in obs_group |
|
|
|
|
| def demo_statistics(demo_group: h5py.Group) -> Tuple[int, bool, Optional[int]]: |
| frames_ds = demo_group["obs/agentview_rgb"] |
| total_frames = int(frames_ds.shape[0]) |
| dones_ds = demo_group.get("dones") |
| if dones_ds is None: |
| return total_frames, True, None |
|
|
| dones = np.asarray(dones_ds[:]) |
| success_indices = np.where(dones == 1)[0] |
| if success_indices.size == 0: |
| return total_frames, False, None |
| return total_frames, True, int(success_indices[0]) |
|
|
|
|
| def select_expert_demo(data_group: h5py.Group) -> Optional[Tuple[str, Dict[str, int]]]: |
| best_name: Optional[str] = None |
| best_total: Optional[int] = None |
| best_success_index: Optional[int] = None |
|
|
| for demo_name in sorted(data_group.keys()): |
| demo_group = data_group[demo_name] |
| if not has_agentview_frames(demo_group): |
| continue |
|
|
| total_frames, is_successful, success_index = demo_statistics(demo_group) |
| if not is_successful or total_frames == 0: |
| continue |
|
|
| candidate_success = success_index if success_index is not None else total_frames - 1 |
|
|
| if best_name is None: |
| best_name = demo_name |
| best_total = total_frames |
| best_success_index = candidate_success |
| continue |
|
|
| assert best_total is not None |
| assert best_success_index is not None |
|
|
| if total_frames < best_total or ( |
| total_frames == best_total and candidate_success < best_success_index |
| ): |
| best_name = demo_name |
| best_total = total_frames |
| best_success_index = candidate_success |
|
|
| if best_name is None or best_total is None or best_success_index is None: |
| return None |
|
|
| return best_name, { |
| "frame_count": best_total, |
| "success_index": best_success_index, |
| } |
|
|
|
|
| def pretty_task_name(hdf5_path: Path) -> str: |
| return hdf5_path.stem.replace("_", " ").replace("demo", "").strip() |
|
|
|
|
| def save_frame(array: np.ndarray, file_path: Path) -> None: |
| file_path.parent.mkdir(parents=True, exist_ok=True) |
| Image.fromarray(array).transpose(Image.FLIP_TOP_BOTTOM).save(file_path) |
|
|
|
|
| def export_demo_frames( |
| demo_group: h5py.Group, output_dir: Path, output_root: Path |
| ) -> List[str]: |
| frames_ds = demo_group["obs/agentview_rgb"] |
| relative_paths: List[str] = [] |
|
|
| for frame_idx in range(frames_ds.shape[0]): |
| array = np.asarray(frames_ds[frame_idx]) |
| frame_path = output_dir / f"frame_{frame_idx:04d}.png" |
| save_frame(array, frame_path) |
| relative_paths.append(str(frame_path.relative_to(output_root))) |
|
|
| return relative_paths |
|
|
|
|
| def ensure_clean_directory(path: Path, overwrite: bool) -> bool: |
| if path.exists(): |
| if not overwrite: |
| return False |
| shutil.rmtree(path) |
| path.mkdir(parents=True, exist_ok=True) |
| return True |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| raw_dir = args.libero_raw_data_dir.expanduser() |
| output_root = args.output_dir.expanduser() |
| suite_dir = output_root / args.libero_task_suite |
| suite_dir.mkdir(parents=True, exist_ok=True) |
|
|
| hdf5_files = list_hdf5_files(raw_dir) |
| if not hdf5_files: |
| print("No HDF5 files found. Check --libero_raw_data_dir.") |
| return |
|
|
| manifest: List[Dict[str, object]] = [] |
|
|
| for hdf5_path in tqdm(hdf5_files, desc="Tasks"): |
| with h5py.File(hdf5_path, "r") as handle: |
| data_group = handle.get("data") |
| if data_group is None: |
| print(f"[skip] No data group in {hdf5_path.name}") |
| continue |
|
|
| selection = select_expert_demo(data_group) |
| if selection is None: |
| print(f"[skip] No successful demos in {hdf5_path.name}") |
| continue |
|
|
| demo_name, stats = selection |
| task_dir = suite_dir / hdf5_path.stem |
|
|
| if not ensure_clean_directory(task_dir, args.overwrite): |
| print(f"[skip] {task_dir} exists. Use --overwrite to regenerate.") |
| continue |
|
|
| demo_group = data_group[demo_name] |
| frame_paths = export_demo_frames(demo_group, task_dir, output_root) |
|
|
| task_metadata = { |
| "suite": args.libero_task_suite, |
| "task_file": hdf5_path.name, |
| "task_name": pretty_task_name(hdf5_path), |
| "demo_name": demo_name, |
| "frame_count": len(frame_paths), |
| "success_index": stats["success_index"], |
| "frame_paths": frame_paths, |
| } |
|
|
| manifest.append(task_metadata) |
|
|
| metadata_path = task_dir / "metadata.json" |
| with metadata_path.open("w", encoding="utf-8") as fout: |
| json.dump(task_metadata, fout, indent=2) |
|
|
| if not manifest: |
| print("No expert demonstrations were exported.") |
| return |
|
|
| manifest_path = suite_dir / "expert_manifest.json" |
| with manifest_path.open("w", encoding="utf-8") as fout: |
| json.dump(manifest, fout, indent=2) |
|
|
| print(f"\nSaved {len(manifest)} expert demos to {suite_dir}") |
| print(f"Manifest: {manifest_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|