VLAdaptorBench / code /scripts /render_oven_metric_frame.py
lsnu's picture
Add fast all-metrics render path
7b8de56 verified
from collections import Counter
from pathlib import Path
import argparse
import json
import math
import os
import pickle
import sys
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from rr_label_study.oven_study import (
BimanualTakeTrayOutOfOven,
DEFAULT_PPRE_TAU,
FULL_CAMERA_SET,
MotionTemplates,
ReplayCache,
Shape,
_apply_relative_pose,
_camera_point_cloud,
_camera_file,
_extract_height_threshold,
_extract_sequence_poses,
_frame_metrics,
_launch_replay_env,
_load_demo,
_load_mask,
_pregrasp_corridor_rel_poses,
_project_points,
_sample_full_tray_points,
_sample_grasp_points,
_visibility_projection_details,
)
VISIBILITY_CAMERAS = ["front", "wrist_right", "wrist_left"]
PATH_CAMERAS = ["front", "overhead", "over_shoulder_left"]
ALL_METRICS_CAMERAS = ["front", "wrist_right", "wrist_left"]
RGB_SIZE = 128
PANEL_SIZE = 256
HEADER_HEIGHT = 72
FOOTER_HEIGHT = 210
def _font(size: int) -> ImageFont.FreeTypeFont:
candidates = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/liberation2/LiberationSans-Regular.ttf",
]
for path in candidates:
if Path(path).exists():
return ImageFont.truetype(path, size=size)
return ImageFont.load_default()
FONT_SM = _font(14)
FONT_MD = _font(18)
FONT_LG = _font(24)
def _load_rgb(episode_dir: Path, camera_name: str, frame_index: int) -> Image.Image:
path = _camera_file(episode_dir, camera_name, "rgb", frame_index)
return Image.open(path).convert("RGB")
def _scale_image(image: Image.Image, size: int = PANEL_SIZE) -> Image.Image:
return image.resize((size, size), resample=Image.Resampling.BILINEAR)
def _new_panel(background: Tuple[int, int, int] = (0, 0, 0)) -> Image.Image:
return Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), background)
def _draw_badge(draw: ImageDraw.ImageDraw, x: int, y: int, text: str, fill: Tuple[int, int, int]) -> None:
left = x
top = y
width = 12 + int(len(text) * 8.5)
height = 24
draw.rounded_rectangle((left, top, left + width, top + height), radius=6, fill=fill)
draw.text((left + 6, top + 4), text, fill=(255, 255, 255), font=FONT_SM)
def _draw_panel_title(image: Image.Image, title: str, subtitle: Optional[str] = None) -> None:
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, PANEL_SIZE, 30), fill=(0, 0, 0))
draw.text((8, 6), title, fill=(255, 255, 255), font=FONT_MD)
if subtitle:
draw.rectangle((0, PANEL_SIZE - 26, PANEL_SIZE, PANEL_SIZE), fill=(0, 0, 0))
draw.text((8, PANEL_SIZE - 22), subtitle, fill=(230, 230, 230), font=FONT_SM)
def _draw_point(draw: ImageDraw.ImageDraw, xy: Tuple[int, int], color: Tuple[int, int, int], radius: int = 3) -> None:
x, y = xy
draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=color)
def _draw_polyline(draw: ImageDraw.ImageDraw, coords: Sequence[Tuple[int, int]], color: Tuple[int, int, int], width: int = 2) -> None:
if len(coords) >= 2:
draw.line(coords, fill=color, width=width)
def _draw_segmented_polyline(
draw: ImageDraw.ImageDraw,
coords: Sequence[Tuple[int, int]],
colors: Sequence[Tuple[int, int, int]],
width: int = 3,
) -> None:
if len(coords) < 2:
return
for idx in range(len(coords) - 1):
color = colors[min(idx + 1, len(colors) - 1)]
draw.line((coords[idx], coords[idx + 1]), fill=color, width=width)
def _draw_text_lines(
draw: ImageDraw.ImageDraw,
x: int,
y: int,
lines: Sequence[str],
fill: Tuple[int, int, int] = (255, 255, 255),
font: ImageFont.FreeTypeFont = FONT_SM,
line_height: int = 18,
) -> None:
for line_index, line in enumerate(lines):
draw.text((x, y + line_index * line_height), line, fill=fill, font=font)
def _bar(draw: ImageDraw.ImageDraw, x: int, y: int, w: int, h: int, value: float, label: str, color: Tuple[int, int, int], threshold: Optional[float] = None) -> None:
draw.text((x, y - 18), f"{label}: {value:.3f}", fill=(255, 255, 255), font=FONT_SM)
draw.rounded_rectangle((x, y, x + w, y + h), radius=5, outline=(120, 120, 120), fill=(20, 20, 20))
filled_w = int(max(0.0, min(1.0, value)) * w)
if filled_w > 0:
if filled_w >= 12:
draw.rounded_rectangle((x, y, x + filled_w, y + h), radius=5, fill=color)
else:
draw.rectangle((x, y, x + filled_w, y + h), fill=color)
if threshold is not None:
tx = x + int(max(0.0, min(1.0, threshold)) * w)
draw.line((tx, y - 2, tx, y + h + 2), fill=(255, 255, 255), width=2)
def _project_coords(
points_world: np.ndarray,
point_cloud_world: np.ndarray,
extrinsics: np.ndarray,
intrinsics: np.ndarray,
) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]:
return _visibility_projection_details(
points_world, point_cloud_world, extrinsics, intrinsics
)
def _infer_mask_handle(mask: np.ndarray, coords: Sequence[Tuple[int, int]]) -> Optional[int]:
values = [int(mask[y, x]) for x, y in coords if int(mask[y, x]) != 0]
if not values:
return None
return Counter(values).most_common(1)[0][0]
def _masked_rgb(rgb: Image.Image, binary_mask: np.ndarray) -> Image.Image:
rgb_np = np.asarray(rgb).copy()
out = np.zeros_like(rgb_np)
out[binary_mask] = rgb_np[binary_mask]
return Image.fromarray(out, mode="RGB")
def _scaled_coords(coords: Iterable[Tuple[int, int]], scale: float) -> List[Tuple[int, int]]:
return [(int(round(x * scale)), int(round(y * scale))) for x, y in coords]
def _demo_gripper_pose(obs, arm_name: str) -> np.ndarray:
side = getattr(obs, arm_name)
return np.asarray(side.gripper_pose, dtype=np.float64)
def _world_pose_to_coords(
pose_xyz: Sequence[float],
extrinsics: np.ndarray,
intrinsics: np.ndarray,
) -> Optional[Tuple[int, int]]:
points = np.asarray([pose_xyz], dtype=np.float64)
uv, camera_xyz = _project_points(points, extrinsics, intrinsics)
u, v = uv[0]
depth = camera_xyz[0, 2]
if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE):
return None
return int(round(float(u))), int(round(float(v)))
def _compose_visibility_panel(
episode_dir: Path,
demo,
frame_index: int,
camera_name: str,
tray_pose: np.ndarray,
left_pose: np.ndarray,
right_pose: np.ndarray,
templates: MotionTemplates,
per_camera_vis: float,
) -> Image.Image:
rgb = _load_rgb(episode_dir, camera_name, frame_index)
point_cloud, extrinsics, intrinsics = _camera_point_cloud(
episode_dir, demo, frame_index, camera_name
)
tray_points = _sample_full_tray_points(tray_pose)
grasp_points = _sample_grasp_points(templates, tray_pose)
tray_proj, tray_visible = _project_coords(
tray_points, point_cloud, extrinsics, intrinsics
)
grasp_proj, grasp_visible = _project_coords(
grasp_points, point_cloud, extrinsics, intrinsics
)
scene_panel = _scale_image(rgb)
_draw_panel_title(
scene_panel,
f"{camera_name} scene",
f"metric vis={per_camera_vis:.3f} | visible={len(grasp_visible)}/{len(grasp_proj) or 1}",
)
draw = ImageDraw.Draw(scene_panel)
for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (70, 170, 255), radius=2)
for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (255, 80, 210), radius=3)
for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (70, 255, 140), radius=3)
_draw_badge(draw, 8, 34, "blue=tray", (40, 105, 180))
_draw_badge(draw, 112, 34, "magenta=grasp", (145, 40, 140))
_draw_badge(draw, 8, 64, "green=visible", (32, 130, 64))
xray_panel = _new_panel()
_draw_panel_title(xray_panel, f"{camera_name} x-ray", "blue=tray, magenta=grasp region")
draw = ImageDraw.Draw(xray_panel)
for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (70, 170, 255), radius=2)
for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (255, 80, 210), radius=3)
left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics)
right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics)
if left_xy is not None:
_draw_point(draw, _scaled_coords([left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 230, 70), radius=4)
if right_xy is not None:
_draw_point(draw, _scaled_coords([right_xy], PANEL_SIZE / RGB_SIZE)[0], (140, 110, 255), radius=4)
visible_pixels = np.zeros((RGB_SIZE, RGB_SIZE), dtype=bool)
for x, y in tray_visible + grasp_visible:
visible_pixels[y, x] = True
masked_panel = _masked_rgb(rgb, visible_pixels)
masked_panel = _scale_image(masked_panel)
_draw_panel_title(
masked_panel,
f"{camera_name} depth-visible",
f"visible grasp pts={len(grasp_visible)}/{len(grasp_proj) or 1}",
)
draw = ImageDraw.Draw(masked_panel)
for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (180, 80, 160), radius=2)
for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE):
_draw_point(draw, xy, (70, 255, 140), radius=3)
row = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE), (15, 15, 15))
row.paste(scene_panel, (0, 0))
row.paste(xray_panel, (PANEL_SIZE, 0))
row.paste(masked_panel, (PANEL_SIZE * 2, 0))
return row
def _compose_visibility_focus(
episode_dir: Path,
demo,
frame_index: int,
tray_pose: np.ndarray,
left_pose: np.ndarray,
right_pose: np.ndarray,
templates: MotionTemplates,
frame_row: pd.Series,
) -> Image.Image:
canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * len(VISIBILITY_CAMERAS)), (12, 12, 12))
draw = ImageDraw.Draw(canvas)
draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0))
draw.text((12, 10), f"Visibility Metric Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG)
draw.text(
(12, 40),
f"three_view_visibility={frame_row['three_view_visibility']:.3f} | full_view_visibility={frame_row['full_view_visibility']:.3f} | y_ready={int(frame_row['y_ready'])} | phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}",
fill=(235, 235, 235),
font=FONT_SM,
)
for row_index, camera_name in enumerate(VISIBILITY_CAMERAS):
per_camera_vis = float(_camera_grasp_visibility(episode_dir, demo, frame_index, camera_name, tray_pose, templates))
row = _compose_visibility_panel(
episode_dir=episode_dir,
demo=demo,
frame_index=frame_index,
camera_name=camera_name,
tray_pose=tray_pose,
left_pose=left_pose,
right_pose=right_pose,
templates=templates,
per_camera_vis=per_camera_vis,
)
canvas.paste(row, (0, HEADER_HEIGHT + row_index * PANEL_SIZE))
return canvas
def _camera_grasp_visibility(
episode_dir: Path,
demo,
frame_index: int,
camera_name: str,
tray_pose: np.ndarray,
templates: MotionTemplates,
) -> float:
point_cloud, extrinsics, intrinsics = _camera_point_cloud(
episode_dir, demo, frame_index, camera_name
)
grasp_points = _sample_grasp_points(templates, tray_pose)
proj, vis = _project_coords(grasp_points, point_cloud, extrinsics, intrinsics)
return float(len(vis) / len(proj)) if proj else 0.0
def _compose_path_quality(
episode_dir: Path,
demo,
frame_index: int,
tray_pose: np.ndarray,
task_base_pose: np.ndarray,
left_pose: np.ndarray,
right_pose: np.ndarray,
templates: MotionTemplates,
frame_row: pd.Series,
left_grasped: bool,
debug_row: Optional[Dict[str, object]],
) -> Image.Image:
canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * 2), (12, 12, 12))
draw = ImageDraw.Draw(canvas)
draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0))
draw.text((12, 10), f"Path Quality Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG)
draw.text(
(12, 40),
f"p_pre={frame_row['p_pre']:.3f} y_pre={int(frame_row['y_pre'])} | p_ext={frame_row['p_ext']:.3f} y_ext={int(frame_row['y_ext'])} | grasped={int(left_grasped)}",
fill=(235, 235, 235),
font=FONT_SM,
)
pregrasp_corridor = [
_apply_relative_pose(tray_pose, rel_pose)
for rel_pose in _pregrasp_corridor_rel_poses(templates)
]
extract_poses = _extract_sequence_poses(tray_pose, task_base_pose, templates)
plan_poses = [*pregrasp_corridor, extract_poses[1], *extract_poses[2:5]]
colors = ([(255, 220, 0)] * len(pregrasp_corridor)) + [
(255, 140, 0),
(80, 255, 120),
(80, 255, 120),
(80, 255, 120),
]
milestone_poses = _milestone_poses(tray_pose, task_base_pose, templates, frame_row)
pext_debug = debug_row.get("p_ext", {}) if debug_row else {}
milestone_debugs = list(pext_debug.get("milestones", []))
demo_left_trail = np.asarray(
[_demo_gripper_pose(demo[i], "left")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)],
dtype=np.float64,
)
demo_right_trail = np.asarray(
[_demo_gripper_pose(demo[i], "right")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)],
dtype=np.float64,
)
for camera_slot, camera_name in enumerate(PATH_CAMERAS):
panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index))
_draw_panel_title(panel, camera_name, "demo trails + planned extract path")
draw = ImageDraw.Draw(panel)
extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"]
intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"]
left_trail_xy = _project_world_polyline(demo_left_trail, extrinsics, intrinsics)
right_trail_xy = _project_world_polyline(demo_right_trail, extrinsics, intrinsics)
plan_xy = [
xy
for pose in plan_poses
for xy in [_world_pose_to_coords(pose[:3], extrinsics, intrinsics)]
if xy is not None
]
_draw_polyline(draw, _scaled_coords(left_trail_xy, PANEL_SIZE / RGB_SIZE), (80, 220, 255), width=3)
_draw_polyline(draw, _scaled_coords(right_trail_xy, PANEL_SIZE / RGB_SIZE), (150, 110, 255), width=3)
_draw_segmented_polyline(draw, _scaled_coords(plan_xy, PANEL_SIZE / RGB_SIZE), colors, width=4)
current_left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics)
current_right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics)
if current_left_xy is not None:
_draw_point(draw, _scaled_coords([current_left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5)
if current_right_xy is not None:
_draw_point(draw, _scaled_coords([current_right_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5)
for pose, color in zip(plan_poses, colors):
xy = _world_pose_to_coords(pose[:3], extrinsics, intrinsics)
if xy is not None:
_draw_point(draw, _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0], color, radius=4)
for milestone_pose, milestone_debug in zip(milestone_poses, milestone_debugs):
xy = _world_pose_to_coords(milestone_pose[:3], extrinsics, intrinsics)
if xy is None:
continue
scaled_xy = _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0]
marker_color = _milestone_color(milestone_debug)
_draw_point(draw, scaled_xy, marker_color, radius=5)
label = _milestone_label(milestone_debug)
draw.text(
(scaled_xy[0] + 6, scaled_xy[1] - 10),
label,
fill=marker_color,
font=FONT_SM,
)
x = camera_slot * PANEL_SIZE
y = HEADER_HEIGHT
canvas.paste(panel, (x, y))
text_panel = Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), (18, 18, 18))
draw = ImageDraw.Draw(text_panel)
draw.text((12, 12), "Metric State", fill=(255, 255, 255), font=FONT_LG)
_bar(draw, 12, 56, PANEL_SIZE - 24, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU)
_bar(draw, 12, 108, PANEL_SIZE - 24, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45)
draw.text((12, 146), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])}", fill=(255, 255, 255), font=FONT_MD)
draw.text((12, 172), f"pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD)
draw.text((12, 198), f"left_grasped={int(left_grasped)} door={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD)
draw.text((12, 224), f"y_retrieve={int(frame_row.get('y_retrieve', 0))} phase={int(frame_row['phase_switch'])} score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD)
_draw_badge(draw, 12, 36, "yellow=approach", (130, 110, 30))
_draw_badge(draw, 138, 36, "green=retreat", (32, 130, 64))
_draw_badge(draw, 250, 36, "red=failed", (140, 40, 40))
canvas.paste(text_panel, (0, HEADER_HEIGHT + PANEL_SIZE))
search_panel = Image.new("RGB", (PANEL_SIZE * 2, PANEL_SIZE), (18, 18, 18))
draw = ImageDraw.Draw(search_panel)
draw.text((12, 12), "p_ext Planner Search", fill=(255, 255, 255), font=FONT_LG)
if pext_debug:
initial_height = float(pext_debug.get("initial_height", frame_row.get("tray_height", 0.0)))
final_height = float(pext_debug.get("final_height", initial_height))
total_length = float(pext_debug.get("total_length", 0.0))
num_milestones = int(pext_debug.get("num_milestones", len(milestone_debugs)))
first_failed = int(pext_debug.get("first_failed_milestone", -1))
mean_rel = float(pext_debug.get("mean_reliability", 0.0))
min_rel = float(pext_debug.get("min_reliability", 0.0))
approach_score = float(pext_debug.get("approach_score", 0.0))
retreat_score = float(pext_debug.get("retreat_score", 0.0))
_bar(draw, 12, 50, 220, 16, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45)
_bar(draw, 260, 50, 220, 16, approach_score, "approach_score", (255, 210, 80))
_bar(draw, 508, 50, 220, 16, retreat_score, "retreat_score", (80, 255, 120))
_bar(draw, 12, 94, 220, 16, mean_rel, "mean_rel", (120, 180, 255))
_bar(draw, 260, 94, 220, 16, min_rel, "min_rel", (120, 180, 255))
draw.text(
(12, 124),
f"height {initial_height:.3f} -> {final_height:.3f} | total_len={total_length:.3f} | milestones={num_milestones} | first_failed={first_failed}",
fill=(235, 235, 235),
font=FONT_SM,
)
draw.text(
(12, 146),
f"start_grasped={int(bool(pext_debug.get('already_grasped_start', False)))} end_grasped={int(bool(pext_debug.get('already_grasped_end', False)))}",
fill=(235, 235, 235),
font=FONT_SM,
)
start_y = 172
row_h = 16
bar_x = 550
for row_index, milestone_debug in enumerate(milestone_debugs[:5]):
y = start_y + row_index * row_h
color = _milestone_color(milestone_debug)
label = _milestone_label(milestone_debug)
status = "OK" if bool(milestone_debug.get("path_found", False)) else "FAIL"
planner_score = float(milestone_debug.get("planner_score", 0.0))
reliability = float(milestone_debug.get("reliability", 0.0))
path_length = float(milestone_debug.get("path_length", 0.0))
ignore_collisions = bool(milestone_debug.get("ignore_collisions", False))
height_before = float(milestone_debug.get("height_before", initial_height))
height_after = float(milestone_debug.get("height_after", height_before))
line = (
f"{label:<3} {status:<4} rel={reliability:.2f} score={planner_score:.3f} "
f"len={path_length:.3f} {'IGN' if ignore_collisions else 'STRICT':<6} "
f"h={height_before:.3f}->{height_after:.3f}"
)
draw.text((12, y), line, fill=color, font=FONT_SM)
draw.rounded_rectangle((bar_x, y + 2, bar_x + 160, y + 12), radius=3, outline=(80, 80, 80), fill=(25, 25, 25))
fill_w = int(max(0.0, min(1.0, planner_score)) * 160)
if fill_w > 0:
draw.rounded_rectangle((bar_x, y + 2, bar_x + fill_w, y + 12), radius=3, fill=color)
if not milestone_debugs:
_draw_text_lines(
draw,
12,
176,
[
"No milestone planning was needed at this frame.",
"This usually means the tray was already safely extractable",
"or the planner returned immediate success without staged search.",
],
fill=(220, 220, 220),
font=FONT_MD,
line_height=22,
)
else:
_draw_text_lines(
draw,
12,
60,
[
"No debug sidecar was provided for this frame.",
"The richer p_ext milestone search diagnostics are unavailable.",
],
fill=(220, 220, 220),
font=FONT_MD,
line_height=22,
)
canvas.paste(search_panel, (PANEL_SIZE, HEADER_HEIGHT + PANEL_SIZE))
return canvas
def _project_world_polyline(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> List[Tuple[int, int]]:
if len(points_world) == 0:
return []
uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics)
coords = []
for (u, v), (_, _, depth) in zip(uv, camera_xyz):
if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE):
continue
coords.append((int(round(float(u))), int(round(float(v)))))
return coords
def _load_debug_row(path: Optional[Path], frame_index: int) -> Optional[Dict[str, object]]:
if path is None or not path.exists():
return None
with path.open("r", encoding="utf-8") as handle:
for line in handle:
row = json.loads(line)
if int(row.get("frame_index", -1)) == int(frame_index):
return row
return None
def _milestone_poses(
tray_pose: np.ndarray,
task_base_pose: np.ndarray,
templates: MotionTemplates,
frame_row: pd.Series,
) -> List[np.ndarray]:
poses = _extract_sequence_poses(tray_pose, task_base_pose, templates)
current_height = float(frame_row.get("p_ext_initial_height", tray_pose[2]))
already_grasped = bool(frame_row.get("p_ext_already_grasped_start", 0.0))
approach_poses = [] if already_grasped else poses[:2]
retreat_poses = poses[2:]
if already_grasped and retreat_poses:
future_retreat_poses = [
pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01
]
if future_retreat_poses:
retreat_poses = future_retreat_poses
elif current_height < _extract_height_threshold(templates):
retreat_poses = [retreat_poses[-1]]
else:
retreat_poses = []
return [*approach_poses, *retreat_poses]
def _milestone_color(milestone_debug: Dict[str, object]) -> Tuple[int, int, int]:
kind = str(milestone_debug.get("kind", "approach"))
path_found = bool(milestone_debug.get("path_found", False))
reliability = float(milestone_debug.get("reliability", 0.0))
if not path_found:
return (230, 70, 70)
if reliability < 1.0:
return (255, 170, 60)
if kind == "retreat":
return (80, 255, 120)
return (255, 220, 0)
def _milestone_label(milestone_debug: Dict[str, object]) -> str:
kind = str(milestone_debug.get("kind", "approach"))
prefix = "A" if kind == "approach" else "R"
return f"{prefix}{int(milestone_debug.get('milestone_index', 0))}"
def _compose_all_metrics(
episode_name: str,
episode_dir: Path,
demo,
frame_index: int,
frame_row: pd.Series,
) -> Image.Image:
banner_color = (145, 30, 30) if int(frame_row["phase_switch"]) == 0 else (25, 115, 40)
canvas = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE + FOOTER_HEIGHT), (12, 12, 12))
draw = ImageDraw.Draw(canvas)
draw.rectangle((0, 0, canvas.size[0], 36), fill=banner_color)
draw.text(
(12, 7),
f"{episode_name} | frame {frame_index:03d} | {'REVEAL' if int(frame_row['phase_switch']) == 0 else 'RETRIEVE'}",
fill=(255, 255, 255),
font=FONT_LG,
)
for idx, camera_name in enumerate(ALL_METRICS_CAMERAS):
panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index))
_draw_panel_title(panel, camera_name, None)
canvas.paste(panel, (idx * PANEL_SIZE, 36))
footer_y = 36 + PANEL_SIZE
draw.rectangle((0, footer_y, canvas.size[0], canvas.size[1]), fill=(18, 18, 18))
_bar(draw, 16, footer_y + 34, 220, 18, float(frame_row["three_view_visibility"]), "three_view_visibility", (60, 160, 255), threshold=0.35)
_bar(draw, 272, footer_y + 34, 220, 18, float(frame_row["full_view_visibility"]), "full_view_visibility", (80, 190, 255), threshold=0.35)
_bar(draw, 528, footer_y + 34, 220, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU)
_bar(draw, 16, footer_y + 96, 220, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45)
draw.text((272, footer_y + 84), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])} pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD)
draw.text((272, footer_y + 108), f"y_ready={int(frame_row['y_ready'])} phase_switch={int(frame_row['phase_switch'])} phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD)
draw.text((528, footer_y + 84), f"door_angle={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD)
draw.text((528, footer_y + 108), f"left_open={float(frame_row['left_gripper_open']):.1f} right_open={float(frame_row['right_gripper_open']):.1f}", fill=(255, 255, 255), font=FONT_MD)
return canvas
def _save_if_requested(image: Image.Image, path: Optional[Path]) -> None:
if path is None:
return
path.parent.mkdir(parents=True, exist_ok=True)
image.save(path)
def _load_templates(path: Path) -> MotionTemplates:
if path.suffix.lower() == ".json":
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
return MotionTemplates.from_json(payload["templates"])
with path.open("rb") as handle:
return pickle.load(handle)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--episode-dir", required=True)
parser.add_argument("--templates-pkl", required=True)
parser.add_argument("--dense-csv", required=True)
parser.add_argument("--frame-index", type=int, required=True)
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--debug-jsonl")
parser.add_argument("--visibility-out")
parser.add_argument("--path-out")
parser.add_argument("--all-out")
args = parser.parse_args()
episode_dir = Path(args.episode_dir)
templates = _load_templates(Path(args.templates_pkl))
frame_df = pd.read_csv(args.dense_csv)
if "frame_index" in frame_df.columns:
matches = frame_df.loc[frame_df["frame_index"] == int(args.frame_index)]
if len(matches) == 0:
raise ValueError(f"frame_index {args.frame_index} not found in {args.dense_csv}")
frame_row = matches.iloc[0]
else:
frame_row = frame_df.iloc[int(args.frame_index)]
debug_jsonl = Path(args.debug_jsonl) if args.debug_jsonl else None
debug_row = _load_debug_row(debug_jsonl, int(args.frame_index))
visibility_out = Path(args.visibility_out) if args.visibility_out else None
path_out = Path(args.path_out) if args.path_out else None
all_out = Path(args.all_out) if args.all_out else None
# The all-metrics panel only depends on RGB frames plus the dense CSV row.
# Skip simulator replay entirely when no geometric overlays are requested.
if all_out is not None and visibility_out is None and path_out is None:
all_image = _compose_all_metrics(
episode_name=episode_dir.name,
episode_dir=episode_dir,
demo=None,
frame_index=int(args.frame_index),
frame_row=frame_row,
)
_save_if_requested(all_image, all_out)
return 0
demo = _load_demo(episode_dir)
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=args.checkpoint_stride)
cache.reset()
cache.step_to(int(args.frame_index))
state = cache.current_state()
left_grasped = any(
obj.get_name() == "tray"
for obj in task._scene.robot.left_gripper.get_grasped_objects()
)
if visibility_out is not None:
visibility_image = _compose_visibility_focus(
episode_dir=episode_dir,
demo=demo,
frame_index=int(args.frame_index),
tray_pose=state.tray_pose,
left_pose=state.left_gripper_pose,
right_pose=state.right_gripper_pose,
templates=templates,
frame_row=frame_row,
)
_save_if_requested(visibility_image, visibility_out)
if path_out is not None:
path_image = _compose_path_quality(
episode_dir=episode_dir,
demo=demo,
frame_index=int(args.frame_index),
tray_pose=state.tray_pose,
task_base_pose=task._task.get_base().get_pose(),
left_pose=state.left_gripper_pose,
right_pose=state.right_gripper_pose,
templates=templates,
frame_row=frame_row,
left_grasped=left_grasped,
debug_row=debug_row,
)
_save_if_requested(path_image, path_out)
if all_out is not None:
all_image = _compose_all_metrics(
episode_name=episode_dir.name,
episode_dir=episode_dir,
demo=demo,
frame_index=int(args.frame_index),
frame_row=frame_row,
)
_save_if_requested(all_image, all_out)
finally:
env.shutdown()
return 0
if __name__ == "__main__":
raise SystemExit(main())