| from collections import Counter |
| from pathlib import Path |
| import argparse |
| import json |
| import math |
| import os |
| import pickle |
| import sys |
| from typing import Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from PIL import Image, ImageDraw, ImageFont |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from rr_label_study.oven_study import ( |
| BimanualTakeTrayOutOfOven, |
| DEFAULT_PPRE_TAU, |
| FULL_CAMERA_SET, |
| MotionTemplates, |
| ReplayCache, |
| Shape, |
| _apply_relative_pose, |
| _camera_point_cloud, |
| _camera_file, |
| _extract_height_threshold, |
| _extract_sequence_poses, |
| _frame_metrics, |
| _launch_replay_env, |
| _load_demo, |
| _load_mask, |
| _pregrasp_corridor_rel_poses, |
| _project_points, |
| _sample_full_tray_points, |
| _sample_grasp_points, |
| _visibility_projection_details, |
| ) |
|
|
|
|
| VISIBILITY_CAMERAS = ["front", "wrist_right", "wrist_left"] |
| PATH_CAMERAS = ["front", "overhead", "over_shoulder_left"] |
| ALL_METRICS_CAMERAS = ["front", "wrist_right", "wrist_left"] |
| RGB_SIZE = 128 |
| PANEL_SIZE = 256 |
| HEADER_HEIGHT = 72 |
| FOOTER_HEIGHT = 210 |
|
|
|
|
| def _font(size: int) -> ImageFont.FreeTypeFont: |
| candidates = [ |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", |
| "/usr/share/fonts/truetype/liberation2/LiberationSans-Regular.ttf", |
| ] |
| for path in candidates: |
| if Path(path).exists(): |
| return ImageFont.truetype(path, size=size) |
| return ImageFont.load_default() |
|
|
|
|
| FONT_SM = _font(14) |
| FONT_MD = _font(18) |
| FONT_LG = _font(24) |
|
|
|
|
| def _load_rgb(episode_dir: Path, camera_name: str, frame_index: int) -> Image.Image: |
| path = _camera_file(episode_dir, camera_name, "rgb", frame_index) |
| return Image.open(path).convert("RGB") |
|
|
|
|
| def _scale_image(image: Image.Image, size: int = PANEL_SIZE) -> Image.Image: |
| return image.resize((size, size), resample=Image.Resampling.BILINEAR) |
|
|
|
|
| def _new_panel(background: Tuple[int, int, int] = (0, 0, 0)) -> Image.Image: |
| return Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), background) |
|
|
|
|
| def _draw_badge(draw: ImageDraw.ImageDraw, x: int, y: int, text: str, fill: Tuple[int, int, int]) -> None: |
| left = x |
| top = y |
| width = 12 + int(len(text) * 8.5) |
| height = 24 |
| draw.rounded_rectangle((left, top, left + width, top + height), radius=6, fill=fill) |
| draw.text((left + 6, top + 4), text, fill=(255, 255, 255), font=FONT_SM) |
|
|
|
|
| def _draw_panel_title(image: Image.Image, title: str, subtitle: Optional[str] = None) -> None: |
| draw = ImageDraw.Draw(image) |
| draw.rectangle((0, 0, PANEL_SIZE, 30), fill=(0, 0, 0)) |
| draw.text((8, 6), title, fill=(255, 255, 255), font=FONT_MD) |
| if subtitle: |
| draw.rectangle((0, PANEL_SIZE - 26, PANEL_SIZE, PANEL_SIZE), fill=(0, 0, 0)) |
| draw.text((8, PANEL_SIZE - 22), subtitle, fill=(230, 230, 230), font=FONT_SM) |
|
|
|
|
| def _draw_point(draw: ImageDraw.ImageDraw, xy: Tuple[int, int], color: Tuple[int, int, int], radius: int = 3) -> None: |
| x, y = xy |
| draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=color) |
|
|
|
|
| def _draw_polyline(draw: ImageDraw.ImageDraw, coords: Sequence[Tuple[int, int]], color: Tuple[int, int, int], width: int = 2) -> None: |
| if len(coords) >= 2: |
| draw.line(coords, fill=color, width=width) |
|
|
|
|
| def _draw_segmented_polyline( |
| draw: ImageDraw.ImageDraw, |
| coords: Sequence[Tuple[int, int]], |
| colors: Sequence[Tuple[int, int, int]], |
| width: int = 3, |
| ) -> None: |
| if len(coords) < 2: |
| return |
| for idx in range(len(coords) - 1): |
| color = colors[min(idx + 1, len(colors) - 1)] |
| draw.line((coords[idx], coords[idx + 1]), fill=color, width=width) |
|
|
|
|
| def _draw_text_lines( |
| draw: ImageDraw.ImageDraw, |
| x: int, |
| y: int, |
| lines: Sequence[str], |
| fill: Tuple[int, int, int] = (255, 255, 255), |
| font: ImageFont.FreeTypeFont = FONT_SM, |
| line_height: int = 18, |
| ) -> None: |
| for line_index, line in enumerate(lines): |
| draw.text((x, y + line_index * line_height), line, fill=fill, font=font) |
|
|
|
|
| def _bar(draw: ImageDraw.ImageDraw, x: int, y: int, w: int, h: int, value: float, label: str, color: Tuple[int, int, int], threshold: Optional[float] = None) -> None: |
| draw.text((x, y - 18), f"{label}: {value:.3f}", fill=(255, 255, 255), font=FONT_SM) |
| draw.rounded_rectangle((x, y, x + w, y + h), radius=5, outline=(120, 120, 120), fill=(20, 20, 20)) |
| filled_w = int(max(0.0, min(1.0, value)) * w) |
| if filled_w > 0: |
| if filled_w >= 12: |
| draw.rounded_rectangle((x, y, x + filled_w, y + h), radius=5, fill=color) |
| else: |
| draw.rectangle((x, y, x + filled_w, y + h), fill=color) |
| if threshold is not None: |
| tx = x + int(max(0.0, min(1.0, threshold)) * w) |
| draw.line((tx, y - 2, tx, y + h + 2), fill=(255, 255, 255), width=2) |
|
|
|
|
| def _project_coords( |
| points_world: np.ndarray, |
| point_cloud_world: np.ndarray, |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| ) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]: |
| return _visibility_projection_details( |
| points_world, point_cloud_world, extrinsics, intrinsics |
| ) |
|
|
|
|
| def _infer_mask_handle(mask: np.ndarray, coords: Sequence[Tuple[int, int]]) -> Optional[int]: |
| values = [int(mask[y, x]) for x, y in coords if int(mask[y, x]) != 0] |
| if not values: |
| return None |
| return Counter(values).most_common(1)[0][0] |
|
|
|
|
| def _masked_rgb(rgb: Image.Image, binary_mask: np.ndarray) -> Image.Image: |
| rgb_np = np.asarray(rgb).copy() |
| out = np.zeros_like(rgb_np) |
| out[binary_mask] = rgb_np[binary_mask] |
| return Image.fromarray(out, mode="RGB") |
|
|
|
|
| def _scaled_coords(coords: Iterable[Tuple[int, int]], scale: float) -> List[Tuple[int, int]]: |
| return [(int(round(x * scale)), int(round(y * scale))) for x, y in coords] |
|
|
|
|
| def _demo_gripper_pose(obs, arm_name: str) -> np.ndarray: |
| side = getattr(obs, arm_name) |
| return np.asarray(side.gripper_pose, dtype=np.float64) |
|
|
|
|
| def _world_pose_to_coords( |
| pose_xyz: Sequence[float], |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| ) -> Optional[Tuple[int, int]]: |
| points = np.asarray([pose_xyz], dtype=np.float64) |
| uv, camera_xyz = _project_points(points, extrinsics, intrinsics) |
| u, v = uv[0] |
| depth = camera_xyz[0, 2] |
| if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE): |
| return None |
| return int(round(float(u))), int(round(float(v))) |
|
|
|
|
| def _compose_visibility_panel( |
| episode_dir: Path, |
| demo, |
| frame_index: int, |
| camera_name: str, |
| tray_pose: np.ndarray, |
| left_pose: np.ndarray, |
| right_pose: np.ndarray, |
| templates: MotionTemplates, |
| per_camera_vis: float, |
| ) -> Image.Image: |
| rgb = _load_rgb(episode_dir, camera_name, frame_index) |
| point_cloud, extrinsics, intrinsics = _camera_point_cloud( |
| episode_dir, demo, frame_index, camera_name |
| ) |
|
|
| tray_points = _sample_full_tray_points(tray_pose) |
| grasp_points = _sample_grasp_points(templates, tray_pose) |
| tray_proj, tray_visible = _project_coords( |
| tray_points, point_cloud, extrinsics, intrinsics |
| ) |
| grasp_proj, grasp_visible = _project_coords( |
| grasp_points, point_cloud, extrinsics, intrinsics |
| ) |
|
|
| scene_panel = _scale_image(rgb) |
| _draw_panel_title( |
| scene_panel, |
| f"{camera_name} scene", |
| f"metric vis={per_camera_vis:.3f} | visible={len(grasp_visible)}/{len(grasp_proj) or 1}", |
| ) |
| draw = ImageDraw.Draw(scene_panel) |
| for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (70, 170, 255), radius=2) |
| for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (255, 80, 210), radius=3) |
| for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (70, 255, 140), radius=3) |
| _draw_badge(draw, 8, 34, "blue=tray", (40, 105, 180)) |
| _draw_badge(draw, 112, 34, "magenta=grasp", (145, 40, 140)) |
| _draw_badge(draw, 8, 64, "green=visible", (32, 130, 64)) |
|
|
| xray_panel = _new_panel() |
| _draw_panel_title(xray_panel, f"{camera_name} x-ray", "blue=tray, magenta=grasp region") |
| draw = ImageDraw.Draw(xray_panel) |
| for xy in _scaled_coords(tray_proj, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (70, 170, 255), radius=2) |
| for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (255, 80, 210), radius=3) |
| left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics) |
| right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics) |
| if left_xy is not None: |
| _draw_point(draw, _scaled_coords([left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 230, 70), radius=4) |
| if right_xy is not None: |
| _draw_point(draw, _scaled_coords([right_xy], PANEL_SIZE / RGB_SIZE)[0], (140, 110, 255), radius=4) |
|
|
| visible_pixels = np.zeros((RGB_SIZE, RGB_SIZE), dtype=bool) |
| for x, y in tray_visible + grasp_visible: |
| visible_pixels[y, x] = True |
| masked_panel = _masked_rgb(rgb, visible_pixels) |
| masked_panel = _scale_image(masked_panel) |
| _draw_panel_title( |
| masked_panel, |
| f"{camera_name} depth-visible", |
| f"visible grasp pts={len(grasp_visible)}/{len(grasp_proj) or 1}", |
| ) |
| draw = ImageDraw.Draw(masked_panel) |
| for xy in _scaled_coords(grasp_proj, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (180, 80, 160), radius=2) |
| for xy in _scaled_coords(grasp_visible, PANEL_SIZE / RGB_SIZE): |
| _draw_point(draw, xy, (70, 255, 140), radius=3) |
|
|
| row = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE), (15, 15, 15)) |
| row.paste(scene_panel, (0, 0)) |
| row.paste(xray_panel, (PANEL_SIZE, 0)) |
| row.paste(masked_panel, (PANEL_SIZE * 2, 0)) |
| return row |
|
|
|
|
| def _compose_visibility_focus( |
| episode_dir: Path, |
| demo, |
| frame_index: int, |
| tray_pose: np.ndarray, |
| left_pose: np.ndarray, |
| right_pose: np.ndarray, |
| templates: MotionTemplates, |
| frame_row: pd.Series, |
| ) -> Image.Image: |
| canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * len(VISIBILITY_CAMERAS)), (12, 12, 12)) |
| draw = ImageDraw.Draw(canvas) |
| draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0)) |
| draw.text((12, 10), f"Visibility Metric Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG) |
| draw.text( |
| (12, 40), |
| f"three_view_visibility={frame_row['three_view_visibility']:.3f} | full_view_visibility={frame_row['full_view_visibility']:.3f} | y_ready={int(frame_row['y_ready'])} | phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", |
| fill=(235, 235, 235), |
| font=FONT_SM, |
| ) |
| for row_index, camera_name in enumerate(VISIBILITY_CAMERAS): |
| per_camera_vis = float(_camera_grasp_visibility(episode_dir, demo, frame_index, camera_name, tray_pose, templates)) |
| row = _compose_visibility_panel( |
| episode_dir=episode_dir, |
| demo=demo, |
| frame_index=frame_index, |
| camera_name=camera_name, |
| tray_pose=tray_pose, |
| left_pose=left_pose, |
| right_pose=right_pose, |
| templates=templates, |
| per_camera_vis=per_camera_vis, |
| ) |
| canvas.paste(row, (0, HEADER_HEIGHT + row_index * PANEL_SIZE)) |
| return canvas |
|
|
|
|
| def _camera_grasp_visibility( |
| episode_dir: Path, |
| demo, |
| frame_index: int, |
| camera_name: str, |
| tray_pose: np.ndarray, |
| templates: MotionTemplates, |
| ) -> float: |
| point_cloud, extrinsics, intrinsics = _camera_point_cloud( |
| episode_dir, demo, frame_index, camera_name |
| ) |
| grasp_points = _sample_grasp_points(templates, tray_pose) |
| proj, vis = _project_coords(grasp_points, point_cloud, extrinsics, intrinsics) |
| return float(len(vis) / len(proj)) if proj else 0.0 |
|
|
|
|
| def _compose_path_quality( |
| episode_dir: Path, |
| demo, |
| frame_index: int, |
| tray_pose: np.ndarray, |
| task_base_pose: np.ndarray, |
| left_pose: np.ndarray, |
| right_pose: np.ndarray, |
| templates: MotionTemplates, |
| frame_row: pd.Series, |
| left_grasped: bool, |
| debug_row: Optional[Dict[str, object]], |
| ) -> Image.Image: |
| canvas = Image.new("RGB", (PANEL_SIZE * 3, HEADER_HEIGHT + PANEL_SIZE * 2), (12, 12, 12)) |
| draw = ImageDraw.Draw(canvas) |
| draw.rectangle((0, 0, canvas.size[0], HEADER_HEIGHT), fill=(0, 0, 0)) |
| draw.text((12, 10), f"Path Quality Debug | frame {frame_index}", fill=(255, 255, 255), font=FONT_LG) |
| draw.text( |
| (12, 40), |
| f"p_pre={frame_row['p_pre']:.3f} y_pre={int(frame_row['y_pre'])} | p_ext={frame_row['p_ext']:.3f} y_ext={int(frame_row['y_ext'])} | grasped={int(left_grasped)}", |
| fill=(235, 235, 235), |
| font=FONT_SM, |
| ) |
|
|
| pregrasp_corridor = [ |
| _apply_relative_pose(tray_pose, rel_pose) |
| for rel_pose in _pregrasp_corridor_rel_poses(templates) |
| ] |
| extract_poses = _extract_sequence_poses(tray_pose, task_base_pose, templates) |
| plan_poses = [*pregrasp_corridor, extract_poses[1], *extract_poses[2:5]] |
| colors = ([(255, 220, 0)] * len(pregrasp_corridor)) + [ |
| (255, 140, 0), |
| (80, 255, 120), |
| (80, 255, 120), |
| (80, 255, 120), |
| ] |
| milestone_poses = _milestone_poses(tray_pose, task_base_pose, templates, frame_row) |
| pext_debug = debug_row.get("p_ext", {}) if debug_row else {} |
| milestone_debugs = list(pext_debug.get("milestones", [])) |
|
|
| demo_left_trail = np.asarray( |
| [_demo_gripper_pose(demo[i], "left")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)], |
| dtype=np.float64, |
| ) |
| demo_right_trail = np.asarray( |
| [_demo_gripper_pose(demo[i], "right")[:3] for i in range(max(0, frame_index - 12), frame_index + 1)], |
| dtype=np.float64, |
| ) |
|
|
| for camera_slot, camera_name in enumerate(PATH_CAMERAS): |
| panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index)) |
| _draw_panel_title(panel, camera_name, "demo trails + planned extract path") |
| draw = ImageDraw.Draw(panel) |
| extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"] |
| intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"] |
| left_trail_xy = _project_world_polyline(demo_left_trail, extrinsics, intrinsics) |
| right_trail_xy = _project_world_polyline(demo_right_trail, extrinsics, intrinsics) |
| plan_xy = [ |
| xy |
| for pose in plan_poses |
| for xy in [_world_pose_to_coords(pose[:3], extrinsics, intrinsics)] |
| if xy is not None |
| ] |
| _draw_polyline(draw, _scaled_coords(left_trail_xy, PANEL_SIZE / RGB_SIZE), (80, 220, 255), width=3) |
| _draw_polyline(draw, _scaled_coords(right_trail_xy, PANEL_SIZE / RGB_SIZE), (150, 110, 255), width=3) |
| _draw_segmented_polyline(draw, _scaled_coords(plan_xy, PANEL_SIZE / RGB_SIZE), colors, width=4) |
| current_left_xy = _world_pose_to_coords(left_pose[:3], extrinsics, intrinsics) |
| current_right_xy = _world_pose_to_coords(right_pose[:3], extrinsics, intrinsics) |
| if current_left_xy is not None: |
| _draw_point(draw, _scaled_coords([current_left_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5) |
| if current_right_xy is not None: |
| _draw_point(draw, _scaled_coords([current_right_xy], PANEL_SIZE / RGB_SIZE)[0], (255, 255, 255), radius=5) |
| for pose, color in zip(plan_poses, colors): |
| xy = _world_pose_to_coords(pose[:3], extrinsics, intrinsics) |
| if xy is not None: |
| _draw_point(draw, _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0], color, radius=4) |
| for milestone_pose, milestone_debug in zip(milestone_poses, milestone_debugs): |
| xy = _world_pose_to_coords(milestone_pose[:3], extrinsics, intrinsics) |
| if xy is None: |
| continue |
| scaled_xy = _scaled_coords([xy], PANEL_SIZE / RGB_SIZE)[0] |
| marker_color = _milestone_color(milestone_debug) |
| _draw_point(draw, scaled_xy, marker_color, radius=5) |
| label = _milestone_label(milestone_debug) |
| draw.text( |
| (scaled_xy[0] + 6, scaled_xy[1] - 10), |
| label, |
| fill=marker_color, |
| font=FONT_SM, |
| ) |
| x = camera_slot * PANEL_SIZE |
| y = HEADER_HEIGHT |
| canvas.paste(panel, (x, y)) |
|
|
| text_panel = Image.new("RGB", (PANEL_SIZE, PANEL_SIZE), (18, 18, 18)) |
| draw = ImageDraw.Draw(text_panel) |
| draw.text((12, 12), "Metric State", fill=(255, 255, 255), font=FONT_LG) |
| _bar(draw, 12, 56, PANEL_SIZE - 24, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU) |
| _bar(draw, 12, 108, PANEL_SIZE - 24, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) |
| draw.text((12, 146), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((12, 172), f"pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((12, 198), f"left_grasped={int(left_grasped)} door={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((12, 224), f"y_retrieve={int(frame_row.get('y_retrieve', 0))} phase={int(frame_row['phase_switch'])} score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| _draw_badge(draw, 12, 36, "yellow=approach", (130, 110, 30)) |
| _draw_badge(draw, 138, 36, "green=retreat", (32, 130, 64)) |
| _draw_badge(draw, 250, 36, "red=failed", (140, 40, 40)) |
| canvas.paste(text_panel, (0, HEADER_HEIGHT + PANEL_SIZE)) |
|
|
| search_panel = Image.new("RGB", (PANEL_SIZE * 2, PANEL_SIZE), (18, 18, 18)) |
| draw = ImageDraw.Draw(search_panel) |
| draw.text((12, 12), "p_ext Planner Search", fill=(255, 255, 255), font=FONT_LG) |
| if pext_debug: |
| initial_height = float(pext_debug.get("initial_height", frame_row.get("tray_height", 0.0))) |
| final_height = float(pext_debug.get("final_height", initial_height)) |
| total_length = float(pext_debug.get("total_length", 0.0)) |
| num_milestones = int(pext_debug.get("num_milestones", len(milestone_debugs))) |
| first_failed = int(pext_debug.get("first_failed_milestone", -1)) |
| mean_rel = float(pext_debug.get("mean_reliability", 0.0)) |
| min_rel = float(pext_debug.get("min_reliability", 0.0)) |
| approach_score = float(pext_debug.get("approach_score", 0.0)) |
| retreat_score = float(pext_debug.get("retreat_score", 0.0)) |
| _bar(draw, 12, 50, 220, 16, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) |
| _bar(draw, 260, 50, 220, 16, approach_score, "approach_score", (255, 210, 80)) |
| _bar(draw, 508, 50, 220, 16, retreat_score, "retreat_score", (80, 255, 120)) |
| _bar(draw, 12, 94, 220, 16, mean_rel, "mean_rel", (120, 180, 255)) |
| _bar(draw, 260, 94, 220, 16, min_rel, "min_rel", (120, 180, 255)) |
| draw.text( |
| (12, 124), |
| f"height {initial_height:.3f} -> {final_height:.3f} | total_len={total_length:.3f} | milestones={num_milestones} | first_failed={first_failed}", |
| fill=(235, 235, 235), |
| font=FONT_SM, |
| ) |
| draw.text( |
| (12, 146), |
| f"start_grasped={int(bool(pext_debug.get('already_grasped_start', False)))} end_grasped={int(bool(pext_debug.get('already_grasped_end', False)))}", |
| fill=(235, 235, 235), |
| font=FONT_SM, |
| ) |
| start_y = 172 |
| row_h = 16 |
| bar_x = 550 |
| for row_index, milestone_debug in enumerate(milestone_debugs[:5]): |
| y = start_y + row_index * row_h |
| color = _milestone_color(milestone_debug) |
| label = _milestone_label(milestone_debug) |
| status = "OK" if bool(milestone_debug.get("path_found", False)) else "FAIL" |
| planner_score = float(milestone_debug.get("planner_score", 0.0)) |
| reliability = float(milestone_debug.get("reliability", 0.0)) |
| path_length = float(milestone_debug.get("path_length", 0.0)) |
| ignore_collisions = bool(milestone_debug.get("ignore_collisions", False)) |
| height_before = float(milestone_debug.get("height_before", initial_height)) |
| height_after = float(milestone_debug.get("height_after", height_before)) |
| line = ( |
| f"{label:<3} {status:<4} rel={reliability:.2f} score={planner_score:.3f} " |
| f"len={path_length:.3f} {'IGN' if ignore_collisions else 'STRICT':<6} " |
| f"h={height_before:.3f}->{height_after:.3f}" |
| ) |
| draw.text((12, y), line, fill=color, font=FONT_SM) |
| draw.rounded_rectangle((bar_x, y + 2, bar_x + 160, y + 12), radius=3, outline=(80, 80, 80), fill=(25, 25, 25)) |
| fill_w = int(max(0.0, min(1.0, planner_score)) * 160) |
| if fill_w > 0: |
| draw.rounded_rectangle((bar_x, y + 2, bar_x + fill_w, y + 12), radius=3, fill=color) |
| if not milestone_debugs: |
| _draw_text_lines( |
| draw, |
| 12, |
| 176, |
| [ |
| "No milestone planning was needed at this frame.", |
| "This usually means the tray was already safely extractable", |
| "or the planner returned immediate success without staged search.", |
| ], |
| fill=(220, 220, 220), |
| font=FONT_MD, |
| line_height=22, |
| ) |
| else: |
| _draw_text_lines( |
| draw, |
| 12, |
| 60, |
| [ |
| "No debug sidecar was provided for this frame.", |
| "The richer p_ext milestone search diagnostics are unavailable.", |
| ], |
| fill=(220, 220, 220), |
| font=FONT_MD, |
| line_height=22, |
| ) |
| canvas.paste(search_panel, (PANEL_SIZE, HEADER_HEIGHT + PANEL_SIZE)) |
| return canvas |
|
|
|
|
| def _project_world_polyline(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> List[Tuple[int, int]]: |
| if len(points_world) == 0: |
| return [] |
| uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) |
| coords = [] |
| for (u, v), (_, _, depth) in zip(uv, camera_xyz): |
| if depth <= 0 or not (0 <= u < RGB_SIZE and 0 <= v < RGB_SIZE): |
| continue |
| coords.append((int(round(float(u))), int(round(float(v))))) |
| return coords |
|
|
|
|
| def _load_debug_row(path: Optional[Path], frame_index: int) -> Optional[Dict[str, object]]: |
| if path is None or not path.exists(): |
| return None |
| with path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| row = json.loads(line) |
| if int(row.get("frame_index", -1)) == int(frame_index): |
| return row |
| return None |
|
|
|
|
| def _milestone_poses( |
| tray_pose: np.ndarray, |
| task_base_pose: np.ndarray, |
| templates: MotionTemplates, |
| frame_row: pd.Series, |
| ) -> List[np.ndarray]: |
| poses = _extract_sequence_poses(tray_pose, task_base_pose, templates) |
| current_height = float(frame_row.get("p_ext_initial_height", tray_pose[2])) |
| already_grasped = bool(frame_row.get("p_ext_already_grasped_start", 0.0)) |
| approach_poses = [] if already_grasped else poses[:2] |
| retreat_poses = poses[2:] |
| if already_grasped and retreat_poses: |
| future_retreat_poses = [ |
| pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01 |
| ] |
| if future_retreat_poses: |
| retreat_poses = future_retreat_poses |
| elif current_height < _extract_height_threshold(templates): |
| retreat_poses = [retreat_poses[-1]] |
| else: |
| retreat_poses = [] |
| return [*approach_poses, *retreat_poses] |
|
|
|
|
| def _milestone_color(milestone_debug: Dict[str, object]) -> Tuple[int, int, int]: |
| kind = str(milestone_debug.get("kind", "approach")) |
| path_found = bool(milestone_debug.get("path_found", False)) |
| reliability = float(milestone_debug.get("reliability", 0.0)) |
| if not path_found: |
| return (230, 70, 70) |
| if reliability < 1.0: |
| return (255, 170, 60) |
| if kind == "retreat": |
| return (80, 255, 120) |
| return (255, 220, 0) |
|
|
|
|
| def _milestone_label(milestone_debug: Dict[str, object]) -> str: |
| kind = str(milestone_debug.get("kind", "approach")) |
| prefix = "A" if kind == "approach" else "R" |
| return f"{prefix}{int(milestone_debug.get('milestone_index', 0))}" |
|
|
|
|
| def _compose_all_metrics( |
| episode_name: str, |
| episode_dir: Path, |
| demo, |
| frame_index: int, |
| frame_row: pd.Series, |
| ) -> Image.Image: |
| banner_color = (145, 30, 30) if int(frame_row["phase_switch"]) == 0 else (25, 115, 40) |
| canvas = Image.new("RGB", (PANEL_SIZE * 3, PANEL_SIZE + FOOTER_HEIGHT), (12, 12, 12)) |
| draw = ImageDraw.Draw(canvas) |
| draw.rectangle((0, 0, canvas.size[0], 36), fill=banner_color) |
| draw.text( |
| (12, 7), |
| f"{episode_name} | frame {frame_index:03d} | {'REVEAL' if int(frame_row['phase_switch']) == 0 else 'RETRIEVE'}", |
| fill=(255, 255, 255), |
| font=FONT_LG, |
| ) |
| for idx, camera_name in enumerate(ALL_METRICS_CAMERAS): |
| panel = _scale_image(_load_rgb(episode_dir, camera_name, frame_index)) |
| _draw_panel_title(panel, camera_name, None) |
| canvas.paste(panel, (idx * PANEL_SIZE, 36)) |
|
|
| footer_y = 36 + PANEL_SIZE |
| draw.rectangle((0, footer_y, canvas.size[0], canvas.size[1]), fill=(18, 18, 18)) |
| _bar(draw, 16, footer_y + 34, 220, 18, float(frame_row["three_view_visibility"]), "three_view_visibility", (60, 160, 255), threshold=0.35) |
| _bar(draw, 272, footer_y + 34, 220, 18, float(frame_row["full_view_visibility"]), "full_view_visibility", (80, 190, 255), threshold=0.35) |
| _bar(draw, 528, footer_y + 34, 220, 18, float(frame_row["p_pre"]), "p_pre", (255, 220, 0), threshold=DEFAULT_PPRE_TAU) |
| _bar(draw, 16, footer_y + 96, 220, 18, float(frame_row["p_ext"]), "p_ext", (80, 255, 120), threshold=0.45) |
| draw.text((272, footer_y + 84), f"y_pre={int(frame_row['y_pre'])} y_ext={int(frame_row['y_ext'])} pre_prog={float(frame_row.get('pregrasp_progress', 0.0)):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((272, footer_y + 108), f"y_ready={int(frame_row['y_ready'])} phase_switch={int(frame_row['phase_switch'])} phase_score={float(frame_row.get('phase_score', frame_row['phase_switch'])):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((528, footer_y + 84), f"door_angle={float(frame_row['door_angle']):.3f}", fill=(255, 255, 255), font=FONT_MD) |
| draw.text((528, footer_y + 108), f"left_open={float(frame_row['left_gripper_open']):.1f} right_open={float(frame_row['right_gripper_open']):.1f}", fill=(255, 255, 255), font=FONT_MD) |
| return canvas |
|
|
|
|
| def _save_if_requested(image: Image.Image, path: Optional[Path]) -> None: |
| if path is None: |
| return |
| path.parent.mkdir(parents=True, exist_ok=True) |
| image.save(path) |
|
|
|
|
| def _load_templates(path: Path) -> MotionTemplates: |
| if path.suffix.lower() == ".json": |
| with path.open("r", encoding="utf-8") as handle: |
| payload = json.load(handle) |
| return MotionTemplates.from_json(payload["templates"]) |
| with path.open("rb") as handle: |
| return pickle.load(handle) |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--episode-dir", required=True) |
| parser.add_argument("--templates-pkl", required=True) |
| parser.add_argument("--dense-csv", required=True) |
| parser.add_argument("--frame-index", type=int, required=True) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--debug-jsonl") |
| parser.add_argument("--visibility-out") |
| parser.add_argument("--path-out") |
| parser.add_argument("--all-out") |
| args = parser.parse_args() |
|
|
| episode_dir = Path(args.episode_dir) |
| templates = _load_templates(Path(args.templates_pkl)) |
| frame_df = pd.read_csv(args.dense_csv) |
| if "frame_index" in frame_df.columns: |
| matches = frame_df.loc[frame_df["frame_index"] == int(args.frame_index)] |
| if len(matches) == 0: |
| raise ValueError(f"frame_index {args.frame_index} not found in {args.dense_csv}") |
| frame_row = matches.iloc[0] |
| else: |
| frame_row = frame_df.iloc[int(args.frame_index)] |
| debug_jsonl = Path(args.debug_jsonl) if args.debug_jsonl else None |
| debug_row = _load_debug_row(debug_jsonl, int(args.frame_index)) |
| visibility_out = Path(args.visibility_out) if args.visibility_out else None |
| path_out = Path(args.path_out) if args.path_out else None |
| all_out = Path(args.all_out) if args.all_out else None |
|
|
| |
| |
| if all_out is not None and visibility_out is None and path_out is None: |
| all_image = _compose_all_metrics( |
| episode_name=episode_dir.name, |
| episode_dir=episode_dir, |
| demo=None, |
| frame_index=int(args.frame_index), |
| frame_row=frame_row, |
| ) |
| _save_if_requested(all_image, all_out) |
| return 0 |
|
|
| demo = _load_demo(episode_dir) |
|
|
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=args.checkpoint_stride) |
| cache.reset() |
| cache.step_to(int(args.frame_index)) |
| state = cache.current_state() |
| left_grasped = any( |
| obj.get_name() == "tray" |
| for obj in task._scene.robot.left_gripper.get_grasped_objects() |
| ) |
|
|
| if visibility_out is not None: |
| visibility_image = _compose_visibility_focus( |
| episode_dir=episode_dir, |
| demo=demo, |
| frame_index=int(args.frame_index), |
| tray_pose=state.tray_pose, |
| left_pose=state.left_gripper_pose, |
| right_pose=state.right_gripper_pose, |
| templates=templates, |
| frame_row=frame_row, |
| ) |
| _save_if_requested(visibility_image, visibility_out) |
|
|
| if path_out is not None: |
| path_image = _compose_path_quality( |
| episode_dir=episode_dir, |
| demo=demo, |
| frame_index=int(args.frame_index), |
| tray_pose=state.tray_pose, |
| task_base_pose=task._task.get_base().get_pose(), |
| left_pose=state.left_gripper_pose, |
| right_pose=state.right_gripper_pose, |
| templates=templates, |
| frame_row=frame_row, |
| left_grasped=left_grasped, |
| debug_row=debug_row, |
| ) |
| _save_if_requested(path_image, path_out) |
|
|
| if all_out is not None: |
| all_image = _compose_all_metrics( |
| episode_name=episode_dir.name, |
| episode_dir=episode_dir, |
| demo=demo, |
| frame_index=int(args.frame_index), |
| frame_row=frame_row, |
| ) |
| _save_if_requested(all_image, all_out) |
| finally: |
| env.shutdown() |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|