VLAdaptorBench / code /scripts /recompute_oven_visibility_pregrasp.py
lsnu's picture
Add iter29 single-pass logging and episode0 debug-aware GIF suite
ba3985e verified
from pathlib import Path
import argparse
import json
import sys
from typing import Dict, Optional
import pandas as pd
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from rr_label_study.oven_study import (
BimanualTakeTrayOutOfOven,
ReplayCache,
_aggregate_summary,
_annotate_phase_columns,
_analyze_episode,
_derive_templates,
_episode_metrics_from_frames,
_keyframe_subset,
_keypoint_discovery,
_launch_replay_env,
_load_demo,
_load_descriptions,
_pregrasp_progress_and_distance,
_pregrasp_score_and_success,
_frame_metrics,
)
def _recompute_columns(
episode_dir: Path,
templates,
checkpoint_stride: int,
base_df: pd.DataFrame,
) -> pd.DataFrame:
demo = _load_demo(episode_dir)
num_frames = min(len(demo), len(base_df))
frame_df = base_df.iloc[:num_frames].copy()
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
for frame_index in range(num_frames):
cache.step_to(frame_index)
state = cache.current_state()
visibility, _ = _frame_metrics(episode_dir, demo, state, templates)
pregrasp_progress, pregrasp_distance = _pregrasp_progress_and_distance(
state.left_gripper_pose,
state.tray_pose,
templates,
)
p_pre, y_pre, _ = _pregrasp_score_and_success(task, templates)
frame_df.at[frame_index, "frame_index"] = frame_index
frame_df.at[frame_index, "time_norm"] = frame_index / max(1, num_frames - 1)
frame_df.at[frame_index, "door_angle"] = state.door_angle
frame_df.at[frame_index, "right_gripper_open"] = state.right_gripper_open
frame_df.at[frame_index, "left_gripper_open"] = state.left_gripper_open
frame_df.at[frame_index, "pregrasp_progress"] = pregrasp_progress
frame_df.at[frame_index, "pregrasp_distance"] = pregrasp_distance
frame_df.at[frame_index, "p_pre"] = p_pre
frame_df.at[frame_index, "y_pre_raw"] = float(bool(y_pre))
frame_df.at[frame_index, "y_pre"] = float(bool(y_pre))
for key, value in visibility.items():
frame_df.at[frame_index, key] = value
if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames:
print(
f"[{episode_dir.name}] recomputed {frame_index + 1}/{num_frames} dense frames",
flush=True,
)
return frame_df
finally:
env.shutdown()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dataset-root", required=True)
parser.add_argument("--episode-dir", required=True)
parser.add_argument("--input-dense-csv", required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--template-episode-dir")
args = parser.parse_args()
dataset_root = Path(args.dataset_root)
episode_dir = Path(args.episode_dir)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
base_df = pd.read_csv(args.input_dense_csv)
demo = _load_demo(episode_dir)
descriptions = _load_descriptions(episode_dir)
template_episode_dir = (
Path(args.template_episode_dir) if args.template_episode_dir else episode_dir
)
templates, template_frames = _derive_templates(dataset_root, template_episode_dir)
with output_dir.joinpath("templates.json").open("w", encoding="utf-8") as handle:
json.dump(
{
"templates": templates.to_json(),
"template_episode": template_episode_dir.name,
"template_frames": template_frames,
},
handle,
indent=2,
)
frame_df = _recompute_columns(
episode_dir=episode_dir,
templates=templates,
checkpoint_stride=args.checkpoint_stride,
base_df=base_df,
)
frame_df = _annotate_phase_columns(frame_df)
keyframes = [index for index in _keypoint_discovery(demo) if index < len(frame_df)]
key_df = _keyframe_subset(frame_df, keyframes)
metrics = _episode_metrics_from_frames(
frame_df=frame_df,
key_df=key_df,
episode_name=episode_dir.name,
description=descriptions[0],
interventions={},
)
frame_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.dense.csv"), index=False)
key_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False)
with output_dir.joinpath(f"{episode_dir.name}.metrics.json").open("w", encoding="utf-8") as handle:
json.dump(metrics, handle, indent=2)
summary = _aggregate_summary([metrics])
with output_dir.joinpath("summary.json").open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())