| import argparse |
| import json |
| import math |
| import os |
| import pickle |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from natsort import natsorted |
| from PIL import Image |
| from pyrep.const import ConfigurationPathAlgorithms as Algos, ObjectType |
| from pyrep.errors import ConfigurationPathError |
| from pyrep.objects.joint import Joint |
| from pyrep.objects.object import Object |
| from pyrep.objects.shape import Shape |
| from pyrep.objects.vision_sensor import VisionSensor |
| from rlbench.action_modes.action_mode import BimanualJointPositionActionMode |
| from rlbench.action_modes.gripper_action_modes import BimanualDiscrete |
| from rlbench.backend.const import DEPTH_SCALE |
| from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask |
| from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import ( |
| BimanualTakeTrayOutOfOven, |
| ) |
| from rlbench.demo import Demo |
| from rlbench.environment import Environment |
| from rlbench.observation_config import CameraConfig, ObservationConfig |
| from sklearn.metrics import ( |
| average_precision_score, |
| f1_score, |
| roc_auc_score, |
| ) |
|
|
|
|
| FULL_CAMERA_SET = [ |
| "front", |
| "overhead", |
| "wrist_right", |
| "wrist_left", |
| "over_shoulder_left", |
| "over_shoulder_right", |
| ] |
| THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"] |
| DISPLAY = ":99" |
| DEMO_DT = 0.05 |
| DEFAULT_IMAGE_SIZE = (128, 128) |
| DEFAULT_PATH_SCALE = 0.75 |
| DEFAULT_PPRE_TAU = 0.45 |
| DEFAULT_VISIBILITY_TAU = 0.35 |
| DEFAULT_PEXT_TAU = 0.45 |
| DEFAULT_DOOR_SPEED_TAU = 0.08 |
| DEFAULT_PHASE_SCORE_TAU = 0.5 |
| DEFAULT_APPROACH_SPEED_TAU = 0.01 |
| DEFAULT_APPROACH_PROGRESS_TAU = 0.02 |
| DEFAULT_PREGRASP_LABEL_PROGRESS_TAU = 0.60 |
| DEFAULT_APPROACH_ONSET_WINDOW = 96 |
| DEFAULT_PREGRASP_CANDIDATE_COUNT = 4 |
| DEFAULT_PLAN_TRIALS = 48 |
| DEFAULT_PLAN_MAX_CONFIGS = 4 |
| DEFAULT_PLAN_MAX_TIME_MS = 10 |
| DEFAULT_PLAN_TRIALS_PER_GOAL = 4 |
| DEFAULT_PLAN_ATTEMPTS = 2 |
| DEFAULT_PLAN_MIN_SUCCESSES = 2 |
| DEFAULT_PREGRASP_ASSIST_PATH_SCALE = 3.0 |
| DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA = 0.01 |
| DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE = 0.02 |
| DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE = 0.55 |
| DEFAULT_DOOR_ASSIST_MIN_QUALITY = 0.3 |
| DEFAULT_READY_PERSISTENCE = 3 |
| DEFAULT_RETRIEVE_PERSISTENCE = 3 |
| DEFAULT_PREGRASP_PERSISTENCE = 3 |
| DEFAULT_MASK_HANDLE_COUNT = 8 |
| DEFAULT_VISIBILITY_POINT_TOLERANCE = 0.03 |
| DEFAULT_GRASP_SECONDARY_FACE_RATIO = 0.35 |
|
|
|
|
| @dataclass |
| class SimulatorSnapshot: |
| task_state: Tuple[bytes, int] |
| right_arm_tree: bytes |
| right_gripper_tree: bytes |
| left_arm_tree: bytes |
| left_gripper_tree: bytes |
| right_arm_joints: Tuple[float, ...] |
| left_arm_joints: Tuple[float, ...] |
| right_gripper_joints: Tuple[float, ...] |
| left_gripper_joints: Tuple[float, ...] |
| right_grasped: Tuple[str, ...] |
| left_grasped: Tuple[str, ...] |
| right_grasped_old_parents: Dict[str, Optional[str]] |
| left_grasped_old_parents: Dict[str, Optional[str]] |
| grasped_subtree_poses: Dict[str, Tuple[float, ...]] |
|
|
|
|
| @dataclass |
| class ReplayState: |
| frame_index: int |
| tray_pose: np.ndarray |
| door_angle: float |
| right_gripper_pose: np.ndarray |
| left_gripper_pose: np.ndarray |
| right_gripper_open: float |
| left_gripper_open: float |
| snapshot: Optional[SimulatorSnapshot] = None |
|
|
|
|
| @dataclass |
| class MotionTemplates: |
| pregrasp_rel_pose: np.ndarray |
| grasp_rel_pose: np.ndarray |
| retreat_rel_poses: List[np.ndarray] |
| grasp_local_center: np.ndarray |
| grasp_region_extents: np.ndarray |
| retriever_home_joints: np.ndarray |
| hold_open_angle: float |
| open_more_delta: float |
| reference_tray_height: float |
| approach_rel_poses: List[np.ndarray] = field(default_factory=list) |
| mask_handle_ids: List[int] = field(default_factory=list) |
|
|
| def to_json(self) -> Dict[str, object]: |
| return { |
| "approach_rel_poses": [pose.tolist() for pose in self.approach_rel_poses], |
| "pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(), |
| "grasp_rel_pose": self.grasp_rel_pose.tolist(), |
| "retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses], |
| "grasp_local_center": self.grasp_local_center.tolist(), |
| "grasp_region_extents": self.grasp_region_extents.tolist(), |
| "retriever_home_joints": self.retriever_home_joints.tolist(), |
| "hold_open_angle": float(self.hold_open_angle), |
| "open_more_delta": float(self.open_more_delta), |
| "reference_tray_height": float(self.reference_tray_height), |
| "mask_handle_ids": [int(handle) for handle in self.mask_handle_ids], |
| } |
|
|
| @classmethod |
| def from_json(cls, payload: Dict[str, object]) -> "MotionTemplates": |
| return cls( |
| pregrasp_rel_pose=np.asarray(payload["pregrasp_rel_pose"], dtype=np.float64), |
| grasp_rel_pose=np.asarray(payload["grasp_rel_pose"], dtype=np.float64), |
| retreat_rel_poses=[ |
| np.asarray(pose, dtype=np.float64) |
| for pose in payload.get("retreat_rel_poses", []) |
| ], |
| grasp_local_center=np.asarray(payload["grasp_local_center"], dtype=np.float64), |
| grasp_region_extents=np.asarray(payload["grasp_region_extents"], dtype=np.float64), |
| retriever_home_joints=np.asarray( |
| payload.get("retriever_home_joints", [0.0] * 7), dtype=np.float64 |
| ), |
| hold_open_angle=float(payload["hold_open_angle"]), |
| open_more_delta=float(payload["open_more_delta"]), |
| reference_tray_height=float(payload["reference_tray_height"]), |
| approach_rel_poses=[ |
| np.asarray(pose, dtype=np.float64) |
| for pose in payload.get("approach_rel_poses", []) |
| ], |
| mask_handle_ids=[int(handle) for handle in payload.get("mask_handle_ids", [])], |
| ) |
|
|
|
|
| @dataclass |
| class EpisodeArtifacts: |
| episode_name: str |
| dense: pd.DataFrame |
| keyframes: pd.DataFrame |
| metrics: Dict[str, object] |
| template_frames: Dict[str, int] |
| debug_rows: Optional[List[Dict[str, object]]] = None |
|
|
|
|
| def _configure_runtime() -> None: |
| os.environ["DISPLAY"] = os.environ.get("DISPLAY", DISPLAY) |
| os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" |
| ld_library_path = os.environ.get("LD_LIBRARY_PATH", "") |
| if "/workspace/coppelia_sim" not in ld_library_path: |
| os.environ["LD_LIBRARY_PATH"] = ( |
| f"{ld_library_path}:/workspace/coppelia_sim" |
| if ld_library_path |
| else "/workspace/coppelia_sim" |
| ) |
| os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" |
| os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root") |
|
|
|
|
| def _minimal_camera_config() -> Dict[str, CameraConfig]: |
| return { |
| "front": CameraConfig( |
| rgb=False, |
| depth=False, |
| point_cloud=False, |
| mask=False, |
| image_size=DEFAULT_IMAGE_SIZE, |
| ) |
| } |
|
|
|
|
| def _make_observation_config() -> ObservationConfig: |
| return ObservationConfig( |
| camera_configs=_minimal_camera_config(), |
| joint_velocities=True, |
| joint_positions=True, |
| joint_forces=False, |
| gripper_open=True, |
| gripper_pose=True, |
| gripper_matrix=False, |
| gripper_joint_positions=True, |
| gripper_touch_forces=False, |
| task_low_dim_state=False, |
| robot_name="bimanual", |
| ) |
|
|
|
|
| def _launch_replay_env() -> Environment: |
| _configure_runtime() |
| env = Environment( |
| action_mode=BimanualJointPositionActionMode(), |
| obs_config=_make_observation_config(), |
| headless=True, |
| robot_setup="dual_panda", |
| ) |
| env.launch() |
| return env |
|
|
|
|
| def _load_demo(episode_dir: Path) -> Demo: |
| with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle: |
| return pickle.load(handle) |
|
|
|
|
| def _load_descriptions(episode_dir: Path) -> List[str]: |
| with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle: |
| return pickle.load(handle) |
|
|
|
|
| def _episode_dirs(dataset_root: Path) -> List[Path]: |
| episodes_dir = dataset_root.joinpath("all_variations", "episodes") |
| return [ |
| episodes_dir.joinpath(name) |
| for name in natsorted(os.listdir(episodes_dir)) |
| if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists() |
| ] |
|
|
|
|
| def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path: |
| if kind == "rgb": |
| return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png") |
| if kind == "depth": |
| return episode_dir.joinpath( |
| f"{camera_name}_depth", f"depth_{frame_index:04d}.png" |
| ) |
| if kind == "mask": |
| return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png") |
| raise ValueError(f"unknown kind: {kind}") |
|
|
|
|
| def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray: |
| image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index)) |
| depth = image_to_float_array(image, DEPTH_SCALE) |
| near = demo[frame_index].misc[f"{camera_name}_camera_near"] |
| far = demo[frame_index].misc[f"{camera_name}_camera_far"] |
| return near + depth * (far - near) |
|
|
|
|
| def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray: |
| image = np.asarray( |
| Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index)), |
| dtype=np.float32, |
| ) |
| if image.ndim == 2: |
| image = np.repeat(image[..., None], 3, axis=2) |
| if image.max() > 1.0: |
| image /= 255.0 |
| return rgb_handles_to_mask(image.copy()) |
|
|
|
|
| def _capture_snapshot(task) -> SimulatorSnapshot: |
| robot = task._scene.robot |
| right_grasped = tuple(robot.right_gripper.get_grasped_objects()) |
| left_grasped = tuple(robot.left_gripper.get_grasped_objects()) |
| grasped_subtree_poses: Dict[str, Tuple[float, ...]] = {} |
| for grasped_object in right_grasped + left_grasped: |
| for subtree_object in grasped_object.get_objects_in_tree(exclude_base=False): |
| grasped_subtree_poses[subtree_object.get_name()] = tuple( |
| float(value) for value in subtree_object.get_pose() |
| ) |
| return SimulatorSnapshot( |
| task_state=task._task.get_state(), |
| right_arm_tree=robot.right_arm.get_configuration_tree(), |
| right_gripper_tree=robot.right_gripper.get_configuration_tree(), |
| left_arm_tree=robot.left_arm.get_configuration_tree(), |
| left_gripper_tree=robot.left_gripper.get_configuration_tree(), |
| right_arm_joints=tuple(float(value) for value in robot.right_arm.get_joint_positions()), |
| left_arm_joints=tuple(float(value) for value in robot.left_arm.get_joint_positions()), |
| right_gripper_joints=tuple(float(value) for value in robot.right_gripper.get_joint_positions()), |
| left_gripper_joints=tuple(float(value) for value in robot.left_gripper.get_joint_positions()), |
| right_grasped=tuple(obj.get_name() for obj in right_grasped), |
| left_grasped=tuple(obj.get_name() for obj in left_grasped), |
| right_grasped_old_parents={ |
| obj.get_name(): ( |
| None if old_parent is None else old_parent.get_name() |
| ) |
| for obj, old_parent in zip(right_grasped, robot.right_gripper._old_parents) |
| }, |
| left_grasped_old_parents={ |
| obj.get_name(): ( |
| None if old_parent is None else old_parent.get_name() |
| ) |
| for obj, old_parent in zip(left_grasped, robot.left_gripper._old_parents) |
| }, |
| grasped_subtree_poses=grasped_subtree_poses, |
| ) |
|
|
|
|
| def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None: |
| robot = task._scene.robot |
| snapshot_has_grasp = bool(snapshot.right_grasped or snapshot.left_grasped) |
| if not snapshot_has_grasp: |
| robot.release_gripper() |
| try: |
| task._task.restore_state(snapshot.task_state) |
| except RuntimeError: |
| task._pyrep.set_configuration_tree(snapshot.task_state[0]) |
| task._pyrep.set_configuration_tree(snapshot.right_arm_tree) |
| task._pyrep.set_configuration_tree(snapshot.right_gripper_tree) |
| task._pyrep.set_configuration_tree(snapshot.left_arm_tree) |
| task._pyrep.set_configuration_tree(snapshot.left_gripper_tree) |
| robot.right_arm.set_joint_positions(list(snapshot.right_arm_joints), disable_dynamics=True) |
| robot.left_arm.set_joint_positions(list(snapshot.left_arm_joints), disable_dynamics=True) |
| robot.right_arm.set_joint_target_positions(list(snapshot.right_arm_joints)) |
| robot.left_arm.set_joint_target_positions(list(snapshot.left_arm_joints)) |
| robot.right_gripper.set_joint_positions( |
| list(snapshot.right_gripper_joints), disable_dynamics=True |
| ) |
| robot.left_gripper.set_joint_positions( |
| list(snapshot.left_gripper_joints), disable_dynamics=True |
| ) |
| robot.right_gripper.set_joint_target_positions(list(snapshot.right_gripper_joints)) |
| robot.left_gripper.set_joint_target_positions(list(snapshot.left_gripper_joints)) |
| if snapshot_has_grasp: |
| robot.release_gripper() |
| for name, pose in snapshot.grasped_subtree_poses.items(): |
| Object.get_object(name).set_pose(np.asarray(pose, dtype=np.float64)) |
| task._pyrep.step() |
| if not snapshot_has_grasp: |
| robot.release_gripper() |
| for name in snapshot.right_grasped: |
| _force_attach_grasped_object( |
| robot.right_gripper, |
| Shape(name), |
| snapshot.right_grasped_old_parents.get(name), |
| ) |
| for name in snapshot.left_grasped: |
| _force_attach_grasped_object( |
| robot.left_gripper, |
| Shape(name), |
| snapshot.left_grasped_old_parents.get(name), |
| ) |
| task._pyrep.step() |
|
|
|
|
| def _force_attach_grasped_object( |
| gripper, obj: Shape, old_parent_name: Optional[str] |
| ) -> None: |
| if any(grasped.get_name() == obj.get_name() for grasped in gripper.get_grasped_objects()): |
| return |
| gripper._grasped_objects.append(obj) |
| old_parent = obj.get_parent() if old_parent_name is None else Object.get_object(old_parent_name) |
| gripper._old_parents.append(old_parent) |
| obj.set_parent(gripper._attach_point, keep_in_place=True) |
|
|
|
|
| def _build_joint_action(target_obs) -> np.ndarray: |
| def _joint_vector(value, fallback) -> np.ndarray: |
| array = np.asarray(fallback if value is None else value, dtype=np.float64) |
| if array.ndim != 1 or array.shape[0] != 7: |
| array = np.asarray(fallback, dtype=np.float64) |
| if array.ndim != 1 or array.shape[0] != 7: |
| raise ValueError(f"invalid joint vector shape: {array.shape}") |
| return array |
|
|
| right = _joint_vector( |
| target_obs.misc.get("right_executed_demo_joint_position_action"), |
| target_obs.right.joint_positions, |
| ) |
| left = _joint_vector( |
| target_obs.misc.get("left_executed_demo_joint_position_action"), |
| target_obs.left.joint_positions, |
| ) |
| return np.concatenate( |
| [ |
| right, |
| np.array([target_obs.right.gripper_open], dtype=np.float64), |
| left, |
| np.array([target_obs.left.gripper_open], dtype=np.float64), |
| ] |
| ) |
|
|
|
|
| class ReplayCache: |
| def __init__(self, task, demo: Demo, checkpoint_stride: int = 16): |
| self.task = task |
| self.demo = demo |
| self.checkpoint_stride = checkpoint_stride |
| self.current_index = 0 |
| self.current_obs = None |
| self.checkpoints: Dict[int, SimulatorSnapshot] = {} |
| self.discrete_gripper = BimanualDiscrete() |
|
|
| def reset(self) -> None: |
| _, self.current_obs = self.task.reset_to_demo(self.demo) |
| self.current_index = 0 |
| self.checkpoints = {0: _capture_snapshot(self.task)} |
|
|
| def step_to(self, target_index: int): |
| if target_index < self.current_index: |
| checkpoint_index = max(i for i in self.checkpoints if i <= target_index) |
| _restore_snapshot(self.task, self.checkpoints[checkpoint_index]) |
| self.current_index = checkpoint_index |
| self.current_obs = self._observation_from_scene() |
| while self.current_index < target_index: |
| next_index = self.current_index + 1 |
| self.task._action_mode.action( |
| self.task._scene, _build_joint_action(self.demo[next_index]) |
| ) |
| self._apply_gripper_replay(self.demo[next_index]) |
| self.current_obs = self._observation_from_scene() |
| self.current_index = next_index |
| if self.current_index % self.checkpoint_stride == 0: |
| self.checkpoints[self.current_index] = _capture_snapshot(self.task) |
| return self.current_obs |
|
|
| def current_state(self) -> ReplayState: |
| return ReplayState( |
| frame_index=self.current_index, |
| tray_pose=Shape("tray").get_pose(), |
| door_angle=Joint("oven_door_joint").get_joint_position(), |
| right_gripper_pose=self.current_obs.right.gripper_pose.copy(), |
| left_gripper_pose=self.current_obs.left.gripper_pose.copy(), |
| right_gripper_open=float(self.current_obs.right.gripper_open), |
| left_gripper_open=float(self.current_obs.left.gripper_open), |
| snapshot=None, |
| ) |
|
|
| def snapshot(self) -> SimulatorSnapshot: |
| return _capture_snapshot(self.task) |
|
|
| def restore(self, snapshot: SimulatorSnapshot) -> None: |
| _restore_snapshot(self.task, snapshot) |
| self.current_obs = self._observation_from_scene() |
|
|
| def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None: |
| self.restore(snapshot) |
| self.current_index = frame_index |
|
|
| def _observation_from_scene(self): |
| return self.task._scene.get_observation() |
|
|
| def _apply_gripper_replay(self, target_obs) -> None: |
| desired = np.array( |
| [target_obs.right.gripper_open, target_obs.left.gripper_open], |
| dtype=np.float64, |
| ) |
| current = np.array( |
| [ |
| float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())), |
| float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())), |
| ], |
| dtype=np.float64, |
| ) |
| if not np.allclose(current, desired): |
| self.discrete_gripper.action(self.task._scene, desired) |
| self.current_obs = self._observation_from_scene() |
| self._maintain_grasp_state(desired) |
| self.current_obs = self._observation_from_scene() |
|
|
| def _maintain_grasp_state(self, desired: np.ndarray) -> None: |
| scene = self.task._scene |
| robot = scene.robot |
| if float(desired[0]) <= 0.5: |
| left_grasped = {obj.get_name() for obj in robot.left_gripper.get_grasped_objects()} |
| for graspable in scene.task.get_graspable_objects(): |
| if graspable.get_name() not in left_grasped: |
| robot.right_gripper.grasp(graspable) |
| elif robot.right_gripper.get_grasped_objects(): |
| robot.right_gripper.release() |
|
|
| if float(desired[1]) <= 0.5: |
| right_grasped = {obj.get_name() for obj in robot.right_gripper.get_grasped_objects()} |
| for graspable in scene.task.get_graspable_objects(): |
| if graspable.get_name() not in right_grasped: |
| robot.left_gripper.grasp(graspable) |
| elif robot.left_gripper.get_grasped_objects(): |
| robot.left_gripper.release() |
|
|
|
|
| def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray: |
| x, y, z, w = quat |
| xx, yy, zz = x * x, y * y, z * z |
| xy, xz, yz = x * y, x * z, y * z |
| wx, wy, wz = w * x, w * y, w * z |
| return np.array( |
| [ |
| [1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)], |
| [2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)], |
| [2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)], |
| ], |
| dtype=np.float64, |
| ) |
|
|
|
|
| def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray: |
| trace = np.trace(matrix) |
| if trace > 0.0: |
| s = math.sqrt(trace + 1.0) * 2.0 |
| w = 0.25 * s |
| x = (matrix[2, 1] - matrix[1, 2]) / s |
| y = (matrix[0, 2] - matrix[2, 0]) / s |
| z = (matrix[1, 0] - matrix[0, 1]) / s |
| elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]: |
| s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0 |
| w = (matrix[2, 1] - matrix[1, 2]) / s |
| x = 0.25 * s |
| y = (matrix[0, 1] + matrix[1, 0]) / s |
| z = (matrix[0, 2] + matrix[2, 0]) / s |
| elif matrix[1, 1] > matrix[2, 2]: |
| s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0 |
| w = (matrix[0, 2] - matrix[2, 0]) / s |
| x = (matrix[0, 1] + matrix[1, 0]) / s |
| y = 0.25 * s |
| z = (matrix[1, 2] + matrix[2, 1]) / s |
| else: |
| s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0 |
| w = (matrix[1, 0] - matrix[0, 1]) / s |
| x = (matrix[0, 2] + matrix[2, 0]) / s |
| y = (matrix[1, 2] + matrix[2, 1]) / s |
| z = 0.25 * s |
| quat = np.array([x, y, z, w], dtype=np.float64) |
| return quat / np.linalg.norm(quat) |
|
|
|
|
| def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray: |
| matrix = np.eye(4, dtype=np.float64) |
| matrix[:3, :3] = _quat_to_matrix(pose[3:]) |
| matrix[:3, 3] = pose[:3] |
| return matrix |
|
|
|
|
| def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray: |
| return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0) |
|
|
|
|
| def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| target = _pose_to_matrix(target_pose) |
| rel = np.linalg.inv(reference) @ target |
| return _matrix_to_pose(rel) |
|
|
|
|
| def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| relative = _pose_to_matrix(relative_pose) |
| world = reference @ relative |
| return _matrix_to_pose(world) |
|
|
|
|
| def _orientation_error_degrees(current_quat: Sequence[float], target_quat: Sequence[float]) -> float: |
| current = np.asarray(current_quat, dtype=np.float64) |
| target = np.asarray(target_quat, dtype=np.float64) |
| current_norm = float(np.linalg.norm(current)) |
| target_norm = float(np.linalg.norm(target)) |
| if current_norm <= 1e-9 or target_norm <= 1e-9: |
| return 180.0 |
| current = current / current_norm |
| target = target / target_norm |
| dot = float(np.clip(abs(current @ target), -1.0, 1.0)) |
| return float(np.degrees(2.0 * np.arccos(dot))) |
|
|
|
|
| def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]]) |
| return (np.linalg.inv(reference) @ point)[:3] |
|
|
|
|
| def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray: |
| reference = _pose_to_matrix(reference_pose) |
| point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]]) |
| return (reference @ point)[:3] |
|
|
|
|
| def _tray_bbox_local() -> np.ndarray: |
| return np.asarray(Shape("tray").get_bounding_box(), dtype=np.float64) |
|
|
|
|
| def _dedupe_points(points: Iterable[np.ndarray], precision: int = 5) -> List[np.ndarray]: |
| unique: List[np.ndarray] = [] |
| seen = set() |
| for point in points: |
| array = np.asarray(point, dtype=np.float64) |
| key = tuple(np.round(array, precision)) |
| if key in seen: |
| continue |
| seen.add(key) |
| unique.append(array) |
| return unique |
|
|
|
|
| def _sample_box_face_points_local( |
| bbox: np.ndarray, |
| axis: int, |
| face_index: int, |
| center: np.ndarray, |
| extents: np.ndarray, |
| primary_samples: int, |
| secondary_samples: int, |
| ) -> List[np.ndarray]: |
| center = np.asarray(center, dtype=np.float64) |
| extents = np.asarray(extents, dtype=np.float64) |
| tangent_axes = [idx for idx in range(3) if idx != axis] |
| tangent_ranges: List[np.ndarray] = [] |
| for tangent_idx, tangent_axis in enumerate(tangent_axes): |
| lo = max(bbox[tangent_axis * 2], center[tangent_axis] - extents[tangent_axis]) |
| hi = min(bbox[tangent_axis * 2 + 1], center[tangent_axis] + extents[tangent_axis]) |
| if hi < lo: |
| lo = hi = float(np.clip(center[tangent_axis], bbox[tangent_axis * 2], bbox[tangent_axis * 2 + 1])) |
| tangent_ranges.append( |
| np.linspace(lo, hi, primary_samples if tangent_idx == 0 else secondary_samples) |
| ) |
|
|
| face_value = bbox[axis * 2 + face_index] |
| points: List[np.ndarray] = [] |
| for first in tangent_ranges[0]: |
| for second in tangent_ranges[1]: |
| point = center.copy() |
| point[axis] = face_value |
| point[tangent_axes[0]] = first |
| point[tangent_axes[1]] = second |
| points.append(point) |
| return points |
|
|
|
|
| def _grasp_surface_specs( |
| templates: MotionTemplates, bbox: np.ndarray |
| ) -> Tuple[np.ndarray, List[Tuple[int, int]]]: |
| center = np.asarray(templates.grasp_local_center, dtype=np.float64).copy() |
| mins = np.asarray([bbox[0], bbox[2], bbox[4]], dtype=np.float64) |
| maxs = np.asarray([bbox[1], bbox[3], bbox[5]], dtype=np.float64) |
| center = np.clip(center, mins, maxs) |
|
|
| approach = ( |
| np.asarray(templates.grasp_rel_pose[:3], dtype=np.float64) |
| - np.asarray(templates.pregrasp_rel_pose[:3], dtype=np.float64) |
| ) |
| if np.linalg.norm(approach) < 1e-6: |
| approach = center - 0.5 * (mins + maxs) |
|
|
| ranked_axes = np.argsort(-np.abs(approach)) |
| primary_mag = float(abs(approach[ranked_axes[0]])) if len(ranked_axes) else 0.0 |
| specs: List[Tuple[int, int]] = [] |
| for rank, axis in enumerate(ranked_axes.tolist()): |
| magnitude = float(abs(approach[axis])) |
| if magnitude < 1e-6: |
| continue |
| if rank > 0 and magnitude < max(1e-6, primary_mag * DEFAULT_GRASP_SECONDARY_FACE_RATIO): |
| continue |
| move_positive = float(approach[axis]) >= 0.0 |
| face_index = 0 if move_positive else 1 |
| spec = (int(axis), int(face_index)) |
| if spec not in specs: |
| specs.append(spec) |
| if len(specs) >= 2: |
| break |
|
|
| if specs: |
| return center, specs |
|
|
| |
| face_distances = [] |
| for axis in range(3): |
| face_distances.append((abs(center[axis] - bbox[axis * 2]), axis, 0)) |
| face_distances.append((abs(center[axis] - bbox[axis * 2 + 1]), axis, 1)) |
| _, axis, face_index = min(face_distances, key=lambda item: item[0]) |
| return center, [(int(axis), int(face_index))] |
|
|
|
|
| def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int: |
| values = [getattr(demo[i], side).gripper_open for i in range(len(demo))] |
| for i in range(1, len(values)): |
| if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5: |
| return i |
| if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5: |
| return i |
| raise RuntimeError(f"no gripper transition found for {side}") |
|
|
|
|
| def _detect_pregrasp_approach_onset( |
| cache: ReplayCache, |
| left_close: int, |
| pregrasp_rel_pose: np.ndarray, |
| ) -> int: |
| distances: List[float] = [] |
| for frame_index in range(left_close): |
| cache.step_to(frame_index) |
| tray_pose = Shape("tray").get_pose() |
| target_pose = _apply_relative_pose(tray_pose, pregrasp_rel_pose) |
| current_pose = np.asarray(cache.current_obs.left.gripper_pose, dtype=np.float64) |
| distances.append(float(np.linalg.norm(current_pose[:3] - target_pose[:3]))) |
|
|
| distances_arr = np.asarray(distances, dtype=np.float64) |
| if len(distances_arr) < 8: |
| return max(0, left_close - 24) |
|
|
| search_start = max(8, left_close - DEFAULT_APPROACH_ONSET_WINDOW) |
| for frame_index in range(search_start, max(search_start, left_close - 6)): |
| window = distances_arr[frame_index : frame_index + 6] |
| if len(window) < 6: |
| break |
| short_drop = float(window[0] - window[-1]) |
| if short_drop < 0.01: |
| continue |
| if np.mean(np.diff(window)) < -0.002: |
| return frame_index |
| return max(search_start, left_close - 24) |
|
|
|
|
| def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]: |
| env = _launch_replay_env() |
| try: |
| demo = _load_demo(template_episode_dir) |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=8) |
| cache.reset() |
| base_pose = task._task.get_base().get_pose() |
|
|
| left_close = _first_transition(demo, "left", open_to_closed=True) |
| left_open = _first_transition(demo, "left", open_to_closed=False) |
| pregrasp_index = max(0, left_close - 5) |
| right_close = _first_transition(demo, "right", open_to_closed=True) |
| right_open = _first_transition(demo, "right", open_to_closed=False) |
|
|
| bootstrap_indices = sorted({pregrasp_index, left_close, right_close, right_open, left_open}) |
| states: Dict[int, ReplayState] = {} |
| for frame_index in bootstrap_indices: |
| cache.step_to(frame_index) |
| states[frame_index] = cache.current_state() |
|
|
| pregrasp_rel_pose = _relative_pose( |
| states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose |
| ) |
| grasp_rel_pose = _relative_pose( |
| states[left_close].tray_pose, states[left_close].left_gripper_pose |
| ) |
|
|
| approach_onset = _detect_pregrasp_approach_onset(cache, left_close, pregrasp_rel_pose) |
| approach_indices = sorted( |
| { |
| *np.linspace( |
| approach_onset, |
| max(approach_onset, pregrasp_index), |
| num=min(6, max(2, pregrasp_index - approach_onset + 1)), |
| dtype=int, |
| ).tolist(), |
| pregrasp_index, |
| max(0, left_close - 2), |
| } |
| ) |
| retreat_indices = sorted( |
| { |
| *[ |
| min(len(demo) - 1, max(left_close + 1, left_close + offset)) |
| for offset in (5, 10, 15, 20, 30, 40, 50) |
| ], |
| max(left_close + 1, left_open - 20), |
| max(left_close + 1, left_open - 10), |
| max(left_close + 1, left_open - 5), |
| left_open, |
| } |
| ) |
| interesting = sorted( |
| { |
| *bootstrap_indices, |
| *approach_indices, |
| *retreat_indices, |
| } |
| ) |
| for frame_index in interesting: |
| if frame_index not in states: |
| cache.step_to(frame_index) |
| states[frame_index] = cache.current_state() |
|
|
| approach_rel_poses = [ |
| _relative_pose(states[index].tray_pose, states[index].left_gripper_pose) |
| for index in approach_indices |
| if index < left_close |
| ] |
| retreat_rel_poses = [ |
| _relative_pose(base_pose, states[index].left_gripper_pose) |
| for index in retreat_indices |
| if index > left_close |
| ] |
| grasp_local_center = _world_to_local( |
| states[left_close].tray_pose, states[left_close].left_gripper_pose[:3] |
| ) |
| templates = MotionTemplates( |
| pregrasp_rel_pose=pregrasp_rel_pose, |
| grasp_rel_pose=grasp_rel_pose, |
| retreat_rel_poses=retreat_rel_poses, |
| grasp_local_center=grasp_local_center, |
| grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64), |
| retriever_home_joints=np.asarray(demo[0].left.joint_positions, dtype=np.float64), |
| hold_open_angle=float(states[right_open].door_angle), |
| open_more_delta=max( |
| 0.12, |
| float(states[right_open].door_angle - states[right_close].door_angle) * 0.25, |
| ), |
| reference_tray_height=float(states[left_close].tray_pose[2]), |
| approach_rel_poses=approach_rel_poses, |
| ) |
| templates.mask_handle_ids = _infer_tray_mask_handle_ids( |
| episode_dir=template_episode_dir, |
| demo=demo, |
| cache=cache, |
| templates=templates, |
| reference_frames=approach_indices[-4:] + [left_close], |
| ) |
| template_frames = { |
| "pregrasp": pregrasp_index, |
| "grasp": left_close, |
| "right_close": right_close, |
| "right_open": right_open, |
| "approach": approach_indices, |
| "retreat": retreat_indices, |
| } |
| return templates, template_frames |
| finally: |
| env.shutdown() |
|
|
|
|
| def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| camera_pos = extrinsics[:3, 3:4] |
| rotation = extrinsics[:3, :3] |
| world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1) |
| projection = intrinsics @ world_to_camera |
| return projection, world_to_camera |
|
|
|
|
| def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| projection, world_to_camera = _camera_projection(extrinsics, intrinsics) |
| homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1) |
| camera_xyz = (world_to_camera @ homogeneous.T).T |
| image_xyz = (projection @ homogeneous.T).T |
| uv = image_xyz[:, :2] / image_xyz[:, 2:3] |
| return uv, camera_xyz |
|
|
|
|
| def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray: |
| bbox = _tray_bbox_local() |
| center, face_specs = _grasp_surface_specs(templates, bbox) |
| extents = np.asarray(templates.grasp_region_extents, dtype=np.float64) |
| points_local: List[np.ndarray] = [] |
| for axis, face_index in face_specs: |
| points_local.extend( |
| _sample_box_face_points_local( |
| bbox=bbox, |
| axis=axis, |
| face_index=face_index, |
| center=center, |
| extents=extents, |
| primary_samples=9, |
| secondary_samples=5, |
| ) |
| ) |
| points_local = _dedupe_points(points_local) |
| return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) |
|
|
|
|
| def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray: |
| bbox = _tray_bbox_local() |
| center = np.asarray( |
| [ |
| 0.5 * (bbox[0] + bbox[1]), |
| 0.5 * (bbox[2] + bbox[3]), |
| 0.5 * (bbox[4] + bbox[5]), |
| ], |
| dtype=np.float64, |
| ) |
| extents = np.asarray( |
| [ |
| 0.5 * (bbox[1] - bbox[0]), |
| 0.5 * (bbox[3] - bbox[2]), |
| 0.5 * (bbox[5] - bbox[4]), |
| ], |
| dtype=np.float64, |
| ) |
| points_local: List[np.ndarray] = [] |
| for axis in range(3): |
| for face_index in [0, 1]: |
| points_local.extend( |
| _sample_box_face_points_local( |
| bbox=bbox, |
| axis=axis, |
| face_index=face_index, |
| center=center, |
| extents=extents, |
| primary_samples=8, |
| secondary_samples=8, |
| ) |
| ) |
| points_local = _dedupe_points(points_local) |
| return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64) |
|
|
|
|
| def _infer_tray_mask_handle_ids( |
| episode_dir: Path, |
| demo: Demo, |
| cache: ReplayCache, |
| templates: MotionTemplates, |
| reference_frames: Sequence[int], |
| max_handles: int = DEFAULT_MASK_HANDLE_COUNT, |
| ) -> List[int]: |
| counts: Dict[int, int] = {} |
| fallback_counts: Dict[int, int] = {} |
| unique_frames = sorted({int(frame_index) for frame_index in reference_frames}) |
| for frame_index in unique_frames: |
| cache.step_to(frame_index) |
| grasp_points = _sample_grasp_points(templates, Shape("tray").get_pose()) |
| tray_points = _sample_full_tray_points(Shape("tray").get_pose()) |
| for camera_name in FULL_CAMERA_SET: |
| mask = _load_mask(episode_dir, frame_index, camera_name) |
| point_cloud, extrinsics, intrinsics = _camera_point_cloud( |
| episode_dir, demo, frame_index, camera_name |
| ) |
| for points_world in [grasp_points, tray_points]: |
| projected, visible = _visibility_projection_details( |
| points_world, point_cloud, extrinsics, intrinsics |
| ) |
| for px, py in visible: |
| handle = int(mask[py, px]) |
| if handle != 0: |
| counts[handle] = counts.get(handle, 0) + 1 |
| for px, py in projected: |
| handle = int(mask[py, px]) |
| if handle != 0: |
| fallback_counts[handle] = fallback_counts.get(handle, 0) + 1 |
|
|
| ranked_source = counts if counts else fallback_counts |
| if not ranked_source: |
| return [] |
| ranked = sorted(ranked_source.items(), key=lambda item: item[1], reverse=True) |
| top_count = ranked[0][1] |
| selected = [ |
| handle |
| for handle, count in ranked |
| if count >= max(2, int(math.ceil(top_count * 0.15))) |
| ][:max_handles] |
| return selected if selected else [ranked[0][0]] |
|
|
|
|
| def _camera_point_cloud( |
| episode_dir: Path, |
| demo: Demo, |
| frame_index: int, |
| camera_name: str, |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| depth = _load_depth_meters(episode_dir, demo, frame_index, camera_name) |
| extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"] |
| intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"] |
| point_cloud = VisionSensor.pointcloud_from_depth_and_camera_params( |
| depth, extrinsics, intrinsics |
| ) |
| return point_cloud, extrinsics, intrinsics |
|
|
|
|
| def _visibility_projection_details( |
| points_world: np.ndarray, |
| point_cloud_world: np.ndarray, |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| tolerance: float = DEFAULT_VISIBILITY_POINT_TOLERANCE, |
| ) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]: |
| uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) |
| height, width = point_cloud_world.shape[:2] |
| projected: List[Tuple[int, int]] = [] |
| visible: List[Tuple[int, int]] = [] |
| for point_world, (u, v), (_, _, camera_depth) in zip(points_world, uv, camera_xyz): |
| if camera_depth <= 0 or not (0 <= u < width and 0 <= v < height): |
| continue |
| px = min(max(int(round(float(u))), 0), width - 1) |
| py = min(max(int(round(float(v))), 0), height - 1) |
| projected.append((px, py)) |
| scene_point = np.asarray(point_cloud_world[py, px], dtype=np.float64) |
| if not np.isfinite(scene_point).all(): |
| continue |
| if float(np.linalg.norm(scene_point - point_world)) <= tolerance: |
| visible.append((px, py)) |
| return projected, visible |
|
|
|
|
| def _point_visibility_ratio_from_point_cloud( |
| points_world: np.ndarray, |
| point_cloud_world: np.ndarray, |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| ) -> float: |
| projected, visible = _visibility_projection_details( |
| points_world, point_cloud_world, extrinsics, intrinsics |
| ) |
| return float(len(visible) / len(projected)) if projected else 0.0 |
|
|
|
|
| def _point_visibility_stats_from_point_cloud( |
| points_world: np.ndarray, |
| point_cloud_world: np.ndarray, |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| ) -> Dict[str, float]: |
| projected, visible = _visibility_projection_details( |
| points_world, point_cloud_world, extrinsics, intrinsics |
| ) |
| projected_count = int(len(projected)) |
| visible_count = int(len(visible)) |
| ratio = float(visible_count / projected_count) if projected_count else 0.0 |
| return { |
| "ratio": ratio, |
| "projected_count": projected_count, |
| "visible_count": visible_count, |
| } |
|
|
|
|
| def _mask_visibility_ratio( |
| points_world: np.ndarray, |
| mask: np.ndarray, |
| handle_ids: Sequence[int], |
| extrinsics: np.ndarray, |
| intrinsics: np.ndarray, |
| ) -> float: |
| if not handle_ids: |
| return 0.0 |
| handle_set = {int(handle) for handle in handle_ids} |
| uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics) |
| height, width = mask.shape |
| visible = 0 |
| total = 0 |
| for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz): |
| if camera_depth <= 0: |
| continue |
| if not (0 <= u < width and 0 <= v < height): |
| continue |
| total += 1 |
| px = int(round(u)) |
| py = int(round(v)) |
| px = min(max(px, 0), width - 1) |
| py = min(max(py, 0), height - 1) |
| if int(mask[py, px]) in handle_set: |
| visible += 1 |
| return float(visible / total) if total else 0.0 |
|
|
|
|
| def _union_visibility(values: Iterable[float]) -> float: |
| product = 1.0 |
| for value in values: |
| product *= 1.0 - float(value) |
| return 1.0 - product |
|
|
|
|
| def _sequence_summary(values: Sequence[float]) -> Dict[str, float]: |
| if not values: |
| return { |
| "count": 0.0, |
| "mean": 0.0, |
| "median": 0.0, |
| "min": 0.0, |
| "max": 0.0, |
| } |
| array = np.asarray(values, dtype=np.float64) |
| return { |
| "count": float(array.size), |
| "mean": float(np.mean(array)), |
| "median": float(np.median(array)), |
| "min": float(np.min(array)), |
| "max": float(np.max(array)), |
| } |
|
|
|
|
| def _json_safe(value: Any) -> Any: |
| if isinstance(value, dict): |
| return {str(key): _json_safe(item) for key, item in value.items()} |
| if isinstance(value, (list, tuple)): |
| return [_json_safe(item) for item in value] |
| if isinstance(value, np.generic): |
| return _json_safe(value.item()) |
| if isinstance(value, np.ndarray): |
| return [_json_safe(item) for item in value.tolist()] |
| if isinstance(value, float): |
| return None if not math.isfinite(value) else value |
| return value |
|
|
|
|
| def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]: |
| keypoints: List[int] = [] |
| right_prev = demo[0].right.gripper_open |
| left_prev = demo[0].left.gripper_open |
| stopped_buffer = 0 |
| for i, obs in enumerate(demo._observations): |
| if i < 2 or i >= len(demo) - 1: |
| right_stopped = left_stopped = False |
| else: |
| right_stopped = ( |
| np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta) |
| and obs.right.gripper_open == demo[i + 1].right.gripper_open |
| and obs.right.gripper_open == demo[i - 1].right.gripper_open |
| and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open |
| ) |
| left_stopped = ( |
| np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta) |
| and obs.left.gripper_open == demo[i + 1].left.gripper_open |
| and obs.left.gripper_open == demo[i - 1].left.gripper_open |
| and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open |
| ) |
| stopped = stopped_buffer <= 0 and right_stopped and left_stopped |
| stopped_buffer = 4 if stopped else stopped_buffer - 1 |
| last = i == len(demo) - 1 |
| state_changed = ( |
| obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev |
| ) |
| if i != 0 and (state_changed or last or stopped): |
| keypoints.append(i) |
| right_prev = obs.right.gripper_open |
| left_prev = obs.left.gripper_open |
| if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]: |
| keypoints.pop(-2) |
| return keypoints |
|
|
|
|
| def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False): |
| arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm |
| try: |
| return arm.get_path( |
| pose[:3], |
| quaternion=pose[3:], |
| ignore_collisions=ignore_collisions, |
| trials=DEFAULT_PLAN_TRIALS, |
| max_configs=DEFAULT_PLAN_MAX_CONFIGS, |
| max_time_ms=DEFAULT_PLAN_MAX_TIME_MS, |
| trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL, |
| algorithm=Algos.RRTConnect, |
| ) |
| except Exception: |
| return None |
|
|
|
|
| def _stable_plan( |
| scene, |
| arm_name: str, |
| pose: np.ndarray, |
| ignore_collisions: bool = False, |
| ) -> Tuple[Optional[object], float, float]: |
| attempts = max(1, DEFAULT_PLAN_ATTEMPTS) |
| successful_paths: List[Tuple[float, object]] = [] |
| for _ in range(attempts): |
| path = _plan_path(scene, arm_name, pose, ignore_collisions=ignore_collisions) |
| length = _path_length(path) |
| if path is None or not np.isfinite(length): |
| continue |
| successful_paths.append((float(length), path)) |
| if not successful_paths: |
| return None, math.inf, 0.0 |
| successful_paths.sort(key=lambda item: item[0]) |
| stable_length = float(np.median([length for length, _ in successful_paths])) |
| reliability = float(len(successful_paths) / attempts) |
| return successful_paths[0][1], stable_length, reliability |
|
|
|
|
| def _plan_is_reliable(reliability: float) -> bool: |
| attempts = max(1, DEFAULT_PLAN_ATTEMPTS) |
| required = min(attempts, max(1, DEFAULT_PLAN_MIN_SUCCESSES)) |
| return reliability >= (required / attempts) |
|
|
|
|
| def _path_length(path) -> float: |
| if path is None: |
| return math.inf |
| try: |
| return float(path._get_path_point_lengths()[-1]) |
| except Exception: |
| return math.inf |
|
|
|
|
| def _dedupe_pose_list(poses: Iterable[np.ndarray], precision: int = 4) -> List[np.ndarray]: |
| unique: List[np.ndarray] = [] |
| seen = set() |
| for pose in poses: |
| key = tuple(np.round(np.asarray(pose, dtype=np.float64), precision)) |
| if key in seen: |
| continue |
| seen.add(key) |
| unique.append(np.asarray(pose, dtype=np.float64)) |
| return unique |
|
|
|
|
| def _pregrasp_progress_and_distance( |
| current_pose: np.ndarray, |
| tray_pose: np.ndarray, |
| templates: MotionTemplates, |
| ) -> Tuple[float, float]: |
| goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) |
| if templates.approach_rel_poses: |
| start_pose = _apply_relative_pose(tray_pose, templates.approach_rel_poses[0]) |
| span = float(np.linalg.norm(start_pose[:3] - goal_pose[:3])) |
| else: |
| span = 0.12 |
| distance = float(np.linalg.norm(current_pose[:3] - goal_pose[:3])) |
| progress = 1.0 - (distance / max(span, 1e-6)) |
| return float(np.clip(progress, 0.0, 1.0)), distance |
|
|
|
|
| def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]: |
| if templates.approach_rel_poses: |
| rel_poses = templates.approach_rel_poses[-min( |
| DEFAULT_PREGRASP_CANDIDATE_COUNT, len(templates.approach_rel_poses) |
| ) :] |
| else: |
| rel_poses = [templates.pregrasp_rel_pose] |
| candidates = [_apply_relative_pose(tray_pose, rel_pose) for rel_pose in rel_poses] |
| for rel_pose in rel_poses[-1:]: |
| base = _apply_relative_pose(tray_pose, rel_pose) |
| for dx in (-0.02, 0.02): |
| perturbed = base.copy() |
| perturbed[0] += dx |
| candidates.append(perturbed) |
| return _dedupe_pose_list(candidates) |
|
|
|
|
| def _late_pregrasp_goal_poses( |
| tray_pose: np.ndarray, templates: MotionTemplates |
| ) -> List[np.ndarray]: |
| final_goal = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) |
| goals = [final_goal] |
| for dx in (-0.02, 0.02): |
| perturbed = final_goal.copy() |
| perturbed[0] += dx |
| goals.append(perturbed) |
| return _dedupe_pose_list(goals) |
|
|
|
|
| def _pregrasp_corridor_rel_poses(templates: MotionTemplates) -> List[np.ndarray]: |
| rel_poses = [ |
| np.asarray(rel_pose, dtype=np.float64) |
| for rel_pose in templates.approach_rel_poses |
| ] |
| if not rel_poses: |
| return [np.asarray(templates.pregrasp_rel_pose, dtype=np.float64)] |
|
|
| pregrasp_rel_pose = np.asarray(templates.pregrasp_rel_pose, dtype=np.float64) |
| stop_index = int( |
| np.argmin( |
| [ |
| float(np.linalg.norm(rel_pose[:3] - pregrasp_rel_pose[:3])) |
| for rel_pose in rel_poses |
| ] |
| ) |
| ) |
| corridor = rel_poses[: stop_index + 1] |
| if not corridor: |
| corridor = [pregrasp_rel_pose] |
| elif float(np.linalg.norm(corridor[-1][:3] - pregrasp_rel_pose[:3])) > 1e-6: |
| corridor.append(pregrasp_rel_pose) |
| return _dedupe_pose_list(corridor) |
|
|
|
|
| def _extract_sequence_poses( |
| tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates |
| ) -> List[np.ndarray]: |
| poses = [ |
| _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose), |
| _apply_relative_pose(tray_pose, templates.grasp_rel_pose), |
| ] |
| if templates.retreat_rel_poses: |
| retreat_indices = np.linspace( |
| 0, |
| len(templates.retreat_rel_poses) - 1, |
| num=min(3, len(templates.retreat_rel_poses)), |
| dtype=int, |
| ) |
| poses.extend( |
| _apply_relative_pose(task_base_pose, templates.retreat_rel_poses[index]) |
| for index in sorted(set(retreat_indices.tolist())) |
| ) |
| return poses |
|
|
|
|
| def _extract_height_threshold(templates: MotionTemplates) -> float: |
| return templates.reference_tray_height + 0.06 |
|
|
|
|
| def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float: |
| baseline = float(templates.reference_tray_height) |
| threshold = _extract_height_threshold(templates) |
| current_height = float(current_height) |
| if current_height <= baseline: |
| return 0.0 |
| if current_height < threshold: |
| lift_fraction = (current_height - baseline) / max(threshold - baseline, 1e-6) |
| return float(np.clip(0.8 * lift_fraction, 0.0, 0.8)) |
| margin = current_height - threshold |
| |
| return float(min(1.0, 0.8 + margin / 0.12)) |
|
|
|
|
| def _arm_external_collision_names(scene, arm_name: str) -> List[str]: |
| arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm |
| gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper |
| robot_shapes = { |
| shape.get_name() for shape in arm.get_objects_in_tree(object_type=ObjectType.SHAPE) |
| } |
| grasped = {obj.get_name() for obj in gripper.get_grasped_objects()} |
| collisions: List[str] = [] |
| for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE): |
| name = shape.get_name() |
| if not shape.is_collidable() or name in robot_shapes or name in grasped: |
| continue |
| try: |
| if arm.check_arm_collision(shape): |
| collisions.append(name) |
| except Exception: |
| continue |
| return sorted(set(collisions)) |
|
|
|
|
| def _allowed_door_collision_names(scene) -> set: |
| allowed = set() |
| for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE): |
| name = shape.get_name() |
| if name.startswith("oven_door") or name.startswith("oven_knob"): |
| allowed.add(name) |
| return allowed |
|
|
|
|
| def _door_collision_shapes(scene) -> List[Shape]: |
| return [ |
| Shape(shape.get_handle()) |
| for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE) |
| if shape.get_name().startswith("oven_door") or shape.get_name().startswith("oven_knob") |
| ] |
|
|
|
|
| def _robot_collision_shape_handles(scene, arm_name: str) -> set: |
| arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm |
| gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper |
| robot_shapes = arm.get_objects_in_tree(object_type=ObjectType.SHAPE) |
| robot_shapes.extend(gripper.get_objects_in_tree(object_type=ObjectType.SHAPE)) |
| return {shape.get_handle() for shape in robot_shapes} |
|
|
|
|
| def _shape_robot_contacts(shape: Shape, robot_shape_handles: set) -> List[np.ndarray]: |
| contacts: List[np.ndarray] = [] |
| try: |
| raw_contacts = shape.get_contact(None, True) |
| except Exception: |
| return contacts |
| shape_handle = shape.get_handle() |
| for contact in raw_contacts: |
| handles = set(contact.get("contact_handles", [])) |
| if shape_handle not in handles: |
| continue |
| other_handles = handles - {shape_handle} |
| if not other_handles.intersection(robot_shape_handles): |
| continue |
| contact_array = np.asarray(contact.get("contact", []), dtype=np.float64) |
| if contact_array.shape != (9,) or not np.all(np.isfinite(contact_array)): |
| continue |
| contacts.append(contact_array) |
| return contacts |
|
|
|
|
| def _door_surface_normal(shape: Shape, contact_point: np.ndarray) -> Optional[np.ndarray]: |
| bbox = np.asarray(shape.get_bounding_box(), dtype=np.float64) |
| extents = np.array( |
| [bbox[1] - bbox[0], bbox[3] - bbox[2], bbox[5] - bbox[4]], dtype=np.float64 |
| ) |
| if not np.all(np.isfinite(extents)): |
| return None |
| thin_axis = int(np.argmin(extents)) |
| axes = np.asarray(shape.get_matrix()[:3, :3], dtype=np.float64) |
| axis = axes[:, thin_axis] |
| axis_norm = float(np.linalg.norm(axis)) |
| if axis_norm <= 1e-9: |
| return None |
| axis = axis / axis_norm |
| relative = np.asarray(contact_point, dtype=np.float64) - shape.get_position() |
| sign = 1.0 if float(relative @ axis) >= 0.0 else -1.0 |
| return sign * axis |
|
|
|
|
| def _door_contact_alignments( |
| shape: Shape, contacts: Sequence[np.ndarray], tip_delta: np.ndarray |
| ) -> List[float]: |
| tip_delta = np.asarray(tip_delta, dtype=np.float64) |
| tip_speed = float(np.linalg.norm(tip_delta)) |
| if tip_speed <= 1e-9: |
| return [] |
| move = tip_delta / tip_speed |
| alignments: List[float] = [] |
| for contact in contacts: |
| normal = _door_surface_normal(shape, contact[:3]) |
| if normal is None: |
| continue |
| alignments.append(float(np.clip(move @ normal, -1.0, 1.0))) |
| return alignments |
|
|
|
|
| def _door_contact_step_quality(alignments: Sequence[float], step_open_delta: float) -> float: |
| if not alignments: |
| return 0.0 |
| values = np.asarray(alignments, dtype=np.float64) |
| parallel_bonus = float(np.mean(1.0 - np.abs(values))) |
| opening_motion_bonus = float(np.mean(np.clip(-values, 0.0, 1.0))) |
| direct_penalty = float( |
| np.mean( |
| np.clip( |
| (np.abs(values) - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE) |
| / max(1.0 - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE, 1e-6), |
| 0.0, |
| 1.0, |
| ) |
| ) |
| ) |
| open_bonus = float( |
| np.clip(step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0) |
| ) |
| close_penalty = float( |
| np.clip(-step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0) |
| ) |
| return float( |
| 0.6 * parallel_bonus |
| + 0.4 * opening_motion_bonus |
| + 0.4 * open_bonus |
| - 0.8 * direct_penalty |
| - 1.0 * close_penalty |
| ) |
|
|
|
|
| def _door_contact_path_quality(step_qualities: Sequence[float], door_open_delta: float) -> float: |
| if not step_qualities: |
| return 0.0 |
| mean_quality = float(np.mean(np.asarray(step_qualities, dtype=np.float64))) |
| open_bonus = float( |
| np.clip( |
| door_open_delta / (3.0 * max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6)), |
| 0.0, |
| 1.0, |
| ) |
| ) |
| return float(np.clip(0.7 * mean_quality + 0.3 * open_bonus, 0.0, 1.0)) |
|
|
|
|
| def _execute_path_with_collision_trace(task, path, arm_name: str) -> Dict[str, object]: |
| collisions = set() |
| step_qualities: List[float] = [] |
| alignments_all: List[float] = [] |
| door_shapes = _door_collision_shapes(task._scene) |
| robot_shape_handles = _robot_collision_shape_handles(task._scene, arm_name) |
| arm = task._scene.robot.left_arm if arm_name == "left" else task._scene.robot.right_arm |
| tip = arm.get_tip() |
| path.set_to_start(disable_dynamics=True) |
| task._pyrep.step() |
| previous_tip = tip.get_position() |
| previous_door_angle = float(Joint("oven_door_joint").get_joint_position()) |
| steps = 0 |
| while True: |
| done = path.step() |
| task._pyrep.step() |
| current_tip = tip.get_position() |
| tip_delta = np.asarray(current_tip - previous_tip, dtype=np.float64) |
| previous_tip = current_tip |
| current_door_angle = float(Joint("oven_door_joint").get_joint_position()) |
| step_open_delta = previous_door_angle - current_door_angle |
| previous_door_angle = current_door_angle |
| current_collisions = set(_arm_external_collision_names(task._scene, arm_name)) |
| for shape in door_shapes: |
| contacts = _shape_robot_contacts(shape, robot_shape_handles) |
| if not contacts: |
| continue |
| current_collisions.add(shape.get_name()) |
| alignments = _door_contact_alignments(shape, contacts, tip_delta) |
| if alignments: |
| alignments_all.extend(float(value) for value in alignments) |
| step_qualities.append( |
| _door_contact_step_quality(alignments, step_open_delta) |
| ) |
| collisions.update(current_collisions) |
| steps += 1 |
| if done or steps > 400: |
| break |
| return { |
| "collision_names": sorted(collisions), |
| "step_qualities": [float(value) for value in step_qualities], |
| "alignments": [float(value) for value in alignments_all], |
| "door_contact_steps": int(len(step_qualities)), |
| "path_steps": int(steps), |
| } |
|
|
|
|
| def _single_door_assisted_pregrasp_score( |
| task, |
| target_pose: np.ndarray, |
| base_snapshot: Optional[SimulatorSnapshot] = None, |
| before_angle: Optional[float] = None, |
| allowed_collisions: Optional[set] = None, |
| ) -> Tuple[float, bool, Dict[str, object]]: |
| snapshot = base_snapshot if base_snapshot is not None else _capture_snapshot(task) |
| debug: Dict[str, object] = { |
| "path_found": False, |
| "path_length": math.inf, |
| "base_score": 0.0, |
| "final_score": 0.0, |
| "bump_applied": False, |
| "post_assist_strict_feasible": False, |
| "post_assist_strict_length": math.inf, |
| "door_open_delta": 0.0, |
| "door_quality": 0.0, |
| "collision_names": [], |
| "bad_collision_names": [], |
| "step_quality_mean": 0.0, |
| "step_quality_min": 0.0, |
| "step_quality_max": 0.0, |
| "alignment_signed_mean": 0.0, |
| "alignment_abs_mean": 0.0, |
| "alignment_min": 0.0, |
| "alignment_max": 0.0, |
| "door_contact_steps": 0, |
| "path_steps": 0, |
| } |
| try: |
| if before_angle is None: |
| before_angle = float(Joint("oven_door_joint").get_joint_position()) |
| path = _plan_path( |
| task._scene, |
| "left", |
| target_pose, |
| ignore_collisions=True, |
| ) |
| length = _path_length(path) |
| if path is None or not np.isfinite(length): |
| return 0.0, False, debug |
| debug["path_found"] = True |
| debug["path_length"] = float(length) |
|
|
| trace = _execute_path_with_collision_trace( |
| task, path, "left" |
| ) |
| allowed = ( |
| _allowed_door_collision_names(task._scene) |
| if allowed_collisions is None |
| else allowed_collisions |
| ) |
| collision_names = [str(name) for name in trace["collision_names"]] |
| bad_collisions = [name for name in collision_names if name not in allowed] |
| door_open_delta = before_angle - float(Joint("oven_door_joint").get_joint_position()) |
| step_qualities = [float(value) for value in trace["step_qualities"]] |
| alignments = [float(value) for value in trace["alignments"]] |
| door_quality = _door_contact_path_quality(step_qualities, door_open_delta) |
| step_summary = _sequence_summary(step_qualities) |
| alignment_summary = _sequence_summary(alignments) |
| debug.update( |
| { |
| "collision_names": collision_names, |
| "bad_collision_names": bad_collisions, |
| "door_open_delta": float(door_open_delta), |
| "door_quality": float(door_quality), |
| "step_quality_mean": float(step_summary["mean"]), |
| "step_quality_min": float(step_summary["min"]), |
| "step_quality_max": float(step_summary["max"]), |
| "alignment_signed_mean": float(alignment_summary["mean"]), |
| "alignment_abs_mean": float( |
| np.mean(np.abs(np.asarray(alignments, dtype=np.float64))) |
| ) |
| if alignments |
| else 0.0, |
| "alignment_min": float(alignment_summary["min"]), |
| "alignment_max": float(alignment_summary["max"]), |
| "door_contact_steps": int(trace["door_contact_steps"]), |
| "path_steps": int(trace["path_steps"]), |
| } |
| ) |
| if ( |
| bad_collisions |
| or door_open_delta <= DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA |
| or door_quality < DEFAULT_DOOR_ASSIST_MIN_QUALITY |
| ): |
| return 0.0, False, debug |
|
|
| base_score = float( |
| math.exp(-float(length) / DEFAULT_PREGRASP_ASSIST_PATH_SCALE) |
| ) |
| base_score *= door_quality |
| debug["base_score"] = float(base_score) |
| strict_path = _plan_path( |
| task._scene, |
| "left", |
| target_pose, |
| ignore_collisions=False, |
| ) |
| strict_length = _path_length(strict_path) |
| debug["post_assist_strict_length"] = float(strict_length) |
| if strict_path is not None and np.isfinite(strict_length): |
| final_score = float(0.5 + 0.5 * base_score) |
| debug["bump_applied"] = True |
| debug["post_assist_strict_feasible"] = True |
| debug["final_score"] = final_score |
| return final_score, False, debug |
|
|
| debug["final_score"] = float(base_score) |
| return base_score, False, debug |
| finally: |
| _restore_snapshot(task, snapshot) |
|
|
|
|
| def _door_assisted_pregrasp_score( |
| task, |
| target_pose: np.ndarray, |
| base_snapshot: Optional[SimulatorSnapshot] = None, |
| before_angle: Optional[float] = None, |
| allowed_collisions: Optional[set] = None, |
| ) -> Tuple[float, bool, Dict[str, object]]: |
| attempts = max(1, DEFAULT_PLAN_ATTEMPTS) |
| successful_scores: List[float] = [] |
| successful_base_scores: List[float] = [] |
| successful_lengths: List[float] = [] |
| successful_door_qualities: List[float] = [] |
| successful_open_deltas: List[float] = [] |
| successful_step_means: List[float] = [] |
| successful_align_abs_means: List[float] = [] |
| successful_align_signed_means: List[float] = [] |
| successful_collision_counts: List[float] = [] |
| successful_bad_collision_counts: List[float] = [] |
| post_assist_strict_count = 0 |
| bump_count = 0 |
| assisted_success = False |
| attempt_debugs: List[Dict[str, object]] = [] |
| for _ in range(attempts): |
| score, success, attempt_debug = _single_door_assisted_pregrasp_score( |
| task, |
| target_pose, |
| base_snapshot=base_snapshot, |
| before_angle=before_angle, |
| allowed_collisions=allowed_collisions, |
| ) |
| attempt_debugs.append(attempt_debug) |
| if score > 0.0: |
| successful_scores.append(float(score)) |
| successful_base_scores.append(float(attempt_debug["base_score"])) |
| successful_lengths.append(float(attempt_debug["path_length"])) |
| successful_door_qualities.append(float(attempt_debug["door_quality"])) |
| successful_open_deltas.append(float(attempt_debug["door_open_delta"])) |
| successful_step_means.append(float(attempt_debug["step_quality_mean"])) |
| successful_align_abs_means.append(float(attempt_debug["alignment_abs_mean"])) |
| successful_align_signed_means.append(float(attempt_debug["alignment_signed_mean"])) |
| successful_collision_counts.append(float(len(attempt_debug["collision_names"]))) |
| successful_bad_collision_counts.append( |
| float(len(attempt_debug["bad_collision_names"])) |
| ) |
| post_assist_strict_count += int(bool(attempt_debug["post_assist_strict_feasible"])) |
| bump_count += int(bool(attempt_debug["bump_applied"])) |
| assisted_success = assisted_success or success |
|
|
| debug: Dict[str, object] = { |
| "attempts": attempt_debugs, |
| "attempt_count": int(attempts), |
| "successful_attempt_count": int(len(successful_scores)), |
| "reliability": 0.0, |
| "base_score": 0.0, |
| "score_before_reliability": 0.0, |
| "median_path_length": 0.0, |
| "door_quality": 0.0, |
| "door_open_delta": 0.0, |
| "step_quality_mean": 0.0, |
| "alignment_abs_mean": 0.0, |
| "alignment_signed_mean": 0.0, |
| "collision_count": 0.0, |
| "bad_collision_count": 0.0, |
| "post_assist_strict_feasible": False, |
| "bump_applied": False, |
| } |
| if not successful_scores: |
| return 0.0, assisted_success, debug |
|
|
| reliability = float(len(successful_scores) / attempts) |
| debug.update( |
| { |
| "reliability": reliability, |
| "base_score": float(np.median(np.asarray(successful_base_scores, dtype=np.float64))), |
| "score_before_reliability": float( |
| np.median(np.asarray(successful_scores, dtype=np.float64)) |
| ), |
| "median_path_length": float( |
| np.median(np.asarray(successful_lengths, dtype=np.float64)) |
| ), |
| "door_quality": float( |
| np.median(np.asarray(successful_door_qualities, dtype=np.float64)) |
| ), |
| "door_open_delta": float( |
| np.median(np.asarray(successful_open_deltas, dtype=np.float64)) |
| ), |
| "step_quality_mean": float( |
| np.median(np.asarray(successful_step_means, dtype=np.float64)) |
| ), |
| "alignment_abs_mean": float( |
| np.median(np.asarray(successful_align_abs_means, dtype=np.float64)) |
| ), |
| "alignment_signed_mean": float( |
| np.median(np.asarray(successful_align_signed_means, dtype=np.float64)) |
| ), |
| "collision_count": float( |
| np.median(np.asarray(successful_collision_counts, dtype=np.float64)) |
| ), |
| "bad_collision_count": float( |
| np.median(np.asarray(successful_bad_collision_counts, dtype=np.float64)) |
| ), |
| "post_assist_strict_feasible": bool(post_assist_strict_count > 0), |
| "bump_applied": bool(bump_count > 0), |
| } |
| ) |
| if not _plan_is_reliable(reliability): |
| return 0.0, assisted_success, debug |
|
|
| stable_score = float(np.median(np.asarray(successful_scores, dtype=np.float64))) |
| return stable_score * reliability, assisted_success, debug |
|
|
|
|
| def _pregrasp_score_and_success( |
| task, templates: MotionTemplates |
| ) -> Tuple[float, bool, Dict[str, object]]: |
| tray = Shape("tray") |
| if any( |
| grasped.get_name() == tray.get_name() |
| for grasped in task._scene.robot.left_gripper.get_grasped_objects() |
| ): |
| return ( |
| 1.0, |
| True, |
| { |
| "tray_grasped_left": True, |
| "source": "grasped", |
| "best_goal_index": -1, |
| "num_goal_poses": 0, |
| "source_strict": False, |
| "source_assisted": False, |
| "bump_applied": False, |
| "base_score": 1.0, |
| "strict_length": 0.0, |
| "assisted_length": 0.0, |
| "assisted_reliability": 1.0, |
| "post_assist_strict_feasible": False, |
| "door_open_delta": 0.0, |
| "door_quality": 0.0, |
| "step_quality_mean": 0.0, |
| "step_quality_min": 0.0, |
| "step_quality_max": 0.0, |
| "alignment_signed_mean": 0.0, |
| "alignment_abs_mean": 0.0, |
| "collision_count": 0, |
| "bad_collision_count": 0, |
| "best_goal_pose": [], |
| "goal_evaluations": [], |
| }, |
| ) |
| tray_pose = tray.get_pose() |
| goal_poses = _late_pregrasp_goal_poses(tray_pose, templates) |
| best_score = 0.0 |
| best_success = False |
| best_goal_index = -1 |
| best_goal_debug: Optional[Dict[str, object]] = None |
| base_snapshot: Optional[SimulatorSnapshot] = None |
| before_angle = float(Joint("oven_door_joint").get_joint_position()) |
| allowed_collisions: Optional[set] = None |
| goal_debugs: List[Dict[str, object]] = [] |
| for goal_index, target_pose in enumerate(goal_poses): |
| goal_debug: Dict[str, object] = { |
| "goal_index": int(goal_index), |
| "target_pose": [float(value) for value in np.asarray(target_pose, dtype=np.float64)], |
| "strict_path_found": False, |
| "strict_length": 0.0, |
| "assisted": None, |
| "candidate_score": 0.0, |
| "source": "none", |
| } |
| path = _plan_path( |
| task._scene, |
| "left", |
| target_pose, |
| ignore_collisions=False, |
| ) |
| length = _path_length(path) |
| if path is not None and np.isfinite(length): |
| goal_debug["strict_path_found"] = True |
| goal_debug["strict_length"] = float(length) |
| goal_debug["candidate_score"] = 1.0 |
| goal_debug["source"] = "strict" |
| goal_debugs.append(goal_debug) |
| return ( |
| 1.0, |
| True, |
| { |
| "tray_grasped_left": False, |
| "source": "strict", |
| "best_goal_index": int(goal_index), |
| "num_goal_poses": int(len(goal_poses)), |
| "source_strict": True, |
| "source_assisted": False, |
| "bump_applied": False, |
| "base_score": 1.0, |
| "strict_length": float(length), |
| "assisted_length": 0.0, |
| "assisted_reliability": 0.0, |
| "post_assist_strict_feasible": False, |
| "door_open_delta": 0.0, |
| "door_quality": 0.0, |
| "step_quality_mean": 0.0, |
| "step_quality_min": 0.0, |
| "step_quality_max": 0.0, |
| "alignment_signed_mean": 0.0, |
| "alignment_abs_mean": 0.0, |
| "collision_count": 0, |
| "bad_collision_count": 0, |
| "best_goal_pose": [ |
| float(value) for value in np.asarray(target_pose, dtype=np.float64) |
| ], |
| "goal_evaluations": goal_debugs, |
| }, |
| ) |
| if base_snapshot is None: |
| base_snapshot = _capture_snapshot(task) |
| allowed_collisions = _allowed_door_collision_names(task._scene) |
| assisted_score, assisted_success, assisted_debug = _door_assisted_pregrasp_score( |
| task, |
| target_pose, |
| base_snapshot=base_snapshot, |
| before_angle=before_angle, |
| allowed_collisions=allowed_collisions, |
| ) |
| goal_debug["assisted"] = assisted_debug |
| goal_debug["candidate_score"] = float(assisted_score) |
| goal_debug["source"] = "assisted" if assisted_score > 0.0 else "none" |
| goal_debugs.append(goal_debug) |
| if assisted_score > best_score: |
| best_score = assisted_score |
| best_goal_index = goal_index |
| best_goal_debug = goal_debug |
| best_success = best_success or assisted_success |
| if best_goal_debug is None: |
| best_goal_debug = { |
| "goal_index": -1, |
| "strict_length": 0.0, |
| "assisted": None, |
| } |
| assisted_debug = best_goal_debug.get("assisted") or {} |
| return ( |
| float(best_score), |
| bool(best_success), |
| { |
| "tray_grasped_left": False, |
| "source": "assisted" if best_score > 0.0 else "none", |
| "best_goal_index": int(best_goal_index), |
| "num_goal_poses": int(len(goal_poses)), |
| "source_strict": False, |
| "source_assisted": bool(best_score > 0.0), |
| "bump_applied": bool(assisted_debug.get("bump_applied", False)), |
| "base_score": float(assisted_debug.get("base_score", 0.0)), |
| "strict_length": float(best_goal_debug.get("strict_length", 0.0)), |
| "assisted_length": float(assisted_debug.get("median_path_length", 0.0)), |
| "assisted_reliability": float(assisted_debug.get("reliability", 0.0)), |
| "post_assist_strict_feasible": bool( |
| assisted_debug.get("post_assist_strict_feasible", False) |
| ), |
| "door_open_delta": float(assisted_debug.get("door_open_delta", 0.0)), |
| "door_quality": float(assisted_debug.get("door_quality", 0.0)), |
| "step_quality_mean": float(assisted_debug.get("step_quality_mean", 0.0)), |
| "step_quality_min": float( |
| min( |
| [ |
| float(attempt.get("step_quality_min", 0.0)) |
| for attempt in assisted_debug.get("attempts", []) |
| if attempt.get("path_found", False) |
| ] |
| or [0.0] |
| ) |
| ), |
| "step_quality_max": float( |
| max( |
| [ |
| float(attempt.get("step_quality_max", 0.0)) |
| for attempt in assisted_debug.get("attempts", []) |
| if attempt.get("path_found", False) |
| ] |
| or [0.0] |
| ) |
| ), |
| "alignment_signed_mean": float( |
| assisted_debug.get("alignment_signed_mean", 0.0) |
| ), |
| "alignment_abs_mean": float(assisted_debug.get("alignment_abs_mean", 0.0)), |
| "collision_count": int(round(float(assisted_debug.get("collision_count", 0.0)))), |
| "bad_collision_count": int( |
| round(float(assisted_debug.get("bad_collision_count", 0.0))) |
| ), |
| "best_goal_pose": [ |
| float(value) |
| for value in np.asarray( |
| goal_poses[best_goal_index], dtype=np.float64 |
| ) |
| ] |
| if best_goal_index >= 0 |
| else [], |
| "goal_evaluations": goal_debugs, |
| }, |
| ) |
|
|
|
|
| def _extract_score_and_success( |
| task, templates: MotionTemplates |
| ) -> Tuple[float, bool, Dict[str, object]]: |
| tray = Shape("tray") |
| robot = task._scene.robot |
| snapshot = _capture_snapshot(task) |
| try: |
| total_length = 0.0 |
| current_height = float(tray.get_position()[2]) |
| already_grasped = any( |
| grasped.get_name() == tray.get_name() |
| for grasped in robot.left_gripper.get_grasped_objects() |
| ) |
| debug: Dict[str, object] = { |
| "initial_height": float(current_height), |
| "final_height": float(current_height), |
| "already_grasped_start": bool(already_grasped), |
| "already_grasped_end": bool(already_grasped), |
| "total_length": 0.0, |
| "num_milestones": 0, |
| "first_failed_milestone": -1, |
| "min_reliability": 0.0, |
| "mean_reliability": 0.0, |
| "approach_score": 0.0, |
| "retreat_score": 0.0, |
| "milestones": [], |
| "score": 0.0, |
| } |
| if already_grasped and current_height >= _extract_height_threshold(templates): |
| score = _extraction_progress_score(current_height, templates) |
| debug["score"] = float(score) |
| return score, True, debug |
| poses = _extract_sequence_poses( |
| tray.get_pose(), task._task.get_base().get_pose(), templates |
| ) |
| approach_poses = [] if already_grasped else poses[:2] |
| retreat_poses = poses[2:] |
| if already_grasped and retreat_poses: |
| future_retreat_poses = [ |
| pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01 |
| ] |
| if future_retreat_poses: |
| retreat_poses = future_retreat_poses |
| elif current_height < _extract_height_threshold(templates): |
| retreat_poses = [retreat_poses[-1]] |
| else: |
| retreat_poses = [] |
| milestone_poses = approach_poses + retreat_poses |
| milestone_collision = ([False] * len(approach_poses)) + ([True] * len(retreat_poses)) |
| progress = _extraction_progress_score(current_height, templates) * 0.25 |
| milestone_weight = 0.75 / max(1, len(milestone_poses)) |
| milestone_reliabilities: List[float] = [] |
| debug["num_milestones"] = int(len(milestone_poses)) |
|
|
| for milestone_index, (pose, ignore_collisions) in enumerate( |
| zip(milestone_poses, milestone_collision) |
| ): |
| path, length, reliability = _stable_plan( |
| task._scene, "left", pose, ignore_collisions=ignore_collisions |
| ) |
| planner_score = ( |
| reliability * math.exp(-length / DEFAULT_PATH_SCALE) |
| if path is not None and np.isfinite(length) |
| else 0.0 |
| ) |
| milestone_debug = { |
| "milestone_index": int(milestone_index), |
| "kind": "approach" if milestone_index < len(approach_poses) else "retreat", |
| "ignore_collisions": bool(ignore_collisions), |
| "path_found": bool(path is not None and np.isfinite(length)), |
| "path_length": float(length) if np.isfinite(length) else 0.0, |
| "reliability": float(reliability), |
| "planner_score": float(planner_score), |
| "height_before": float(current_height), |
| } |
| debug["milestones"].append(milestone_debug) |
| if path is None or not np.isfinite(length): |
| debug["first_failed_milestone"] = int(milestone_index) |
| debug["final_height"] = float(current_height) |
| debug["total_length"] = float(total_length) |
| debug["score"] = float(progress) |
| return progress, False, debug |
| milestone_reliabilities.append(float(reliability)) |
| progress += milestone_weight * planner_score |
| if milestone_debug["kind"] == "approach": |
| debug["approach_score"] = float(debug["approach_score"] + milestone_weight * planner_score) |
| else: |
| debug["retreat_score"] = float(debug["retreat_score"] + milestone_weight * planner_score) |
| if not _plan_is_reliable(reliability): |
| debug["first_failed_milestone"] = int(milestone_index) |
| debug["final_height"] = float(current_height) |
| debug["total_length"] = float(total_length + length) |
| debug["score"] = float(progress) |
| debug["min_reliability"] = float(min(milestone_reliabilities)) |
| debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities))) |
| return progress, False, debug |
| total_length += length |
| path.set_to_end(disable_dynamics=True) |
| task._pyrep.step() |
| current_height = float(tray.get_position()[2]) |
| progress = max(progress, _extraction_progress_score(current_height, templates) * 0.25) |
| milestone_debug["height_after"] = float(current_height) |
| if (not already_grasped) and milestone_index == (len(approach_poses) - 1): |
| robot.left_gripper.grasp(tray) |
| already_grasped = True |
| if not already_grasped: |
| robot.left_gripper.grasp(tray) |
| final_height = float(tray.get_position()[2]) |
| success = final_height >= _extract_height_threshold(templates) |
| score = max( |
| progress, |
| math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)), |
| _extraction_progress_score(final_height, templates) if success else 0.0, |
| ) |
| debug["final_height"] = float(final_height) |
| debug["already_grasped_end"] = bool(already_grasped) |
| debug["total_length"] = float(total_length) |
| debug["score"] = float(score) |
| if milestone_reliabilities: |
| debug["min_reliability"] = float(min(milestone_reliabilities)) |
| debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities))) |
| return score, bool(success), debug |
| finally: |
| _restore_snapshot(task, snapshot) |
|
|
|
|
| def _wait_branch(task, steps: int = 5) -> None: |
| for _ in range(steps): |
| task._scene.step() |
|
|
|
|
| def _open_more_branch(task, templates: MotionTemplates) -> None: |
| joint = Joint("oven_door_joint") |
| current = joint.get_joint_position() |
| joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True) |
| for _ in range(3): |
| task._pyrep.step() |
|
|
|
|
| def _hold_open_branch(task, templates: MotionTemplates) -> None: |
| joint = Joint("oven_door_joint") |
| current = joint.get_joint_position() |
| joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True) |
| for _ in range(3): |
| task._pyrep.step() |
|
|
|
|
| def _frame_metrics( |
| episode_dir: Path, |
| demo: Demo, |
| frame_state: ReplayState, |
| templates: MotionTemplates, |
| ) -> Tuple[Dict[str, float], Dict[str, object]]: |
| grasp_points = _sample_grasp_points(templates, frame_state.tray_pose) |
| full_tray_points = _sample_full_tray_points(frame_state.tray_pose) |
| camera_values: Dict[str, Dict[str, float]] = {} |
| for camera_name in FULL_CAMERA_SET: |
| point_cloud, extrinsics, intrinsics = _camera_point_cloud( |
| episode_dir, demo, frame_state.frame_index, camera_name |
| ) |
| grasp_stats = _point_visibility_stats_from_point_cloud( |
| grasp_points, point_cloud, extrinsics, intrinsics |
| ) |
| tray_stats = _point_visibility_stats_from_point_cloud( |
| full_tray_points, point_cloud, extrinsics, intrinsics |
| ) |
| camera_values[camera_name] = { |
| "grasp_visibility": float(grasp_stats["ratio"]), |
| "grasp_projected_count": int(grasp_stats["projected_count"]), |
| "grasp_visible_count": int(grasp_stats["visible_count"]), |
| "tray_visibility": float(tray_stats["ratio"]), |
| "tray_projected_count": int(tray_stats["projected_count"]), |
| "tray_visible_count": int(tray_stats["visible_count"]), |
| } |
|
|
| values: Dict[str, float] = {} |
| for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items(): |
| values[f"{name}_visibility"] = _union_visibility( |
| camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set |
| ) |
| values[f"{name}_whole_tray_visibility"] = _union_visibility( |
| camera_values[camera_name]["tray_visibility"] for camera_name in camera_set |
| ) |
| for camera_name in FULL_CAMERA_SET: |
| camera_value = camera_values[camera_name] |
| values[f"{camera_name}_grasp_visibility"] = float(camera_value["grasp_visibility"]) |
| values[f"{camera_name}_tray_visibility"] = float(camera_value["tray_visibility"]) |
| values[f"{camera_name}_grasp_projected_count"] = float( |
| camera_value["grasp_projected_count"] |
| ) |
| values[f"{camera_name}_grasp_visible_count"] = float(camera_value["grasp_visible_count"]) |
| values[f"{camera_name}_tray_projected_count"] = float(camera_value["tray_projected_count"]) |
| values[f"{camera_name}_tray_visible_count"] = float(camera_value["tray_visible_count"]) |
|
|
| best_grasp_camera = max( |
| FULL_CAMERA_SET, |
| key=lambda camera_name: float(camera_values[camera_name]["grasp_visibility"]), |
| ) |
| best_tray_camera = max( |
| FULL_CAMERA_SET, |
| key=lambda camera_name: float(camera_values[camera_name]["tray_visibility"]), |
| ) |
| debug = { |
| "best_grasp_visibility_camera": best_grasp_camera, |
| "best_whole_tray_visibility_camera": best_tray_camera, |
| "camera_values": camera_values, |
| } |
| return values, debug |
|
|
|
|
| def _build_frame_artifacts( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| task, |
| state: ReplayState, |
| ) -> Tuple[Dict[str, float], Dict[str, object]]: |
| visibility_values, visibility_debug = _frame_metrics(episode_dir, demo, state, templates) |
| tray_pose = np.asarray(state.tray_pose, dtype=np.float64) |
| current_pose = np.asarray(state.left_gripper_pose, dtype=np.float64) |
| right_pose = np.asarray(state.right_gripper_pose, dtype=np.float64) |
| robot = task._scene.robot |
| left_joints = np.asarray(robot.left_arm.get_joint_positions(), dtype=np.float64) |
| right_joints = np.asarray(robot.right_arm.get_joint_positions(), dtype=np.float64) |
| pregrasp_goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose) |
| pregrasp_progress, pregrasp_distance = _pregrasp_progress_and_distance( |
| current_pose, |
| tray_pose, |
| templates, |
| ) |
| pregrasp_position_error = float(np.linalg.norm(current_pose[:3] - pregrasp_goal_pose[:3])) |
| pregrasp_orientation_error_deg = _orientation_error_degrees( |
| current_pose[3:], pregrasp_goal_pose[3:] |
| ) |
| tray = Shape("tray") |
| tray_grasped_left = any( |
| grasped.get_name() == tray.get_name() |
| for grasped in task._scene.robot.left_gripper.get_grasped_objects() |
| ) |
| tray_height = float(tray.get_position()[2]) |
| p_pre, y_pre, p_pre_debug = _pregrasp_score_and_success(task, templates) |
| p_ext, y_ext, p_ext_debug = _extract_score_and_success(task, templates) |
|
|
| row = { |
| "frame_index": int(state.frame_index), |
| "time_norm": float(state.frame_index / max(1, len(demo) - 1)), |
| "door_angle": float(state.door_angle), |
| "right_gripper_open": float(state.right_gripper_open), |
| "left_gripper_open": float(state.left_gripper_open), |
| "right_arm_pose_x": float(right_pose[0]), |
| "right_arm_pose_y": float(right_pose[1]), |
| "right_arm_pose_z": float(right_pose[2]), |
| "right_arm_pose_qx": float(right_pose[3]), |
| "right_arm_pose_qy": float(right_pose[4]), |
| "right_arm_pose_qz": float(right_pose[5]), |
| "right_arm_pose_qw": float(right_pose[6]), |
| "left_arm_pose_x": float(current_pose[0]), |
| "left_arm_pose_y": float(current_pose[1]), |
| "left_arm_pose_z": float(current_pose[2]), |
| "left_arm_pose_qx": float(current_pose[3]), |
| "left_arm_pose_qy": float(current_pose[4]), |
| "left_arm_pose_qz": float(current_pose[5]), |
| "left_arm_pose_qw": float(current_pose[6]), |
| "tray_height": float(tray_height), |
| "tray_grasped_left": float(bool(tray_grasped_left)), |
| "pregrasp_progress": float(pregrasp_progress), |
| "pregrasp_distance": float(pregrasp_distance), |
| "pregrasp_position_error": float(pregrasp_position_error), |
| "pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg), |
| "p_pre": float(p_pre), |
| "p_ext": float(p_ext), |
| "y_pre_raw": float(bool(y_pre)), |
| "y_ext_raw": float(bool(y_ext)), |
| "y_pre": float(bool(y_pre)), |
| "y_ext": float(bool(y_ext)), |
| "p_pre_source_strict": float(bool(p_pre_debug["source_strict"])), |
| "p_pre_source_assisted": float(bool(p_pre_debug["source_assisted"])), |
| "p_pre_bump_applied": float(bool(p_pre_debug["bump_applied"])), |
| "p_pre_base_score": float(p_pre_debug["base_score"]), |
| "p_pre_strict_length": float(p_pre_debug["strict_length"]), |
| "p_pre_assisted_length": float(p_pre_debug["assisted_length"]), |
| "p_pre_assisted_reliability": float(p_pre_debug["assisted_reliability"]), |
| "p_pre_post_assist_strict_feasible": float( |
| bool(p_pre_debug["post_assist_strict_feasible"]) |
| ), |
| "p_pre_door_open_delta": float(p_pre_debug["door_open_delta"]), |
| "p_pre_door_quality": float(p_pre_debug["door_quality"]), |
| "p_pre_step_quality_mean": float(p_pre_debug["step_quality_mean"]), |
| "p_pre_step_quality_min": float(p_pre_debug["step_quality_min"]), |
| "p_pre_step_quality_max": float(p_pre_debug["step_quality_max"]), |
| "p_pre_alignment_signed_mean": float(p_pre_debug["alignment_signed_mean"]), |
| "p_pre_alignment_abs_mean": float(p_pre_debug["alignment_abs_mean"]), |
| "p_pre_collision_count": float(p_pre_debug["collision_count"]), |
| "p_pre_bad_collision_count": float(p_pre_debug["bad_collision_count"]), |
| "p_pre_best_goal_index": float(p_pre_debug["best_goal_index"]), |
| "p_pre_num_goal_poses": float(p_pre_debug["num_goal_poses"]), |
| "p_pre_best_target_pose_x": float(p_pre_debug["best_goal_pose"][0]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_y": float(p_pre_debug["best_goal_pose"][1]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_z": float(p_pre_debug["best_goal_pose"][2]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_qx": float(p_pre_debug["best_goal_pose"][3]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_qy": float(p_pre_debug["best_goal_pose"][4]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_qz": float(p_pre_debug["best_goal_pose"][5]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_pre_best_target_pose_qw": float(p_pre_debug["best_goal_pose"][6]) |
| if p_pre_debug["best_goal_pose"] |
| else float("nan"), |
| "p_ext_initial_height": float(p_ext_debug["initial_height"]), |
| "p_ext_final_height": float(p_ext_debug["final_height"]), |
| "p_ext_total_length": float(p_ext_debug["total_length"]), |
| "p_ext_already_grasped_start": float(bool(p_ext_debug["already_grasped_start"])), |
| "p_ext_already_grasped_end": float(bool(p_ext_debug["already_grasped_end"])), |
| "p_ext_num_milestones": float(p_ext_debug["num_milestones"]), |
| "p_ext_first_failed_milestone": float(p_ext_debug["first_failed_milestone"]), |
| "p_ext_min_reliability": float(p_ext_debug["min_reliability"]), |
| "p_ext_mean_reliability": float(p_ext_debug["mean_reliability"]), |
| "p_ext_approach_score": float(p_ext_debug["approach_score"]), |
| "p_ext_retreat_score": float(p_ext_debug["retreat_score"]), |
| **visibility_values, |
| } |
| debug = { |
| "frame_index": int(state.frame_index), |
| "state": { |
| "door_angle": float(state.door_angle), |
| "tray_height": float(tray_height), |
| "tray_grasped_left": bool(tray_grasped_left), |
| "left_arm_pose": [float(value) for value in current_pose], |
| "right_arm_pose": [float(value) for value in right_pose], |
| "left_arm_joint_positions": [float(value) for value in left_joints], |
| "right_arm_joint_positions": [float(value) for value in right_joints], |
| "pregrasp_goal_pose": [float(value) for value in pregrasp_goal_pose], |
| "pregrasp_position_error": float(pregrasp_position_error), |
| "pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg), |
| }, |
| "visibility": visibility_debug, |
| "p_pre": p_pre_debug, |
| "p_ext": p_ext_debug, |
| } |
| return row, debug |
|
|
|
|
| def _compute_frame_row_isolated( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| checkpoint_stride: int, |
| frame_index: int, |
| ) -> Dict[str, float]: |
| rows = _compute_frame_rows_sequential( |
| episode_dir=episode_dir, |
| demo=demo, |
| templates=templates, |
| checkpoint_stride=checkpoint_stride, |
| frame_indices=[frame_index], |
| ) |
| if not rows: |
| raise RuntimeError(f"failed to compute frame {frame_index}") |
| return rows[0] |
|
|
|
|
| def _build_frame_row( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| task, |
| state: ReplayState, |
| ) -> Dict[str, float]: |
| row, _ = _build_frame_artifacts(episode_dir, demo, templates, task, state) |
| return row |
|
|
|
|
| def _compute_frame_rows_sequential( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| checkpoint_stride: int, |
| frame_indices: Sequence[int], |
| ) -> List[Dict[str, float]]: |
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| rows: List[Dict[str, float]] = [] |
| for frame_index in sorted({int(index) for index in frame_indices}): |
| cache.step_to(frame_index) |
| frame_snapshot = cache.snapshot() |
| state = cache.current_state() |
| rows.append(_build_frame_row(episode_dir, demo, templates, task, state)) |
| cache.restore(frame_snapshot) |
| return rows |
| finally: |
| env.shutdown() |
|
|
|
|
| def _compute_frame_rows_independent( |
| episode_dir: Path, |
| demo: Demo, |
| templates: MotionTemplates, |
| checkpoint_stride: int, |
| frame_indices: Sequence[int], |
| ) -> List[Dict[str, float]]: |
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| initial_snapshot = cache.snapshot() |
| rows: List[Dict[str, float]] = [] |
| for frame_index in sorted({int(index) for index in frame_indices}): |
| cache.restore_to_index(initial_snapshot, 0) |
| cache.step_to(frame_index) |
| state = cache.current_state() |
| rows.append(_build_frame_row(episode_dir, demo, templates, task, state)) |
| return rows |
| finally: |
| env.shutdown() |
|
|
|
|
| def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float: |
| if len(np.unique(y_true)) < 2: |
| return float("nan") |
| return float(roc_auc_score(y_true, y_score)) |
|
|
|
|
| def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float: |
| if len(np.unique(y_true)) < 2: |
| return float("nan") |
| return float(average_precision_score(y_true, y_score)) |
|
|
|
|
| def _first_crossing(values: np.ndarray, threshold: float) -> int: |
| above = np.flatnonzero(values >= threshold) |
| return int(above[0]) if len(above) else -1 |
|
|
|
|
| def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]: |
| diffs = np.diff(binary_values.astype(int)) |
| return int(np.sum(diffs == 1)), int(np.sum(diffs == -1)) |
|
|
|
|
| def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame: |
| key_df = frame_df.iloc[list(keyframes)].copy() |
| key_df["keyframe_ordinal"] = np.arange(len(key_df)) |
| return key_df |
|
|
|
|
| def _persistent_rise_mask(binary_values: np.ndarray, window: int) -> np.ndarray: |
| binary_values = np.asarray(binary_values, dtype=bool) |
| rises = np.zeros(len(binary_values), dtype=bool) |
| if window <= 0: |
| rises[:] = binary_values |
| return rises |
| for index in range(len(binary_values)): |
| segment = binary_values[index : index + window] |
| if len(segment) == window and np.all(segment): |
| rises[index] = True |
| return rises |
|
|
|
|
| def _monotone_after_first(binary_values: np.ndarray) -> np.ndarray: |
| binary_values = np.asarray(binary_values, dtype=bool) |
| monotone = np.zeros(len(binary_values), dtype=bool) |
| if np.any(binary_values): |
| first_true = int(np.flatnonzero(binary_values)[0]) |
| monotone[first_true:] = True |
| return monotone |
|
|
|
|
| def _annotate_phase_columns(frame_df: pd.DataFrame) -> pd.DataFrame: |
| door_speed = np.gradient(frame_df["door_angle"].to_numpy(dtype=float), DEMO_DT) |
| frame_df["door_speed_signed"] = door_speed |
| frame_df["door_speed_abs"] = np.abs(door_speed) |
|
|
| y_ext_raw = ( |
| frame_df["y_ext_raw"].to_numpy(dtype=bool) |
| if "y_ext_raw" in frame_df |
| else frame_df["y_ext"].to_numpy(dtype=bool) |
| ) |
| pregrasp_progress = ( |
| frame_df["pregrasp_progress"].to_numpy(dtype=float) |
| if "pregrasp_progress" in frame_df |
| else frame_df["p_pre"].to_numpy(dtype=float) |
| ) |
| pregrasp_distance = ( |
| frame_df["pregrasp_distance"].to_numpy(dtype=float) |
| if "pregrasp_distance" in frame_df |
| else 1.0 - pregrasp_progress |
| ) |
| pregrasp_speed = -np.gradient(pregrasp_distance, DEMO_DT) |
| frame_df["pregrasp_speed"] = pregrasp_speed |
|
|
| pregrasp_progress_seed = pregrasp_progress >= DEFAULT_PREGRASP_LABEL_PROGRESS_TAU |
| y_pre_seed = ( |
| frame_df["y_pre_raw"].to_numpy(dtype=bool) |
| if "y_pre_raw" in frame_df |
| else pregrasp_progress_seed |
| ) |
| y_pre_binary = _monotone_after_first( |
| _persistent_rise_mask(y_pre_seed, DEFAULT_PREGRASP_PERSISTENCE) |
| ) |
| y_ext_binary = _monotone_after_first( |
| _persistent_rise_mask(y_ext_raw, DEFAULT_READY_PERSISTENCE) |
| ) |
| frame_df["y_pre_seed_raw"] = y_pre_seed.astype(float) |
| frame_df["y_pre_progress_seed"] = pregrasp_progress_seed.astype(float) |
| frame_df["y_pre"] = y_pre_binary.astype(float) |
| frame_df["y_ext"] = y_ext_binary.astype(float) |
| frame_df["y_ext_seed_raw"] = y_ext_raw.astype(float) |
|
|
| frame_df["phase_score"] = np.clip(frame_df["p_pre"].to_numpy(dtype=float), 0.0, 1.0) |
| approach_active = ( |
| (pregrasp_progress >= DEFAULT_APPROACH_PROGRESS_TAU) |
| & (pregrasp_speed >= DEFAULT_APPROACH_SPEED_TAU) |
| ) |
| frame_df["approach_active"] = approach_active.astype(float) |
|
|
| retrieve_onset = _persistent_rise_mask( |
| approach_active & y_pre_binary, DEFAULT_RETRIEVE_PERSISTENCE |
| ) |
| frame_df["retrieve_onset_raw"] = retrieve_onset.astype(float) |
| frame_df["y_retrieve"] = _monotone_after_first(retrieve_onset).astype(float) |
|
|
| ready_seed = np.zeros(len(frame_df), dtype=bool) |
| for index in range(len(frame_df)): |
| window = y_ext_binary[index : index + DEFAULT_READY_PERSISTENCE] |
| if ( |
| len(window) == DEFAULT_READY_PERSISTENCE |
| and np.all(window) |
| and frame_df.iloc[index]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU |
| ): |
| ready_seed[index] = True |
| frame_df["ready_seed_raw"] = ready_seed.astype(float) |
| frame_df["y_ready"] = _monotone_after_first(ready_seed).astype(float) |
|
|
| phase_seed = _persistent_rise_mask( |
| frame_df["p_pre"].to_numpy(dtype=float) >= DEFAULT_PHASE_SCORE_TAU, |
| DEFAULT_RETRIEVE_PERSISTENCE, |
| ) |
| frame_df["phase_seed_raw"] = phase_seed.astype(float) |
| frame_df["phase_switch"] = _monotone_after_first(phase_seed).astype(float) |
| frame_df["p_pre_minus_tau"] = ( |
| frame_df["p_pre"].to_numpy(dtype=float) - DEFAULT_PHASE_SCORE_TAU |
| ) |
| frame_df["p_ext_minus_tau"] = ( |
| frame_df["p_ext"].to_numpy(dtype=float) - DEFAULT_PEXT_TAU |
| ) |
| frame_df["door_speed_margin"] = DEFAULT_DOOR_SPEED_TAU - frame_df["door_speed_abs"] |
| return frame_df |
|
|
|
|
| def _episode_metrics_from_frames( |
| frame_df: pd.DataFrame, |
| key_df: pd.DataFrame, |
| episode_name: str, |
| description: str, |
| interventions: Dict[str, float], |
| ) -> Dict[str, object]: |
| y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int) |
| y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int) |
| y_retrieve_arr = frame_df["y_retrieve"].to_numpy(dtype=int) |
| y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int) |
| p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float) |
| p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float) |
| phase_arr = frame_df["phase_switch"].to_numpy(dtype=int) |
| whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float) |
| door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float) |
| time_arr = frame_df["time_norm"].to_numpy(dtype=float) |
|
|
| ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PPRE_TAU) |
| pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU) |
| phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| retrieve_cross = _first_crossing(y_retrieve_arr.astype(float), 0.5) |
| ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5) |
| phase_rises, phase_falls = _transition_count(phase_arr) |
|
|
| key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| key_retrieve_cross = _first_crossing(key_df["y_retrieve"].to_numpy(dtype=float), 0.5) |
| key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5) |
|
|
| return { |
| "episode_name": episode_name, |
| "description": description, |
| "num_dense_frames": int(len(frame_df)), |
| "num_keyframes": int(len(key_df)), |
| "phase_switch_rises": int(phase_rises), |
| "phase_switch_falls": int(phase_falls), |
| "ppre_cross_frame": int(ppre_cross), |
| "pext_cross_frame": int(pext_cross), |
| "phase_cross_frame": int(phase_cross), |
| "retrieve_cross_frame": int(retrieve_cross), |
| "ready_cross_frame": int(ready_cross), |
| "ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross), |
| "dense_boundary_error_to_retrieve_frames": float(abs(phase_cross - retrieve_cross)) |
| if phase_cross >= 0 and retrieve_cross >= 0 |
| else float("nan"), |
| "dense_boundary_error_frames": float(abs(phase_cross - ready_cross)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "key_boundary_error_to_retrieve_keyframes": float(abs(key_phase_cross - key_retrieve_cross)) |
| if key_phase_cross >= 0 and key_retrieve_cross >= 0 |
| else float("nan"), |
| "key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross)) |
| if key_phase_cross >= 0 and key_ready_cross >= 0 |
| else float("nan"), |
| "auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), |
| "auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)), |
| "auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), |
| "auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)), |
| "auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr), |
| "auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr), |
| "auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr), |
| "auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr), |
| "auroc_phase_yretrieve": _safe_auc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)), |
| "auprc_phase_yretrieve": _safe_auprc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)), |
| "f1_phase_yretrieve": float(f1_score(y_retrieve_arr, phase_arr)) |
| if np.any(y_retrieve_arr) and np.any(phase_arr) |
| else float("nan"), |
| "auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)), |
| "auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)), |
| "f1_phase_yready": float(f1_score(y_ready_arr, phase_arr)) |
| if np.any(y_ready_arr) and np.any(phase_arr) |
| else float("nan"), |
| "baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr), |
| "baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr), |
| "baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr), |
| "baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr), |
| "baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis), |
| "baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis), |
| **interventions, |
| } |
|
|
|
|
| def _isolated_intervention_outcomes( |
| demo: Demo, |
| templates: MotionTemplates, |
| frame_index: int, |
| checkpoint_stride: int, |
| ) -> Dict[str, Tuple[float, bool]]: |
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| cache.step_to(frame_index) |
| snapshot = cache.snapshot() |
|
|
| base_score, base_success, _ = _extract_score_and_success(task, templates) |
| base = (base_score, base_success) |
|
|
| _restore_snapshot(task, snapshot) |
| _open_more_branch(task, templates) |
| open_score, open_success, _ = _extract_score_and_success(task, templates) |
| open_more = (open_score, open_success) |
|
|
| _restore_snapshot(task, snapshot) |
| _hold_open_branch(task, templates) |
| hold_score, hold_success, _ = _extract_score_and_success(task, templates) |
| hold_open = (hold_score, hold_success) |
|
|
| _restore_snapshot(task, snapshot) |
| _wait_branch(task) |
| wait_score, wait_success, _ = _extract_score_and_success(task, templates) |
| wait = (wait_score, wait_success) |
|
|
| return { |
| "base": base, |
| "open_more": open_more, |
| "hold_open": hold_open, |
| "wait": wait, |
| } |
| finally: |
| env.shutdown() |
|
|
|
|
| def _interventional_validity( |
| demo: Demo, |
| templates: MotionTemplates, |
| frame_df: pd.DataFrame, |
| checkpoint_stride: int, |
| ) -> Dict[str, float]: |
| ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool)) |
| ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2 |
| pre_ready_indices = sorted( |
| { |
| max(0, ready_onset - 20), |
| max(0, ready_onset - 10), |
| } |
| ) |
| post_ready_indices = sorted( |
| { |
| ready_onset, |
| min(len(frame_df) - 1, ready_onset + 20), |
| } |
| ) |
| stats = { |
| "pre_ready_open_more_increases_pext": 0, |
| "pre_ready_open_more_trials": 0, |
| "pre_ready_hold_open_increases_pext": 0, |
| "pre_ready_hold_open_trials": 0, |
| "pre_ready_extract_success": 0, |
| "pre_ready_extract_trials": 0, |
| "pre_ready_wait_extract_success": 0, |
| "pre_ready_wait_trials": 0, |
| "post_ready_extract_success": 0, |
| "post_ready_extract_trials": 0, |
| "post_ready_open_more_low_gain": 0, |
| "post_ready_open_more_trials": 0, |
| "post_ready_hold_open_low_gain": 0, |
| "post_ready_hold_open_trials": 0, |
| } |
| for frame_index in [*pre_ready_indices, *post_ready_indices]: |
| outcomes = _isolated_intervention_outcomes( |
| demo=demo, |
| templates=templates, |
| frame_index=frame_index, |
| checkpoint_stride=checkpoint_stride, |
| ) |
| base_pext, base_extract_success = outcomes["base"] |
| pre_ready = frame_index in pre_ready_indices |
| open_pext, _ = outcomes["open_more"] |
| hold_pext, _ = outcomes["hold_open"] |
| _, wait_extract_success = outcomes["wait"] |
|
|
| if pre_ready: |
| stats["pre_ready_open_more_trials"] += 1 |
| stats["pre_ready_hold_open_trials"] += 1 |
| stats["pre_ready_extract_trials"] += 1 |
| stats["pre_ready_wait_trials"] += 1 |
| if open_pext > base_pext: |
| stats["pre_ready_open_more_increases_pext"] += 1 |
| if hold_pext > base_pext: |
| stats["pre_ready_hold_open_increases_pext"] += 1 |
| if base_extract_success: |
| stats["pre_ready_extract_success"] += 1 |
| if wait_extract_success: |
| stats["pre_ready_wait_extract_success"] += 1 |
| else: |
| stats["post_ready_extract_trials"] += 1 |
| stats["post_ready_open_more_trials"] += 1 |
| stats["post_ready_hold_open_trials"] += 1 |
| if base_extract_success: |
| stats["post_ready_extract_success"] += 1 |
| if (open_pext - base_pext) <= 0.05: |
| stats["post_ready_open_more_low_gain"] += 1 |
| if (hold_pext - base_pext) <= 0.05: |
| stats["post_ready_hold_open_low_gain"] += 1 |
| return { |
| key: float(value) for key, value in stats.items() |
| } |
|
|
|
|
| def _analyze_episode( |
| dataset_root: Path, |
| episode_dir: Path, |
| templates: MotionTemplates, |
| template_frames: Dict[str, int], |
| checkpoint_stride: int = 16, |
| max_frames: Optional[int] = None, |
| independent_replay: bool = False, |
| ) -> EpisodeArtifacts: |
| demo = _load_demo(episode_dir) |
| descriptions = _load_descriptions(episode_dir) |
|
|
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| num_frames = len(demo) if max_frames is None else min(len(demo), max_frames) |
|
|
| rows: List[Dict[str, float]] = [] |
| debug_rows: List[Dict[str, object]] = [] |
| initial_snapshot = cache.checkpoints[0] if independent_replay else None |
| for frame_index in range(num_frames): |
| if independent_replay: |
| cache.restore_to_index(initial_snapshot, 0) |
| cache.step_to(frame_index) |
| frame_snapshot = cache.snapshot() if not independent_replay else None |
| state = cache.current_state() |
| row, debug_row = _build_frame_artifacts( |
| episode_dir, demo, templates, task, state |
| ) |
| rows.append(row) |
| debug_rows.append(debug_row) |
| if frame_snapshot is not None: |
| cache.restore(frame_snapshot) |
| if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames: |
| print( |
| f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames", |
| flush=True, |
| ) |
|
|
| frame_df = pd.DataFrame(rows) |
| frame_df = _annotate_phase_columns(frame_df) |
|
|
| keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames] |
| key_df = _keyframe_subset(frame_df, keyframes) |
| finally: |
| env.shutdown() |
|
|
| interventions = _interventional_validity( |
| demo=demo, |
| templates=templates, |
| frame_df=frame_df, |
| checkpoint_stride=checkpoint_stride, |
| ) |
| metrics = _episode_metrics_from_frames( |
| frame_df=frame_df, |
| key_df=key_df, |
| episode_name=episode_dir.name, |
| description=descriptions[0], |
| interventions=interventions, |
| ) |
| return EpisodeArtifacts( |
| episode_name=episode_dir.name, |
| dense=frame_df, |
| keyframes=key_df, |
| metrics=metrics, |
| template_frames=template_frames, |
| debug_rows=debug_rows, |
| ) |
|
|
|
|
| def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]: |
| frame = pd.DataFrame(episode_metrics) |
| numeric = frame.select_dtypes(include=[np.number]) |
| summary = { |
| "num_episodes": int(len(frame)), |
| "mean_metrics": numeric.mean(numeric_only=True).to_dict(), |
| "median_metrics": numeric.median(numeric_only=True).to_dict(), |
| "single_switch_rate": float((frame["phase_switch_rises"] == 1).mean()) |
| if len(frame) |
| else float("nan"), |
| "reversion_rate": float((frame["phase_switch_falls"] > 0).mean()) |
| if len(frame) |
| else float("nan"), |
| "ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"), |
| } |
| return summary |
|
|
|
|
| def run_study( |
| dataset_root: str, |
| result_dir: str, |
| max_episodes: Optional[int] = None, |
| checkpoint_stride: int = 16, |
| max_frames: Optional[int] = None, |
| episode_offset: int = 0, |
| template_episode_index: int = 0, |
| episode_indices: Optional[Sequence[int]] = None, |
| independent_replay: bool = True, |
| per_episode_templates: bool = False, |
| ) -> Dict[str, object]: |
| dataset_path = Path(dataset_root) |
| result_path = Path(result_dir) |
| result_path.mkdir(parents=True, exist_ok=True) |
|
|
| all_episode_dirs = _episode_dirs(dataset_path) |
| if not all_episode_dirs: |
| raise RuntimeError(f"no episodes available under {dataset_root}") |
| if not (0 <= template_episode_index < len(all_episode_dirs)): |
| raise ValueError( |
| f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}" |
| ) |
|
|
| selected_episode_indices: List[int] |
| if episode_indices is not None: |
| selected_episode_indices = [] |
| seen_episode_indices = set() |
| for raw_index in episode_indices: |
| episode_index = int(raw_index) |
| if not (0 <= episode_index < len(all_episode_dirs)): |
| raise ValueError( |
| f"episode index {episode_index} outside available range 0..{len(all_episode_dirs) - 1}" |
| ) |
| if episode_index in seen_episode_indices: |
| continue |
| selected_episode_indices.append(episode_index) |
| seen_episode_indices.add(episode_index) |
| episode_dirs = [all_episode_dirs[index] for index in selected_episode_indices] |
| else: |
| episode_dirs = all_episode_dirs[episode_offset:] |
| if max_episodes is not None: |
| episode_dirs = episode_dirs[:max_episodes] |
| selected_episode_indices = [ |
| all_episode_dirs.index(episode_dir) for episode_dir in episode_dirs |
| ] |
| if not episode_dirs: |
| raise RuntimeError( |
| f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes} episode_indices={episode_indices}" |
| ) |
|
|
| template_episode_dir = all_episode_dirs[template_episode_index] |
| if not per_episode_templates: |
| templates, template_frames = _derive_templates(dataset_path, template_episode_dir) |
| with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump( |
| { |
| "template_mode": "shared", |
| "templates": templates.to_json(), |
| "template_episode": template_episode_dir.name, |
| "template_frames": template_frames, |
| "episode_offset": episode_offset, |
| "selected_episode_indices": selected_episode_indices, |
| }, |
| handle, |
| indent=2, |
| ) |
| else: |
| with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump( |
| { |
| "template_mode": "per_episode", |
| "episode_offset": episode_offset, |
| "selected_episode_indices": selected_episode_indices, |
| }, |
| handle, |
| indent=2, |
| ) |
|
|
| episode_metrics: List[Dict[str, object]] = [] |
| for episode_dir in episode_dirs: |
| if per_episode_templates: |
| episode_templates, episode_template_frames = _derive_templates(dataset_path, episode_dir) |
| with result_path.joinpath(f"{episode_dir.name}.templates.json").open( |
| "w", encoding="utf-8" |
| ) as handle: |
| json.dump( |
| { |
| "template_mode": "per_episode", |
| "templates": episode_templates.to_json(), |
| "template_episode": episode_dir.name, |
| "template_frames": episode_template_frames, |
| }, |
| handle, |
| indent=2, |
| ) |
| else: |
| episode_templates = templates |
| episode_template_frames = template_frames |
| artifacts = _analyze_episode( |
| dataset_path, |
| episode_dir, |
| episode_templates, |
| episode_template_frames, |
| checkpoint_stride=checkpoint_stride, |
| max_frames=max_frames, |
| independent_replay=independent_replay, |
| ) |
| artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False) |
| artifacts.keyframes.to_csv( |
| result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False |
| ) |
| if artifacts.debug_rows is not None: |
| with result_path.joinpath(f"{episode_dir.name}.debug.jsonl").open( |
| "w", encoding="utf-8" |
| ) as handle: |
| for debug_row in artifacts.debug_rows: |
| handle.write(json.dumps(_json_safe(debug_row))) |
| handle.write("\n") |
| with result_path.joinpath(f"{episode_dir.name}.metrics.json").open( |
| "w", encoding="utf-8" |
| ) as handle: |
| json.dump(artifacts.metrics, handle, indent=2) |
| episode_metrics.append(artifacts.metrics) |
|
|
| summary = _aggregate_summary(episode_metrics) |
| with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
| return summary |
|
|
|
|
| def main(argv: Optional[Sequence[str]] = None) -> int: |
| def _parse_episode_indices(value: str) -> List[int]: |
| indices: List[int] = [] |
| for chunk in value.split(","): |
| chunk = chunk.strip() |
| if not chunk: |
| continue |
| indices.append(int(chunk)) |
| if not indices: |
| raise argparse.ArgumentTypeError("episode-indices must contain at least one integer") |
| return indices |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--dataset-root", |
| default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", |
| ) |
| parser.add_argument( |
| "--result-dir", |
| default="/workspace/reveal_retrieve_label_study/results/oven_first_pass", |
| ) |
| parser.add_argument("--max-episodes", type=int, default=1) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--max-frames", type=int) |
| parser.add_argument("--episode-offset", type=int, default=0) |
| parser.add_argument("--template-episode-index", type=int, default=0) |
| parser.add_argument("--episode-indices", type=_parse_episode_indices) |
| parser.add_argument( |
| "--independent-replay", |
| dest="independent_replay", |
| action="store_true", |
| help="Replay each frame from the demo start/checkpoint to avoid simulator drift.", |
| ) |
| parser.add_argument( |
| "--sequential-replay", |
| dest="independent_replay", |
| action="store_false", |
| help="Reuse the live replay state across frames for speed at the cost of more drift.", |
| ) |
| parser.add_argument("--per-episode-templates", action="store_true") |
| parser.set_defaults(independent_replay=True) |
| args = parser.parse_args(argv) |
|
|
| summary = run_study( |
| dataset_root=args.dataset_root, |
| result_dir=args.result_dir, |
| max_episodes=args.max_episodes, |
| checkpoint_stride=args.checkpoint_stride, |
| max_frames=args.max_frames, |
| episode_offset=args.episode_offset, |
| template_episode_index=args.template_episode_index, |
| episode_indices=args.episode_indices, |
| independent_replay=args.independent_replay, |
| per_episode_templates=args.per_episode_templates, |
| ) |
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|