import argparse import json import math import os import pickle from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple import numpy as np import pandas as pd from natsort import natsorted from PIL import Image from pyrep.const import ConfigurationPathAlgorithms as Algos, ObjectType from pyrep.errors import ConfigurationPathError from pyrep.objects.joint import Joint from pyrep.objects.object import Object from pyrep.objects.shape import Shape from pyrep.objects.vision_sensor import VisionSensor from rlbench.action_modes.action_mode import BimanualJointPositionActionMode from rlbench.action_modes.gripper_action_modes import BimanualDiscrete from rlbench.backend.const import DEPTH_SCALE from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import ( BimanualTakeTrayOutOfOven, ) from rlbench.demo import Demo from rlbench.environment import Environment from rlbench.observation_config import CameraConfig, ObservationConfig from sklearn.metrics import ( average_precision_score, f1_score, roc_auc_score, ) FULL_CAMERA_SET = [ "front", "overhead", "wrist_right", "wrist_left", "over_shoulder_left", "over_shoulder_right", ] THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"] DISPLAY = ":99" DEMO_DT = 0.05 DEFAULT_IMAGE_SIZE = (128, 128) DEFAULT_PATH_SCALE = 0.75 DEFAULT_PPRE_TAU = 0.45 DEFAULT_VISIBILITY_TAU = 0.35 DEFAULT_PEXT_TAU = 0.45 DEFAULT_DOOR_SPEED_TAU = 0.08 DEFAULT_PHASE_SCORE_TAU = 0.5 DEFAULT_APPROACH_SPEED_TAU = 0.01 DEFAULT_APPROACH_PROGRESS_TAU = 0.02 DEFAULT_PREGRASP_LABEL_PROGRESS_TAU = 0.60 DEFAULT_APPROACH_ONSET_WINDOW = 96 DEFAULT_PREGRASP_CANDIDATE_COUNT = 4 DEFAULT_PLAN_TRIALS = 48 DEFAULT_PLAN_MAX_CONFIGS = 4 DEFAULT_PLAN_MAX_TIME_MS = 10 DEFAULT_PLAN_TRIALS_PER_GOAL = 4 DEFAULT_PLAN_ATTEMPTS = 2 DEFAULT_PLAN_MIN_SUCCESSES = 2 DEFAULT_PREGRASP_ASSIST_PATH_SCALE = 3.0 DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA = 0.01 DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE = 0.02 DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE = 0.55 DEFAULT_DOOR_ASSIST_MIN_QUALITY = 0.3 DEFAULT_READY_PERSISTENCE = 3 DEFAULT_RETRIEVE_PERSISTENCE = 3 DEFAULT_PREGRASP_PERSISTENCE = 3 DEFAULT_MASK_HANDLE_COUNT = 8 DEFAULT_VISIBILITY_POINT_TOLERANCE = 0.03 DEFAULT_GRASP_SECONDARY_FACE_RATIO = 0.35 @dataclass class SimulatorSnapshot: task_state: Tuple[bytes, int] right_arm_tree: bytes right_gripper_tree: bytes left_arm_tree: bytes left_gripper_tree: bytes right_arm_joints: Tuple[float, ...] left_arm_joints: Tuple[float, ...] right_gripper_joints: Tuple[float, ...] left_gripper_joints: Tuple[float, ...] right_grasped: Tuple[str, ...] left_grasped: Tuple[str, ...] right_grasped_old_parents: Dict[str, Optional[str]] left_grasped_old_parents: Dict[str, Optional[str]] grasped_subtree_poses: Dict[str, Tuple[float, ...]] @dataclass class ReplayState: frame_index: int tray_pose: np.ndarray door_angle: float right_gripper_pose: np.ndarray left_gripper_pose: np.ndarray right_gripper_open: float left_gripper_open: float snapshot: Optional[SimulatorSnapshot] = None @dataclass class MotionTemplates: pregrasp_rel_pose: np.ndarray grasp_rel_pose: np.ndarray retreat_rel_poses: List[np.ndarray] grasp_local_center: np.ndarray grasp_region_extents: np.ndarray retriever_home_joints: np.ndarray hold_open_angle: float open_more_delta: float reference_tray_height: float approach_rel_poses: List[np.ndarray] = field(default_factory=list) mask_handle_ids: List[int] = field(default_factory=list) def to_json(self) -> Dict[str, object]: return { "approach_rel_poses": [pose.tolist() for pose in self.approach_rel_poses], "pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(), "grasp_rel_pose": self.grasp_rel_pose.tolist(), "retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses], "grasp_local_center": self.grasp_local_center.tolist(), "grasp_region_extents": self.grasp_region_extents.tolist(), "retriever_home_joints": self.retriever_home_joints.tolist(), "hold_open_angle": float(self.hold_open_angle), "open_more_delta": float(self.open_more_delta), "reference_tray_height": float(self.reference_tray_height), "mask_handle_ids": [int(handle) for handle in self.mask_handle_ids], } @classmethod def from_json(cls, payload: Dict[str, object]) -> "MotionTemplates": return cls( pregrasp_rel_pose=np.asarray(payload["pregrasp_rel_pose"], dtype=np.float64), grasp_rel_pose=np.asarray(payload["grasp_rel_pose"], dtype=np.float64), retreat_rel_poses=[ np.asarray(pose, dtype=np.float64) for pose in payload.get("retreat_rel_poses", []) ], grasp_local_center=np.asarray(payload["grasp_local_center"], dtype=np.float64), grasp_region_extents=np.asarray(payload["grasp_region_extents"], dtype=np.float64), retriever_home_joints=np.asarray( payload.get("retriever_home_joints", [0.0] * 7), dtype=np.float64 ), hold_open_angle=float(payload["hold_open_angle"]), open_more_delta=float(payload["open_more_delta"]), reference_tray_height=float(payload["reference_tray_height"]), approach_rel_poses=[ np.asarray(pose, dtype=np.float64) for pose in payload.get("approach_rel_poses", []) ], mask_handle_ids=[int(handle) for handle in payload.get("mask_handle_ids", [])], ) @dataclass class EpisodeArtifacts: episode_name: str dense: pd.DataFrame keyframes: pd.DataFrame metrics: Dict[str, object] template_frames: Dict[str, int] debug_rows: Optional[List[Dict[str, object]]] = None def _configure_runtime() -> None: os.environ["DISPLAY"] = os.environ.get("DISPLAY", DISPLAY) os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") if "/workspace/coppelia_sim" not in ld_library_path: os.environ["LD_LIBRARY_PATH"] = ( f"{ld_library_path}:/workspace/coppelia_sim" if ld_library_path else "/workspace/coppelia_sim" ) os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root") def _minimal_camera_config() -> Dict[str, CameraConfig]: return { "front": CameraConfig( rgb=False, depth=False, point_cloud=False, mask=False, image_size=DEFAULT_IMAGE_SIZE, ) } def _make_observation_config() -> ObservationConfig: return ObservationConfig( camera_configs=_minimal_camera_config(), joint_velocities=True, joint_positions=True, joint_forces=False, gripper_open=True, gripper_pose=True, gripper_matrix=False, gripper_joint_positions=True, gripper_touch_forces=False, task_low_dim_state=False, robot_name="bimanual", ) def _launch_replay_env() -> Environment: _configure_runtime() env = Environment( action_mode=BimanualJointPositionActionMode(), obs_config=_make_observation_config(), headless=True, robot_setup="dual_panda", ) env.launch() return env def _load_demo(episode_dir: Path) -> Demo: with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle: return pickle.load(handle) def _load_descriptions(episode_dir: Path) -> List[str]: with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle: return pickle.load(handle) def _episode_dirs(dataset_root: Path) -> List[Path]: episodes_dir = dataset_root.joinpath("all_variations", "episodes") return [ episodes_dir.joinpath(name) for name in natsorted(os.listdir(episodes_dir)) if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists() ] def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path: if kind == "rgb": return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png") if kind == "depth": return episode_dir.joinpath( f"{camera_name}_depth", f"depth_{frame_index:04d}.png" ) if kind == "mask": return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png") raise ValueError(f"unknown kind: {kind}") def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray: image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index)) depth = image_to_float_array(image, DEPTH_SCALE) near = demo[frame_index].misc[f"{camera_name}_camera_near"] far = demo[frame_index].misc[f"{camera_name}_camera_far"] return near + depth * (far - near) def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray: image = np.asarray( Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index)), dtype=np.float32, ) if image.ndim == 2: image = np.repeat(image[..., None], 3, axis=2) if image.max() > 1.0: image /= 255.0 return rgb_handles_to_mask(image.copy()) def _capture_snapshot(task) -> SimulatorSnapshot: robot = task._scene.robot right_grasped = tuple(robot.right_gripper.get_grasped_objects()) left_grasped = tuple(robot.left_gripper.get_grasped_objects()) grasped_subtree_poses: Dict[str, Tuple[float, ...]] = {} for grasped_object in right_grasped + left_grasped: for subtree_object in grasped_object.get_objects_in_tree(exclude_base=False): grasped_subtree_poses[subtree_object.get_name()] = tuple( float(value) for value in subtree_object.get_pose() ) return SimulatorSnapshot( task_state=task._task.get_state(), right_arm_tree=robot.right_arm.get_configuration_tree(), right_gripper_tree=robot.right_gripper.get_configuration_tree(), left_arm_tree=robot.left_arm.get_configuration_tree(), left_gripper_tree=robot.left_gripper.get_configuration_tree(), right_arm_joints=tuple(float(value) for value in robot.right_arm.get_joint_positions()), left_arm_joints=tuple(float(value) for value in robot.left_arm.get_joint_positions()), right_gripper_joints=tuple(float(value) for value in robot.right_gripper.get_joint_positions()), left_gripper_joints=tuple(float(value) for value in robot.left_gripper.get_joint_positions()), right_grasped=tuple(obj.get_name() for obj in right_grasped), left_grasped=tuple(obj.get_name() for obj in left_grasped), right_grasped_old_parents={ obj.get_name(): ( None if old_parent is None else old_parent.get_name() ) for obj, old_parent in zip(right_grasped, robot.right_gripper._old_parents) }, left_grasped_old_parents={ obj.get_name(): ( None if old_parent is None else old_parent.get_name() ) for obj, old_parent in zip(left_grasped, robot.left_gripper._old_parents) }, grasped_subtree_poses=grasped_subtree_poses, ) def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None: robot = task._scene.robot snapshot_has_grasp = bool(snapshot.right_grasped or snapshot.left_grasped) if not snapshot_has_grasp: robot.release_gripper() try: task._task.restore_state(snapshot.task_state) except RuntimeError: task._pyrep.set_configuration_tree(snapshot.task_state[0]) task._pyrep.set_configuration_tree(snapshot.right_arm_tree) task._pyrep.set_configuration_tree(snapshot.right_gripper_tree) task._pyrep.set_configuration_tree(snapshot.left_arm_tree) task._pyrep.set_configuration_tree(snapshot.left_gripper_tree) robot.right_arm.set_joint_positions(list(snapshot.right_arm_joints), disable_dynamics=True) robot.left_arm.set_joint_positions(list(snapshot.left_arm_joints), disable_dynamics=True) robot.right_arm.set_joint_target_positions(list(snapshot.right_arm_joints)) robot.left_arm.set_joint_target_positions(list(snapshot.left_arm_joints)) robot.right_gripper.set_joint_positions( list(snapshot.right_gripper_joints), disable_dynamics=True ) robot.left_gripper.set_joint_positions( list(snapshot.left_gripper_joints), disable_dynamics=True ) robot.right_gripper.set_joint_target_positions(list(snapshot.right_gripper_joints)) robot.left_gripper.set_joint_target_positions(list(snapshot.left_gripper_joints)) if snapshot_has_grasp: robot.release_gripper() for name, pose in snapshot.grasped_subtree_poses.items(): Object.get_object(name).set_pose(np.asarray(pose, dtype=np.float64)) task._pyrep.step() if not snapshot_has_grasp: robot.release_gripper() for name in snapshot.right_grasped: _force_attach_grasped_object( robot.right_gripper, Shape(name), snapshot.right_grasped_old_parents.get(name), ) for name in snapshot.left_grasped: _force_attach_grasped_object( robot.left_gripper, Shape(name), snapshot.left_grasped_old_parents.get(name), ) task._pyrep.step() def _force_attach_grasped_object( gripper, obj: Shape, old_parent_name: Optional[str] ) -> None: if any(grasped.get_name() == obj.get_name() for grasped in gripper.get_grasped_objects()): return gripper._grasped_objects.append(obj) old_parent = obj.get_parent() if old_parent_name is None else Object.get_object(old_parent_name) gripper._old_parents.append(old_parent) obj.set_parent(gripper._attach_point, keep_in_place=True) def _build_joint_action(target_obs) -> np.ndarray: def _joint_vector(value, fallback) -> np.ndarray: array = np.asarray(fallback if value is None else value, dtype=np.float64) if array.ndim != 1 or array.shape[0] != 7: array = np.asarray(fallback, dtype=np.float64) if array.ndim != 1 or array.shape[0] != 7: raise ValueError(f"invalid joint vector shape: {array.shape}") return array right = _joint_vector( target_obs.misc.get("right_executed_demo_joint_position_action"), target_obs.right.joint_positions, ) left = _joint_vector( target_obs.misc.get("left_executed_demo_joint_position_action"), target_obs.left.joint_positions, ) return np.concatenate( [ right, np.array([target_obs.right.gripper_open], dtype=np.float64), left, np.array([target_obs.left.gripper_open], dtype=np.float64), ] ) class ReplayCache: def __init__(self, task, demo: Demo, checkpoint_stride: int = 16): self.task = task self.demo = demo self.checkpoint_stride = checkpoint_stride self.current_index = 0 self.current_obs = None self.checkpoints: Dict[int, SimulatorSnapshot] = {} self.discrete_gripper = BimanualDiscrete() def reset(self) -> None: _, self.current_obs = self.task.reset_to_demo(self.demo) self.current_index = 0 self.checkpoints = {0: _capture_snapshot(self.task)} def step_to(self, target_index: int): if target_index < self.current_index: checkpoint_index = max(i for i in self.checkpoints if i <= target_index) _restore_snapshot(self.task, self.checkpoints[checkpoint_index]) self.current_index = checkpoint_index self.current_obs = self._observation_from_scene() while self.current_index < target_index: next_index = self.current_index + 1 self.task._action_mode.action( self.task._scene, _build_joint_action(self.demo[next_index]) ) self._apply_gripper_replay(self.demo[next_index]) self.current_obs = self._observation_from_scene() self.current_index = next_index if self.current_index % self.checkpoint_stride == 0: self.checkpoints[self.current_index] = _capture_snapshot(self.task) return self.current_obs def current_state(self) -> ReplayState: return ReplayState( frame_index=self.current_index, tray_pose=Shape("tray").get_pose(), door_angle=Joint("oven_door_joint").get_joint_position(), right_gripper_pose=self.current_obs.right.gripper_pose.copy(), left_gripper_pose=self.current_obs.left.gripper_pose.copy(), right_gripper_open=float(self.current_obs.right.gripper_open), left_gripper_open=float(self.current_obs.left.gripper_open), snapshot=None, ) def snapshot(self) -> SimulatorSnapshot: return _capture_snapshot(self.task) def restore(self, snapshot: SimulatorSnapshot) -> None: _restore_snapshot(self.task, snapshot) self.current_obs = self._observation_from_scene() def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None: self.restore(snapshot) self.current_index = frame_index def _observation_from_scene(self): return self.task._scene.get_observation() def _apply_gripper_replay(self, target_obs) -> None: desired = np.array( [target_obs.right.gripper_open, target_obs.left.gripper_open], dtype=np.float64, ) current = np.array( [ float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())), float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())), ], dtype=np.float64, ) if not np.allclose(current, desired): self.discrete_gripper.action(self.task._scene, desired) self.current_obs = self._observation_from_scene() self._maintain_grasp_state(desired) self.current_obs = self._observation_from_scene() def _maintain_grasp_state(self, desired: np.ndarray) -> None: scene = self.task._scene robot = scene.robot if float(desired[0]) <= 0.5: left_grasped = {obj.get_name() for obj in robot.left_gripper.get_grasped_objects()} for graspable in scene.task.get_graspable_objects(): if graspable.get_name() not in left_grasped: robot.right_gripper.grasp(graspable) elif robot.right_gripper.get_grasped_objects(): robot.right_gripper.release() if float(desired[1]) <= 0.5: right_grasped = {obj.get_name() for obj in robot.right_gripper.get_grasped_objects()} for graspable in scene.task.get_graspable_objects(): if graspable.get_name() not in right_grasped: robot.left_gripper.grasp(graspable) elif robot.left_gripper.get_grasped_objects(): robot.left_gripper.release() def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray: x, y, z, w = quat xx, yy, zz = x * x, y * y, z * z xy, xz, yz = x * y, x * z, y * z wx, wy, wz = w * x, w * y, w * z return np.array( [ [1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)], [2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)], [2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)], ], dtype=np.float64, ) def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray: trace = np.trace(matrix) if trace > 0.0: s = math.sqrt(trace + 1.0) * 2.0 w = 0.25 * s x = (matrix[2, 1] - matrix[1, 2]) / s y = (matrix[0, 2] - matrix[2, 0]) / s z = (matrix[1, 0] - matrix[0, 1]) / s elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]: s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0 w = (matrix[2, 1] - matrix[1, 2]) / s x = 0.25 * s y = (matrix[0, 1] + matrix[1, 0]) / s z = (matrix[0, 2] + matrix[2, 0]) / s elif matrix[1, 1] > matrix[2, 2]: s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0 w = (matrix[0, 2] - matrix[2, 0]) / s x = (matrix[0, 1] + matrix[1, 0]) / s y = 0.25 * s z = (matrix[1, 2] + matrix[2, 1]) / s else: s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0 w = (matrix[1, 0] - matrix[0, 1]) / s x = (matrix[0, 2] + matrix[2, 0]) / s y = (matrix[1, 2] + matrix[2, 1]) / s z = 0.25 * s quat = np.array([x, y, z, w], dtype=np.float64) return quat / np.linalg.norm(quat) def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray: matrix = np.eye(4, dtype=np.float64) matrix[:3, :3] = _quat_to_matrix(pose[3:]) matrix[:3, 3] = pose[:3] return matrix def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray: return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0) def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) target = _pose_to_matrix(target_pose) rel = np.linalg.inv(reference) @ target return _matrix_to_pose(rel) def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) relative = _pose_to_matrix(relative_pose) world = reference @ relative return _matrix_to_pose(world) def _orientation_error_degrees(current_quat: Sequence[float], target_quat: Sequence[float]) -> float: current = np.asarray(current_quat, dtype=np.float64) target = np.asarray(target_quat, dtype=np.float64) current_norm = float(np.linalg.norm(current)) target_norm = float(np.linalg.norm(target)) if current_norm <= 1e-9 or target_norm <= 1e-9: return 180.0 current = current / current_norm target = target / target_norm dot = float(np.clip(abs(current @ target), -1.0, 1.0)) return float(np.degrees(2.0 * np.arccos(dot))) def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]]) return (np.linalg.inv(reference) @ point)[:3] def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray: reference = _pose_to_matrix(reference_pose) point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]]) return (reference @ point)[:3] def _tray_bbox_local() -> np.ndarray: return np.asarray(Shape("tray").get_bounding_box(), dtype=np.float64) def _dedupe_points(points: Iterable[np.ndarray], precision: int = 5) -> List[np.ndarray]: unique: List[np.ndarray] = [] seen = set() for point in points: array = np.asarray(point, dtype=np.float64) key = tuple(np.round(array, precision)) if key in seen: continue seen.add(key) unique.append(array) return unique def _sample_box_face_points_local( bbox: np.ndarray, axis: int, face_index: int, center: np.ndarray, extents: np.ndarray, primary_samples: int, secondary_samples: int, ) -> List[np.ndarray]: center = np.asarray(center, dtype=np.float64) extents = np.asarray(extents, dtype=np.float64) tangent_axes = [idx for idx in range(3) if idx != axis] tangent_ranges: List[np.ndarray] = [] for tangent_idx, tangent_axis in enumerate(tangent_axes): lo = max(bbox[tangent_axis * 2], center[tangent_axis] - extents[tangent_axis]) hi = min(bbox[tangent_axis * 2 + 1], center[tangent_axis] + extents[tangent_axis]) if hi < lo: lo = hi = float(np.clip(center[tangent_axis], bbox[tangent_axis * 2], bbox[tangent_axis * 2 + 1])) tangent_ranges.append( np.linspace(lo, hi, primary_samples if tangent_idx == 0 else secondary_samples) ) face_value = bbox[axis * 2 + face_index] points: List[np.ndarray] = [] for first in tangent_ranges[0]: for second in tangent_ranges[1]: point = center.copy() point[axis] = face_value point[tangent_axes[0]] = first point[tangent_axes[1]] = second points.append(point) return points def _grasp_surface_specs( templates: MotionTemplates, bbox: np.ndarray ) -> Tuple[np.ndarray, List[Tuple[int, int]]]: center = np.asarray(templates.grasp_local_center, dtype=np.float64).copy() mins = np.asarray([bbox[0], bbox[2], bbox[4]], dtype=np.float64) maxs = np.asarray([bbox[1], bbox[3], bbox[5]], dtype=np.float64) center = np.clip(center, mins, maxs) approach = ( np.asarray(templates.grasp_rel_pose[:3], dtype=np.float64) - np.asarray(templates.pregrasp_rel_pose[:3], dtype=np.float64) ) if np.linalg.norm(approach) < 1e-6: approach = center - 0.5 * (mins + maxs) ranked_axes = np.argsort(-np.abs(approach)) primary_mag = float(abs(approach[ranked_axes[0]])) if len(ranked_axes) else 0.0 specs: List[Tuple[int, int]] = [] for rank, axis in enumerate(ranked_axes.tolist()): magnitude = float(abs(approach[axis])) if magnitude < 1e-6: continue if rank > 0 and magnitude < max(1e-6, primary_mag * DEFAULT_GRASP_SECONDARY_FACE_RATIO): continue move_positive = float(approach[axis]) >= 0.0 face_index = 0 if move_positive else 1 spec = (int(axis), int(face_index)) if spec not in specs: specs.append(spec) if len(specs) >= 2: break if specs: return center, specs # Fallback to the face closest to the demonstrated grasp center. face_distances = [] for axis in range(3): face_distances.append((abs(center[axis] - bbox[axis * 2]), axis, 0)) face_distances.append((abs(center[axis] - bbox[axis * 2 + 1]), axis, 1)) _, axis, face_index = min(face_distances, key=lambda item: item[0]) return center, [(int(axis), int(face_index))] def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int: values = [getattr(demo[i], side).gripper_open for i in range(len(demo))] for i in range(1, len(values)): if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5: return i if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5: return i raise RuntimeError(f"no gripper transition found for {side}") def _detect_pregrasp_approach_onset( cache: ReplayCache, left_close: int, pregrasp_rel_pose: np.ndarray, ) -> int: distances: List[float] = [] for frame_index in range(left_close): cache.step_to(frame_index) tray_pose = Shape("tray").get_pose() target_pose = _apply_relative_pose(tray_pose, pregrasp_rel_pose) current_pose = np.asarray(cache.current_obs.left.gripper_pose, dtype=np.float64) distances.append(float(np.linalg.norm(current_pose[:3] - target_pose[:3]))) distances_arr = np.asarray(distances, dtype=np.float64) if len(distances_arr) < 8: return max(0, left_close - 24) search_start = max(8, left_close - DEFAULT_APPROACH_ONSET_WINDOW) for frame_index in range(search_start, max(search_start, left_close - 6)): window = distances_arr[frame_index : frame_index + 6] if len(window) < 6: break short_drop = float(window[0] - window[-1]) if short_drop < 0.01: continue if np.mean(np.diff(window)) < -0.002: return frame_index return max(search_start, left_close - 24) def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]: env = _launch_replay_env() try: demo = _load_demo(template_episode_dir) task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=8) cache.reset() base_pose = task._task.get_base().get_pose() left_close = _first_transition(demo, "left", open_to_closed=True) left_open = _first_transition(demo, "left", open_to_closed=False) pregrasp_index = max(0, left_close - 5) right_close = _first_transition(demo, "right", open_to_closed=True) right_open = _first_transition(demo, "right", open_to_closed=False) bootstrap_indices = sorted({pregrasp_index, left_close, right_close, right_open, left_open}) states: Dict[int, ReplayState] = {} for frame_index in bootstrap_indices: cache.step_to(frame_index) states[frame_index] = cache.current_state() pregrasp_rel_pose = _relative_pose( states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose ) grasp_rel_pose = _relative_pose( states[left_close].tray_pose, states[left_close].left_gripper_pose ) approach_onset = _detect_pregrasp_approach_onset(cache, left_close, pregrasp_rel_pose) approach_indices = sorted( { *np.linspace( approach_onset, max(approach_onset, pregrasp_index), num=min(6, max(2, pregrasp_index - approach_onset + 1)), dtype=int, ).tolist(), pregrasp_index, max(0, left_close - 2), } ) retreat_indices = sorted( { *[ min(len(demo) - 1, max(left_close + 1, left_close + offset)) for offset in (5, 10, 15, 20, 30, 40, 50) ], max(left_close + 1, left_open - 20), max(left_close + 1, left_open - 10), max(left_close + 1, left_open - 5), left_open, } ) interesting = sorted( { *bootstrap_indices, *approach_indices, *retreat_indices, } ) for frame_index in interesting: if frame_index not in states: cache.step_to(frame_index) states[frame_index] = cache.current_state() approach_rel_poses = [ _relative_pose(states[index].tray_pose, states[index].left_gripper_pose) for index in approach_indices if index < left_close ] retreat_rel_poses = [ _relative_pose(base_pose, states[index].left_gripper_pose) for index in retreat_indices if index > left_close ] grasp_local_center = _world_to_local( states[left_close].tray_pose, states[left_close].left_gripper_pose[:3] ) templates = MotionTemplates( pregrasp_rel_pose=pregrasp_rel_pose, grasp_rel_pose=grasp_rel_pose, retreat_rel_poses=retreat_rel_poses, grasp_local_center=grasp_local_center, grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64), retriever_home_joints=np.asarray(demo[0].left.joint_positions, dtype=np.float64), hold_open_angle=float(states[right_open].door_angle), open_more_delta=max( 0.12, float(states[right_open].door_angle - states[right_close].door_angle) * 0.25, ), reference_tray_height=float(states[left_close].tray_pose[2]), approach_rel_poses=approach_rel_poses, ) templates.mask_handle_ids = _infer_tray_mask_handle_ids( episode_dir=template_episode_dir, demo=demo, cache=cache, templates=templates, reference_frames=approach_indices[-4:] + [left_close], ) template_frames = { "pregrasp": pregrasp_index, "grasp": left_close, "right_close": right_close, "right_open": right_open, "approach": approach_indices, "retreat": retreat_indices, } return templates, template_frames finally: env.shutdown() def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: camera_pos = extrinsics[:3, 3:4] rotation = extrinsics[:3, :3] world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1) projection = intrinsics @ world_to_camera return projection, world_to_camera def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: projection, world_to_camera = _camera_projection(extrinsics, intrinsics) homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1) camera_xyz = (world_to_camera @ homogeneous.T).T image_xyz = (projection @ homogeneous.T).T uv = image_xyz[:, :2] / image_xyz[:, 2:3] return uv, camera_xyz def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray: bbox = _tray_bbox_local() center, face_specs = _grasp_surface_specs(templates, bbox) extents = np.asarray(templates.grasp_region_extents, dtype=np.float64) points_local: List[np.ndarray] = [] for axis, face_index in face_specs: points_local.extend( _sample_box_face_points_local( bbox=bbox, axis=axis, face_index=face_index, center=center, extents=extents, primary_samples=9, secondary_samples=5, ) ) points_local = _dedupe_points(points_local) return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray: bbox = _tray_bbox_local() center = np.asarray( [ 0.5 * (bbox[0] + bbox[1]), 0.5 * (bbox[2] + bbox[3]), 0.5 * (bbox[4] + bbox[5]), ], dtype=np.float64, ) extents = np.asarray( [ 0.5 * (bbox[1] - bbox[0]), 0.5 * (bbox[3] - bbox[2]), 0.5 * (bbox[5] - bbox[4]), ], dtype=np.float64, ) points_local: List[np.ndarray] = [] for axis in range(3): for face_index in [0, 1]: points_local.extend( _sample_box_face_points_local( bbox=bbox, axis=axis, face_index=face_index, center=center, extents=extents, primary_samples=8, secondary_samples=8, ) ) points_local = _dedupe_points(points_local) return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) def _infer_tray_mask_handle_ids( episode_dir: Path, demo: Demo, cache: ReplayCache, templates: MotionTemplates, reference_frames: Sequence[int], max_handles: int = DEFAULT_MASK_HANDLE_COUNT, ) -> List[int]: counts: Dict[int, int] = {} fallback_counts: Dict[int, int] = {} unique_frames = sorted({int(frame_index) for frame_index in reference_frames}) for frame_index in unique_frames: cache.step_to(frame_index) grasp_points = _sample_grasp_points(templates, Shape("tray").get_pose()) tray_points = _sample_full_tray_points(Shape("tray").get_pose()) for camera_name in FULL_CAMERA_SET: mask = _load_mask(episode_dir, frame_index, camera_name) point_cloud, extrinsics, intrinsics = _camera_point_cloud( episode_dir, demo, frame_index, camera_name ) for points_world in [grasp_points, tray_points]: projected, visible = _visibility_projection_details( points_world, point_cloud, extrinsics, intrinsics ) for px, py in visible: handle = int(mask[py, px]) if handle != 0: counts[handle] = counts.get(handle, 0) + 1 for px, py in projected: handle = int(mask[py, px]) if handle != 0: fallback_counts[handle] = fallback_counts.get(handle, 0) + 1 ranked_source = counts if counts else fallback_counts if not ranked_source: return [] ranked = sorted(ranked_source.items(), key=lambda item: item[1], reverse=True) top_count = ranked[0][1] selected = [ handle for handle, count in ranked if count >= max(2, int(math.ceil(top_count * 0.15))) ][:max_handles] return selected if selected else [ranked[0][0]] def _camera_point_cloud( episode_dir: Path, demo: Demo, frame_index: int, camera_name: str, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: depth = _load_depth_meters(episode_dir, demo, frame_index, camera_name) extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"] intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"] point_cloud = VisionSensor.pointcloud_from_depth_and_camera_params( depth, extrinsics, intrinsics ) return point_cloud, extrinsics, intrinsics def _visibility_projection_details( points_world: np.ndarray, point_cloud_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray, tolerance: float = DEFAULT_VISIBILITY_POINT_TOLERANCE, ) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]: uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) height, width = point_cloud_world.shape[:2] projected: List[Tuple[int, int]] = [] visible: List[Tuple[int, int]] = [] for point_world, (u, v), (_, _, camera_depth) in zip(points_world, uv, camera_xyz): if camera_depth <= 0 or not (0 <= u < width and 0 <= v < height): continue px = min(max(int(round(float(u))), 0), width - 1) py = min(max(int(round(float(v))), 0), height - 1) projected.append((px, py)) scene_point = np.asarray(point_cloud_world[py, px], dtype=np.float64) if not np.isfinite(scene_point).all(): continue if float(np.linalg.norm(scene_point - point_world)) <= tolerance: visible.append((px, py)) return projected, visible def _point_visibility_ratio_from_point_cloud( points_world: np.ndarray, point_cloud_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray, ) -> float: projected, visible = _visibility_projection_details( points_world, point_cloud_world, extrinsics, intrinsics ) return float(len(visible) / len(projected)) if projected else 0.0 def _point_visibility_stats_from_point_cloud( points_world: np.ndarray, point_cloud_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray, ) -> Dict[str, float]: projected, visible = _visibility_projection_details( points_world, point_cloud_world, extrinsics, intrinsics ) projected_count = int(len(projected)) visible_count = int(len(visible)) ratio = float(visible_count / projected_count) if projected_count else 0.0 return { "ratio": ratio, "projected_count": projected_count, "visible_count": visible_count, } def _mask_visibility_ratio( points_world: np.ndarray, mask: np.ndarray, handle_ids: Sequence[int], extrinsics: np.ndarray, intrinsics: np.ndarray, ) -> float: if not handle_ids: return 0.0 handle_set = {int(handle) for handle in handle_ids} uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) height, width = mask.shape visible = 0 total = 0 for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz): if camera_depth <= 0: continue if not (0 <= u < width and 0 <= v < height): continue total += 1 px = int(round(u)) py = int(round(v)) px = min(max(px, 0), width - 1) py = min(max(py, 0), height - 1) if int(mask[py, px]) in handle_set: visible += 1 return float(visible / total) if total else 0.0 def _union_visibility(values: Iterable[float]) -> float: product = 1.0 for value in values: product *= 1.0 - float(value) return 1.0 - product def _sequence_summary(values: Sequence[float]) -> Dict[str, float]: if not values: return { "count": 0.0, "mean": 0.0, "median": 0.0, "min": 0.0, "max": 0.0, } array = np.asarray(values, dtype=np.float64) return { "count": float(array.size), "mean": float(np.mean(array)), "median": float(np.median(array)), "min": float(np.min(array)), "max": float(np.max(array)), } def _json_safe(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_safe(item) for key, item in value.items()} if isinstance(value, (list, tuple)): return [_json_safe(item) for item in value] if isinstance(value, np.generic): return _json_safe(value.item()) if isinstance(value, np.ndarray): return [_json_safe(item) for item in value.tolist()] if isinstance(value, float): return None if not math.isfinite(value) else value return value def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]: keypoints: List[int] = [] right_prev = demo[0].right.gripper_open left_prev = demo[0].left.gripper_open stopped_buffer = 0 for i, obs in enumerate(demo._observations): if i < 2 or i >= len(demo) - 1: right_stopped = left_stopped = False else: right_stopped = ( np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta) and obs.right.gripper_open == demo[i + 1].right.gripper_open and obs.right.gripper_open == demo[i - 1].right.gripper_open and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open ) left_stopped = ( np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta) and obs.left.gripper_open == demo[i + 1].left.gripper_open and obs.left.gripper_open == demo[i - 1].left.gripper_open and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open ) stopped = stopped_buffer <= 0 and right_stopped and left_stopped stopped_buffer = 4 if stopped else stopped_buffer - 1 last = i == len(demo) - 1 state_changed = ( obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev ) if i != 0 and (state_changed or last or stopped): keypoints.append(i) right_prev = obs.right.gripper_open left_prev = obs.left.gripper_open if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]: keypoints.pop(-2) return keypoints def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False): arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm try: return arm.get_path( pose[:3], quaternion=pose[3:], ignore_collisions=ignore_collisions, trials=DEFAULT_PLAN_TRIALS, max_configs=DEFAULT_PLAN_MAX_CONFIGS, max_time_ms=DEFAULT_PLAN_MAX_TIME_MS, trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL, algorithm=Algos.RRTConnect, ) except Exception: return None def _stable_plan( scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False, ) -> Tuple[Optional[object], float, float]: attempts = max(1, DEFAULT_PLAN_ATTEMPTS) successful_paths: List[Tuple[float, object]] = [] for _ in range(attempts): path = _plan_path(scene, arm_name, pose, ignore_collisions=ignore_collisions) length = _path_length(path) if path is None or not np.isfinite(length): continue successful_paths.append((float(length), path)) if not successful_paths: return None, math.inf, 0.0 successful_paths.sort(key=lambda item: item[0]) stable_length = float(np.median([length for length, _ in successful_paths])) reliability = float(len(successful_paths) / attempts) return successful_paths[0][1], stable_length, reliability def _plan_is_reliable(reliability: float) -> bool: attempts = max(1, DEFAULT_PLAN_ATTEMPTS) required = min(attempts, max(1, DEFAULT_PLAN_MIN_SUCCESSES)) return reliability >= (required / attempts) def _path_length(path) -> float: if path is None: return math.inf try: return float(path._get_path_point_lengths()[-1]) except Exception: return math.inf def _dedupe_pose_list(poses: Iterable[np.ndarray], precision: int = 4) -> List[np.ndarray]: unique: List[np.ndarray] = [] seen = set() for pose in poses: key = tuple(np.round(np.asarray(pose, dtype=np.float64), precision)) if key in seen: continue seen.add(key) unique.append(np.asarray(pose, dtype=np.float64)) return unique def _pregrasp_progress_and_distance( current_pose: np.ndarray, tray_pose: np.ndarray, templates: MotionTemplates, ) -> Tuple[float, float]: goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) if templates.approach_rel_poses: start_pose = _apply_relative_pose(tray_pose, templates.approach_rel_poses[0]) span = float(np.linalg.norm(start_pose[:3] - goal_pose[:3])) else: span = 0.12 distance = float(np.linalg.norm(current_pose[:3] - goal_pose[:3])) progress = 1.0 - (distance / max(span, 1e-6)) return float(np.clip(progress, 0.0, 1.0)), distance def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]: if templates.approach_rel_poses: rel_poses = templates.approach_rel_poses[-min( DEFAULT_PREGRASP_CANDIDATE_COUNT, len(templates.approach_rel_poses) ) :] else: rel_poses = [templates.pregrasp_rel_pose] candidates = [_apply_relative_pose(tray_pose, rel_pose) for rel_pose in rel_poses] for rel_pose in rel_poses[-1:]: base = _apply_relative_pose(tray_pose, rel_pose) for dx in (-0.02, 0.02): perturbed = base.copy() perturbed[0] += dx candidates.append(perturbed) return _dedupe_pose_list(candidates) def _late_pregrasp_goal_poses( tray_pose: np.ndarray, templates: MotionTemplates ) -> List[np.ndarray]: final_goal = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) goals = [final_goal] for dx in (-0.02, 0.02): perturbed = final_goal.copy() perturbed[0] += dx goals.append(perturbed) return _dedupe_pose_list(goals) def _pregrasp_corridor_rel_poses(templates: MotionTemplates) -> List[np.ndarray]: rel_poses = [ np.asarray(rel_pose, dtype=np.float64) for rel_pose in templates.approach_rel_poses ] if not rel_poses: return [np.asarray(templates.pregrasp_rel_pose, dtype=np.float64)] pregrasp_rel_pose = np.asarray(templates.pregrasp_rel_pose, dtype=np.float64) stop_index = int( np.argmin( [ float(np.linalg.norm(rel_pose[:3] - pregrasp_rel_pose[:3])) for rel_pose in rel_poses ] ) ) corridor = rel_poses[: stop_index + 1] if not corridor: corridor = [pregrasp_rel_pose] elif float(np.linalg.norm(corridor[-1][:3] - pregrasp_rel_pose[:3])) > 1e-6: corridor.append(pregrasp_rel_pose) return _dedupe_pose_list(corridor) def _extract_sequence_poses( tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates ) -> List[np.ndarray]: poses = [ _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose), _apply_relative_pose(tray_pose, templates.grasp_rel_pose), ] if templates.retreat_rel_poses: retreat_indices = np.linspace( 0, len(templates.retreat_rel_poses) - 1, num=min(3, len(templates.retreat_rel_poses)), dtype=int, ) poses.extend( _apply_relative_pose(task_base_pose, templates.retreat_rel_poses[index]) for index in sorted(set(retreat_indices.tolist())) ) return poses def _extract_height_threshold(templates: MotionTemplates) -> float: return templates.reference_tray_height + 0.06 def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float: baseline = float(templates.reference_tray_height) threshold = _extract_height_threshold(templates) current_height = float(current_height) if current_height <= baseline: return 0.0 if current_height < threshold: lift_fraction = (current_height - baseline) / max(threshold - baseline, 1e-6) return float(np.clip(0.8 * lift_fraction, 0.0, 0.8)) margin = current_height - threshold # Saturate smoothly once the tray is clearly lifted above the oven lip. return float(min(1.0, 0.8 + margin / 0.12)) def _arm_external_collision_names(scene, arm_name: str) -> List[str]: arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper robot_shapes = { shape.get_name() for shape in arm.get_objects_in_tree(object_type=ObjectType.SHAPE) } grasped = {obj.get_name() for obj in gripper.get_grasped_objects()} collisions: List[str] = [] for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE): name = shape.get_name() if not shape.is_collidable() or name in robot_shapes or name in grasped: continue try: if arm.check_arm_collision(shape): collisions.append(name) except Exception: continue return sorted(set(collisions)) def _allowed_door_collision_names(scene) -> set: allowed = set() for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE): name = shape.get_name() if name.startswith("oven_door") or name.startswith("oven_knob"): allowed.add(name) return allowed def _door_collision_shapes(scene) -> List[Shape]: return [ Shape(shape.get_handle()) for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE) if shape.get_name().startswith("oven_door") or shape.get_name().startswith("oven_knob") ] def _robot_collision_shape_handles(scene, arm_name: str) -> set: arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper robot_shapes = arm.get_objects_in_tree(object_type=ObjectType.SHAPE) robot_shapes.extend(gripper.get_objects_in_tree(object_type=ObjectType.SHAPE)) return {shape.get_handle() for shape in robot_shapes} def _shape_robot_contacts(shape: Shape, robot_shape_handles: set) -> List[np.ndarray]: contacts: List[np.ndarray] = [] try: raw_contacts = shape.get_contact(None, True) except Exception: return contacts shape_handle = shape.get_handle() for contact in raw_contacts: handles = set(contact.get("contact_handles", [])) if shape_handle not in handles: continue other_handles = handles - {shape_handle} if not other_handles.intersection(robot_shape_handles): continue contact_array = np.asarray(contact.get("contact", []), dtype=np.float64) if contact_array.shape != (9,) or not np.all(np.isfinite(contact_array)): continue contacts.append(contact_array) return contacts def _door_surface_normal(shape: Shape, contact_point: np.ndarray) -> Optional[np.ndarray]: bbox = np.asarray(shape.get_bounding_box(), dtype=np.float64) extents = np.array( [bbox[1] - bbox[0], bbox[3] - bbox[2], bbox[5] - bbox[4]], dtype=np.float64 ) if not np.all(np.isfinite(extents)): return None thin_axis = int(np.argmin(extents)) axes = np.asarray(shape.get_matrix()[:3, :3], dtype=np.float64) axis = axes[:, thin_axis] axis_norm = float(np.linalg.norm(axis)) if axis_norm <= 1e-9: return None axis = axis / axis_norm relative = np.asarray(contact_point, dtype=np.float64) - shape.get_position() sign = 1.0 if float(relative @ axis) >= 0.0 else -1.0 return sign * axis def _door_contact_alignments( shape: Shape, contacts: Sequence[np.ndarray], tip_delta: np.ndarray ) -> List[float]: tip_delta = np.asarray(tip_delta, dtype=np.float64) tip_speed = float(np.linalg.norm(tip_delta)) if tip_speed <= 1e-9: return [] move = tip_delta / tip_speed alignments: List[float] = [] for contact in contacts: normal = _door_surface_normal(shape, contact[:3]) if normal is None: continue alignments.append(float(np.clip(move @ normal, -1.0, 1.0))) return alignments def _door_contact_step_quality(alignments: Sequence[float], step_open_delta: float) -> float: if not alignments: return 0.0 values = np.asarray(alignments, dtype=np.float64) parallel_bonus = float(np.mean(1.0 - np.abs(values))) opening_motion_bonus = float(np.mean(np.clip(-values, 0.0, 1.0))) direct_penalty = float( np.mean( np.clip( (np.abs(values) - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE) / max(1.0 - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE, 1e-6), 0.0, 1.0, ) ) ) open_bonus = float( np.clip(step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0) ) close_penalty = float( np.clip(-step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0) ) return float( 0.6 * parallel_bonus + 0.4 * opening_motion_bonus + 0.4 * open_bonus - 0.8 * direct_penalty - 1.0 * close_penalty ) def _door_contact_path_quality(step_qualities: Sequence[float], door_open_delta: float) -> float: if not step_qualities: return 0.0 mean_quality = float(np.mean(np.asarray(step_qualities, dtype=np.float64))) open_bonus = float( np.clip( door_open_delta / (3.0 * max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6)), 0.0, 1.0, ) ) return float(np.clip(0.7 * mean_quality + 0.3 * open_bonus, 0.0, 1.0)) def _execute_path_with_collision_trace(task, path, arm_name: str) -> Dict[str, object]: collisions = set() step_qualities: List[float] = [] alignments_all: List[float] = [] door_shapes = _door_collision_shapes(task._scene) robot_shape_handles = _robot_collision_shape_handles(task._scene, arm_name) arm = task._scene.robot.left_arm if arm_name == "left" else task._scene.robot.right_arm tip = arm.get_tip() path.set_to_start(disable_dynamics=True) task._pyrep.step() previous_tip = tip.get_position() previous_door_angle = float(Joint("oven_door_joint").get_joint_position()) steps = 0 while True: done = path.step() task._pyrep.step() current_tip = tip.get_position() tip_delta = np.asarray(current_tip - previous_tip, dtype=np.float64) previous_tip = current_tip current_door_angle = float(Joint("oven_door_joint").get_joint_position()) step_open_delta = previous_door_angle - current_door_angle previous_door_angle = current_door_angle current_collisions = set(_arm_external_collision_names(task._scene, arm_name)) for shape in door_shapes: contacts = _shape_robot_contacts(shape, robot_shape_handles) if not contacts: continue current_collisions.add(shape.get_name()) alignments = _door_contact_alignments(shape, contacts, tip_delta) if alignments: alignments_all.extend(float(value) for value in alignments) step_qualities.append( _door_contact_step_quality(alignments, step_open_delta) ) collisions.update(current_collisions) steps += 1 if done or steps > 400: break return { "collision_names": sorted(collisions), "step_qualities": [float(value) for value in step_qualities], "alignments": [float(value) for value in alignments_all], "door_contact_steps": int(len(step_qualities)), "path_steps": int(steps), } def _single_door_assisted_pregrasp_score( task, target_pose: np.ndarray, base_snapshot: Optional[SimulatorSnapshot] = None, before_angle: Optional[float] = None, allowed_collisions: Optional[set] = None, ) -> Tuple[float, bool, Dict[str, object]]: snapshot = base_snapshot if base_snapshot is not None else _capture_snapshot(task) debug: Dict[str, object] = { "path_found": False, "path_length": math.inf, "base_score": 0.0, "final_score": 0.0, "bump_applied": False, "post_assist_strict_feasible": False, "post_assist_strict_length": math.inf, "door_open_delta": 0.0, "door_quality": 0.0, "collision_names": [], "bad_collision_names": [], "step_quality_mean": 0.0, "step_quality_min": 0.0, "step_quality_max": 0.0, "alignment_signed_mean": 0.0, "alignment_abs_mean": 0.0, "alignment_min": 0.0, "alignment_max": 0.0, "door_contact_steps": 0, "path_steps": 0, } try: if before_angle is None: before_angle = float(Joint("oven_door_joint").get_joint_position()) path = _plan_path( task._scene, "left", target_pose, ignore_collisions=True, ) length = _path_length(path) if path is None or not np.isfinite(length): return 0.0, False, debug debug["path_found"] = True debug["path_length"] = float(length) trace = _execute_path_with_collision_trace( task, path, "left" ) allowed = ( _allowed_door_collision_names(task._scene) if allowed_collisions is None else allowed_collisions ) collision_names = [str(name) for name in trace["collision_names"]] bad_collisions = [name for name in collision_names if name not in allowed] door_open_delta = before_angle - float(Joint("oven_door_joint").get_joint_position()) step_qualities = [float(value) for value in trace["step_qualities"]] alignments = [float(value) for value in trace["alignments"]] door_quality = _door_contact_path_quality(step_qualities, door_open_delta) step_summary = _sequence_summary(step_qualities) alignment_summary = _sequence_summary(alignments) debug.update( { "collision_names": collision_names, "bad_collision_names": bad_collisions, "door_open_delta": float(door_open_delta), "door_quality": float(door_quality), "step_quality_mean": float(step_summary["mean"]), "step_quality_min": float(step_summary["min"]), "step_quality_max": float(step_summary["max"]), "alignment_signed_mean": float(alignment_summary["mean"]), "alignment_abs_mean": float( np.mean(np.abs(np.asarray(alignments, dtype=np.float64))) ) if alignments else 0.0, "alignment_min": float(alignment_summary["min"]), "alignment_max": float(alignment_summary["max"]), "door_contact_steps": int(trace["door_contact_steps"]), "path_steps": int(trace["path_steps"]), } ) if ( bad_collisions or door_open_delta <= DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA or door_quality < DEFAULT_DOOR_ASSIST_MIN_QUALITY ): return 0.0, False, debug base_score = float( math.exp(-float(length) / DEFAULT_PREGRASP_ASSIST_PATH_SCALE) ) base_score *= door_quality debug["base_score"] = float(base_score) strict_path = _plan_path( task._scene, "left", target_pose, ignore_collisions=False, ) strict_length = _path_length(strict_path) debug["post_assist_strict_length"] = float(strict_length) if strict_path is not None and np.isfinite(strict_length): final_score = float(0.5 + 0.5 * base_score) debug["bump_applied"] = True debug["post_assist_strict_feasible"] = True debug["final_score"] = final_score return final_score, False, debug debug["final_score"] = float(base_score) return base_score, False, debug finally: _restore_snapshot(task, snapshot) def _door_assisted_pregrasp_score( task, target_pose: np.ndarray, base_snapshot: Optional[SimulatorSnapshot] = None, before_angle: Optional[float] = None, allowed_collisions: Optional[set] = None, ) -> Tuple[float, bool, Dict[str, object]]: attempts = max(1, DEFAULT_PLAN_ATTEMPTS) successful_scores: List[float] = [] successful_base_scores: List[float] = [] successful_lengths: List[float] = [] successful_door_qualities: List[float] = [] successful_open_deltas: List[float] = [] successful_step_means: List[float] = [] successful_align_abs_means: List[float] = [] successful_align_signed_means: List[float] = [] successful_collision_counts: List[float] = [] successful_bad_collision_counts: List[float] = [] post_assist_strict_count = 0 bump_count = 0 assisted_success = False attempt_debugs: List[Dict[str, object]] = [] for _ in range(attempts): score, success, attempt_debug = _single_door_assisted_pregrasp_score( task, target_pose, base_snapshot=base_snapshot, before_angle=before_angle, allowed_collisions=allowed_collisions, ) attempt_debugs.append(attempt_debug) if score > 0.0: successful_scores.append(float(score)) successful_base_scores.append(float(attempt_debug["base_score"])) successful_lengths.append(float(attempt_debug["path_length"])) successful_door_qualities.append(float(attempt_debug["door_quality"])) successful_open_deltas.append(float(attempt_debug["door_open_delta"])) successful_step_means.append(float(attempt_debug["step_quality_mean"])) successful_align_abs_means.append(float(attempt_debug["alignment_abs_mean"])) successful_align_signed_means.append(float(attempt_debug["alignment_signed_mean"])) successful_collision_counts.append(float(len(attempt_debug["collision_names"]))) successful_bad_collision_counts.append( float(len(attempt_debug["bad_collision_names"])) ) post_assist_strict_count += int(bool(attempt_debug["post_assist_strict_feasible"])) bump_count += int(bool(attempt_debug["bump_applied"])) assisted_success = assisted_success or success debug: Dict[str, object] = { "attempts": attempt_debugs, "attempt_count": int(attempts), "successful_attempt_count": int(len(successful_scores)), "reliability": 0.0, "base_score": 0.0, "score_before_reliability": 0.0, "median_path_length": 0.0, "door_quality": 0.0, "door_open_delta": 0.0, "step_quality_mean": 0.0, "alignment_abs_mean": 0.0, "alignment_signed_mean": 0.0, "collision_count": 0.0, "bad_collision_count": 0.0, "post_assist_strict_feasible": False, "bump_applied": False, } if not successful_scores: return 0.0, assisted_success, debug reliability = float(len(successful_scores) / attempts) debug.update( { "reliability": reliability, "base_score": float(np.median(np.asarray(successful_base_scores, dtype=np.float64))), "score_before_reliability": float( np.median(np.asarray(successful_scores, dtype=np.float64)) ), "median_path_length": float( np.median(np.asarray(successful_lengths, dtype=np.float64)) ), "door_quality": float( np.median(np.asarray(successful_door_qualities, dtype=np.float64)) ), "door_open_delta": float( np.median(np.asarray(successful_open_deltas, dtype=np.float64)) ), "step_quality_mean": float( np.median(np.asarray(successful_step_means, dtype=np.float64)) ), "alignment_abs_mean": float( np.median(np.asarray(successful_align_abs_means, dtype=np.float64)) ), "alignment_signed_mean": float( np.median(np.asarray(successful_align_signed_means, dtype=np.float64)) ), "collision_count": float( np.median(np.asarray(successful_collision_counts, dtype=np.float64)) ), "bad_collision_count": float( np.median(np.asarray(successful_bad_collision_counts, dtype=np.float64)) ), "post_assist_strict_feasible": bool(post_assist_strict_count > 0), "bump_applied": bool(bump_count > 0), } ) if not _plan_is_reliable(reliability): return 0.0, assisted_success, debug stable_score = float(np.median(np.asarray(successful_scores, dtype=np.float64))) return stable_score * reliability, assisted_success, debug def _pregrasp_score_and_success( task, templates: MotionTemplates ) -> Tuple[float, bool, Dict[str, object]]: tray = Shape("tray") if any( grasped.get_name() == tray.get_name() for grasped in task._scene.robot.left_gripper.get_grasped_objects() ): return ( 1.0, True, { "tray_grasped_left": True, "source": "grasped", "best_goal_index": -1, "num_goal_poses": 0, "source_strict": False, "source_assisted": False, "bump_applied": False, "base_score": 1.0, "strict_length": 0.0, "assisted_length": 0.0, "assisted_reliability": 1.0, "post_assist_strict_feasible": False, "door_open_delta": 0.0, "door_quality": 0.0, "step_quality_mean": 0.0, "step_quality_min": 0.0, "step_quality_max": 0.0, "alignment_signed_mean": 0.0, "alignment_abs_mean": 0.0, "collision_count": 0, "bad_collision_count": 0, "best_goal_pose": [], "goal_evaluations": [], }, ) tray_pose = tray.get_pose() goal_poses = _late_pregrasp_goal_poses(tray_pose, templates) best_score = 0.0 best_success = False best_goal_index = -1 best_goal_debug: Optional[Dict[str, object]] = None base_snapshot: Optional[SimulatorSnapshot] = None before_angle = float(Joint("oven_door_joint").get_joint_position()) allowed_collisions: Optional[set] = None goal_debugs: List[Dict[str, object]] = [] for goal_index, target_pose in enumerate(goal_poses): goal_debug: Dict[str, object] = { "goal_index": int(goal_index), "target_pose": [float(value) for value in np.asarray(target_pose, dtype=np.float64)], "strict_path_found": False, "strict_length": 0.0, "assisted": None, "candidate_score": 0.0, "source": "none", } path = _plan_path( task._scene, "left", target_pose, ignore_collisions=False, ) length = _path_length(path) if path is not None and np.isfinite(length): goal_debug["strict_path_found"] = True goal_debug["strict_length"] = float(length) goal_debug["candidate_score"] = 1.0 goal_debug["source"] = "strict" goal_debugs.append(goal_debug) return ( 1.0, True, { "tray_grasped_left": False, "source": "strict", "best_goal_index": int(goal_index), "num_goal_poses": int(len(goal_poses)), "source_strict": True, "source_assisted": False, "bump_applied": False, "base_score": 1.0, "strict_length": float(length), "assisted_length": 0.0, "assisted_reliability": 0.0, "post_assist_strict_feasible": False, "door_open_delta": 0.0, "door_quality": 0.0, "step_quality_mean": 0.0, "step_quality_min": 0.0, "step_quality_max": 0.0, "alignment_signed_mean": 0.0, "alignment_abs_mean": 0.0, "collision_count": 0, "bad_collision_count": 0, "best_goal_pose": [ float(value) for value in np.asarray(target_pose, dtype=np.float64) ], "goal_evaluations": goal_debugs, }, ) if base_snapshot is None: base_snapshot = _capture_snapshot(task) allowed_collisions = _allowed_door_collision_names(task._scene) assisted_score, assisted_success, assisted_debug = _door_assisted_pregrasp_score( task, target_pose, base_snapshot=base_snapshot, before_angle=before_angle, allowed_collisions=allowed_collisions, ) goal_debug["assisted"] = assisted_debug goal_debug["candidate_score"] = float(assisted_score) goal_debug["source"] = "assisted" if assisted_score > 0.0 else "none" goal_debugs.append(goal_debug) if assisted_score > best_score: best_score = assisted_score best_goal_index = goal_index best_goal_debug = goal_debug best_success = best_success or assisted_success if best_goal_debug is None: best_goal_debug = { "goal_index": -1, "strict_length": 0.0, "assisted": None, } assisted_debug = best_goal_debug.get("assisted") or {} return ( float(best_score), bool(best_success), { "tray_grasped_left": False, "source": "assisted" if best_score > 0.0 else "none", "best_goal_index": int(best_goal_index), "num_goal_poses": int(len(goal_poses)), "source_strict": False, "source_assisted": bool(best_score > 0.0), "bump_applied": bool(assisted_debug.get("bump_applied", False)), "base_score": float(assisted_debug.get("base_score", 0.0)), "strict_length": float(best_goal_debug.get("strict_length", 0.0)), "assisted_length": float(assisted_debug.get("median_path_length", 0.0)), "assisted_reliability": float(assisted_debug.get("reliability", 0.0)), "post_assist_strict_feasible": bool( assisted_debug.get("post_assist_strict_feasible", False) ), "door_open_delta": float(assisted_debug.get("door_open_delta", 0.0)), "door_quality": float(assisted_debug.get("door_quality", 0.0)), "step_quality_mean": float(assisted_debug.get("step_quality_mean", 0.0)), "step_quality_min": float( min( [ float(attempt.get("step_quality_min", 0.0)) for attempt in assisted_debug.get("attempts", []) if attempt.get("path_found", False) ] or [0.0] ) ), "step_quality_max": float( max( [ float(attempt.get("step_quality_max", 0.0)) for attempt in assisted_debug.get("attempts", []) if attempt.get("path_found", False) ] or [0.0] ) ), "alignment_signed_mean": float( assisted_debug.get("alignment_signed_mean", 0.0) ), "alignment_abs_mean": float(assisted_debug.get("alignment_abs_mean", 0.0)), "collision_count": int(round(float(assisted_debug.get("collision_count", 0.0)))), "bad_collision_count": int( round(float(assisted_debug.get("bad_collision_count", 0.0))) ), "best_goal_pose": [ float(value) for value in np.asarray( goal_poses[best_goal_index], dtype=np.float64 ) ] if best_goal_index >= 0 else [], "goal_evaluations": goal_debugs, }, ) def _extract_score_and_success( task, templates: MotionTemplates ) -> Tuple[float, bool, Dict[str, object]]: tray = Shape("tray") robot = task._scene.robot snapshot = _capture_snapshot(task) try: total_length = 0.0 current_height = float(tray.get_position()[2]) already_grasped = any( grasped.get_name() == tray.get_name() for grasped in robot.left_gripper.get_grasped_objects() ) debug: Dict[str, object] = { "initial_height": float(current_height), "final_height": float(current_height), "already_grasped_start": bool(already_grasped), "already_grasped_end": bool(already_grasped), "total_length": 0.0, "num_milestones": 0, "first_failed_milestone": -1, "min_reliability": 0.0, "mean_reliability": 0.0, "approach_score": 0.0, "retreat_score": 0.0, "milestones": [], "score": 0.0, } if already_grasped and current_height >= _extract_height_threshold(templates): score = _extraction_progress_score(current_height, templates) debug["score"] = float(score) return score, True, debug poses = _extract_sequence_poses( tray.get_pose(), task._task.get_base().get_pose(), templates ) approach_poses = [] if already_grasped else poses[:2] retreat_poses = poses[2:] if already_grasped and retreat_poses: future_retreat_poses = [ pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01 ] if future_retreat_poses: retreat_poses = future_retreat_poses elif current_height < _extract_height_threshold(templates): retreat_poses = [retreat_poses[-1]] else: retreat_poses = [] milestone_poses = approach_poses + retreat_poses milestone_collision = ([False] * len(approach_poses)) + ([True] * len(retreat_poses)) progress = _extraction_progress_score(current_height, templates) * 0.25 milestone_weight = 0.75 / max(1, len(milestone_poses)) milestone_reliabilities: List[float] = [] debug["num_milestones"] = int(len(milestone_poses)) for milestone_index, (pose, ignore_collisions) in enumerate( zip(milestone_poses, milestone_collision) ): path, length, reliability = _stable_plan( task._scene, "left", pose, ignore_collisions=ignore_collisions ) planner_score = ( reliability * math.exp(-length / DEFAULT_PATH_SCALE) if path is not None and np.isfinite(length) else 0.0 ) milestone_debug = { "milestone_index": int(milestone_index), "kind": "approach" if milestone_index < len(approach_poses) else "retreat", "ignore_collisions": bool(ignore_collisions), "path_found": bool(path is not None and np.isfinite(length)), "path_length": float(length) if np.isfinite(length) else 0.0, "reliability": float(reliability), "planner_score": float(planner_score), "height_before": float(current_height), } debug["milestones"].append(milestone_debug) if path is None or not np.isfinite(length): debug["first_failed_milestone"] = int(milestone_index) debug["final_height"] = float(current_height) debug["total_length"] = float(total_length) debug["score"] = float(progress) return progress, False, debug milestone_reliabilities.append(float(reliability)) progress += milestone_weight * planner_score if milestone_debug["kind"] == "approach": debug["approach_score"] = float(debug["approach_score"] + milestone_weight * planner_score) else: debug["retreat_score"] = float(debug["retreat_score"] + milestone_weight * planner_score) if not _plan_is_reliable(reliability): debug["first_failed_milestone"] = int(milestone_index) debug["final_height"] = float(current_height) debug["total_length"] = float(total_length + length) debug["score"] = float(progress) debug["min_reliability"] = float(min(milestone_reliabilities)) debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities))) return progress, False, debug total_length += length path.set_to_end(disable_dynamics=True) task._pyrep.step() current_height = float(tray.get_position()[2]) progress = max(progress, _extraction_progress_score(current_height, templates) * 0.25) milestone_debug["height_after"] = float(current_height) if (not already_grasped) and milestone_index == (len(approach_poses) - 1): robot.left_gripper.grasp(tray) already_grasped = True if not already_grasped: robot.left_gripper.grasp(tray) final_height = float(tray.get_position()[2]) success = final_height >= _extract_height_threshold(templates) score = max( progress, math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)), _extraction_progress_score(final_height, templates) if success else 0.0, ) debug["final_height"] = float(final_height) debug["already_grasped_end"] = bool(already_grasped) debug["total_length"] = float(total_length) debug["score"] = float(score) if milestone_reliabilities: debug["min_reliability"] = float(min(milestone_reliabilities)) debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities))) return score, bool(success), debug finally: _restore_snapshot(task, snapshot) def _wait_branch(task, steps: int = 5) -> None: for _ in range(steps): task._scene.step() def _open_more_branch(task, templates: MotionTemplates) -> None: joint = Joint("oven_door_joint") current = joint.get_joint_position() joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True) for _ in range(3): task._pyrep.step() def _hold_open_branch(task, templates: MotionTemplates) -> None: joint = Joint("oven_door_joint") current = joint.get_joint_position() joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True) for _ in range(3): task._pyrep.step() def _frame_metrics( episode_dir: Path, demo: Demo, frame_state: ReplayState, templates: MotionTemplates, ) -> Tuple[Dict[str, float], Dict[str, object]]: grasp_points = _sample_grasp_points(templates, frame_state.tray_pose) full_tray_points = _sample_full_tray_points(frame_state.tray_pose) camera_values: Dict[str, Dict[str, float]] = {} for camera_name in FULL_CAMERA_SET: point_cloud, extrinsics, intrinsics = _camera_point_cloud( episode_dir, demo, frame_state.frame_index, camera_name ) grasp_stats = _point_visibility_stats_from_point_cloud( grasp_points, point_cloud, extrinsics, intrinsics ) tray_stats = _point_visibility_stats_from_point_cloud( full_tray_points, point_cloud, extrinsics, intrinsics ) camera_values[camera_name] = { "grasp_visibility": float(grasp_stats["ratio"]), "grasp_projected_count": int(grasp_stats["projected_count"]), "grasp_visible_count": int(grasp_stats["visible_count"]), "tray_visibility": float(tray_stats["ratio"]), "tray_projected_count": int(tray_stats["projected_count"]), "tray_visible_count": int(tray_stats["visible_count"]), } values: Dict[str, float] = {} for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items(): values[f"{name}_visibility"] = _union_visibility( camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set ) values[f"{name}_whole_tray_visibility"] = _union_visibility( camera_values[camera_name]["tray_visibility"] for camera_name in camera_set ) for camera_name in FULL_CAMERA_SET: camera_value = camera_values[camera_name] values[f"{camera_name}_grasp_visibility"] = float(camera_value["grasp_visibility"]) values[f"{camera_name}_tray_visibility"] = float(camera_value["tray_visibility"]) values[f"{camera_name}_grasp_projected_count"] = float( camera_value["grasp_projected_count"] ) values[f"{camera_name}_grasp_visible_count"] = float(camera_value["grasp_visible_count"]) values[f"{camera_name}_tray_projected_count"] = float(camera_value["tray_projected_count"]) values[f"{camera_name}_tray_visible_count"] = float(camera_value["tray_visible_count"]) best_grasp_camera = max( FULL_CAMERA_SET, key=lambda camera_name: float(camera_values[camera_name]["grasp_visibility"]), ) best_tray_camera = max( FULL_CAMERA_SET, key=lambda camera_name: float(camera_values[camera_name]["tray_visibility"]), ) debug = { "best_grasp_visibility_camera": best_grasp_camera, "best_whole_tray_visibility_camera": best_tray_camera, "camera_values": camera_values, } return values, debug def _build_frame_artifacts( episode_dir: Path, demo: Demo, templates: MotionTemplates, task, state: ReplayState, ) -> Tuple[Dict[str, float], Dict[str, object]]: visibility_values, visibility_debug = _frame_metrics(episode_dir, demo, state, templates) tray_pose = np.asarray(state.tray_pose, dtype=np.float64) current_pose = np.asarray(state.left_gripper_pose, dtype=np.float64) right_pose = np.asarray(state.right_gripper_pose, dtype=np.float64) robot = task._scene.robot left_joints = np.asarray(robot.left_arm.get_joint_positions(), dtype=np.float64) right_joints = np.asarray(robot.right_arm.get_joint_positions(), dtype=np.float64) pregrasp_goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) pregrasp_progress, pregrasp_distance = _pregrasp_progress_and_distance( current_pose, tray_pose, templates, ) pregrasp_position_error = float(np.linalg.norm(current_pose[:3] - pregrasp_goal_pose[:3])) pregrasp_orientation_error_deg = _orientation_error_degrees( current_pose[3:], pregrasp_goal_pose[3:] ) tray = Shape("tray") tray_grasped_left = any( grasped.get_name() == tray.get_name() for grasped in task._scene.robot.left_gripper.get_grasped_objects() ) tray_height = float(tray.get_position()[2]) p_pre, y_pre, p_pre_debug = _pregrasp_score_and_success(task, templates) p_ext, y_ext, p_ext_debug = _extract_score_and_success(task, templates) row = { "frame_index": int(state.frame_index), "time_norm": float(state.frame_index / max(1, len(demo) - 1)), "door_angle": float(state.door_angle), "right_gripper_open": float(state.right_gripper_open), "left_gripper_open": float(state.left_gripper_open), "right_arm_pose_x": float(right_pose[0]), "right_arm_pose_y": float(right_pose[1]), "right_arm_pose_z": float(right_pose[2]), "right_arm_pose_qx": float(right_pose[3]), "right_arm_pose_qy": float(right_pose[4]), "right_arm_pose_qz": float(right_pose[5]), "right_arm_pose_qw": float(right_pose[6]), "left_arm_pose_x": float(current_pose[0]), "left_arm_pose_y": float(current_pose[1]), "left_arm_pose_z": float(current_pose[2]), "left_arm_pose_qx": float(current_pose[3]), "left_arm_pose_qy": float(current_pose[4]), "left_arm_pose_qz": float(current_pose[5]), "left_arm_pose_qw": float(current_pose[6]), "tray_height": float(tray_height), "tray_grasped_left": float(bool(tray_grasped_left)), "pregrasp_progress": float(pregrasp_progress), "pregrasp_distance": float(pregrasp_distance), "pregrasp_position_error": float(pregrasp_position_error), "pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg), "p_pre": float(p_pre), "p_ext": float(p_ext), "y_pre_raw": float(bool(y_pre)), "y_ext_raw": float(bool(y_ext)), "y_pre": float(bool(y_pre)), "y_ext": float(bool(y_ext)), "p_pre_source_strict": float(bool(p_pre_debug["source_strict"])), "p_pre_source_assisted": float(bool(p_pre_debug["source_assisted"])), "p_pre_bump_applied": float(bool(p_pre_debug["bump_applied"])), "p_pre_base_score": float(p_pre_debug["base_score"]), "p_pre_strict_length": float(p_pre_debug["strict_length"]), "p_pre_assisted_length": float(p_pre_debug["assisted_length"]), "p_pre_assisted_reliability": float(p_pre_debug["assisted_reliability"]), "p_pre_post_assist_strict_feasible": float( bool(p_pre_debug["post_assist_strict_feasible"]) ), "p_pre_door_open_delta": float(p_pre_debug["door_open_delta"]), "p_pre_door_quality": float(p_pre_debug["door_quality"]), "p_pre_step_quality_mean": float(p_pre_debug["step_quality_mean"]), "p_pre_step_quality_min": float(p_pre_debug["step_quality_min"]), "p_pre_step_quality_max": float(p_pre_debug["step_quality_max"]), "p_pre_alignment_signed_mean": float(p_pre_debug["alignment_signed_mean"]), "p_pre_alignment_abs_mean": float(p_pre_debug["alignment_abs_mean"]), "p_pre_collision_count": float(p_pre_debug["collision_count"]), "p_pre_bad_collision_count": float(p_pre_debug["bad_collision_count"]), "p_pre_best_goal_index": float(p_pre_debug["best_goal_index"]), "p_pre_num_goal_poses": float(p_pre_debug["num_goal_poses"]), "p_pre_best_target_pose_x": float(p_pre_debug["best_goal_pose"][0]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_y": float(p_pre_debug["best_goal_pose"][1]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_z": float(p_pre_debug["best_goal_pose"][2]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_qx": float(p_pre_debug["best_goal_pose"][3]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_qy": float(p_pre_debug["best_goal_pose"][4]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_qz": float(p_pre_debug["best_goal_pose"][5]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_pre_best_target_pose_qw": float(p_pre_debug["best_goal_pose"][6]) if p_pre_debug["best_goal_pose"] else float("nan"), "p_ext_initial_height": float(p_ext_debug["initial_height"]), "p_ext_final_height": float(p_ext_debug["final_height"]), "p_ext_total_length": float(p_ext_debug["total_length"]), "p_ext_already_grasped_start": float(bool(p_ext_debug["already_grasped_start"])), "p_ext_already_grasped_end": float(bool(p_ext_debug["already_grasped_end"])), "p_ext_num_milestones": float(p_ext_debug["num_milestones"]), "p_ext_first_failed_milestone": float(p_ext_debug["first_failed_milestone"]), "p_ext_min_reliability": float(p_ext_debug["min_reliability"]), "p_ext_mean_reliability": float(p_ext_debug["mean_reliability"]), "p_ext_approach_score": float(p_ext_debug["approach_score"]), "p_ext_retreat_score": float(p_ext_debug["retreat_score"]), **visibility_values, } debug = { "frame_index": int(state.frame_index), "state": { "door_angle": float(state.door_angle), "tray_height": float(tray_height), "tray_grasped_left": bool(tray_grasped_left), "left_arm_pose": [float(value) for value in current_pose], "right_arm_pose": [float(value) for value in right_pose], "left_arm_joint_positions": [float(value) for value in left_joints], "right_arm_joint_positions": [float(value) for value in right_joints], "pregrasp_goal_pose": [float(value) for value in pregrasp_goal_pose], "pregrasp_position_error": float(pregrasp_position_error), "pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg), }, "visibility": visibility_debug, "p_pre": p_pre_debug, "p_ext": p_ext_debug, } return row, debug def _compute_frame_row_isolated( episode_dir: Path, demo: Demo, templates: MotionTemplates, checkpoint_stride: int, frame_index: int, ) -> Dict[str, float]: rows = _compute_frame_rows_sequential( episode_dir=episode_dir, demo=demo, templates=templates, checkpoint_stride=checkpoint_stride, frame_indices=[frame_index], ) if not rows: raise RuntimeError(f"failed to compute frame {frame_index}") return rows[0] def _build_frame_row( episode_dir: Path, demo: Demo, templates: MotionTemplates, task, state: ReplayState, ) -> Dict[str, float]: row, _ = _build_frame_artifacts(episode_dir, demo, templates, task, state) return row def _compute_frame_rows_sequential( episode_dir: Path, demo: Demo, templates: MotionTemplates, checkpoint_stride: int, frame_indices: Sequence[int], ) -> List[Dict[str, float]]: env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() rows: List[Dict[str, float]] = [] for frame_index in sorted({int(index) for index in frame_indices}): cache.step_to(frame_index) frame_snapshot = cache.snapshot() state = cache.current_state() rows.append(_build_frame_row(episode_dir, demo, templates, task, state)) cache.restore(frame_snapshot) return rows finally: env.shutdown() def _compute_frame_rows_independent( episode_dir: Path, demo: Demo, templates: MotionTemplates, checkpoint_stride: int, frame_indices: Sequence[int], ) -> List[Dict[str, float]]: env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() initial_snapshot = cache.snapshot() rows: List[Dict[str, float]] = [] for frame_index in sorted({int(index) for index in frame_indices}): cache.restore_to_index(initial_snapshot, 0) cache.step_to(frame_index) state = cache.current_state() rows.append(_build_frame_row(episode_dir, demo, templates, task, state)) return rows finally: env.shutdown() def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float: if len(np.unique(y_true)) < 2: return float("nan") return float(roc_auc_score(y_true, y_score)) def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float: if len(np.unique(y_true)) < 2: return float("nan") return float(average_precision_score(y_true, y_score)) def _first_crossing(values: np.ndarray, threshold: float) -> int: above = np.flatnonzero(values >= threshold) return int(above[0]) if len(above) else -1 def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]: diffs = np.diff(binary_values.astype(int)) return int(np.sum(diffs == 1)), int(np.sum(diffs == -1)) def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame: key_df = frame_df.iloc[list(keyframes)].copy() key_df["keyframe_ordinal"] = np.arange(len(key_df)) return key_df def _persistent_rise_mask(binary_values: np.ndarray, window: int) -> np.ndarray: binary_values = np.asarray(binary_values, dtype=bool) rises = np.zeros(len(binary_values), dtype=bool) if window <= 0: rises[:] = binary_values return rises for index in range(len(binary_values)): segment = binary_values[index : index + window] if len(segment) == window and np.all(segment): rises[index] = True return rises def _monotone_after_first(binary_values: np.ndarray) -> np.ndarray: binary_values = np.asarray(binary_values, dtype=bool) monotone = np.zeros(len(binary_values), dtype=bool) if np.any(binary_values): first_true = int(np.flatnonzero(binary_values)[0]) monotone[first_true:] = True return monotone def _annotate_phase_columns(frame_df: pd.DataFrame) -> pd.DataFrame: door_speed = np.gradient(frame_df["door_angle"].to_numpy(dtype=float), DEMO_DT) frame_df["door_speed_signed"] = door_speed frame_df["door_speed_abs"] = np.abs(door_speed) y_ext_raw = ( frame_df["y_ext_raw"].to_numpy(dtype=bool) if "y_ext_raw" in frame_df else frame_df["y_ext"].to_numpy(dtype=bool) ) pregrasp_progress = ( frame_df["pregrasp_progress"].to_numpy(dtype=float) if "pregrasp_progress" in frame_df else frame_df["p_pre"].to_numpy(dtype=float) ) pregrasp_distance = ( frame_df["pregrasp_distance"].to_numpy(dtype=float) if "pregrasp_distance" in frame_df else 1.0 - pregrasp_progress ) pregrasp_speed = -np.gradient(pregrasp_distance, DEMO_DT) frame_df["pregrasp_speed"] = pregrasp_speed pregrasp_progress_seed = pregrasp_progress >= DEFAULT_PREGRASP_LABEL_PROGRESS_TAU y_pre_seed = ( frame_df["y_pre_raw"].to_numpy(dtype=bool) if "y_pre_raw" in frame_df else pregrasp_progress_seed ) y_pre_binary = _monotone_after_first( _persistent_rise_mask(y_pre_seed, DEFAULT_PREGRASP_PERSISTENCE) ) y_ext_binary = _monotone_after_first( _persistent_rise_mask(y_ext_raw, DEFAULT_READY_PERSISTENCE) ) frame_df["y_pre_seed_raw"] = y_pre_seed.astype(float) frame_df["y_pre_progress_seed"] = pregrasp_progress_seed.astype(float) frame_df["y_pre"] = y_pre_binary.astype(float) frame_df["y_ext"] = y_ext_binary.astype(float) frame_df["y_ext_seed_raw"] = y_ext_raw.astype(float) frame_df["phase_score"] = np.clip(frame_df["p_pre"].to_numpy(dtype=float), 0.0, 1.0) approach_active = ( (pregrasp_progress >= DEFAULT_APPROACH_PROGRESS_TAU) & (pregrasp_speed >= DEFAULT_APPROACH_SPEED_TAU) ) frame_df["approach_active"] = approach_active.astype(float) retrieve_onset = _persistent_rise_mask( approach_active & y_pre_binary, DEFAULT_RETRIEVE_PERSISTENCE ) frame_df["retrieve_onset_raw"] = retrieve_onset.astype(float) frame_df["y_retrieve"] = _monotone_after_first(retrieve_onset).astype(float) ready_seed = np.zeros(len(frame_df), dtype=bool) for index in range(len(frame_df)): window = y_ext_binary[index : index + DEFAULT_READY_PERSISTENCE] if ( len(window) == DEFAULT_READY_PERSISTENCE and np.all(window) and frame_df.iloc[index]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU ): ready_seed[index] = True frame_df["ready_seed_raw"] = ready_seed.astype(float) frame_df["y_ready"] = _monotone_after_first(ready_seed).astype(float) phase_seed = _persistent_rise_mask( frame_df["p_pre"].to_numpy(dtype=float) >= DEFAULT_PHASE_SCORE_TAU, DEFAULT_RETRIEVE_PERSISTENCE, ) frame_df["phase_seed_raw"] = phase_seed.astype(float) frame_df["phase_switch"] = _monotone_after_first(phase_seed).astype(float) frame_df["p_pre_minus_tau"] = ( frame_df["p_pre"].to_numpy(dtype=float) - DEFAULT_PHASE_SCORE_TAU ) frame_df["p_ext_minus_tau"] = ( frame_df["p_ext"].to_numpy(dtype=float) - DEFAULT_PEXT_TAU ) frame_df["door_speed_margin"] = DEFAULT_DOOR_SPEED_TAU - frame_df["door_speed_abs"] return frame_df def _episode_metrics_from_frames( frame_df: pd.DataFrame, key_df: pd.DataFrame, episode_name: str, description: str, interventions: Dict[str, float], ) -> Dict[str, object]: y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int) y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int) y_retrieve_arr = frame_df["y_retrieve"].to_numpy(dtype=int) y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int) p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float) p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float) phase_arr = frame_df["phase_switch"].to_numpy(dtype=int) whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float) door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float) time_arr = frame_df["time_norm"].to_numpy(dtype=float) ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PPRE_TAU) pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU) phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5) retrieve_cross = _first_crossing(y_retrieve_arr.astype(float), 0.5) ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5) phase_rises, phase_falls = _transition_count(phase_arr) key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5) key_retrieve_cross = _first_crossing(key_df["y_retrieve"].to_numpy(dtype=float), 0.5) key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5) return { "episode_name": episode_name, "description": description, "num_dense_frames": int(len(frame_df)), "num_keyframes": int(len(key_df)), "phase_switch_rises": int(phase_rises), "phase_switch_falls": int(phase_falls), "ppre_cross_frame": int(ppre_cross), "pext_cross_frame": int(pext_cross), "phase_cross_frame": int(phase_cross), "retrieve_cross_frame": int(retrieve_cross), "ready_cross_frame": int(ready_cross), "ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross), "dense_boundary_error_to_retrieve_frames": float(abs(phase_cross - retrieve_cross)) if phase_cross >= 0 and retrieve_cross >= 0 else float("nan"), "dense_boundary_error_frames": float(abs(phase_cross - ready_cross)) if phase_cross >= 0 and ready_cross >= 0 else float("nan"), "dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df)) if phase_cross >= 0 and ready_cross >= 0 else float("nan"), "key_boundary_error_to_retrieve_keyframes": float(abs(key_phase_cross - key_retrieve_cross)) if key_phase_cross >= 0 and key_retrieve_cross >= 0 else float("nan"), "key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross)) if key_phase_cross >= 0 and key_ready_cross >= 0 else float("nan"), "auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), "auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), "auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), "auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), "auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr), "auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr), "auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr), "auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr), "auroc_phase_yretrieve": _safe_auc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)), "auprc_phase_yretrieve": _safe_auprc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)), "f1_phase_yretrieve": float(f1_score(y_retrieve_arr, phase_arr)) if np.any(y_retrieve_arr) and np.any(phase_arr) else float("nan"), "auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)), "auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)), "f1_phase_yready": float(f1_score(y_ready_arr, phase_arr)) if np.any(y_ready_arr) and np.any(phase_arr) else float("nan"), "baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr), "baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr), "baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr), "baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr), "baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis), "baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis), **interventions, } def _isolated_intervention_outcomes( demo: Demo, templates: MotionTemplates, frame_index: int, checkpoint_stride: int, ) -> Dict[str, Tuple[float, bool]]: env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() cache.step_to(frame_index) snapshot = cache.snapshot() base_score, base_success, _ = _extract_score_and_success(task, templates) base = (base_score, base_success) _restore_snapshot(task, snapshot) _open_more_branch(task, templates) open_score, open_success, _ = _extract_score_and_success(task, templates) open_more = (open_score, open_success) _restore_snapshot(task, snapshot) _hold_open_branch(task, templates) hold_score, hold_success, _ = _extract_score_and_success(task, templates) hold_open = (hold_score, hold_success) _restore_snapshot(task, snapshot) _wait_branch(task) wait_score, wait_success, _ = _extract_score_and_success(task, templates) wait = (wait_score, wait_success) return { "base": base, "open_more": open_more, "hold_open": hold_open, "wait": wait, } finally: env.shutdown() def _interventional_validity( demo: Demo, templates: MotionTemplates, frame_df: pd.DataFrame, checkpoint_stride: int, ) -> Dict[str, float]: ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool)) ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2 pre_ready_indices = sorted( { max(0, ready_onset - 20), max(0, ready_onset - 10), } ) post_ready_indices = sorted( { ready_onset, min(len(frame_df) - 1, ready_onset + 20), } ) stats = { "pre_ready_open_more_increases_pext": 0, "pre_ready_open_more_trials": 0, "pre_ready_hold_open_increases_pext": 0, "pre_ready_hold_open_trials": 0, "pre_ready_extract_success": 0, "pre_ready_extract_trials": 0, "pre_ready_wait_extract_success": 0, "pre_ready_wait_trials": 0, "post_ready_extract_success": 0, "post_ready_extract_trials": 0, "post_ready_open_more_low_gain": 0, "post_ready_open_more_trials": 0, "post_ready_hold_open_low_gain": 0, "post_ready_hold_open_trials": 0, } for frame_index in [*pre_ready_indices, *post_ready_indices]: outcomes = _isolated_intervention_outcomes( demo=demo, templates=templates, frame_index=frame_index, checkpoint_stride=checkpoint_stride, ) base_pext, base_extract_success = outcomes["base"] pre_ready = frame_index in pre_ready_indices open_pext, _ = outcomes["open_more"] hold_pext, _ = outcomes["hold_open"] _, wait_extract_success = outcomes["wait"] if pre_ready: stats["pre_ready_open_more_trials"] += 1 stats["pre_ready_hold_open_trials"] += 1 stats["pre_ready_extract_trials"] += 1 stats["pre_ready_wait_trials"] += 1 if open_pext > base_pext: stats["pre_ready_open_more_increases_pext"] += 1 if hold_pext > base_pext: stats["pre_ready_hold_open_increases_pext"] += 1 if base_extract_success: stats["pre_ready_extract_success"] += 1 if wait_extract_success: stats["pre_ready_wait_extract_success"] += 1 else: stats["post_ready_extract_trials"] += 1 stats["post_ready_open_more_trials"] += 1 stats["post_ready_hold_open_trials"] += 1 if base_extract_success: stats["post_ready_extract_success"] += 1 if (open_pext - base_pext) <= 0.05: stats["post_ready_open_more_low_gain"] += 1 if (hold_pext - base_pext) <= 0.05: stats["post_ready_hold_open_low_gain"] += 1 return { key: float(value) for key, value in stats.items() } def _analyze_episode( dataset_root: Path, episode_dir: Path, templates: MotionTemplates, template_frames: Dict[str, int], checkpoint_stride: int = 16, max_frames: Optional[int] = None, independent_replay: bool = False, ) -> EpisodeArtifacts: demo = _load_demo(episode_dir) descriptions = _load_descriptions(episode_dir) env = _launch_replay_env() try: task = env.get_task(BimanualTakeTrayOutOfOven) cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) cache.reset() num_frames = len(demo) if max_frames is None else min(len(demo), max_frames) rows: List[Dict[str, float]] = [] debug_rows: List[Dict[str, object]] = [] initial_snapshot = cache.checkpoints[0] if independent_replay else None for frame_index in range(num_frames): if independent_replay: cache.restore_to_index(initial_snapshot, 0) cache.step_to(frame_index) frame_snapshot = cache.snapshot() if not independent_replay else None state = cache.current_state() row, debug_row = _build_frame_artifacts( episode_dir, demo, templates, task, state ) rows.append(row) debug_rows.append(debug_row) if frame_snapshot is not None: cache.restore(frame_snapshot) if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames: print( f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames", flush=True, ) frame_df = pd.DataFrame(rows) frame_df = _annotate_phase_columns(frame_df) keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames] key_df = _keyframe_subset(frame_df, keyframes) finally: env.shutdown() interventions = _interventional_validity( demo=demo, templates=templates, frame_df=frame_df, checkpoint_stride=checkpoint_stride, ) metrics = _episode_metrics_from_frames( frame_df=frame_df, key_df=key_df, episode_name=episode_dir.name, description=descriptions[0], interventions=interventions, ) return EpisodeArtifacts( episode_name=episode_dir.name, dense=frame_df, keyframes=key_df, metrics=metrics, template_frames=template_frames, debug_rows=debug_rows, ) def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]: frame = pd.DataFrame(episode_metrics) numeric = frame.select_dtypes(include=[np.number]) summary = { "num_episodes": int(len(frame)), "mean_metrics": numeric.mean(numeric_only=True).to_dict(), "median_metrics": numeric.median(numeric_only=True).to_dict(), "single_switch_rate": float((frame["phase_switch_rises"] == 1).mean()) if len(frame) else float("nan"), "reversion_rate": float((frame["phase_switch_falls"] > 0).mean()) if len(frame) else float("nan"), "ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"), } return summary def run_study( dataset_root: str, result_dir: str, max_episodes: Optional[int] = None, checkpoint_stride: int = 16, max_frames: Optional[int] = None, episode_offset: int = 0, template_episode_index: int = 0, episode_indices: Optional[Sequence[int]] = None, independent_replay: bool = True, per_episode_templates: bool = False, ) -> Dict[str, object]: dataset_path = Path(dataset_root) result_path = Path(result_dir) result_path.mkdir(parents=True, exist_ok=True) all_episode_dirs = _episode_dirs(dataset_path) if not all_episode_dirs: raise RuntimeError(f"no episodes available under {dataset_root}") if not (0 <= template_episode_index < len(all_episode_dirs)): raise ValueError( f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}" ) selected_episode_indices: List[int] if episode_indices is not None: selected_episode_indices = [] seen_episode_indices = set() for raw_index in episode_indices: episode_index = int(raw_index) if not (0 <= episode_index < len(all_episode_dirs)): raise ValueError( f"episode index {episode_index} outside available range 0..{len(all_episode_dirs) - 1}" ) if episode_index in seen_episode_indices: continue selected_episode_indices.append(episode_index) seen_episode_indices.add(episode_index) episode_dirs = [all_episode_dirs[index] for index in selected_episode_indices] else: episode_dirs = all_episode_dirs[episode_offset:] if max_episodes is not None: episode_dirs = episode_dirs[:max_episodes] selected_episode_indices = [ all_episode_dirs.index(episode_dir) for episode_dir in episode_dirs ] if not episode_dirs: raise RuntimeError( f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes} episode_indices={episode_indices}" ) template_episode_dir = all_episode_dirs[template_episode_index] if not per_episode_templates: templates, template_frames = _derive_templates(dataset_path, template_episode_dir) with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: json.dump( { "template_mode": "shared", "templates": templates.to_json(), "template_episode": template_episode_dir.name, "template_frames": template_frames, "episode_offset": episode_offset, "selected_episode_indices": selected_episode_indices, }, handle, indent=2, ) else: with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: json.dump( { "template_mode": "per_episode", "episode_offset": episode_offset, "selected_episode_indices": selected_episode_indices, }, handle, indent=2, ) episode_metrics: List[Dict[str, object]] = [] for episode_dir in episode_dirs: if per_episode_templates: episode_templates, episode_template_frames = _derive_templates(dataset_path, episode_dir) with result_path.joinpath(f"{episode_dir.name}.templates.json").open( "w", encoding="utf-8" ) as handle: json.dump( { "template_mode": "per_episode", "templates": episode_templates.to_json(), "template_episode": episode_dir.name, "template_frames": episode_template_frames, }, handle, indent=2, ) else: episode_templates = templates episode_template_frames = template_frames artifacts = _analyze_episode( dataset_path, episode_dir, episode_templates, episode_template_frames, checkpoint_stride=checkpoint_stride, max_frames=max_frames, independent_replay=independent_replay, ) artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False) artifacts.keyframes.to_csv( result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False ) if artifacts.debug_rows is not None: with result_path.joinpath(f"{episode_dir.name}.debug.jsonl").open( "w", encoding="utf-8" ) as handle: for debug_row in artifacts.debug_rows: handle.write(json.dumps(_json_safe(debug_row))) handle.write("\n") with result_path.joinpath(f"{episode_dir.name}.metrics.json").open( "w", encoding="utf-8" ) as handle: json.dump(artifacts.metrics, handle, indent=2) episode_metrics.append(artifacts.metrics) summary = _aggregate_summary(episode_metrics) with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle: json.dump(summary, handle, indent=2) return summary def main(argv: Optional[Sequence[str]] = None) -> int: def _parse_episode_indices(value: str) -> List[int]: indices: List[int] = [] for chunk in value.split(","): chunk = chunk.strip() if not chunk: continue indices.append(int(chunk)) if not indices: raise argparse.ArgumentTypeError("episode-indices must contain at least one integer") return indices parser = argparse.ArgumentParser() parser.add_argument( "--dataset-root", default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", ) parser.add_argument( "--result-dir", default="/workspace/reveal_retrieve_label_study/results/oven_first_pass", ) parser.add_argument("--max-episodes", type=int, default=1) parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--max-frames", type=int) parser.add_argument("--episode-offset", type=int, default=0) parser.add_argument("--template-episode-index", type=int, default=0) parser.add_argument("--episode-indices", type=_parse_episode_indices) parser.add_argument( "--independent-replay", dest="independent_replay", action="store_true", help="Replay each frame from the demo start/checkpoint to avoid simulator drift.", ) parser.add_argument( "--sequential-replay", dest="independent_replay", action="store_false", help="Reuse the live replay state across frames for speed at the cost of more drift.", ) parser.add_argument("--per-episode-templates", action="store_true") parser.set_defaults(independent_replay=True) args = parser.parse_args(argv) summary = run_study( dataset_root=args.dataset_root, result_dir=args.result_dir, max_episodes=args.max_episodes, checkpoint_stride=args.checkpoint_stride, max_frames=args.max_frames, episode_offset=args.episode_offset, template_episode_index=args.template_episode_index, episode_indices=args.episode_indices, independent_replay=args.independent_replay, per_episode_templates=args.per_episode_templates, ) print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())