TTI / Dev /testing /prepare_vlac_test_data.py
JosephBai's picture
Upload folder using huggingface_hub
857c2e9 verified
from __future__ import annotations
"""Simple helpers to curate a tiny VLAC evaluation dataset.
The goal is to grab a handful of demonstrations from LIBERO-style HDF5 files,
export a few key frames as PNGs, and write a lightweight JSON manifest that the
qualitative evaluation scripts can consume. Each record stores the relative
paths of the saved images plus an optional reference trajectory.
Two dataset layouts are supported:
``task_progress``
One entry per demo with an initial frame followed by a few evenly spaced
progress frames. The JSON is a list of dicts with the following fields:
{
"suite": str,
"task": str,
"demo_id": str,
"frames": [
{"path": str, "progress": float}, # includes the initial frame
...
],
"reference": [str, ...] # optional, relative image paths
}
``task_done``
One entry per demo containing a "negative" frame (pre-success), the success
frame, and an optional reference trajectory.
{
"suite": str,
"task": str,
"demo_id": str,
"negative": str,
"positive": str,
"reference": [str, ...]
}
The script intentionally keeps the logic compact so it is easy to tweak when
probing the VLAC service on a toy dataset.
"""
import json
import os
from pathlib import Path
from typing import Iterable, List, Optional, Sequence
import random
import h5py
import numpy as np
from PIL import Image
# ---------------------------------------------------------------------------
# Configuration (edit to match your local paths)
# ---------------------------------------------------------------------------
INPUT_FOLDERS: Sequence[str] = (
# "/home/zechen/Data/Robo/LIBERO_Regen/libero_object_regen",
"/home/zechen/Data/Robo/LIBERO_Regen/libero_10_regen",
)
# MODE: str = "task_done" # "task_done" or "task_progress"
# OUTPUT_DIR: str = "toy_vlac_done_dataset_libero10"
# DEMO_LIMIT_PER_TASK: int = 20 # how many demos to export from each task file
# MAX_REFERENCE_FRAMES: int = 8
MODE: str = "task_progress" # "task_done" or "task_progress"
OUTPUT_DIR: str = "toy_vlac_progress_dataset_libero10"
DEMO_LIMIT_PER_TASK: int = 10 # how many demos to export from each task file
PROGRESS_FRAMES_PER_DEMO: int = 7 # not counting the initial frame
MAX_REFERENCE_FRAMES: int = 8
# ---------------------------------------------------------------------------
# Utility helpers
# ---------------------------------------------------------------------------
def list_hdf5_files(folders: Iterable[str]) -> List[Path]:
files: List[Path] = []
for folder in folders:
path = Path(folder)
if not path.is_dir():
print(f"[skip] folder not found: {path}")
continue
files.extend(sorted(path.glob("*.hdf5")))
return files
def load_demo_arrays(demo_group: h5py.Group, demo_name: str):
demo = demo_group[demo_name]
frames = demo["obs/agentview_rgb"]
dones = demo.get("dones")
dones_array = np.asarray(dones[:]) if dones is not None else None
return frames, dones_array
def first_success_index(dones: Optional[np.ndarray], total_frames: int) -> int:
if dones is None:
return total_frames - 1
indices = np.where(dones == 1)[0]
return int(indices[0]) if indices.size > 0 else total_frames - 1
def save_frame(array: np.ndarray, path: Path) -> str:
path.parent.mkdir(parents=True, exist_ok=True)
Image.fromarray(array).transpose(Image.FLIP_TOP_BOTTOM).save(path)
return str(path)
def select_reference_name(demo_names: Sequence[str], current_idx: int) -> Optional[str]:
if len(demo_names) <= 1:
return None
return demo_names[(current_idx + 1) % len(demo_names)]
def export_reference_frames(
demo_group: h5py.Group,
reference_demo: Optional[str],
images_root: Path,
demo_folder: Path,
) -> List[str]:
if reference_demo is None:
return []
frames, dones = load_demo_arrays(demo_group, reference_demo)
total = frames.shape[0]
if total < 2:
return []
success_frame = first_success_index(dones, total)
if success_frame <= 0:
return []
available = success_frame + 1 # inclusive of the success frame
count = min(MAX_REFERENCE_FRAMES, available)
if count < 2:
count = 2
indices = np.linspace(0, success_frame, num=count)
indices = sorted({int(round(idx)) for idx in indices})
if indices[0] != 0:
indices.insert(0, 0)
if indices[-1] != success_frame:
indices.append(success_frame)
indices = indices[:MAX_REFERENCE_FRAMES]
rel_paths: List[str] = []
for ref_idx, frame_index in enumerate(indices):
rel_path = demo_folder / f"reference_{ref_idx:02d}.png"
save_frame(frames[frame_index], images_root / rel_path)
rel_paths.append(str(rel_path))
return rel_paths
# ---------------------------------------------------------------------------
# Dataset creation
# ---------------------------------------------------------------------------
def build_progress_entries(hdf5_path: Path, images_root: Path) -> List[dict]:
entries: List[dict] = []
suite = hdf5_path.parent.name
task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip()
print(f"[progress] {suite} :: {task}")
with h5py.File(hdf5_path, "r") as handle:
data_group = handle.get("data")
if data_group is None:
print(" - skipping (no data group)")
return entries
demo_names = sorted(data_group.keys())
for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]):
frames, dones = load_demo_arrays(data_group, demo_name)
total = frames.shape[0]
success_frame = first_success_index(dones, total)
if success_frame < 1:
print(f" - skipping demo {demo_name} (success at first frame)")
continue
demo_folder = Path(f"{hdf5_path.stem}_{demo_name}")
ref_paths = export_reference_frames(data_group, select_reference_name(demo_names, demo_idx), images_root, demo_folder)
# Save the initial frame
frame_records: List[dict] = []
initial_rel = demo_folder / "initial.png"
save_frame(frames[0], images_root / initial_rel)
frame_records.append({"path": str(initial_rel), "progress": 0.0})
# Evenly spaced progress frames between t=1 and success
sample_count = min(PROGRESS_FRAMES_PER_DEMO, success_frame)
indices = np.linspace(1, success_frame, num=sample_count, dtype=int)
for step_idx, frame_index in enumerate(indices):
rel_path = demo_folder / f"frame_{step_idx:02d}.png"
save_frame(frames[frame_index], images_root / rel_path)
progress = float(frame_index / success_frame)
frame_records.append({"path": str(rel_path), "progress": round(progress, 3)})
entries.append(
{
"suite": suite,
"task": task,
"demo_id": str(demo_folder),
"frames": frame_records,
"reference": ref_paths,
}
)
print(f" - exported demo {demo_name} ({len(frame_records)} frames, {len(ref_paths)} ref)")
return entries
def build_done_entries(hdf5_path: Path, images_root: Path) -> List[dict]:
entries: List[dict] = []
suite = hdf5_path.parent.name
task = hdf5_path.stem.replace("_", " ").replace("demo", "").strip()
print(f"[done] {suite} :: {task}")
with h5py.File(hdf5_path, "r") as handle:
data_group = handle.get("data")
if data_group is None:
print(" - skipping (no data group)")
return entries
demo_names = sorted(data_group.keys())
for demo_idx, demo_name in enumerate(demo_names[:DEMO_LIMIT_PER_TASK]):
frames, dones = load_demo_arrays(data_group, demo_name)
total = frames.shape[0]
success_frame = first_success_index(dones, total)
if success_frame <= 0:
print(f" - skipping demo {demo_name} (missing success)")
continue
# pick a negative frame comfortably before success and with a valid predecessor
lower = max(1, success_frame // 4)
upper = max(1, success_frame - success_frame // 4)
negative_index = random.randint(lower, upper)
negative_prev_index = max(0, negative_index - 1)
positive_prev_index = max(0, success_frame - 1)
demo_folder = Path(f"{hdf5_path.stem}_{demo_name}")
ref_paths = export_reference_frames(
data_group,
select_reference_name(demo_names, demo_idx),
images_root,
demo_folder,
)
initial_rel = demo_folder / "initial.png"
save_frame(frames[0], images_root / initial_rel)
neg_prev_rel = demo_folder / f"neg_prev_{negative_prev_index:04d}.png"
neg_curr_rel = demo_folder / f"neg_curr_{negative_index:04d}.png"
pos_prev_rel = demo_folder / f"pos_prev_{positive_prev_index:04d}.png"
pos_curr_rel = demo_folder / f"pos_curr_{success_frame:04d}.png"
save_frame(frames[negative_prev_index], images_root / neg_prev_rel)
save_frame(frames[negative_index], images_root / neg_curr_rel)
save_frame(frames[positive_prev_index], images_root / pos_prev_rel)
save_frame(frames[success_frame], images_root / pos_curr_rel)
samples = [
{
"label": 0,
"initial": str(initial_rel),
"prev": str(neg_prev_rel),
"curr": str(neg_curr_rel),
},
{
"label": 1,
"initial": str(initial_rel),
"prev": str(pos_prev_rel),
"curr": str(pos_curr_rel),
},
]
entries.append(
{
"suite": suite,
"task": task,
"demo_id": str(demo_folder),
"samples": samples,
"reference": ref_paths,
}
)
print(
f" - exported demo {demo_name} (samples: {len(samples)}, ref frames: {len(ref_paths)})"
)
return entries
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
files = list_hdf5_files(INPUT_FOLDERS)
if not files:
print("No HDF5 files found. Update INPUT_FOLDERS and try again.")
return
output_dir = Path(OUTPUT_DIR)
images_root = output_dir / "images"
images_root.mkdir(parents=True, exist_ok=True)
if MODE == "task_progress":
all_entries: List[dict] = []
for path in files:
all_entries.extend(build_progress_entries(path, images_root))
json_path = output_dir / "dataset_frame_progress.json"
elif MODE == "task_done":
all_entries = []
for path in files:
all_entries.extend(build_done_entries(path, images_root))
json_path = output_dir / "dataset_task_done.json"
else:
raise ValueError(f"Unsupported MODE: {MODE}")
with json_path.open("w", encoding="utf-8") as f:
json.dump(all_entries, f, indent=2)
print(f"\nSaved {len(all_entries)} entries to {json_path}")
print(f"Image root: {images_root}")
if __name__ == "__main__":
main()