from collections import Counter from pathlib import Path import argparse import json import math import os import pickle import sys from typing import Dict, Iterable, List, Optional, Sequence, Tuple import numpy as np import pandas as pd from PIL import Image, ImageDraw, ImageFont PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from rr_label_study.oven_study import ( BimanualTakeTrayOutOfOven, DEFAULT_PPRE_TAU, FULL_CAMERA_SET, MotionTemplates, ReplayCache, Shape, _apply_relative_pose, _camera_point_cloud, _camera_file, _extract_height_threshold, _extract_sequence_poses, _frame_metrics, _launch_replay_env, _load_demo, _load_mask, _pregrasp_corridor_rel_poses, _project_points, _sample_full_tray_points, _sample_grasp_points, _visibility_projection_details, ) VISIBILITY_CAMERAS = ["front", "wrist_right", "wrist_left"] PATH_CAMERAS = ["front", "overhead", "over_shoulder_left"] ALL_METRICS_CAMERAS = ["front", "wrist_right", "wrist_left"] RGB_SIZE = 128 PANEL_SIZE = 256 HEADER_HEIGHT = 72 FOOTER_HEIGHT = 210 def _font(size: int) -> ImageFont.FreeTypeFont: candidates = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Regular.ttf", ] for path in candidates: if Path(path).exists(): return ImageFont.truetype(path, size=size) return ImageFont.load_default() FONT_SM = _font(14) FONT_MD = _font(18) FONT_LG = _font(24) def _load_rgb(episode_dir: Path, camera_name: str, frame_index: int) -> Image.Image: path = _camera_file(episode_dir, camera_name, "rgb", frame_index) return Image.open(path).convert("RGB") def _scale_image(image: Image.Image, size: int = PANEL_SIZE) -> Image.Image: return image.resize((size, size), resample=Image.Resampling.BILINEAR) def _new_panel(background: Tuple[int, int, int] = (0, 0, 0)) -> Image.Image: return Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), background) def _draw_badge(draw: ImageDraw.ImageDraw, x: int, y: int, text: str, fill: Tuple[int, int, int]) -> None: left = x top = y width = 12 + int(len(text) * 8.5) height = 24 draw.rounded_rectangle((left, top, left + width, top + height), radius=6, fill=fill) draw.text((left + 6, top + 4), text, fill=(255, 255, 255), font=FONT_SM) def _draw_panel_title(image: Image.Image, title: str, subtitle: Optional[str] = None) -> None: draw = ImageDraw.Draw(image) draw.rectangle((0, 0, PANEL_SIZE, 30), fill=(0, 0, 0)) draw.text((8, 6), title, fill=(255, 255, 255), font=FONT_MD) if subtitle: draw.rectangle((0, PANEL_SIZE - 26, PANEL_SIZE, PANEL_SIZE), fill=(0, 0, 0)) draw.text((8, PANEL_SIZE - 22), subtitle, fill=(230, 230, 230), font=FONT_SM) def _draw_point(draw: ImageDraw.ImageDraw, xy: Tuple[int, int], color: Tuple[int, int, int], radius: int = 3) -> None: x, y = xy draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=color) def _draw_polyline(draw: ImageDraw.ImageDraw, coords: Sequence[Tuple[int, int]], color: Tuple[int, int, int], width: int = 2) -> None: if len(coords) >= 2: draw.line(coords, fill=color, width=width) def _draw_segmented_polyline( draw: ImageDraw.ImageDraw, coords: Sequence[Tuple[int, int]], colors: Sequence[Tuple[int, int, int]], width: int = 3, ) -> None: if len(coords) < 2: return for idx in range(len(coords) - 1): color = colors[min(idx + 1, len(colors) - 1)] draw.line((coords[idx], coords[idx + 1]), fill=color, width=width) def _draw_text_lines( draw: ImageDraw.ImageDraw, x: int, y: int, lines: Sequence[str], fill: Tuple[int, int, int] = (255, 255, 255), font: ImageFont.FreeTypeFont = FONT_SM, line_height: int = 18, ) -> None: for line_index, line in enumerate(lines): draw.text((x, y + line_index * line_height), line, fill=fill, font=font) def _bar(draw: ImageDraw.ImageDraw, x: int, y: int, w: int, h: int, value: float, label: str, color: Tuple[int, int, int], threshold: Optional[float] = None) -> None: draw.text((x, y - 18), f"{label}: {value:.3f}", fill=(255, 255, 255), font=FONT_SM) draw.rounded_rectangle((x, y, x + w, y + h), radius=5, outline=(120, 120, 120), fill=(20, 20, 20)) filled_w = int(max(0.0, min(1.0, value)) * w) if filled_w > 0: if filled_w >= 12: draw.rounded_rectangle((x, y, x + filled_w, y + h), radius=5, fill=color) else: draw.rectangle((x, y, x + filled_w, y + h), fill=color) if threshold is not None: tx = x + int(max(0.0, min(1.0, threshold)) * w) draw.line((tx, y - 2, tx, y + h + 2), fill=(255, 255, 255), width=2) def _project_coords( points_world: np.ndarray, point_cloud_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray, ) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]: return _visibility_projection_details( points_world, point_cloud_world, extrinsics, intrinsics ) def _infer_mask_handle(mask: np.ndarray, coords: Sequence[Tuple[int, int]]) -> Optional[int]: values = [int(mask[y, x]) for x, y in coords if int(mask[y, x]) != 0] if not values: return None return Counter(values).most_common(1)[0][0] def _masked_rgb(rgb: Image.Image, binary_mask: np.ndarray) -> Image.Image: rgb_np = np.asarray(rgb).copy() out = np.zeros_like(rgb_np) out[binary_mask] = rgb_np[binary_mask] return Image.fromarray(out, mode="RGB") def _scaled_coords(coords: Iterable[Tuple[int, int]], scale: float) -> List[Tuple[int, int]]: return [(int(round(x * scale)), int(round(y * scale))) for x, y in coords] def _demo_gripper_pose(obs, arm_name: str) -> np.ndarray: side = getattr(obs, arm_name) return np.asarray(side.gripper_pose, dtype=np.float64) def _world_pose_to_coords( pose_xyz: Sequence[float], extrinsics: np.ndarray, intrinsics: np.ndarray, ) -> Optional[Tuple[int, int]]: points = np.asarray([pose_xyz], dtype=np.float64) uv, camera_xyz = _project_points(points, extrinsics, intrinsics) u, v = uv[0] depth = camera_xyz[0, 2] if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE): return None return int(round(float(u))), int(round(float(v))) def _compose_visibility_panel( episode_dir: Path, demo, frame_index: int, camera_name: str, tray_pose: np.ndarray, left_pose: np.ndarray, right_pose: np.ndarray, templates: MotionTemplates, per_camera_vis: float, ) -> Image.Image: rgb = _load_rgb(episode_dir, camera_name, frame_index) point_cloud, extrinsics, intrinsics = _camera_point_cloud( episode_dir, demo, frame_index, camera_name ) tray_points = _sample_full_tray_points(tray_pose) grasp_points = _sample_grasp_points(templates, tray_pose) tray_proj, tray_visible = _project_coords( tray_points, point_cloud, extrinsics, intrinsics ) grasp_proj, grasp_visible = _project_coords( grasp_points, point_cloud, extrinsics, intrinsics ) scene_panel = _scale_image(rgb) _draw_panel_title( scene_panel, f"{camera_name} scene", f"metric vis={per_camera_vis:.3f} | visible={len(grasp_visible)}/{len(grasp_proj) or 1}", ) draw = ImageDraw.Draw(scene_panel) for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (70, 170, 255), radius=2) for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (255, 80, 210), radius=3) for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (70, 255, 140), radius=3) _draw_badge(draw, 8, 34, "blue=tray", (40, 105, 180)) _draw_badge(draw, 112, 34, "magenta=grasp", (145, 40, 140)) _draw_badge(draw, 8, 64, "green=visible", (32, 130, 64)) xray_panel = _new_panel() _draw_panel_title(xray_panel, f"{camera_name} x-ray", "blue=tray, magenta=grasp region") draw = ImageDraw.Draw(xray_panel) for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (70, 170, 255), radius=2) for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (255, 80, 210), radius=3) left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics) right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics) if left_xy is not None: _draw_point(draw, _scaled_coords([left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 230, 70), radius=4) if right_xy is not None: _draw_point(draw, _scaled_coords([right_xy], PANEL_SIZE / RGB_SIZE)[0], (140, 110, 255), radius=4) visible_pixels = np.zeros((RGB_SIZE, RGB_SIZE), dtype=bool) for x, y in tray_visible + grasp_visible: visible_pixels[y, x] = True masked_panel = _masked_rgb(rgb, visible_pixels) masked_panel = _scale_image(masked_panel) _draw_panel_title( masked_panel, f"{camera_name} depth-visible", f"visible grasp pts={len(grasp_visible)}/{len(grasp_proj) or 1}", ) draw = ImageDraw.Draw(masked_panel) for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (180, 80, 160), radius=2) for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE): _draw_point(draw, xy, (70, 255, 140), radius=3) row = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE), (15, 15, 15)) row.paste(scene_panel, (0, 0)) row.paste(xray_panel, (PANEL_SIZE, 0)) row.paste(masked_panel, (PANEL_SIZE * 2, 0)) return row def _compose_visibility_focus( episode_dir: Path, demo, frame_index: int, tray_pose: np.ndarray, left_pose: np.ndarray, right_pose: np.ndarray, templates: MotionTemplates, frame_row: pd.Series, ) -> Image.Image: canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * len(VISIBILITY_CAMERAS)), (12, 12, 12)) draw = ImageDraw.Draw(canvas) draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0)) draw.text((12, 10), f"Visibility Metric Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG) draw.text( (12, 40), f"three_view_visibility={frame_row['three_view_visibility']:.3f} | full_view_visibility={frame_row['full_view_visibility']:.3f} | y_ready={int(frame_row['y_ready'])} | phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(235, 235, 235), font=FONT_SM, ) for row_index, camera_name in enumerate(VISIBILITY_CAMERAS): per_camera_vis = float(_camera_grasp_visibility(episode_dir, demo, frame_index, camera_name, tray_pose, templates)) row = _compose_visibility_panel( episode_dir=episode_dir, demo=demo, frame_index=frame_index, camera_name=camera_name, tray_pose=tray_pose, left_pose=left_pose, right_pose=right_pose, templates=templates, per_camera_vis=per_camera_vis, ) canvas.paste(row, (0, HEADER_HEIGHT + row_index * PANEL_SIZE)) return canvas def _camera_grasp_visibility( episode_dir: Path, demo, frame_index: int, camera_name: str, tray_pose: np.ndarray, templates: MotionTemplates, ) -> float: point_cloud, extrinsics, intrinsics = _camera_point_cloud( episode_dir, demo, frame_index, camera_name ) grasp_points = _sample_grasp_points(templates, tray_pose) proj, vis = _project_coords(grasp_points, point_cloud, extrinsics, intrinsics) return float(len(vis) / len(proj)) if proj else 0.0 def _compose_path_quality( episode_dir: Path, demo, frame_index: int, tray_pose: np.ndarray, task_base_pose: np.ndarray, left_pose: np.ndarray, right_pose: np.ndarray, templates: MotionTemplates, frame_row: pd.Series, left_grasped: bool, debug_row: Optional[Dict[str, object]], ) -> Image.Image: canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * 2), (12, 12, 12)) draw = ImageDraw.Draw(canvas) draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0)) draw.text((12, 10), f"Path Quality Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG) draw.text( (12, 40), f"p_pre={frame_row['p_pre']:.3f} y_pre={int(frame_row['y_pre'])} | p_ext={frame_row['p_ext']:.3f} y_ext={int(frame_row['y_ext'])} | grasped={int(left_grasped)}", fill=(235, 235, 235), font=FONT_SM, ) pregrasp_corridor = [ _apply_relative_pose(tray_pose, rel_pose) for rel_pose in _pregrasp_corridor_rel_poses(templates) ] extract_poses = _extract_sequence_poses(tray_pose, task_base_pose, templates) plan_poses = [*pregrasp_corridor, extract_poses[1], *extract_poses[2:5]] colors = ([(255, 220, 0)] * len(pregrasp_corridor)) + [ (255, 140, 0), (80, 255, 120), (80, 255, 120), (80, 255, 120), ] milestone_poses = _milestone_poses(tray_pose, task_base_pose, templates, frame_row) pext_debug = debug_row.get("p_ext", {}) if debug_row else {} milestone_debugs = list(pext_debug.get("milestones", [])) demo_left_trail = np.asarray( [_demo_gripper_pose(demo[i], "left")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)], dtype=np.float64, ) demo_right_trail = np.asarray( [_demo_gripper_pose(demo[i], "right")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)], dtype=np.float64, ) for camera_slot, camera_name in enumerate(PATH_CAMERAS): panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index)) _draw_panel_title(panel, camera_name, "demo trails + planned extract path") draw = ImageDraw.Draw(panel) extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"] intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"] left_trail_xy = _project_world_polyline(demo_left_trail, extrinsics, intrinsics) right_trail_xy = _project_world_polyline(demo_right_trail, extrinsics, intrinsics) plan_xy = [ xy for pose in plan_poses for xy in [_world_pose_to_coords(pose[:3], extrinsics, intrinsics)] if xy is not None ] _draw_polyline(draw, _scaled_coords(left_trail_xy, PANEL_SIZE / RGB_SIZE), (80, 220, 255), width=3) _draw_polyline(draw, _scaled_coords(right_trail_xy, PANEL_SIZE / RGB_SIZE), (150, 110, 255), width=3) _draw_segmented_polyline(draw, _scaled_coords(plan_xy, PANEL_SIZE / RGB_SIZE), colors, width=4) current_left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics) current_right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics) if current_left_xy is not None: _draw_point(draw, _scaled_coords([current_left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5) if current_right_xy is not None: _draw_point(draw, _scaled_coords([current_right_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5) for pose, color in zip(plan_poses, colors): xy = _world_pose_to_coords(pose[:3], extrinsics, intrinsics) if xy is not None: _draw_point(draw, _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0], color, radius=4) for milestone_pose, milestone_debug in zip(milestone_poses, milestone_debugs): xy = _world_pose_to_coords(milestone_pose[:3], extrinsics, intrinsics) if xy is None: continue scaled_xy = _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0] marker_color = _milestone_color(milestone_debug) _draw_point(draw, scaled_xy, marker_color, radius=5) label = _milestone_label(milestone_debug) draw.text( (scaled_xy[0] + 6, scaled_xy[1] - 10), label, fill=marker_color, font=FONT_SM, ) x = camera_slot * PANEL_SIZE y = HEADER_HEIGHT canvas.paste(panel, (x, y)) text_panel = Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), (18, 18, 18)) draw = ImageDraw.Draw(text_panel) draw.text((12, 12), "Metric State", fill=(255, 255, 255), font=FONT_LG) _bar(draw, 12, 56, PANEL_SIZE - 24, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU) _bar(draw, 12, 108, PANEL_SIZE - 24, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) draw.text((12, 146), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])}", fill=(255, 255, 255), font=FONT_MD) draw.text((12, 172), f"pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD) draw.text((12, 198), f"left_grasped={int(left_grasped)} door={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD) draw.text((12, 224), f"y_retrieve={int(frame_row.get('y_retrieve', 0))} phase={int(frame_row['phase_switch'])} score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD) _draw_badge(draw, 12, 36, "yellow=approach", (130, 110, 30)) _draw_badge(draw, 138, 36, "green=retreat", (32, 130, 64)) _draw_badge(draw, 250, 36, "red=failed", (140, 40, 40)) canvas.paste(text_panel, (0, HEADER_HEIGHT + PANEL_SIZE)) search_panel = Image.new("RGB", (PANEL_SIZE * 2, PANEL_SIZE), (18, 18, 18)) draw = ImageDraw.Draw(search_panel) draw.text((12, 12), "p_ext Planner Search", fill=(255, 255, 255), font=FONT_LG) if pext_debug: initial_height = float(pext_debug.get("initial_height", frame_row.get("tray_height", 0.0))) final_height = float(pext_debug.get("final_height", initial_height)) total_length = float(pext_debug.get("total_length", 0.0)) num_milestones = int(pext_debug.get("num_milestones", len(milestone_debugs))) first_failed = int(pext_debug.get("first_failed_milestone", -1)) mean_rel = float(pext_debug.get("mean_reliability", 0.0)) min_rel = float(pext_debug.get("min_reliability", 0.0)) approach_score = float(pext_debug.get("approach_score", 0.0)) retreat_score = float(pext_debug.get("retreat_score", 0.0)) _bar(draw, 12, 50, 220, 16, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) _bar(draw, 260, 50, 220, 16, approach_score, "approach_score", (255, 210, 80)) _bar(draw, 508, 50, 220, 16, retreat_score, "retreat_score", (80, 255, 120)) _bar(draw, 12, 94, 220, 16, mean_rel, "mean_rel", (120, 180, 255)) _bar(draw, 260, 94, 220, 16, min_rel, "min_rel", (120, 180, 255)) draw.text( (12, 124), f"height {initial_height:.3f} -> {final_height:.3f} | total_len={total_length:.3f} | milestones={num_milestones} | first_failed={first_failed}", fill=(235, 235, 235), font=FONT_SM, ) draw.text( (12, 146), f"start_grasped={int(bool(pext_debug.get('already_grasped_start', False)))} end_grasped={int(bool(pext_debug.get('already_grasped_end', False)))}", fill=(235, 235, 235), font=FONT_SM, ) start_y = 172 row_h = 16 bar_x = 550 for row_index, milestone_debug in enumerate(milestone_debugs[:5]): y = start_y + row_index * row_h color = _milestone_color(milestone_debug) label = _milestone_label(milestone_debug) status = "OK" if bool(milestone_debug.get("path_found", False)) else "FAIL" planner_score = float(milestone_debug.get("planner_score", 0.0)) reliability = float(milestone_debug.get("reliability", 0.0)) path_length = float(milestone_debug.get("path_length", 0.0)) ignore_collisions = bool(milestone_debug.get("ignore_collisions", False)) height_before = float(milestone_debug.get("height_before", initial_height)) height_after = float(milestone_debug.get("height_after", height_before)) line = ( f"{label:<3} {status:<4} rel={reliability:.2f} score={planner_score:.3f} " f"len={path_length:.3f} {'IGN' if ignore_collisions else 'STRICT':<6} " f"h={height_before:.3f}->{height_after:.3f}" ) draw.text((12, y), line, fill=color, font=FONT_SM) draw.rounded_rectangle((bar_x, y + 2, bar_x + 160, y + 12), radius=3, outline=(80, 80, 80), fill=(25, 25, 25)) fill_w = int(max(0.0, min(1.0, planner_score)) * 160) if fill_w > 0: draw.rounded_rectangle((bar_x, y + 2, bar_x + fill_w, y + 12), radius=3, fill=color) if not milestone_debugs: _draw_text_lines( draw, 12, 176, [ "No milestone planning was needed at this frame.", "This usually means the tray was already safely extractable", "or the planner returned immediate success without staged search.", ], fill=(220, 220, 220), font=FONT_MD, line_height=22, ) else: _draw_text_lines( draw, 12, 60, [ "No debug sidecar was provided for this frame.", "The richer p_ext milestone search diagnostics are unavailable.", ], fill=(220, 220, 220), font=FONT_MD, line_height=22, ) canvas.paste(search_panel, (PANEL_SIZE, HEADER_HEIGHT + PANEL_SIZE)) return canvas def _project_world_polyline(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> List[Tuple[int, int]]: if len(points_world) == 0: return [] uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) coords = [] for (u, v), (_, _, depth) in zip(uv, camera_xyz): if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE): continue coords.append((int(round(float(u))), int(round(float(v))))) return coords def _load_debug_row(path: Optional[Path], frame_index: int) -> Optional[Dict[str, object]]: if path is None or not path.exists(): return None with path.open("r", encoding="utf-8") as handle: for line in handle: row = json.loads(line) if int(row.get("frame_index", -1)) == int(frame_index): return row return None def _milestone_poses( tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates, frame_row: pd.Series, ) -> List[np.ndarray]: poses = _extract_sequence_poses(tray_pose, task_base_pose, templates) current_height = float(frame_row.get("p_ext_initial_height", tray_pose[2])) already_grasped = bool(frame_row.get("p_ext_already_grasped_start", 0.0)) approach_poses = [] if already_grasped else poses[:2] retreat_poses = poses[2:] if already_grasped and retreat_poses: future_retreat_poses = [ pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01 ] if future_retreat_poses: retreat_poses = future_retreat_poses elif current_height < _extract_height_threshold(templates): retreat_poses = [retreat_poses[-1]] else: retreat_poses = [] return [*approach_poses, *retreat_poses] def _milestone_color(milestone_debug: Dict[str, object]) -> Tuple[int, int, int]: kind = str(milestone_debug.get("kind", "approach")) path_found = bool(milestone_debug.get("path_found", False)) reliability = float(milestone_debug.get("reliability", 0.0)) if not path_found: return (230, 70, 70) if reliability < 1.0: return (255, 170, 60) if kind == "retreat": return (80, 255, 120) return (255, 220, 0) def _milestone_label(milestone_debug: Dict[str, object]) -> str: kind = str(milestone_debug.get("kind", "approach")) prefix = "A" if kind == "approach" else "R" return f"{prefix}{int(milestone_debug.get('milestone_index', 0))}" def _compose_all_metrics( episode_name: str, episode_dir: Path, demo, frame_index: int, frame_row: pd.Series, ) -> Image.Image: banner_color = (145, 30, 30) if int(frame_row["phase_switch"]) == 0 else (25, 115, 40) canvas = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE + FOOTER_HEIGHT), (12, 12, 12)) draw = ImageDraw.Draw(canvas) draw.rectangle((0, 0, canvas.size[0], 36), fill=banner_color) draw.text( (12, 7), f"{episode_name} | frame {frame_index:03d} | {'REVEAL' if int(frame_row['phase_switch']) == 0 else 'RETRIEVE'}", fill=(255, 255, 255), font=FONT_LG, ) for idx, camera_name in enumerate(ALL_METRICS_CAMERAS): panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index)) _draw_panel_title(panel, camera_name, None) canvas.paste(panel, (idx * PANEL_SIZE, 36)) footer_y = 36 + PANEL_SIZE draw.rectangle((0, footer_y, canvas.size[0], canvas.size[1]), fill=(18, 18, 18)) _bar(draw, 16, footer_y + 34, 220, 18, float(frame_row["three_view_visibility"]), "three_view_visibility", (60, 160, 255), threshold=0.35) _bar(draw, 272, footer_y + 34, 220, 18, float(frame_row["full_view_visibility"]), "full_view_visibility", (80, 190, 255), threshold=0.35) _bar(draw, 528, footer_y + 34, 220, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU) _bar(draw, 16, footer_y + 96, 220, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) draw.text((272, footer_y + 84), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])} pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD) draw.text((272, footer_y + 108), f"y_ready={int(frame_row['y_ready'])} phase_switch={int(frame_row['phase_switch'])} phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD) draw.text((528, footer_y + 84), f"door_angle={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD) draw.text((528, footer_y + 108), f"left_open={float(frame_row['left_gripper_open']):.1f} right_open={float(frame_row['right_gripper_open']):.1f}", fill=(255, 255, 255), font=FONT_MD) return canvas def _save_if_requested(image: Image.Image, path: Optional[Path]) -> None: if path is None: return path.parent.mkdir(parents=True, exist_ok=True) image.save(path) def _load_templates(path: Path) -> MotionTemplates: if path.suffix.lower() == ".json": with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) return MotionTemplates.from_json(payload["templates"]) with path.open("rb") as handle: return pickle.load(handle) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--episode-dir", required=True) parser.add_argument("--templates-pkl", required=True) parser.add_argument("--dense-csv", required=True) parser.add_argument("--frame-index", type=int, required=True) parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--debug-jsonl") parser.add_argument("--visibility-out") parser.add_argument("--path-out") parser.add_argument("--all-out") args = parser.parse_args() episode_dir = Path(args.episode_dir) templates = _load_templates(Path(args.templates_pkl)) frame_df = pd.read_csv(args.dense_csv) if "frame_index" in frame_df.columns: matches = frame_df.loc[frame_df["frame_index"] == int(args.frame_index)] if len(matches) == 0: raise ValueError(f"frame_index {args.frame_index} not found in {args.dense_csv}") frame_row = matches.iloc[0] else: frame_row = frame_df.iloc[int(args.frame_index)] debug_jsonl = Path(args.debug_jsonl) if args.debug_jsonl else None debug_row = _load_debug_row(debug_jsonl, int(args.frame_index)) visibility_out = Path(args.visibility_out) if args.visibility_out else None path_out = Path(args.path_out) if args.path_out else None all_out = Path(args.all_out) if args.all_out else None # The all-metrics panel only depends on RGB frames plus the dense CSV row. # Skip simulator replay entirely when no geometric overlays are requested. if all_out is not None and visibility_out is None and path_out is None: all_image = _compose_all_metrics( episode_name=episode_dir.name, episode_dir=episode_dir, demo=None, frame_index=int(args.frame_index), frame_row=frame_row, ) _save_if_requested(all_image, all_out) return 0 demo = _load_demo(episode_dir) env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=args.checkpoint_stride) cache.reset() cache.step_to(int(args.frame_index)) state = cache.current_state() left_grasped = any( obj.get_name() == "tray" for obj in task._scene.robot.left_gripper.get_grasped_objects() ) if visibility_out is not None: visibility_image = _compose_visibility_focus( episode_dir=episode_dir, demo=demo, frame_index=int(args.frame_index), tray_pose=state.tray_pose, left_pose=state.left_gripper_pose, right_pose=state.right_gripper_pose, templates=templates, frame_row=frame_row, ) _save_if_requested(visibility_image, visibility_out) if path_out is not None: path_image = _compose_path_quality( episode_dir=episode_dir, demo=demo, frame_index=int(args.frame_index), tray_pose=state.tray_pose, task_base_pose=task._task.get_base().get_pose(), left_pose=state.left_gripper_pose, right_pose=state.right_gripper_pose, templates=templates, frame_row=frame_row, left_grasped=left_grasped, debug_row=debug_row, ) _save_if_requested(path_image, path_out) if all_out is not None: all_image = _compose_all_metrics( episode_name=episode_dir.name, episode_dir=episode_dir, demo=demo, frame_index=int(args.frame_index), frame_row=frame_row, ) _save_if_requested(all_image, all_out) finally: env.shutdown() return 0 if __name__ == "__main__": raise SystemExit(main())