lsnu's picture
Add files using upload-large-folder tool
09fbe32 verified
import argparse
import json
import math
import os
import pickle
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
from natsort import natsorted
from PIL import Image
from pyrep.const import ConfigurationPathAlgorithms as Algos, ObjectType
from pyrep.errors import ConfigurationPathError
from pyrep.objects.joint import Joint
from pyrep.objects.object import Object
from pyrep.objects.shape import Shape
from pyrep.objects.vision_sensor import VisionSensor
from rlbench.action_modes.action_mode import BimanualJointPositionActionMode
from rlbench.action_modes.gripper_action_modes import BimanualDiscrete
from rlbench.backend.const import DEPTH_SCALE
from rlbench.backend.utils import image_to_float_array, rgb_handles_to_mask
from rlbench.bimanual_tasks.bimanual_take_tray_out_of_oven import (
BimanualTakeTrayOutOfOven,
)
from rlbench.demo import Demo
from rlbench.environment import Environment
from rlbench.observation_config import CameraConfig, ObservationConfig
from sklearn.metrics import (
average_precision_score,
f1_score,
roc_auc_score,
)
FULL_CAMERA_SET = [
"front",
"overhead",
"wrist_right",
"wrist_left",
"over_shoulder_left",
"over_shoulder_right",
]
THREE_VIEW_SET = ["front", "wrist_right", "wrist_left"]
DISPLAY = ":99"
DEMO_DT = 0.05
DEFAULT_IMAGE_SIZE = (128, 128)
DEFAULT_PATH_SCALE = 0.75
DEFAULT_PPRE_TAU = 0.45
DEFAULT_VISIBILITY_TAU = 0.35
DEFAULT_PEXT_TAU = 0.45
DEFAULT_DOOR_SPEED_TAU = 0.08
DEFAULT_PHASE_SCORE_TAU = 0.5
DEFAULT_APPROACH_SPEED_TAU = 0.01
DEFAULT_APPROACH_PROGRESS_TAU = 0.02
DEFAULT_PREGRASP_LABEL_PROGRESS_TAU = 0.60
DEFAULT_APPROACH_ONSET_WINDOW = 96
DEFAULT_PREGRASP_CANDIDATE_COUNT = 4
DEFAULT_PLAN_TRIALS = 48
DEFAULT_PLAN_MAX_CONFIGS = 4
DEFAULT_PLAN_MAX_TIME_MS = 10
DEFAULT_PLAN_TRIALS_PER_GOAL = 4
DEFAULT_PLAN_ATTEMPTS = 2
DEFAULT_PLAN_MIN_SUCCESSES = 2
DEFAULT_PREGRASP_ASSIST_PATH_SCALE = 3.0
DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA = 0.01
DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE = 0.02
DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE = 0.55
DEFAULT_DOOR_ASSIST_MIN_QUALITY = 0.3
DEFAULT_READY_PERSISTENCE = 3
DEFAULT_RETRIEVE_PERSISTENCE = 3
DEFAULT_PREGRASP_PERSISTENCE = 3
DEFAULT_MASK_HANDLE_COUNT = 8
DEFAULT_VISIBILITY_POINT_TOLERANCE = 0.03
DEFAULT_GRASP_SECONDARY_FACE_RATIO = 0.35
@dataclass
class SimulatorSnapshot:
task_state: Tuple[bytes, int]
right_arm_tree: bytes
right_gripper_tree: bytes
left_arm_tree: bytes
left_gripper_tree: bytes
right_arm_joints: Tuple[float, ...]
left_arm_joints: Tuple[float, ...]
right_gripper_joints: Tuple[float, ...]
left_gripper_joints: Tuple[float, ...]
right_grasped: Tuple[str, ...]
left_grasped: Tuple[str, ...]
right_grasped_old_parents: Dict[str, Optional[str]]
left_grasped_old_parents: Dict[str, Optional[str]]
grasped_subtree_poses: Dict[str, Tuple[float, ...]]
@dataclass
class ReplayState:
frame_index: int
tray_pose: np.ndarray
door_angle: float
right_gripper_pose: np.ndarray
left_gripper_pose: np.ndarray
right_gripper_open: float
left_gripper_open: float
snapshot: Optional[SimulatorSnapshot] = None
@dataclass
class MotionTemplates:
pregrasp_rel_pose: np.ndarray
grasp_rel_pose: np.ndarray
retreat_rel_poses: List[np.ndarray]
grasp_local_center: np.ndarray
grasp_region_extents: np.ndarray
retriever_home_joints: np.ndarray
hold_open_angle: float
open_more_delta: float
reference_tray_height: float
approach_rel_poses: List[np.ndarray] = field(default_factory=list)
mask_handle_ids: List[int] = field(default_factory=list)
def to_json(self) -> Dict[str, object]:
return {
"approach_rel_poses": [pose.tolist() for pose in self.approach_rel_poses],
"pregrasp_rel_pose": self.pregrasp_rel_pose.tolist(),
"grasp_rel_pose": self.grasp_rel_pose.tolist(),
"retreat_rel_poses": [pose.tolist() for pose in self.retreat_rel_poses],
"grasp_local_center": self.grasp_local_center.tolist(),
"grasp_region_extents": self.grasp_region_extents.tolist(),
"retriever_home_joints": self.retriever_home_joints.tolist(),
"hold_open_angle": float(self.hold_open_angle),
"open_more_delta": float(self.open_more_delta),
"reference_tray_height": float(self.reference_tray_height),
"mask_handle_ids": [int(handle) for handle in self.mask_handle_ids],
}
@classmethod
def from_json(cls, payload: Dict[str, object]) -> "MotionTemplates":
return cls(
pregrasp_rel_pose=np.asarray(payload["pregrasp_rel_pose"], dtype=np.float64),
grasp_rel_pose=np.asarray(payload["grasp_rel_pose"], dtype=np.float64),
retreat_rel_poses=[
np.asarray(pose, dtype=np.float64)
for pose in payload.get("retreat_rel_poses", [])
],
grasp_local_center=np.asarray(payload["grasp_local_center"], dtype=np.float64),
grasp_region_extents=np.asarray(payload["grasp_region_extents"], dtype=np.float64),
retriever_home_joints=np.asarray(
payload.get("retriever_home_joints", [0.0] * 7), dtype=np.float64
),
hold_open_angle=float(payload["hold_open_angle"]),
open_more_delta=float(payload["open_more_delta"]),
reference_tray_height=float(payload["reference_tray_height"]),
approach_rel_poses=[
np.asarray(pose, dtype=np.float64)
for pose in payload.get("approach_rel_poses", [])
],
mask_handle_ids=[int(handle) for handle in payload.get("mask_handle_ids", [])],
)
@dataclass
class EpisodeArtifacts:
episode_name: str
dense: pd.DataFrame
keyframes: pd.DataFrame
metrics: Dict[str, object]
template_frames: Dict[str, int]
debug_rows: Optional[List[Dict[str, object]]] = None
def _configure_runtime() -> None:
os.environ["DISPLAY"] = os.environ.get("DISPLAY", DISPLAY)
os.environ["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim"
ld_library_path = os.environ.get("LD_LIBRARY_PATH", "")
if "/workspace/coppelia_sim" not in ld_library_path:
os.environ["LD_LIBRARY_PATH"] = (
f"{ld_library_path}:/workspace/coppelia_sim"
if ld_library_path
else "/workspace/coppelia_sim"
)
os.environ["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim"
os.environ.setdefault("XDG_RUNTIME_DIR", "/tmp/runtime-root")
def _minimal_camera_config() -> Dict[str, CameraConfig]:
return {
"front": CameraConfig(
rgb=False,
depth=False,
point_cloud=False,
mask=False,
image_size=DEFAULT_IMAGE_SIZE,
)
}
def _make_observation_config() -> ObservationConfig:
return ObservationConfig(
camera_configs=_minimal_camera_config(),
joint_velocities=True,
joint_positions=True,
joint_forces=False,
gripper_open=True,
gripper_pose=True,
gripper_matrix=False,
gripper_joint_positions=True,
gripper_touch_forces=False,
task_low_dim_state=False,
robot_name="bimanual",
)
def _launch_replay_env() -> Environment:
_configure_runtime()
env = Environment(
action_mode=BimanualJointPositionActionMode(),
obs_config=_make_observation_config(),
headless=True,
robot_setup="dual_panda",
)
env.launch()
return env
def _load_demo(episode_dir: Path) -> Demo:
with episode_dir.joinpath("low_dim_obs.pkl").open("rb") as handle:
return pickle.load(handle)
def _load_descriptions(episode_dir: Path) -> List[str]:
with episode_dir.joinpath("variation_descriptions.pkl").open("rb") as handle:
return pickle.load(handle)
def _episode_dirs(dataset_root: Path) -> List[Path]:
episodes_dir = dataset_root.joinpath("all_variations", "episodes")
return [
episodes_dir.joinpath(name)
for name in natsorted(os.listdir(episodes_dir))
if episodes_dir.joinpath(name, "low_dim_obs.pkl").exists()
]
def _camera_file(episode_dir: Path, camera_name: str, kind: str, frame_index: int) -> Path:
if kind == "rgb":
return episode_dir.joinpath(f"{camera_name}_rgb", f"rgb_{frame_index:04d}.png")
if kind == "depth":
return episode_dir.joinpath(
f"{camera_name}_depth", f"depth_{frame_index:04d}.png"
)
if kind == "mask":
return episode_dir.joinpath(f"{camera_name}_mask", f"mask_{frame_index:04d}.png")
raise ValueError(f"unknown kind: {kind}")
def _load_depth_meters(episode_dir: Path, demo: Demo, frame_index: int, camera_name: str) -> np.ndarray:
image = Image.open(_camera_file(episode_dir, camera_name, "depth", frame_index))
depth = image_to_float_array(image, DEPTH_SCALE)
near = demo[frame_index].misc[f"{camera_name}_camera_near"]
far = demo[frame_index].misc[f"{camera_name}_camera_far"]
return near + depth * (far - near)
def _load_mask(episode_dir: Path, frame_index: int, camera_name: str) -> np.ndarray:
image = np.asarray(
Image.open(_camera_file(episode_dir, camera_name, "mask", frame_index)),
dtype=np.float32,
)
if image.ndim == 2:
image = np.repeat(image[..., None], 3, axis=2)
if image.max() > 1.0:
image /= 255.0
return rgb_handles_to_mask(image.copy())
def _capture_snapshot(task) -> SimulatorSnapshot:
robot = task._scene.robot
right_grasped = tuple(robot.right_gripper.get_grasped_objects())
left_grasped = tuple(robot.left_gripper.get_grasped_objects())
grasped_subtree_poses: Dict[str, Tuple[float, ...]] = {}
for grasped_object in right_grasped + left_grasped:
for subtree_object in grasped_object.get_objects_in_tree(exclude_base=False):
grasped_subtree_poses[subtree_object.get_name()] = tuple(
float(value) for value in subtree_object.get_pose()
)
return SimulatorSnapshot(
task_state=task._task.get_state(),
right_arm_tree=robot.right_arm.get_configuration_tree(),
right_gripper_tree=robot.right_gripper.get_configuration_tree(),
left_arm_tree=robot.left_arm.get_configuration_tree(),
left_gripper_tree=robot.left_gripper.get_configuration_tree(),
right_arm_joints=tuple(float(value) for value in robot.right_arm.get_joint_positions()),
left_arm_joints=tuple(float(value) for value in robot.left_arm.get_joint_positions()),
right_gripper_joints=tuple(float(value) for value in robot.right_gripper.get_joint_positions()),
left_gripper_joints=tuple(float(value) for value in robot.left_gripper.get_joint_positions()),
right_grasped=tuple(obj.get_name() for obj in right_grasped),
left_grasped=tuple(obj.get_name() for obj in left_grasped),
right_grasped_old_parents={
obj.get_name(): (
None if old_parent is None else old_parent.get_name()
)
for obj, old_parent in zip(right_grasped, robot.right_gripper._old_parents)
},
left_grasped_old_parents={
obj.get_name(): (
None if old_parent is None else old_parent.get_name()
)
for obj, old_parent in zip(left_grasped, robot.left_gripper._old_parents)
},
grasped_subtree_poses=grasped_subtree_poses,
)
def _restore_snapshot(task, snapshot: SimulatorSnapshot) -> None:
robot = task._scene.robot
snapshot_has_grasp = bool(snapshot.right_grasped or snapshot.left_grasped)
if not snapshot_has_grasp:
robot.release_gripper()
try:
task._task.restore_state(snapshot.task_state)
except RuntimeError:
task._pyrep.set_configuration_tree(snapshot.task_state[0])
task._pyrep.set_configuration_tree(snapshot.right_arm_tree)
task._pyrep.set_configuration_tree(snapshot.right_gripper_tree)
task._pyrep.set_configuration_tree(snapshot.left_arm_tree)
task._pyrep.set_configuration_tree(snapshot.left_gripper_tree)
robot.right_arm.set_joint_positions(list(snapshot.right_arm_joints), disable_dynamics=True)
robot.left_arm.set_joint_positions(list(snapshot.left_arm_joints), disable_dynamics=True)
robot.right_arm.set_joint_target_positions(list(snapshot.right_arm_joints))
robot.left_arm.set_joint_target_positions(list(snapshot.left_arm_joints))
robot.right_gripper.set_joint_positions(
list(snapshot.right_gripper_joints), disable_dynamics=True
)
robot.left_gripper.set_joint_positions(
list(snapshot.left_gripper_joints), disable_dynamics=True
)
robot.right_gripper.set_joint_target_positions(list(snapshot.right_gripper_joints))
robot.left_gripper.set_joint_target_positions(list(snapshot.left_gripper_joints))
if snapshot_has_grasp:
robot.release_gripper()
for name, pose in snapshot.grasped_subtree_poses.items():
Object.get_object(name).set_pose(np.asarray(pose, dtype=np.float64))
task._pyrep.step()
if not snapshot_has_grasp:
robot.release_gripper()
for name in snapshot.right_grasped:
_force_attach_grasped_object(
robot.right_gripper,
Shape(name),
snapshot.right_grasped_old_parents.get(name),
)
for name in snapshot.left_grasped:
_force_attach_grasped_object(
robot.left_gripper,
Shape(name),
snapshot.left_grasped_old_parents.get(name),
)
task._pyrep.step()
def _force_attach_grasped_object(
gripper, obj: Shape, old_parent_name: Optional[str]
) -> None:
if any(grasped.get_name() == obj.get_name() for grasped in gripper.get_grasped_objects()):
return
gripper._grasped_objects.append(obj)
old_parent = obj.get_parent() if old_parent_name is None else Object.get_object(old_parent_name)
gripper._old_parents.append(old_parent)
obj.set_parent(gripper._attach_point, keep_in_place=True)
def _build_joint_action(target_obs) -> np.ndarray:
def _joint_vector(value, fallback) -> np.ndarray:
array = np.asarray(fallback if value is None else value, dtype=np.float64)
if array.ndim != 1 or array.shape[0] != 7:
array = np.asarray(fallback, dtype=np.float64)
if array.ndim != 1 or array.shape[0] != 7:
raise ValueError(f"invalid joint vector shape: {array.shape}")
return array
right = _joint_vector(
target_obs.misc.get("right_executed_demo_joint_position_action"),
target_obs.right.joint_positions,
)
left = _joint_vector(
target_obs.misc.get("left_executed_demo_joint_position_action"),
target_obs.left.joint_positions,
)
return np.concatenate(
[
right,
np.array([target_obs.right.gripper_open], dtype=np.float64),
left,
np.array([target_obs.left.gripper_open], dtype=np.float64),
]
)
class ReplayCache:
def __init__(self, task, demo: Demo, checkpoint_stride: int = 16):
self.task = task
self.demo = demo
self.checkpoint_stride = checkpoint_stride
self.current_index = 0
self.current_obs = None
self.checkpoints: Dict[int, SimulatorSnapshot] = {}
self.discrete_gripper = BimanualDiscrete()
def reset(self) -> None:
_, self.current_obs = self.task.reset_to_demo(self.demo)
self.current_index = 0
self.checkpoints = {0: _capture_snapshot(self.task)}
def step_to(self, target_index: int):
if target_index < self.current_index:
checkpoint_index = max(i for i in self.checkpoints if i <= target_index)
_restore_snapshot(self.task, self.checkpoints[checkpoint_index])
self.current_index = checkpoint_index
self.current_obs = self._observation_from_scene()
while self.current_index < target_index:
next_index = self.current_index + 1
self.task._action_mode.action(
self.task._scene, _build_joint_action(self.demo[next_index])
)
self._apply_gripper_replay(self.demo[next_index])
self.current_obs = self._observation_from_scene()
self.current_index = next_index
if self.current_index % self.checkpoint_stride == 0:
self.checkpoints[self.current_index] = _capture_snapshot(self.task)
return self.current_obs
def current_state(self) -> ReplayState:
return ReplayState(
frame_index=self.current_index,
tray_pose=Shape("tray").get_pose(),
door_angle=Joint("oven_door_joint").get_joint_position(),
right_gripper_pose=self.current_obs.right.gripper_pose.copy(),
left_gripper_pose=self.current_obs.left.gripper_pose.copy(),
right_gripper_open=float(self.current_obs.right.gripper_open),
left_gripper_open=float(self.current_obs.left.gripper_open),
snapshot=None,
)
def snapshot(self) -> SimulatorSnapshot:
return _capture_snapshot(self.task)
def restore(self, snapshot: SimulatorSnapshot) -> None:
_restore_snapshot(self.task, snapshot)
self.current_obs = self._observation_from_scene()
def restore_to_index(self, snapshot: SimulatorSnapshot, frame_index: int) -> None:
self.restore(snapshot)
self.current_index = frame_index
def _observation_from_scene(self):
return self.task._scene.get_observation()
def _apply_gripper_replay(self, target_obs) -> None:
desired = np.array(
[target_obs.right.gripper_open, target_obs.left.gripper_open],
dtype=np.float64,
)
current = np.array(
[
float(all(x > 0.9 for x in self.task._scene.robot.right_gripper.get_open_amount())),
float(all(x > 0.9 for x in self.task._scene.robot.left_gripper.get_open_amount())),
],
dtype=np.float64,
)
if not np.allclose(current, desired):
self.discrete_gripper.action(self.task._scene, desired)
self.current_obs = self._observation_from_scene()
self._maintain_grasp_state(desired)
self.current_obs = self._observation_from_scene()
def _maintain_grasp_state(self, desired: np.ndarray) -> None:
scene = self.task._scene
robot = scene.robot
if float(desired[0]) <= 0.5:
left_grasped = {obj.get_name() for obj in robot.left_gripper.get_grasped_objects()}
for graspable in scene.task.get_graspable_objects():
if graspable.get_name() not in left_grasped:
robot.right_gripper.grasp(graspable)
elif robot.right_gripper.get_grasped_objects():
robot.right_gripper.release()
if float(desired[1]) <= 0.5:
right_grasped = {obj.get_name() for obj in robot.right_gripper.get_grasped_objects()}
for graspable in scene.task.get_graspable_objects():
if graspable.get_name() not in right_grasped:
robot.left_gripper.grasp(graspable)
elif robot.left_gripper.get_grasped_objects():
robot.left_gripper.release()
def _quat_to_matrix(quat: Sequence[float]) -> np.ndarray:
x, y, z, w = quat
xx, yy, zz = x * x, y * y, z * z
xy, xz, yz = x * y, x * z, y * z
wx, wy, wz = w * x, w * y, w * z
return np.array(
[
[1 - 2 * (yy + zz), 2 * (xy - wz), 2 * (xz + wy)],
[2 * (xy + wz), 1 - 2 * (xx + zz), 2 * (yz - wx)],
[2 * (xz - wy), 2 * (yz + wx), 1 - 2 * (xx + yy)],
],
dtype=np.float64,
)
def _matrix_to_quat(matrix: np.ndarray) -> np.ndarray:
trace = np.trace(matrix)
if trace > 0.0:
s = math.sqrt(trace + 1.0) * 2.0
w = 0.25 * s
x = (matrix[2, 1] - matrix[1, 2]) / s
y = (matrix[0, 2] - matrix[2, 0]) / s
z = (matrix[1, 0] - matrix[0, 1]) / s
elif matrix[0, 0] > matrix[1, 1] and matrix[0, 0] > matrix[2, 2]:
s = math.sqrt(1.0 + matrix[0, 0] - matrix[1, 1] - matrix[2, 2]) * 2.0
w = (matrix[2, 1] - matrix[1, 2]) / s
x = 0.25 * s
y = (matrix[0, 1] + matrix[1, 0]) / s
z = (matrix[0, 2] + matrix[2, 0]) / s
elif matrix[1, 1] > matrix[2, 2]:
s = math.sqrt(1.0 + matrix[1, 1] - matrix[0, 0] - matrix[2, 2]) * 2.0
w = (matrix[0, 2] - matrix[2, 0]) / s
x = (matrix[0, 1] + matrix[1, 0]) / s
y = 0.25 * s
z = (matrix[1, 2] + matrix[2, 1]) / s
else:
s = math.sqrt(1.0 + matrix[2, 2] - matrix[0, 0] - matrix[1, 1]) * 2.0
w = (matrix[1, 0] - matrix[0, 1]) / s
x = (matrix[0, 2] + matrix[2, 0]) / s
y = (matrix[1, 2] + matrix[2, 1]) / s
z = 0.25 * s
quat = np.array([x, y, z, w], dtype=np.float64)
return quat / np.linalg.norm(quat)
def _pose_to_matrix(pose: Sequence[float]) -> np.ndarray:
matrix = np.eye(4, dtype=np.float64)
matrix[:3, :3] = _quat_to_matrix(pose[3:])
matrix[:3, 3] = pose[:3]
return matrix
def _matrix_to_pose(matrix: np.ndarray) -> np.ndarray:
return np.concatenate([matrix[:3, 3], _matrix_to_quat(matrix[:3, :3])], axis=0)
def _relative_pose(reference_pose: Sequence[float], target_pose: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
target = _pose_to_matrix(target_pose)
rel = np.linalg.inv(reference) @ target
return _matrix_to_pose(rel)
def _apply_relative_pose(reference_pose: Sequence[float], relative_pose: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
relative = _pose_to_matrix(relative_pose)
world = reference @ relative
return _matrix_to_pose(world)
def _orientation_error_degrees(current_quat: Sequence[float], target_quat: Sequence[float]) -> float:
current = np.asarray(current_quat, dtype=np.float64)
target = np.asarray(target_quat, dtype=np.float64)
current_norm = float(np.linalg.norm(current))
target_norm = float(np.linalg.norm(target))
if current_norm <= 1e-9 or target_norm <= 1e-9:
return 180.0
current = current / current_norm
target = target / target_norm
dot = float(np.clip(abs(current @ target), -1.0, 1.0))
return float(np.degrees(2.0 * np.arccos(dot)))
def _world_to_local(reference_pose: Sequence[float], point_world: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
point = np.concatenate([np.asarray(point_world, dtype=np.float64), [1.0]])
return (np.linalg.inv(reference) @ point)[:3]
def _local_to_world(reference_pose: Sequence[float], point_local: Sequence[float]) -> np.ndarray:
reference = _pose_to_matrix(reference_pose)
point = np.concatenate([np.asarray(point_local, dtype=np.float64), [1.0]])
return (reference @ point)[:3]
def _tray_bbox_local() -> np.ndarray:
return np.asarray(Shape("tray").get_bounding_box(), dtype=np.float64)
def _dedupe_points(points: Iterable[np.ndarray], precision: int = 5) -> List[np.ndarray]:
unique: List[np.ndarray] = []
seen = set()
for point in points:
array = np.asarray(point, dtype=np.float64)
key = tuple(np.round(array, precision))
if key in seen:
continue
seen.add(key)
unique.append(array)
return unique
def _sample_box_face_points_local(
bbox: np.ndarray,
axis: int,
face_index: int,
center: np.ndarray,
extents: np.ndarray,
primary_samples: int,
secondary_samples: int,
) -> List[np.ndarray]:
center = np.asarray(center, dtype=np.float64)
extents = np.asarray(extents, dtype=np.float64)
tangent_axes = [idx for idx in range(3) if idx != axis]
tangent_ranges: List[np.ndarray] = []
for tangent_idx, tangent_axis in enumerate(tangent_axes):
lo = max(bbox[tangent_axis * 2], center[tangent_axis] - extents[tangent_axis])
hi = min(bbox[tangent_axis * 2 + 1], center[tangent_axis] + extents[tangent_axis])
if hi < lo:
lo = hi = float(np.clip(center[tangent_axis], bbox[tangent_axis * 2], bbox[tangent_axis * 2 + 1]))
tangent_ranges.append(
np.linspace(lo, hi, primary_samples if tangent_idx == 0 else secondary_samples)
)
face_value = bbox[axis * 2 + face_index]
points: List[np.ndarray] = []
for first in tangent_ranges[0]:
for second in tangent_ranges[1]:
point = center.copy()
point[axis] = face_value
point[tangent_axes[0]] = first
point[tangent_axes[1]] = second
points.append(point)
return points
def _grasp_surface_specs(
templates: MotionTemplates, bbox: np.ndarray
) -> Tuple[np.ndarray, List[Tuple[int, int]]]:
center = np.asarray(templates.grasp_local_center, dtype=np.float64).copy()
mins = np.asarray([bbox[0], bbox[2], bbox[4]], dtype=np.float64)
maxs = np.asarray([bbox[1], bbox[3], bbox[5]], dtype=np.float64)
center = np.clip(center, mins, maxs)
approach = (
np.asarray(templates.grasp_rel_pose[:3], dtype=np.float64)
- np.asarray(templates.pregrasp_rel_pose[:3], dtype=np.float64)
)
if np.linalg.norm(approach) < 1e-6:
approach = center - 0.5 * (mins + maxs)
ranked_axes = np.argsort(-np.abs(approach))
primary_mag = float(abs(approach[ranked_axes[0]])) if len(ranked_axes) else 0.0
specs: List[Tuple[int, int]] = []
for rank, axis in enumerate(ranked_axes.tolist()):
magnitude = float(abs(approach[axis]))
if magnitude < 1e-6:
continue
if rank > 0 and magnitude < max(1e-6, primary_mag * DEFAULT_GRASP_SECONDARY_FACE_RATIO):
continue
move_positive = float(approach[axis]) >= 0.0
face_index = 0 if move_positive else 1
spec = (int(axis), int(face_index))
if spec not in specs:
specs.append(spec)
if len(specs) >= 2:
break
if specs:
return center, specs
# Fallback to the face closest to the demonstrated grasp center.
face_distances = []
for axis in range(3):
face_distances.append((abs(center[axis] - bbox[axis * 2]), axis, 0))
face_distances.append((abs(center[axis] - bbox[axis * 2 + 1]), axis, 1))
_, axis, face_index = min(face_distances, key=lambda item: item[0])
return center, [(int(axis), int(face_index))]
def _first_transition(demo: Demo, side: str, open_to_closed: bool) -> int:
values = [getattr(demo[i], side).gripper_open for i in range(len(demo))]
for i in range(1, len(values)):
if open_to_closed and values[i - 1] > 0.5 and values[i] < 0.5:
return i
if not open_to_closed and values[i - 1] < 0.5 and values[i] > 0.5:
return i
raise RuntimeError(f"no gripper transition found for {side}")
def _detect_pregrasp_approach_onset(
cache: ReplayCache,
left_close: int,
pregrasp_rel_pose: np.ndarray,
) -> int:
distances: List[float] = []
for frame_index in range(left_close):
cache.step_to(frame_index)
tray_pose = Shape("tray").get_pose()
target_pose = _apply_relative_pose(tray_pose, pregrasp_rel_pose)
current_pose = np.asarray(cache.current_obs.left.gripper_pose, dtype=np.float64)
distances.append(float(np.linalg.norm(current_pose[:3] - target_pose[:3])))
distances_arr = np.asarray(distances, dtype=np.float64)
if len(distances_arr) < 8:
return max(0, left_close - 24)
search_start = max(8, left_close - DEFAULT_APPROACH_ONSET_WINDOW)
for frame_index in range(search_start, max(search_start, left_close - 6)):
window = distances_arr[frame_index : frame_index + 6]
if len(window) < 6:
break
short_drop = float(window[0] - window[-1])
if short_drop < 0.01:
continue
if np.mean(np.diff(window)) < -0.002:
return frame_index
return max(search_start, left_close - 24)
def _derive_templates(dataset_root: Path, template_episode_dir: Path) -> Tuple[MotionTemplates, Dict[str, int]]:
env = _launch_replay_env()
try:
demo = _load_demo(template_episode_dir)
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=8)
cache.reset()
base_pose = task._task.get_base().get_pose()
left_close = _first_transition(demo, "left", open_to_closed=True)
left_open = _first_transition(demo, "left", open_to_closed=False)
pregrasp_index = max(0, left_close - 5)
right_close = _first_transition(demo, "right", open_to_closed=True)
right_open = _first_transition(demo, "right", open_to_closed=False)
bootstrap_indices = sorted({pregrasp_index, left_close, right_close, right_open, left_open})
states: Dict[int, ReplayState] = {}
for frame_index in bootstrap_indices:
cache.step_to(frame_index)
states[frame_index] = cache.current_state()
pregrasp_rel_pose = _relative_pose(
states[pregrasp_index].tray_pose, states[pregrasp_index].left_gripper_pose
)
grasp_rel_pose = _relative_pose(
states[left_close].tray_pose, states[left_close].left_gripper_pose
)
approach_onset = _detect_pregrasp_approach_onset(cache, left_close, pregrasp_rel_pose)
approach_indices = sorted(
{
*np.linspace(
approach_onset,
max(approach_onset, pregrasp_index),
num=min(6, max(2, pregrasp_index - approach_onset + 1)),
dtype=int,
).tolist(),
pregrasp_index,
max(0, left_close - 2),
}
)
retreat_indices = sorted(
{
*[
min(len(demo) - 1, max(left_close + 1, left_close + offset))
for offset in (5, 10, 15, 20, 30, 40, 50)
],
max(left_close + 1, left_open - 20),
max(left_close + 1, left_open - 10),
max(left_close + 1, left_open - 5),
left_open,
}
)
interesting = sorted(
{
*bootstrap_indices,
*approach_indices,
*retreat_indices,
}
)
for frame_index in interesting:
if frame_index not in states:
cache.step_to(frame_index)
states[frame_index] = cache.current_state()
approach_rel_poses = [
_relative_pose(states[index].tray_pose, states[index].left_gripper_pose)
for index in approach_indices
if index < left_close
]
retreat_rel_poses = [
_relative_pose(base_pose, states[index].left_gripper_pose)
for index in retreat_indices
if index > left_close
]
grasp_local_center = _world_to_local(
states[left_close].tray_pose, states[left_close].left_gripper_pose[:3]
)
templates = MotionTemplates(
pregrasp_rel_pose=pregrasp_rel_pose,
grasp_rel_pose=grasp_rel_pose,
retreat_rel_poses=retreat_rel_poses,
grasp_local_center=grasp_local_center,
grasp_region_extents=np.array([0.03, 0.015, 0.004], dtype=np.float64),
retriever_home_joints=np.asarray(demo[0].left.joint_positions, dtype=np.float64),
hold_open_angle=float(states[right_open].door_angle),
open_more_delta=max(
0.12,
float(states[right_open].door_angle - states[right_close].door_angle) * 0.25,
),
reference_tray_height=float(states[left_close].tray_pose[2]),
approach_rel_poses=approach_rel_poses,
)
templates.mask_handle_ids = _infer_tray_mask_handle_ids(
episode_dir=template_episode_dir,
demo=demo,
cache=cache,
templates=templates,
reference_frames=approach_indices[-4:] + [left_close],
)
template_frames = {
"pregrasp": pregrasp_index,
"grasp": left_close,
"right_close": right_close,
"right_open": right_open,
"approach": approach_indices,
"retreat": retreat_indices,
}
return templates, template_frames
finally:
env.shutdown()
def _camera_projection(extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
camera_pos = extrinsics[:3, 3:4]
rotation = extrinsics[:3, :3]
world_to_camera = np.concatenate([rotation.T, -(rotation.T @ camera_pos)], axis=1)
projection = intrinsics @ world_to_camera
return projection, world_to_camera
def _project_points(points_world: np.ndarray, extrinsics: np.ndarray, intrinsics: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
projection, world_to_camera = _camera_projection(extrinsics, intrinsics)
homogeneous = np.concatenate([points_world, np.ones((len(points_world), 1))], axis=1)
camera_xyz = (world_to_camera @ homogeneous.T).T
image_xyz = (projection @ homogeneous.T).T
uv = image_xyz[:, :2] / image_xyz[:, 2:3]
return uv, camera_xyz
def _sample_grasp_points(templates: MotionTemplates, tray_pose: np.ndarray) -> np.ndarray:
bbox = _tray_bbox_local()
center, face_specs = _grasp_surface_specs(templates, bbox)
extents = np.asarray(templates.grasp_region_extents, dtype=np.float64)
points_local: List[np.ndarray] = []
for axis, face_index in face_specs:
points_local.extend(
_sample_box_face_points_local(
bbox=bbox,
axis=axis,
face_index=face_index,
center=center,
extents=extents,
primary_samples=9,
secondary_samples=5,
)
)
points_local = _dedupe_points(points_local)
return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64)
def _sample_full_tray_points(tray_pose: np.ndarray) -> np.ndarray:
bbox = _tray_bbox_local()
center = np.asarray(
[
0.5 * (bbox[0] + bbox[1]),
0.5 * (bbox[2] + bbox[3]),
0.5 * (bbox[4] + bbox[5]),
],
dtype=np.float64,
)
extents = np.asarray(
[
0.5 * (bbox[1] - bbox[0]),
0.5 * (bbox[3] - bbox[2]),
0.5 * (bbox[5] - bbox[4]),
],
dtype=np.float64,
)
points_local: List[np.ndarray] = []
for axis in range(3):
for face_index in [0, 1]:
points_local.extend(
_sample_box_face_points_local(
bbox=bbox,
axis=axis,
face_index=face_index,
center=center,
extents=extents,
primary_samples=8,
secondary_samples=8,
)
)
points_local = _dedupe_points(points_local)
return np.array([_local_to_world(tray_pose, point) for point in points_local], dtype=np.float64)
def _infer_tray_mask_handle_ids(
episode_dir: Path,
demo: Demo,
cache: ReplayCache,
templates: MotionTemplates,
reference_frames: Sequence[int],
max_handles: int = DEFAULT_MASK_HANDLE_COUNT,
) -> List[int]:
counts: Dict[int, int] = {}
fallback_counts: Dict[int, int] = {}
unique_frames = sorted({int(frame_index) for frame_index in reference_frames})
for frame_index in unique_frames:
cache.step_to(frame_index)
grasp_points = _sample_grasp_points(templates, Shape("tray").get_pose())
tray_points = _sample_full_tray_points(Shape("tray").get_pose())
for camera_name in FULL_CAMERA_SET:
mask = _load_mask(episode_dir, frame_index, camera_name)
point_cloud, extrinsics, intrinsics = _camera_point_cloud(
episode_dir, demo, frame_index, camera_name
)
for points_world in [grasp_points, tray_points]:
projected, visible = _visibility_projection_details(
points_world, point_cloud, extrinsics, intrinsics
)
for px, py in visible:
handle = int(mask[py, px])
if handle != 0:
counts[handle] = counts.get(handle, 0) + 1
for px, py in projected:
handle = int(mask[py, px])
if handle != 0:
fallback_counts[handle] = fallback_counts.get(handle, 0) + 1
ranked_source = counts if counts else fallback_counts
if not ranked_source:
return []
ranked = sorted(ranked_source.items(), key=lambda item: item[1], reverse=True)
top_count = ranked[0][1]
selected = [
handle
for handle, count in ranked
if count >= max(2, int(math.ceil(top_count * 0.15)))
][:max_handles]
return selected if selected else [ranked[0][0]]
def _camera_point_cloud(
episode_dir: Path,
demo: Demo,
frame_index: int,
camera_name: str,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
depth = _load_depth_meters(episode_dir, demo, frame_index, camera_name)
extrinsics = demo[frame_index].misc[f"{camera_name}_camera_extrinsics"]
intrinsics = demo[frame_index].misc[f"{camera_name}_camera_intrinsics"]
point_cloud = VisionSensor.pointcloud_from_depth_and_camera_params(
depth, extrinsics, intrinsics
)
return point_cloud, extrinsics, intrinsics
def _visibility_projection_details(
points_world: np.ndarray,
point_cloud_world: np.ndarray,
extrinsics: np.ndarray,
intrinsics: np.ndarray,
tolerance: float = DEFAULT_VISIBILITY_POINT_TOLERANCE,
) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]:
uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics)
height, width = point_cloud_world.shape[:2]
projected: List[Tuple[int, int]] = []
visible: List[Tuple[int, int]] = []
for point_world, (u, v), (_, _, camera_depth) in zip(points_world, uv, camera_xyz):
if camera_depth <= 0 or not (0 <= u < width and 0 <= v < height):
continue
px = min(max(int(round(float(u))), 0), width - 1)
py = min(max(int(round(float(v))), 0), height - 1)
projected.append((px, py))
scene_point = np.asarray(point_cloud_world[py, px], dtype=np.float64)
if not np.isfinite(scene_point).all():
continue
if float(np.linalg.norm(scene_point - point_world)) <= tolerance:
visible.append((px, py))
return projected, visible
def _point_visibility_ratio_from_point_cloud(
points_world: np.ndarray,
point_cloud_world: np.ndarray,
extrinsics: np.ndarray,
intrinsics: np.ndarray,
) -> float:
projected, visible = _visibility_projection_details(
points_world, point_cloud_world, extrinsics, intrinsics
)
return float(len(visible) / len(projected)) if projected else 0.0
def _point_visibility_stats_from_point_cloud(
points_world: np.ndarray,
point_cloud_world: np.ndarray,
extrinsics: np.ndarray,
intrinsics: np.ndarray,
) -> Dict[str, float]:
projected, visible = _visibility_projection_details(
points_world, point_cloud_world, extrinsics, intrinsics
)
projected_count = int(len(projected))
visible_count = int(len(visible))
ratio = float(visible_count / projected_count) if projected_count else 0.0
return {
"ratio": ratio,
"projected_count": projected_count,
"visible_count": visible_count,
}
def _mask_visibility_ratio(
points_world: np.ndarray,
mask: np.ndarray,
handle_ids: Sequence[int],
extrinsics: np.ndarray,
intrinsics: np.ndarray,
) -> float:
if not handle_ids:
return 0.0
handle_set = {int(handle) for handle in handle_ids}
uv, camera_xyz = _project_points(points_world, extrinsics, intrinsics)
height, width = mask.shape
visible = 0
total = 0
for (u, v), (_, _, camera_depth) in zip(uv, camera_xyz):
if camera_depth <= 0:
continue
if not (0 <= u < width and 0 <= v < height):
continue
total += 1
px = int(round(u))
py = int(round(v))
px = min(max(px, 0), width - 1)
py = min(max(py, 0), height - 1)
if int(mask[py, px]) in handle_set:
visible += 1
return float(visible / total) if total else 0.0
def _union_visibility(values: Iterable[float]) -> float:
product = 1.0
for value in values:
product *= 1.0 - float(value)
return 1.0 - product
def _sequence_summary(values: Sequence[float]) -> Dict[str, float]:
if not values:
return {
"count": 0.0,
"mean": 0.0,
"median": 0.0,
"min": 0.0,
"max": 0.0,
}
array = np.asarray(values, dtype=np.float64)
return {
"count": float(array.size),
"mean": float(np.mean(array)),
"median": float(np.median(array)),
"min": float(np.min(array)),
"max": float(np.max(array)),
}
def _json_safe(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple)):
return [_json_safe(item) for item in value]
if isinstance(value, np.generic):
return _json_safe(value.item())
if isinstance(value, np.ndarray):
return [_json_safe(item) for item in value.tolist()]
if isinstance(value, float):
return None if not math.isfinite(value) else value
return value
def _keypoint_discovery(demo: Demo, stopping_delta: float = 0.1) -> List[int]:
keypoints: List[int] = []
right_prev = demo[0].right.gripper_open
left_prev = demo[0].left.gripper_open
stopped_buffer = 0
for i, obs in enumerate(demo._observations):
if i < 2 or i >= len(demo) - 1:
right_stopped = left_stopped = False
else:
right_stopped = (
np.allclose(obs.right.joint_velocities, 0, atol=stopping_delta)
and obs.right.gripper_open == demo[i + 1].right.gripper_open
and obs.right.gripper_open == demo[i - 1].right.gripper_open
and demo[i - 2].right.gripper_open == demo[i - 1].right.gripper_open
)
left_stopped = (
np.allclose(obs.left.joint_velocities, 0, atol=stopping_delta)
and obs.left.gripper_open == demo[i + 1].left.gripper_open
and obs.left.gripper_open == demo[i - 1].left.gripper_open
and demo[i - 2].left.gripper_open == demo[i - 1].left.gripper_open
)
stopped = stopped_buffer <= 0 and right_stopped and left_stopped
stopped_buffer = 4 if stopped else stopped_buffer - 1
last = i == len(demo) - 1
state_changed = (
obs.right.gripper_open != right_prev or obs.left.gripper_open != left_prev
)
if i != 0 and (state_changed or last or stopped):
keypoints.append(i)
right_prev = obs.right.gripper_open
left_prev = obs.left.gripper_open
if len(keypoints) > 1 and (keypoints[-1] - 1) == keypoints[-2]:
keypoints.pop(-2)
return keypoints
def _plan_path(scene, arm_name: str, pose: np.ndarray, ignore_collisions: bool = False):
arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm
try:
return arm.get_path(
pose[:3],
quaternion=pose[3:],
ignore_collisions=ignore_collisions,
trials=DEFAULT_PLAN_TRIALS,
max_configs=DEFAULT_PLAN_MAX_CONFIGS,
max_time_ms=DEFAULT_PLAN_MAX_TIME_MS,
trials_per_goal=DEFAULT_PLAN_TRIALS_PER_GOAL,
algorithm=Algos.RRTConnect,
)
except Exception:
return None
def _stable_plan(
scene,
arm_name: str,
pose: np.ndarray,
ignore_collisions: bool = False,
) -> Tuple[Optional[object], float, float]:
attempts = max(1, DEFAULT_PLAN_ATTEMPTS)
successful_paths: List[Tuple[float, object]] = []
for _ in range(attempts):
path = _plan_path(scene, arm_name, pose, ignore_collisions=ignore_collisions)
length = _path_length(path)
if path is None or not np.isfinite(length):
continue
successful_paths.append((float(length), path))
if not successful_paths:
return None, math.inf, 0.0
successful_paths.sort(key=lambda item: item[0])
stable_length = float(np.median([length for length, _ in successful_paths]))
reliability = float(len(successful_paths) / attempts)
return successful_paths[0][1], stable_length, reliability
def _plan_is_reliable(reliability: float) -> bool:
attempts = max(1, DEFAULT_PLAN_ATTEMPTS)
required = min(attempts, max(1, DEFAULT_PLAN_MIN_SUCCESSES))
return reliability >= (required / attempts)
def _path_length(path) -> float:
if path is None:
return math.inf
try:
return float(path._get_path_point_lengths()[-1])
except Exception:
return math.inf
def _dedupe_pose_list(poses: Iterable[np.ndarray], precision: int = 4) -> List[np.ndarray]:
unique: List[np.ndarray] = []
seen = set()
for pose in poses:
key = tuple(np.round(np.asarray(pose, dtype=np.float64), precision))
if key in seen:
continue
seen.add(key)
unique.append(np.asarray(pose, dtype=np.float64))
return unique
def _pregrasp_progress_and_distance(
current_pose: np.ndarray,
tray_pose: np.ndarray,
templates: MotionTemplates,
) -> Tuple[float, float]:
goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose)
if templates.approach_rel_poses:
start_pose = _apply_relative_pose(tray_pose, templates.approach_rel_poses[0])
span = float(np.linalg.norm(start_pose[:3] - goal_pose[:3]))
else:
span = 0.12
distance = float(np.linalg.norm(current_pose[:3] - goal_pose[:3]))
progress = 1.0 - (distance / max(span, 1e-6))
return float(np.clip(progress, 0.0, 1.0)), distance
def _pregrasp_candidates(tray_pose: np.ndarray, templates: MotionTemplates) -> List[np.ndarray]:
if templates.approach_rel_poses:
rel_poses = templates.approach_rel_poses[-min(
DEFAULT_PREGRASP_CANDIDATE_COUNT, len(templates.approach_rel_poses)
) :]
else:
rel_poses = [templates.pregrasp_rel_pose]
candidates = [_apply_relative_pose(tray_pose, rel_pose) for rel_pose in rel_poses]
for rel_pose in rel_poses[-1:]:
base = _apply_relative_pose(tray_pose, rel_pose)
for dx in (-0.02, 0.02):
perturbed = base.copy()
perturbed[0] += dx
candidates.append(perturbed)
return _dedupe_pose_list(candidates)
def _late_pregrasp_goal_poses(
tray_pose: np.ndarray, templates: MotionTemplates
) -> List[np.ndarray]:
final_goal = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose)
goals = [final_goal]
for dx in (-0.02, 0.02):
perturbed = final_goal.copy()
perturbed[0] += dx
goals.append(perturbed)
return _dedupe_pose_list(goals)
def _pregrasp_corridor_rel_poses(templates: MotionTemplates) -> List[np.ndarray]:
rel_poses = [
np.asarray(rel_pose, dtype=np.float64)
for rel_pose in templates.approach_rel_poses
]
if not rel_poses:
return [np.asarray(templates.pregrasp_rel_pose, dtype=np.float64)]
pregrasp_rel_pose = np.asarray(templates.pregrasp_rel_pose, dtype=np.float64)
stop_index = int(
np.argmin(
[
float(np.linalg.norm(rel_pose[:3] - pregrasp_rel_pose[:3]))
for rel_pose in rel_poses
]
)
)
corridor = rel_poses[: stop_index + 1]
if not corridor:
corridor = [pregrasp_rel_pose]
elif float(np.linalg.norm(corridor[-1][:3] - pregrasp_rel_pose[:3])) > 1e-6:
corridor.append(pregrasp_rel_pose)
return _dedupe_pose_list(corridor)
def _extract_sequence_poses(
tray_pose: np.ndarray, task_base_pose: np.ndarray, templates: MotionTemplates
) -> List[np.ndarray]:
poses = [
_apply_relative_pose(tray_pose, templates.pregrasp_rel_pose),
_apply_relative_pose(tray_pose, templates.grasp_rel_pose),
]
if templates.retreat_rel_poses:
retreat_indices = np.linspace(
0,
len(templates.retreat_rel_poses) - 1,
num=min(3, len(templates.retreat_rel_poses)),
dtype=int,
)
poses.extend(
_apply_relative_pose(task_base_pose, templates.retreat_rel_poses[index])
for index in sorted(set(retreat_indices.tolist()))
)
return poses
def _extract_height_threshold(templates: MotionTemplates) -> float:
return templates.reference_tray_height + 0.06
def _extraction_progress_score(current_height: float, templates: MotionTemplates) -> float:
baseline = float(templates.reference_tray_height)
threshold = _extract_height_threshold(templates)
current_height = float(current_height)
if current_height <= baseline:
return 0.0
if current_height < threshold:
lift_fraction = (current_height - baseline) / max(threshold - baseline, 1e-6)
return float(np.clip(0.8 * lift_fraction, 0.0, 0.8))
margin = current_height - threshold
# Saturate smoothly once the tray is clearly lifted above the oven lip.
return float(min(1.0, 0.8 + margin / 0.12))
def _arm_external_collision_names(scene, arm_name: str) -> List[str]:
arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm
gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper
robot_shapes = {
shape.get_name() for shape in arm.get_objects_in_tree(object_type=ObjectType.SHAPE)
}
grasped = {obj.get_name() for obj in gripper.get_grasped_objects()}
collisions: List[str] = []
for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE):
name = shape.get_name()
if not shape.is_collidable() or name in robot_shapes or name in grasped:
continue
try:
if arm.check_arm_collision(shape):
collisions.append(name)
except Exception:
continue
return sorted(set(collisions))
def _allowed_door_collision_names(scene) -> set:
allowed = set()
for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE):
name = shape.get_name()
if name.startswith("oven_door") or name.startswith("oven_knob"):
allowed.add(name)
return allowed
def _door_collision_shapes(scene) -> List[Shape]:
return [
Shape(shape.get_handle())
for shape in scene.pyrep.get_objects_in_tree(object_type=ObjectType.SHAPE)
if shape.get_name().startswith("oven_door") or shape.get_name().startswith("oven_knob")
]
def _robot_collision_shape_handles(scene, arm_name: str) -> set:
arm = scene.robot.left_arm if arm_name == "left" else scene.robot.right_arm
gripper = scene.robot.left_gripper if arm_name == "left" else scene.robot.right_gripper
robot_shapes = arm.get_objects_in_tree(object_type=ObjectType.SHAPE)
robot_shapes.extend(gripper.get_objects_in_tree(object_type=ObjectType.SHAPE))
return {shape.get_handle() for shape in robot_shapes}
def _shape_robot_contacts(shape: Shape, robot_shape_handles: set) -> List[np.ndarray]:
contacts: List[np.ndarray] = []
try:
raw_contacts = shape.get_contact(None, True)
except Exception:
return contacts
shape_handle = shape.get_handle()
for contact in raw_contacts:
handles = set(contact.get("contact_handles", []))
if shape_handle not in handles:
continue
other_handles = handles - {shape_handle}
if not other_handles.intersection(robot_shape_handles):
continue
contact_array = np.asarray(contact.get("contact", []), dtype=np.float64)
if contact_array.shape != (9,) or not np.all(np.isfinite(contact_array)):
continue
contacts.append(contact_array)
return contacts
def _door_surface_normal(shape: Shape, contact_point: np.ndarray) -> Optional[np.ndarray]:
bbox = np.asarray(shape.get_bounding_box(), dtype=np.float64)
extents = np.array(
[bbox[1] - bbox[0], bbox[3] - bbox[2], bbox[5] - bbox[4]], dtype=np.float64
)
if not np.all(np.isfinite(extents)):
return None
thin_axis = int(np.argmin(extents))
axes = np.asarray(shape.get_matrix()[:3, :3], dtype=np.float64)
axis = axes[:, thin_axis]
axis_norm = float(np.linalg.norm(axis))
if axis_norm <= 1e-9:
return None
axis = axis / axis_norm
relative = np.asarray(contact_point, dtype=np.float64) - shape.get_position()
sign = 1.0 if float(relative @ axis) >= 0.0 else -1.0
return sign * axis
def _door_contact_alignments(
shape: Shape, contacts: Sequence[np.ndarray], tip_delta: np.ndarray
) -> List[float]:
tip_delta = np.asarray(tip_delta, dtype=np.float64)
tip_speed = float(np.linalg.norm(tip_delta))
if tip_speed <= 1e-9:
return []
move = tip_delta / tip_speed
alignments: List[float] = []
for contact in contacts:
normal = _door_surface_normal(shape, contact[:3])
if normal is None:
continue
alignments.append(float(np.clip(move @ normal, -1.0, 1.0)))
return alignments
def _door_contact_step_quality(alignments: Sequence[float], step_open_delta: float) -> float:
if not alignments:
return 0.0
values = np.asarray(alignments, dtype=np.float64)
parallel_bonus = float(np.mean(1.0 - np.abs(values)))
opening_motion_bonus = float(np.mean(np.clip(-values, 0.0, 1.0)))
direct_penalty = float(
np.mean(
np.clip(
(np.abs(values) - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE)
/ max(1.0 - DEFAULT_DOOR_ASSIST_DIRECT_ALIGN_FREE, 1e-6),
0.0,
1.0,
)
)
)
open_bonus = float(
np.clip(step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0)
)
close_penalty = float(
np.clip(-step_open_delta / max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6), 0.0, 1.0)
)
return float(
0.6 * parallel_bonus
+ 0.4 * opening_motion_bonus
+ 0.4 * open_bonus
- 0.8 * direct_penalty
- 1.0 * close_penalty
)
def _door_contact_path_quality(step_qualities: Sequence[float], door_open_delta: float) -> float:
if not step_qualities:
return 0.0
mean_quality = float(np.mean(np.asarray(step_qualities, dtype=np.float64)))
open_bonus = float(
np.clip(
door_open_delta / (3.0 * max(DEFAULT_DOOR_ASSIST_STEP_OPEN_SCALE, 1e-6)),
0.0,
1.0,
)
)
return float(np.clip(0.7 * mean_quality + 0.3 * open_bonus, 0.0, 1.0))
def _execute_path_with_collision_trace(task, path, arm_name: str) -> Dict[str, object]:
collisions = set()
step_qualities: List[float] = []
alignments_all: List[float] = []
door_shapes = _door_collision_shapes(task._scene)
robot_shape_handles = _robot_collision_shape_handles(task._scene, arm_name)
arm = task._scene.robot.left_arm if arm_name == "left" else task._scene.robot.right_arm
tip = arm.get_tip()
path.set_to_start(disable_dynamics=True)
task._pyrep.step()
previous_tip = tip.get_position()
previous_door_angle = float(Joint("oven_door_joint").get_joint_position())
steps = 0
while True:
done = path.step()
task._pyrep.step()
current_tip = tip.get_position()
tip_delta = np.asarray(current_tip - previous_tip, dtype=np.float64)
previous_tip = current_tip
current_door_angle = float(Joint("oven_door_joint").get_joint_position())
step_open_delta = previous_door_angle - current_door_angle
previous_door_angle = current_door_angle
current_collisions = set(_arm_external_collision_names(task._scene, arm_name))
for shape in door_shapes:
contacts = _shape_robot_contacts(shape, robot_shape_handles)
if not contacts:
continue
current_collisions.add(shape.get_name())
alignments = _door_contact_alignments(shape, contacts, tip_delta)
if alignments:
alignments_all.extend(float(value) for value in alignments)
step_qualities.append(
_door_contact_step_quality(alignments, step_open_delta)
)
collisions.update(current_collisions)
steps += 1
if done or steps > 400:
break
return {
"collision_names": sorted(collisions),
"step_qualities": [float(value) for value in step_qualities],
"alignments": [float(value) for value in alignments_all],
"door_contact_steps": int(len(step_qualities)),
"path_steps": int(steps),
}
def _single_door_assisted_pregrasp_score(
task,
target_pose: np.ndarray,
base_snapshot: Optional[SimulatorSnapshot] = None,
before_angle: Optional[float] = None,
allowed_collisions: Optional[set] = None,
) -> Tuple[float, bool, Dict[str, object]]:
snapshot = base_snapshot if base_snapshot is not None else _capture_snapshot(task)
debug: Dict[str, object] = {
"path_found": False,
"path_length": math.inf,
"base_score": 0.0,
"final_score": 0.0,
"bump_applied": False,
"post_assist_strict_feasible": False,
"post_assist_strict_length": math.inf,
"door_open_delta": 0.0,
"door_quality": 0.0,
"collision_names": [],
"bad_collision_names": [],
"step_quality_mean": 0.0,
"step_quality_min": 0.0,
"step_quality_max": 0.0,
"alignment_signed_mean": 0.0,
"alignment_abs_mean": 0.0,
"alignment_min": 0.0,
"alignment_max": 0.0,
"door_contact_steps": 0,
"path_steps": 0,
}
try:
if before_angle is None:
before_angle = float(Joint("oven_door_joint").get_joint_position())
path = _plan_path(
task._scene,
"left",
target_pose,
ignore_collisions=True,
)
length = _path_length(path)
if path is None or not np.isfinite(length):
return 0.0, False, debug
debug["path_found"] = True
debug["path_length"] = float(length)
trace = _execute_path_with_collision_trace(
task, path, "left"
)
allowed = (
_allowed_door_collision_names(task._scene)
if allowed_collisions is None
else allowed_collisions
)
collision_names = [str(name) for name in trace["collision_names"]]
bad_collisions = [name for name in collision_names if name not in allowed]
door_open_delta = before_angle - float(Joint("oven_door_joint").get_joint_position())
step_qualities = [float(value) for value in trace["step_qualities"]]
alignments = [float(value) for value in trace["alignments"]]
door_quality = _door_contact_path_quality(step_qualities, door_open_delta)
step_summary = _sequence_summary(step_qualities)
alignment_summary = _sequence_summary(alignments)
debug.update(
{
"collision_names": collision_names,
"bad_collision_names": bad_collisions,
"door_open_delta": float(door_open_delta),
"door_quality": float(door_quality),
"step_quality_mean": float(step_summary["mean"]),
"step_quality_min": float(step_summary["min"]),
"step_quality_max": float(step_summary["max"]),
"alignment_signed_mean": float(alignment_summary["mean"]),
"alignment_abs_mean": float(
np.mean(np.abs(np.asarray(alignments, dtype=np.float64)))
)
if alignments
else 0.0,
"alignment_min": float(alignment_summary["min"]),
"alignment_max": float(alignment_summary["max"]),
"door_contact_steps": int(trace["door_contact_steps"]),
"path_steps": int(trace["path_steps"]),
}
)
if (
bad_collisions
or door_open_delta <= DEFAULT_DOOR_ASSIST_MIN_OPEN_DELTA
or door_quality < DEFAULT_DOOR_ASSIST_MIN_QUALITY
):
return 0.0, False, debug
base_score = float(
math.exp(-float(length) / DEFAULT_PREGRASP_ASSIST_PATH_SCALE)
)
base_score *= door_quality
debug["base_score"] = float(base_score)
strict_path = _plan_path(
task._scene,
"left",
target_pose,
ignore_collisions=False,
)
strict_length = _path_length(strict_path)
debug["post_assist_strict_length"] = float(strict_length)
if strict_path is not None and np.isfinite(strict_length):
final_score = float(0.5 + 0.5 * base_score)
debug["bump_applied"] = True
debug["post_assist_strict_feasible"] = True
debug["final_score"] = final_score
return final_score, False, debug
debug["final_score"] = float(base_score)
return base_score, False, debug
finally:
_restore_snapshot(task, snapshot)
def _door_assisted_pregrasp_score(
task,
target_pose: np.ndarray,
base_snapshot: Optional[SimulatorSnapshot] = None,
before_angle: Optional[float] = None,
allowed_collisions: Optional[set] = None,
) -> Tuple[float, bool, Dict[str, object]]:
attempts = max(1, DEFAULT_PLAN_ATTEMPTS)
successful_scores: List[float] = []
successful_base_scores: List[float] = []
successful_lengths: List[float] = []
successful_door_qualities: List[float] = []
successful_open_deltas: List[float] = []
successful_step_means: List[float] = []
successful_align_abs_means: List[float] = []
successful_align_signed_means: List[float] = []
successful_collision_counts: List[float] = []
successful_bad_collision_counts: List[float] = []
post_assist_strict_count = 0
bump_count = 0
assisted_success = False
attempt_debugs: List[Dict[str, object]] = []
for _ in range(attempts):
score, success, attempt_debug = _single_door_assisted_pregrasp_score(
task,
target_pose,
base_snapshot=base_snapshot,
before_angle=before_angle,
allowed_collisions=allowed_collisions,
)
attempt_debugs.append(attempt_debug)
if score > 0.0:
successful_scores.append(float(score))
successful_base_scores.append(float(attempt_debug["base_score"]))
successful_lengths.append(float(attempt_debug["path_length"]))
successful_door_qualities.append(float(attempt_debug["door_quality"]))
successful_open_deltas.append(float(attempt_debug["door_open_delta"]))
successful_step_means.append(float(attempt_debug["step_quality_mean"]))
successful_align_abs_means.append(float(attempt_debug["alignment_abs_mean"]))
successful_align_signed_means.append(float(attempt_debug["alignment_signed_mean"]))
successful_collision_counts.append(float(len(attempt_debug["collision_names"])))
successful_bad_collision_counts.append(
float(len(attempt_debug["bad_collision_names"]))
)
post_assist_strict_count += int(bool(attempt_debug["post_assist_strict_feasible"]))
bump_count += int(bool(attempt_debug["bump_applied"]))
assisted_success = assisted_success or success
debug: Dict[str, object] = {
"attempts": attempt_debugs,
"attempt_count": int(attempts),
"successful_attempt_count": int(len(successful_scores)),
"reliability": 0.0,
"base_score": 0.0,
"score_before_reliability": 0.0,
"median_path_length": 0.0,
"door_quality": 0.0,
"door_open_delta": 0.0,
"step_quality_mean": 0.0,
"alignment_abs_mean": 0.0,
"alignment_signed_mean": 0.0,
"collision_count": 0.0,
"bad_collision_count": 0.0,
"post_assist_strict_feasible": False,
"bump_applied": False,
}
if not successful_scores:
return 0.0, assisted_success, debug
reliability = float(len(successful_scores) / attempts)
debug.update(
{
"reliability": reliability,
"base_score": float(np.median(np.asarray(successful_base_scores, dtype=np.float64))),
"score_before_reliability": float(
np.median(np.asarray(successful_scores, dtype=np.float64))
),
"median_path_length": float(
np.median(np.asarray(successful_lengths, dtype=np.float64))
),
"door_quality": float(
np.median(np.asarray(successful_door_qualities, dtype=np.float64))
),
"door_open_delta": float(
np.median(np.asarray(successful_open_deltas, dtype=np.float64))
),
"step_quality_mean": float(
np.median(np.asarray(successful_step_means, dtype=np.float64))
),
"alignment_abs_mean": float(
np.median(np.asarray(successful_align_abs_means, dtype=np.float64))
),
"alignment_signed_mean": float(
np.median(np.asarray(successful_align_signed_means, dtype=np.float64))
),
"collision_count": float(
np.median(np.asarray(successful_collision_counts, dtype=np.float64))
),
"bad_collision_count": float(
np.median(np.asarray(successful_bad_collision_counts, dtype=np.float64))
),
"post_assist_strict_feasible": bool(post_assist_strict_count > 0),
"bump_applied": bool(bump_count > 0),
}
)
if not _plan_is_reliable(reliability):
return 0.0, assisted_success, debug
stable_score = float(np.median(np.asarray(successful_scores, dtype=np.float64)))
return stable_score * reliability, assisted_success, debug
def _pregrasp_score_and_success(
task, templates: MotionTemplates
) -> Tuple[float, bool, Dict[str, object]]:
tray = Shape("tray")
if any(
grasped.get_name() == tray.get_name()
for grasped in task._scene.robot.left_gripper.get_grasped_objects()
):
return (
1.0,
True,
{
"tray_grasped_left": True,
"source": "grasped",
"best_goal_index": -1,
"num_goal_poses": 0,
"source_strict": False,
"source_assisted": False,
"bump_applied": False,
"base_score": 1.0,
"strict_length": 0.0,
"assisted_length": 0.0,
"assisted_reliability": 1.0,
"post_assist_strict_feasible": False,
"door_open_delta": 0.0,
"door_quality": 0.0,
"step_quality_mean": 0.0,
"step_quality_min": 0.0,
"step_quality_max": 0.0,
"alignment_signed_mean": 0.0,
"alignment_abs_mean": 0.0,
"collision_count": 0,
"bad_collision_count": 0,
"best_goal_pose": [],
"goal_evaluations": [],
},
)
tray_pose = tray.get_pose()
goal_poses = _late_pregrasp_goal_poses(tray_pose, templates)
best_score = 0.0
best_success = False
best_goal_index = -1
best_goal_debug: Optional[Dict[str, object]] = None
base_snapshot: Optional[SimulatorSnapshot] = None
before_angle = float(Joint("oven_door_joint").get_joint_position())
allowed_collisions: Optional[set] = None
goal_debugs: List[Dict[str, object]] = []
for goal_index, target_pose in enumerate(goal_poses):
goal_debug: Dict[str, object] = {
"goal_index": int(goal_index),
"target_pose": [float(value) for value in np.asarray(target_pose, dtype=np.float64)],
"strict_path_found": False,
"strict_length": 0.0,
"assisted": None,
"candidate_score": 0.0,
"source": "none",
}
path = _plan_path(
task._scene,
"left",
target_pose,
ignore_collisions=False,
)
length = _path_length(path)
if path is not None and np.isfinite(length):
goal_debug["strict_path_found"] = True
goal_debug["strict_length"] = float(length)
goal_debug["candidate_score"] = 1.0
goal_debug["source"] = "strict"
goal_debugs.append(goal_debug)
return (
1.0,
True,
{
"tray_grasped_left": False,
"source": "strict",
"best_goal_index": int(goal_index),
"num_goal_poses": int(len(goal_poses)),
"source_strict": True,
"source_assisted": False,
"bump_applied": False,
"base_score": 1.0,
"strict_length": float(length),
"assisted_length": 0.0,
"assisted_reliability": 0.0,
"post_assist_strict_feasible": False,
"door_open_delta": 0.0,
"door_quality": 0.0,
"step_quality_mean": 0.0,
"step_quality_min": 0.0,
"step_quality_max": 0.0,
"alignment_signed_mean": 0.0,
"alignment_abs_mean": 0.0,
"collision_count": 0,
"bad_collision_count": 0,
"best_goal_pose": [
float(value) for value in np.asarray(target_pose, dtype=np.float64)
],
"goal_evaluations": goal_debugs,
},
)
if base_snapshot is None:
base_snapshot = _capture_snapshot(task)
allowed_collisions = _allowed_door_collision_names(task._scene)
assisted_score, assisted_success, assisted_debug = _door_assisted_pregrasp_score(
task,
target_pose,
base_snapshot=base_snapshot,
before_angle=before_angle,
allowed_collisions=allowed_collisions,
)
goal_debug["assisted"] = assisted_debug
goal_debug["candidate_score"] = float(assisted_score)
goal_debug["source"] = "assisted" if assisted_score > 0.0 else "none"
goal_debugs.append(goal_debug)
if assisted_score > best_score:
best_score = assisted_score
best_goal_index = goal_index
best_goal_debug = goal_debug
best_success = best_success or assisted_success
if best_goal_debug is None:
best_goal_debug = {
"goal_index": -1,
"strict_length": 0.0,
"assisted": None,
}
assisted_debug = best_goal_debug.get("assisted") or {}
return (
float(best_score),
bool(best_success),
{
"tray_grasped_left": False,
"source": "assisted" if best_score > 0.0 else "none",
"best_goal_index": int(best_goal_index),
"num_goal_poses": int(len(goal_poses)),
"source_strict": False,
"source_assisted": bool(best_score > 0.0),
"bump_applied": bool(assisted_debug.get("bump_applied", False)),
"base_score": float(assisted_debug.get("base_score", 0.0)),
"strict_length": float(best_goal_debug.get("strict_length", 0.0)),
"assisted_length": float(assisted_debug.get("median_path_length", 0.0)),
"assisted_reliability": float(assisted_debug.get("reliability", 0.0)),
"post_assist_strict_feasible": bool(
assisted_debug.get("post_assist_strict_feasible", False)
),
"door_open_delta": float(assisted_debug.get("door_open_delta", 0.0)),
"door_quality": float(assisted_debug.get("door_quality", 0.0)),
"step_quality_mean": float(assisted_debug.get("step_quality_mean", 0.0)),
"step_quality_min": float(
min(
[
float(attempt.get("step_quality_min", 0.0))
for attempt in assisted_debug.get("attempts", [])
if attempt.get("path_found", False)
]
or [0.0]
)
),
"step_quality_max": float(
max(
[
float(attempt.get("step_quality_max", 0.0))
for attempt in assisted_debug.get("attempts", [])
if attempt.get("path_found", False)
]
or [0.0]
)
),
"alignment_signed_mean": float(
assisted_debug.get("alignment_signed_mean", 0.0)
),
"alignment_abs_mean": float(assisted_debug.get("alignment_abs_mean", 0.0)),
"collision_count": int(round(float(assisted_debug.get("collision_count", 0.0)))),
"bad_collision_count": int(
round(float(assisted_debug.get("bad_collision_count", 0.0)))
),
"best_goal_pose": [
float(value)
for value in np.asarray(
goal_poses[best_goal_index], dtype=np.float64
)
]
if best_goal_index >= 0
else [],
"goal_evaluations": goal_debugs,
},
)
def _extract_score_and_success(
task, templates: MotionTemplates
) -> Tuple[float, bool, Dict[str, object]]:
tray = Shape("tray")
robot = task._scene.robot
snapshot = _capture_snapshot(task)
try:
total_length = 0.0
current_height = float(tray.get_position()[2])
already_grasped = any(
grasped.get_name() == tray.get_name()
for grasped in robot.left_gripper.get_grasped_objects()
)
debug: Dict[str, object] = {
"initial_height": float(current_height),
"final_height": float(current_height),
"already_grasped_start": bool(already_grasped),
"already_grasped_end": bool(already_grasped),
"total_length": 0.0,
"num_milestones": 0,
"first_failed_milestone": -1,
"min_reliability": 0.0,
"mean_reliability": 0.0,
"approach_score": 0.0,
"retreat_score": 0.0,
"milestones": [],
"score": 0.0,
}
if already_grasped and current_height >= _extract_height_threshold(templates):
score = _extraction_progress_score(current_height, templates)
debug["score"] = float(score)
return score, True, debug
poses = _extract_sequence_poses(
tray.get_pose(), task._task.get_base().get_pose(), templates
)
approach_poses = [] if already_grasped else poses[:2]
retreat_poses = poses[2:]
if already_grasped and retreat_poses:
future_retreat_poses = [
pose for pose in retreat_poses if float(pose[2]) > current_height + 0.01
]
if future_retreat_poses:
retreat_poses = future_retreat_poses
elif current_height < _extract_height_threshold(templates):
retreat_poses = [retreat_poses[-1]]
else:
retreat_poses = []
milestone_poses = approach_poses + retreat_poses
milestone_collision = ([False] * len(approach_poses)) + ([True] * len(retreat_poses))
progress = _extraction_progress_score(current_height, templates) * 0.25
milestone_weight = 0.75 / max(1, len(milestone_poses))
milestone_reliabilities: List[float] = []
debug["num_milestones"] = int(len(milestone_poses))
for milestone_index, (pose, ignore_collisions) in enumerate(
zip(milestone_poses, milestone_collision)
):
path, length, reliability = _stable_plan(
task._scene, "left", pose, ignore_collisions=ignore_collisions
)
planner_score = (
reliability * math.exp(-length / DEFAULT_PATH_SCALE)
if path is not None and np.isfinite(length)
else 0.0
)
milestone_debug = {
"milestone_index": int(milestone_index),
"kind": "approach" if milestone_index < len(approach_poses) else "retreat",
"ignore_collisions": bool(ignore_collisions),
"path_found": bool(path is not None and np.isfinite(length)),
"path_length": float(length) if np.isfinite(length) else 0.0,
"reliability": float(reliability),
"planner_score": float(planner_score),
"height_before": float(current_height),
}
debug["milestones"].append(milestone_debug)
if path is None or not np.isfinite(length):
debug["first_failed_milestone"] = int(milestone_index)
debug["final_height"] = float(current_height)
debug["total_length"] = float(total_length)
debug["score"] = float(progress)
return progress, False, debug
milestone_reliabilities.append(float(reliability))
progress += milestone_weight * planner_score
if milestone_debug["kind"] == "approach":
debug["approach_score"] = float(debug["approach_score"] + milestone_weight * planner_score)
else:
debug["retreat_score"] = float(debug["retreat_score"] + milestone_weight * planner_score)
if not _plan_is_reliable(reliability):
debug["first_failed_milestone"] = int(milestone_index)
debug["final_height"] = float(current_height)
debug["total_length"] = float(total_length + length)
debug["score"] = float(progress)
debug["min_reliability"] = float(min(milestone_reliabilities))
debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities)))
return progress, False, debug
total_length += length
path.set_to_end(disable_dynamics=True)
task._pyrep.step()
current_height = float(tray.get_position()[2])
progress = max(progress, _extraction_progress_score(current_height, templates) * 0.25)
milestone_debug["height_after"] = float(current_height)
if (not already_grasped) and milestone_index == (len(approach_poses) - 1):
robot.left_gripper.grasp(tray)
already_grasped = True
if not already_grasped:
robot.left_gripper.grasp(tray)
final_height = float(tray.get_position()[2])
success = final_height >= _extract_height_threshold(templates)
score = max(
progress,
math.exp(-total_length / (DEFAULT_PATH_SCALE * 2.5)),
_extraction_progress_score(final_height, templates) if success else 0.0,
)
debug["final_height"] = float(final_height)
debug["already_grasped_end"] = bool(already_grasped)
debug["total_length"] = float(total_length)
debug["score"] = float(score)
if milestone_reliabilities:
debug["min_reliability"] = float(min(milestone_reliabilities))
debug["mean_reliability"] = float(np.mean(np.asarray(milestone_reliabilities)))
return score, bool(success), debug
finally:
_restore_snapshot(task, snapshot)
def _wait_branch(task, steps: int = 5) -> None:
for _ in range(steps):
task._scene.step()
def _open_more_branch(task, templates: MotionTemplates) -> None:
joint = Joint("oven_door_joint")
current = joint.get_joint_position()
joint.set_joint_position(current - templates.open_more_delta, disable_dynamics=True)
for _ in range(3):
task._pyrep.step()
def _hold_open_branch(task, templates: MotionTemplates) -> None:
joint = Joint("oven_door_joint")
current = joint.get_joint_position()
joint.set_joint_position(min(current, templates.hold_open_angle), disable_dynamics=True)
for _ in range(3):
task._pyrep.step()
def _frame_metrics(
episode_dir: Path,
demo: Demo,
frame_state: ReplayState,
templates: MotionTemplates,
) -> Tuple[Dict[str, float], Dict[str, object]]:
grasp_points = _sample_grasp_points(templates, frame_state.tray_pose)
full_tray_points = _sample_full_tray_points(frame_state.tray_pose)
camera_values: Dict[str, Dict[str, float]] = {}
for camera_name in FULL_CAMERA_SET:
point_cloud, extrinsics, intrinsics = _camera_point_cloud(
episode_dir, demo, frame_state.frame_index, camera_name
)
grasp_stats = _point_visibility_stats_from_point_cloud(
grasp_points, point_cloud, extrinsics, intrinsics
)
tray_stats = _point_visibility_stats_from_point_cloud(
full_tray_points, point_cloud, extrinsics, intrinsics
)
camera_values[camera_name] = {
"grasp_visibility": float(grasp_stats["ratio"]),
"grasp_projected_count": int(grasp_stats["projected_count"]),
"grasp_visible_count": int(grasp_stats["visible_count"]),
"tray_visibility": float(tray_stats["ratio"]),
"tray_projected_count": int(tray_stats["projected_count"]),
"tray_visible_count": int(tray_stats["visible_count"]),
}
values: Dict[str, float] = {}
for name, camera_set in {"three_view": THREE_VIEW_SET, "full_view": FULL_CAMERA_SET}.items():
values[f"{name}_visibility"] = _union_visibility(
camera_values[camera_name]["grasp_visibility"] for camera_name in camera_set
)
values[f"{name}_whole_tray_visibility"] = _union_visibility(
camera_values[camera_name]["tray_visibility"] for camera_name in camera_set
)
for camera_name in FULL_CAMERA_SET:
camera_value = camera_values[camera_name]
values[f"{camera_name}_grasp_visibility"] = float(camera_value["grasp_visibility"])
values[f"{camera_name}_tray_visibility"] = float(camera_value["tray_visibility"])
values[f"{camera_name}_grasp_projected_count"] = float(
camera_value["grasp_projected_count"]
)
values[f"{camera_name}_grasp_visible_count"] = float(camera_value["grasp_visible_count"])
values[f"{camera_name}_tray_projected_count"] = float(camera_value["tray_projected_count"])
values[f"{camera_name}_tray_visible_count"] = float(camera_value["tray_visible_count"])
best_grasp_camera = max(
FULL_CAMERA_SET,
key=lambda camera_name: float(camera_values[camera_name]["grasp_visibility"]),
)
best_tray_camera = max(
FULL_CAMERA_SET,
key=lambda camera_name: float(camera_values[camera_name]["tray_visibility"]),
)
debug = {
"best_grasp_visibility_camera": best_grasp_camera,
"best_whole_tray_visibility_camera": best_tray_camera,
"camera_values": camera_values,
}
return values, debug
def _build_frame_artifacts(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
task,
state: ReplayState,
) -> Tuple[Dict[str, float], Dict[str, object]]:
visibility_values, visibility_debug = _frame_metrics(episode_dir, demo, state, templates)
tray_pose = np.asarray(state.tray_pose, dtype=np.float64)
current_pose = np.asarray(state.left_gripper_pose, dtype=np.float64)
right_pose = np.asarray(state.right_gripper_pose, dtype=np.float64)
robot = task._scene.robot
left_joints = np.asarray(robot.left_arm.get_joint_positions(), dtype=np.float64)
right_joints = np.asarray(robot.right_arm.get_joint_positions(), dtype=np.float64)
pregrasp_goal_pose = _apply_relative_pose(tray_pose, templates.pregrasp_rel_pose)
pregrasp_progress, pregrasp_distance = _pregrasp_progress_and_distance(
current_pose,
tray_pose,
templates,
)
pregrasp_position_error = float(np.linalg.norm(current_pose[:3] - pregrasp_goal_pose[:3]))
pregrasp_orientation_error_deg = _orientation_error_degrees(
current_pose[3:], pregrasp_goal_pose[3:]
)
tray = Shape("tray")
tray_grasped_left = any(
grasped.get_name() == tray.get_name()
for grasped in task._scene.robot.left_gripper.get_grasped_objects()
)
tray_height = float(tray.get_position()[2])
p_pre, y_pre, p_pre_debug = _pregrasp_score_and_success(task, templates)
p_ext, y_ext, p_ext_debug = _extract_score_and_success(task, templates)
row = {
"frame_index": int(state.frame_index),
"time_norm": float(state.frame_index / max(1, len(demo) - 1)),
"door_angle": float(state.door_angle),
"right_gripper_open": float(state.right_gripper_open),
"left_gripper_open": float(state.left_gripper_open),
"right_arm_pose_x": float(right_pose[0]),
"right_arm_pose_y": float(right_pose[1]),
"right_arm_pose_z": float(right_pose[2]),
"right_arm_pose_qx": float(right_pose[3]),
"right_arm_pose_qy": float(right_pose[4]),
"right_arm_pose_qz": float(right_pose[5]),
"right_arm_pose_qw": float(right_pose[6]),
"left_arm_pose_x": float(current_pose[0]),
"left_arm_pose_y": float(current_pose[1]),
"left_arm_pose_z": float(current_pose[2]),
"left_arm_pose_qx": float(current_pose[3]),
"left_arm_pose_qy": float(current_pose[4]),
"left_arm_pose_qz": float(current_pose[5]),
"left_arm_pose_qw": float(current_pose[6]),
"tray_height": float(tray_height),
"tray_grasped_left": float(bool(tray_grasped_left)),
"pregrasp_progress": float(pregrasp_progress),
"pregrasp_distance": float(pregrasp_distance),
"pregrasp_position_error": float(pregrasp_position_error),
"pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg),
"p_pre": float(p_pre),
"p_ext": float(p_ext),
"y_pre_raw": float(bool(y_pre)),
"y_ext_raw": float(bool(y_ext)),
"y_pre": float(bool(y_pre)),
"y_ext": float(bool(y_ext)),
"p_pre_source_strict": float(bool(p_pre_debug["source_strict"])),
"p_pre_source_assisted": float(bool(p_pre_debug["source_assisted"])),
"p_pre_bump_applied": float(bool(p_pre_debug["bump_applied"])),
"p_pre_base_score": float(p_pre_debug["base_score"]),
"p_pre_strict_length": float(p_pre_debug["strict_length"]),
"p_pre_assisted_length": float(p_pre_debug["assisted_length"]),
"p_pre_assisted_reliability": float(p_pre_debug["assisted_reliability"]),
"p_pre_post_assist_strict_feasible": float(
bool(p_pre_debug["post_assist_strict_feasible"])
),
"p_pre_door_open_delta": float(p_pre_debug["door_open_delta"]),
"p_pre_door_quality": float(p_pre_debug["door_quality"]),
"p_pre_step_quality_mean": float(p_pre_debug["step_quality_mean"]),
"p_pre_step_quality_min": float(p_pre_debug["step_quality_min"]),
"p_pre_step_quality_max": float(p_pre_debug["step_quality_max"]),
"p_pre_alignment_signed_mean": float(p_pre_debug["alignment_signed_mean"]),
"p_pre_alignment_abs_mean": float(p_pre_debug["alignment_abs_mean"]),
"p_pre_collision_count": float(p_pre_debug["collision_count"]),
"p_pre_bad_collision_count": float(p_pre_debug["bad_collision_count"]),
"p_pre_best_goal_index": float(p_pre_debug["best_goal_index"]),
"p_pre_num_goal_poses": float(p_pre_debug["num_goal_poses"]),
"p_pre_best_target_pose_x": float(p_pre_debug["best_goal_pose"][0])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_y": float(p_pre_debug["best_goal_pose"][1])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_z": float(p_pre_debug["best_goal_pose"][2])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_qx": float(p_pre_debug["best_goal_pose"][3])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_qy": float(p_pre_debug["best_goal_pose"][4])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_qz": float(p_pre_debug["best_goal_pose"][5])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_pre_best_target_pose_qw": float(p_pre_debug["best_goal_pose"][6])
if p_pre_debug["best_goal_pose"]
else float("nan"),
"p_ext_initial_height": float(p_ext_debug["initial_height"]),
"p_ext_final_height": float(p_ext_debug["final_height"]),
"p_ext_total_length": float(p_ext_debug["total_length"]),
"p_ext_already_grasped_start": float(bool(p_ext_debug["already_grasped_start"])),
"p_ext_already_grasped_end": float(bool(p_ext_debug["already_grasped_end"])),
"p_ext_num_milestones": float(p_ext_debug["num_milestones"]),
"p_ext_first_failed_milestone": float(p_ext_debug["first_failed_milestone"]),
"p_ext_min_reliability": float(p_ext_debug["min_reliability"]),
"p_ext_mean_reliability": float(p_ext_debug["mean_reliability"]),
"p_ext_approach_score": float(p_ext_debug["approach_score"]),
"p_ext_retreat_score": float(p_ext_debug["retreat_score"]),
**visibility_values,
}
debug = {
"frame_index": int(state.frame_index),
"state": {
"door_angle": float(state.door_angle),
"tray_height": float(tray_height),
"tray_grasped_left": bool(tray_grasped_left),
"left_arm_pose": [float(value) for value in current_pose],
"right_arm_pose": [float(value) for value in right_pose],
"left_arm_joint_positions": [float(value) for value in left_joints],
"right_arm_joint_positions": [float(value) for value in right_joints],
"pregrasp_goal_pose": [float(value) for value in pregrasp_goal_pose],
"pregrasp_position_error": float(pregrasp_position_error),
"pregrasp_orientation_error_deg": float(pregrasp_orientation_error_deg),
},
"visibility": visibility_debug,
"p_pre": p_pre_debug,
"p_ext": p_ext_debug,
}
return row, debug
def _compute_frame_row_isolated(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
checkpoint_stride: int,
frame_index: int,
) -> Dict[str, float]:
rows = _compute_frame_rows_sequential(
episode_dir=episode_dir,
demo=demo,
templates=templates,
checkpoint_stride=checkpoint_stride,
frame_indices=[frame_index],
)
if not rows:
raise RuntimeError(f"failed to compute frame {frame_index}")
return rows[0]
def _build_frame_row(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
task,
state: ReplayState,
) -> Dict[str, float]:
row, _ = _build_frame_artifacts(episode_dir, demo, templates, task, state)
return row
def _compute_frame_rows_sequential(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
checkpoint_stride: int,
frame_indices: Sequence[int],
) -> List[Dict[str, float]]:
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
rows: List[Dict[str, float]] = []
for frame_index in sorted({int(index) for index in frame_indices}):
cache.step_to(frame_index)
frame_snapshot = cache.snapshot()
state = cache.current_state()
rows.append(_build_frame_row(episode_dir, demo, templates, task, state))
cache.restore(frame_snapshot)
return rows
finally:
env.shutdown()
def _compute_frame_rows_independent(
episode_dir: Path,
demo: Demo,
templates: MotionTemplates,
checkpoint_stride: int,
frame_indices: Sequence[int],
) -> List[Dict[str, float]]:
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
initial_snapshot = cache.snapshot()
rows: List[Dict[str, float]] = []
for frame_index in sorted({int(index) for index in frame_indices}):
cache.restore_to_index(initial_snapshot, 0)
cache.step_to(frame_index)
state = cache.current_state()
rows.append(_build_frame_row(episode_dir, demo, templates, task, state))
return rows
finally:
env.shutdown()
def _safe_auc(y_true: np.ndarray, y_score: np.ndarray) -> float:
if len(np.unique(y_true)) < 2:
return float("nan")
return float(roc_auc_score(y_true, y_score))
def _safe_auprc(y_true: np.ndarray, y_score: np.ndarray) -> float:
if len(np.unique(y_true)) < 2:
return float("nan")
return float(average_precision_score(y_true, y_score))
def _first_crossing(values: np.ndarray, threshold: float) -> int:
above = np.flatnonzero(values >= threshold)
return int(above[0]) if len(above) else -1
def _transition_count(binary_values: np.ndarray) -> Tuple[int, int]:
diffs = np.diff(binary_values.astype(int))
return int(np.sum(diffs == 1)), int(np.sum(diffs == -1))
def _keyframe_subset(frame_df: pd.DataFrame, keyframes: Sequence[int]) -> pd.DataFrame:
key_df = frame_df.iloc[list(keyframes)].copy()
key_df["keyframe_ordinal"] = np.arange(len(key_df))
return key_df
def _persistent_rise_mask(binary_values: np.ndarray, window: int) -> np.ndarray:
binary_values = np.asarray(binary_values, dtype=bool)
rises = np.zeros(len(binary_values), dtype=bool)
if window <= 0:
rises[:] = binary_values
return rises
for index in range(len(binary_values)):
segment = binary_values[index : index + window]
if len(segment) == window and np.all(segment):
rises[index] = True
return rises
def _monotone_after_first(binary_values: np.ndarray) -> np.ndarray:
binary_values = np.asarray(binary_values, dtype=bool)
monotone = np.zeros(len(binary_values), dtype=bool)
if np.any(binary_values):
first_true = int(np.flatnonzero(binary_values)[0])
monotone[first_true:] = True
return monotone
def _annotate_phase_columns(frame_df: pd.DataFrame) -> pd.DataFrame:
door_speed = np.gradient(frame_df["door_angle"].to_numpy(dtype=float), DEMO_DT)
frame_df["door_speed_signed"] = door_speed
frame_df["door_speed_abs"] = np.abs(door_speed)
y_ext_raw = (
frame_df["y_ext_raw"].to_numpy(dtype=bool)
if "y_ext_raw" in frame_df
else frame_df["y_ext"].to_numpy(dtype=bool)
)
pregrasp_progress = (
frame_df["pregrasp_progress"].to_numpy(dtype=float)
if "pregrasp_progress" in frame_df
else frame_df["p_pre"].to_numpy(dtype=float)
)
pregrasp_distance = (
frame_df["pregrasp_distance"].to_numpy(dtype=float)
if "pregrasp_distance" in frame_df
else 1.0 - pregrasp_progress
)
pregrasp_speed = -np.gradient(pregrasp_distance, DEMO_DT)
frame_df["pregrasp_speed"] = pregrasp_speed
pregrasp_progress_seed = pregrasp_progress >= DEFAULT_PREGRASP_LABEL_PROGRESS_TAU
y_pre_seed = (
frame_df["y_pre_raw"].to_numpy(dtype=bool)
if "y_pre_raw" in frame_df
else pregrasp_progress_seed
)
y_pre_binary = _monotone_after_first(
_persistent_rise_mask(y_pre_seed, DEFAULT_PREGRASP_PERSISTENCE)
)
y_ext_binary = _monotone_after_first(
_persistent_rise_mask(y_ext_raw, DEFAULT_READY_PERSISTENCE)
)
frame_df["y_pre_seed_raw"] = y_pre_seed.astype(float)
frame_df["y_pre_progress_seed"] = pregrasp_progress_seed.astype(float)
frame_df["y_pre"] = y_pre_binary.astype(float)
frame_df["y_ext"] = y_ext_binary.astype(float)
frame_df["y_ext_seed_raw"] = y_ext_raw.astype(float)
frame_df["phase_score"] = np.clip(frame_df["p_pre"].to_numpy(dtype=float), 0.0, 1.0)
approach_active = (
(pregrasp_progress >= DEFAULT_APPROACH_PROGRESS_TAU)
& (pregrasp_speed >= DEFAULT_APPROACH_SPEED_TAU)
)
frame_df["approach_active"] = approach_active.astype(float)
retrieve_onset = _persistent_rise_mask(
approach_active & y_pre_binary, DEFAULT_RETRIEVE_PERSISTENCE
)
frame_df["retrieve_onset_raw"] = retrieve_onset.astype(float)
frame_df["y_retrieve"] = _monotone_after_first(retrieve_onset).astype(float)
ready_seed = np.zeros(len(frame_df), dtype=bool)
for index in range(len(frame_df)):
window = y_ext_binary[index : index + DEFAULT_READY_PERSISTENCE]
if (
len(window) == DEFAULT_READY_PERSISTENCE
and np.all(window)
and frame_df.iloc[index]["door_speed_abs"] <= DEFAULT_DOOR_SPEED_TAU
):
ready_seed[index] = True
frame_df["ready_seed_raw"] = ready_seed.astype(float)
frame_df["y_ready"] = _monotone_after_first(ready_seed).astype(float)
phase_seed = _persistent_rise_mask(
frame_df["p_pre"].to_numpy(dtype=float) >= DEFAULT_PHASE_SCORE_TAU,
DEFAULT_RETRIEVE_PERSISTENCE,
)
frame_df["phase_seed_raw"] = phase_seed.astype(float)
frame_df["phase_switch"] = _monotone_after_first(phase_seed).astype(float)
frame_df["p_pre_minus_tau"] = (
frame_df["p_pre"].to_numpy(dtype=float) - DEFAULT_PHASE_SCORE_TAU
)
frame_df["p_ext_minus_tau"] = (
frame_df["p_ext"].to_numpy(dtype=float) - DEFAULT_PEXT_TAU
)
frame_df["door_speed_margin"] = DEFAULT_DOOR_SPEED_TAU - frame_df["door_speed_abs"]
return frame_df
def _episode_metrics_from_frames(
frame_df: pd.DataFrame,
key_df: pd.DataFrame,
episode_name: str,
description: str,
interventions: Dict[str, float],
) -> Dict[str, object]:
y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int)
y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int)
y_retrieve_arr = frame_df["y_retrieve"].to_numpy(dtype=int)
y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int)
p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float)
p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float)
phase_arr = frame_df["phase_switch"].to_numpy(dtype=int)
whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float)
door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float)
time_arr = frame_df["time_norm"].to_numpy(dtype=float)
ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PPRE_TAU)
pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU)
phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5)
retrieve_cross = _first_crossing(y_retrieve_arr.astype(float), 0.5)
ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5)
phase_rises, phase_falls = _transition_count(phase_arr)
key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5)
key_retrieve_cross = _first_crossing(key_df["y_retrieve"].to_numpy(dtype=float), 0.5)
key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5)
return {
"episode_name": episode_name,
"description": description,
"num_dense_frames": int(len(frame_df)),
"num_keyframes": int(len(key_df)),
"phase_switch_rises": int(phase_rises),
"phase_switch_falls": int(phase_falls),
"ppre_cross_frame": int(ppre_cross),
"pext_cross_frame": int(pext_cross),
"phase_cross_frame": int(phase_cross),
"retrieve_cross_frame": int(retrieve_cross),
"ready_cross_frame": int(ready_cross),
"ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross),
"dense_boundary_error_to_retrieve_frames": float(abs(phase_cross - retrieve_cross))
if phase_cross >= 0 and retrieve_cross >= 0
else float("nan"),
"dense_boundary_error_frames": float(abs(phase_cross - ready_cross))
if phase_cross >= 0 and ready_cross >= 0
else float("nan"),
"dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df))
if phase_cross >= 0 and ready_cross >= 0
else float("nan"),
"key_boundary_error_to_retrieve_keyframes": float(abs(key_phase_cross - key_retrieve_cross))
if key_phase_cross >= 0 and key_retrieve_cross >= 0
else float("nan"),
"key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross))
if key_phase_cross >= 0 and key_ready_cross >= 0
else float("nan"),
"auroc_vret_ypre_three": _safe_auc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)),
"auprc_vret_ypre_three": _safe_auprc(y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float)),
"auroc_vret_ypre_full": _safe_auc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)),
"auprc_vret_ypre_full": _safe_auprc(y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float)),
"auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr),
"auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr),
"auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr),
"auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr),
"auroc_phase_yretrieve": _safe_auc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)),
"auprc_phase_yretrieve": _safe_auprc(y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float)),
"f1_phase_yretrieve": float(f1_score(y_retrieve_arr, phase_arr))
if np.any(y_retrieve_arr) and np.any(phase_arr)
else float("nan"),
"auroc_phase_yready": _safe_auc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)),
"auprc_phase_yready": _safe_auprc(y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float)),
"f1_phase_yready": float(f1_score(y_ready_arr, phase_arr))
if np.any(y_ready_arr) and np.any(phase_arr)
else float("nan"),
"baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr),
"baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr),
"baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr),
"baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr),
"baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis),
"baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis),
**interventions,
}
def _isolated_intervention_outcomes(
demo: Demo,
templates: MotionTemplates,
frame_index: int,
checkpoint_stride: int,
) -> Dict[str, Tuple[float, bool]]:
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
cache.step_to(frame_index)
snapshot = cache.snapshot()
base_score, base_success, _ = _extract_score_and_success(task, templates)
base = (base_score, base_success)
_restore_snapshot(task, snapshot)
_open_more_branch(task, templates)
open_score, open_success, _ = _extract_score_and_success(task, templates)
open_more = (open_score, open_success)
_restore_snapshot(task, snapshot)
_hold_open_branch(task, templates)
hold_score, hold_success, _ = _extract_score_and_success(task, templates)
hold_open = (hold_score, hold_success)
_restore_snapshot(task, snapshot)
_wait_branch(task)
wait_score, wait_success, _ = _extract_score_and_success(task, templates)
wait = (wait_score, wait_success)
return {
"base": base,
"open_more": open_more,
"hold_open": hold_open,
"wait": wait,
}
finally:
env.shutdown()
def _interventional_validity(
demo: Demo,
templates: MotionTemplates,
frame_df: pd.DataFrame,
checkpoint_stride: int,
) -> Dict[str, float]:
ready_indices = np.flatnonzero(frame_df["y_ready"].to_numpy(dtype=bool))
ready_onset = int(ready_indices[0]) if len(ready_indices) else len(frame_df) // 2
pre_ready_indices = sorted(
{
max(0, ready_onset - 20),
max(0, ready_onset - 10),
}
)
post_ready_indices = sorted(
{
ready_onset,
min(len(frame_df) - 1, ready_onset + 20),
}
)
stats = {
"pre_ready_open_more_increases_pext": 0,
"pre_ready_open_more_trials": 0,
"pre_ready_hold_open_increases_pext": 0,
"pre_ready_hold_open_trials": 0,
"pre_ready_extract_success": 0,
"pre_ready_extract_trials": 0,
"pre_ready_wait_extract_success": 0,
"pre_ready_wait_trials": 0,
"post_ready_extract_success": 0,
"post_ready_extract_trials": 0,
"post_ready_open_more_low_gain": 0,
"post_ready_open_more_trials": 0,
"post_ready_hold_open_low_gain": 0,
"post_ready_hold_open_trials": 0,
}
for frame_index in [*pre_ready_indices, *post_ready_indices]:
outcomes = _isolated_intervention_outcomes(
demo=demo,
templates=templates,
frame_index=frame_index,
checkpoint_stride=checkpoint_stride,
)
base_pext, base_extract_success = outcomes["base"]
pre_ready = frame_index in pre_ready_indices
open_pext, _ = outcomes["open_more"]
hold_pext, _ = outcomes["hold_open"]
_, wait_extract_success = outcomes["wait"]
if pre_ready:
stats["pre_ready_open_more_trials"] += 1
stats["pre_ready_hold_open_trials"] += 1
stats["pre_ready_extract_trials"] += 1
stats["pre_ready_wait_trials"] += 1
if open_pext > base_pext:
stats["pre_ready_open_more_increases_pext"] += 1
if hold_pext > base_pext:
stats["pre_ready_hold_open_increases_pext"] += 1
if base_extract_success:
stats["pre_ready_extract_success"] += 1
if wait_extract_success:
stats["pre_ready_wait_extract_success"] += 1
else:
stats["post_ready_extract_trials"] += 1
stats["post_ready_open_more_trials"] += 1
stats["post_ready_hold_open_trials"] += 1
if base_extract_success:
stats["post_ready_extract_success"] += 1
if (open_pext - base_pext) <= 0.05:
stats["post_ready_open_more_low_gain"] += 1
if (hold_pext - base_pext) <= 0.05:
stats["post_ready_hold_open_low_gain"] += 1
return {
key: float(value) for key, value in stats.items()
}
def _analyze_episode(
dataset_root: Path,
episode_dir: Path,
templates: MotionTemplates,
template_frames: Dict[str, int],
checkpoint_stride: int = 16,
max_frames: Optional[int] = None,
independent_replay: bool = False,
) -> EpisodeArtifacts:
demo = _load_demo(episode_dir)
descriptions = _load_descriptions(episode_dir)
env = _launch_replay_env()
try:
task = env.get_task(BimanualTakeTrayOutOfOven)
cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride)
cache.reset()
num_frames = len(demo) if max_frames is None else min(len(demo), max_frames)
rows: List[Dict[str, float]] = []
debug_rows: List[Dict[str, object]] = []
initial_snapshot = cache.checkpoints[0] if independent_replay else None
for frame_index in range(num_frames):
if independent_replay:
cache.restore_to_index(initial_snapshot, 0)
cache.step_to(frame_index)
frame_snapshot = cache.snapshot() if not independent_replay else None
state = cache.current_state()
row, debug_row = _build_frame_artifacts(
episode_dir, demo, templates, task, state
)
rows.append(row)
debug_rows.append(debug_row)
if frame_snapshot is not None:
cache.restore(frame_snapshot)
if (frame_index + 1) % 25 == 0 or (frame_index + 1) == num_frames:
print(
f"[{episode_dir.name}] analyzed {frame_index + 1}/{num_frames} dense frames",
flush=True,
)
frame_df = pd.DataFrame(rows)
frame_df = _annotate_phase_columns(frame_df)
keyframes = [index for index in _keypoint_discovery(demo) if index < num_frames]
key_df = _keyframe_subset(frame_df, keyframes)
finally:
env.shutdown()
interventions = _interventional_validity(
demo=demo,
templates=templates,
frame_df=frame_df,
checkpoint_stride=checkpoint_stride,
)
metrics = _episode_metrics_from_frames(
frame_df=frame_df,
key_df=key_df,
episode_name=episode_dir.name,
description=descriptions[0],
interventions=interventions,
)
return EpisodeArtifacts(
episode_name=episode_dir.name,
dense=frame_df,
keyframes=key_df,
metrics=metrics,
template_frames=template_frames,
debug_rows=debug_rows,
)
def _aggregate_summary(episode_metrics: List[Dict[str, object]]) -> Dict[str, object]:
frame = pd.DataFrame(episode_metrics)
numeric = frame.select_dtypes(include=[np.number])
summary = {
"num_episodes": int(len(frame)),
"mean_metrics": numeric.mean(numeric_only=True).to_dict(),
"median_metrics": numeric.median(numeric_only=True).to_dict(),
"single_switch_rate": float((frame["phase_switch_rises"] == 1).mean())
if len(frame)
else float("nan"),
"reversion_rate": float((frame["phase_switch_falls"] > 0).mean())
if len(frame)
else float("nan"),
"ordering_ok_rate": float(frame["ordering_ok"].mean()) if len(frame) else float("nan"),
}
return summary
def run_study(
dataset_root: str,
result_dir: str,
max_episodes: Optional[int] = None,
checkpoint_stride: int = 16,
max_frames: Optional[int] = None,
episode_offset: int = 0,
template_episode_index: int = 0,
episode_indices: Optional[Sequence[int]] = None,
independent_replay: bool = True,
per_episode_templates: bool = False,
) -> Dict[str, object]:
dataset_path = Path(dataset_root)
result_path = Path(result_dir)
result_path.mkdir(parents=True, exist_ok=True)
all_episode_dirs = _episode_dirs(dataset_path)
if not all_episode_dirs:
raise RuntimeError(f"no episodes available under {dataset_root}")
if not (0 <= template_episode_index < len(all_episode_dirs)):
raise ValueError(
f"template_episode_index {template_episode_index} outside available range 0..{len(all_episode_dirs) - 1}"
)
selected_episode_indices: List[int]
if episode_indices is not None:
selected_episode_indices = []
seen_episode_indices = set()
for raw_index in episode_indices:
episode_index = int(raw_index)
if not (0 <= episode_index < len(all_episode_dirs)):
raise ValueError(
f"episode index {episode_index} outside available range 0..{len(all_episode_dirs) - 1}"
)
if episode_index in seen_episode_indices:
continue
selected_episode_indices.append(episode_index)
seen_episode_indices.add(episode_index)
episode_dirs = [all_episode_dirs[index] for index in selected_episode_indices]
else:
episode_dirs = all_episode_dirs[episode_offset:]
if max_episodes is not None:
episode_dirs = episode_dirs[:max_episodes]
selected_episode_indices = [
all_episode_dirs.index(episode_dir) for episode_dir in episode_dirs
]
if not episode_dirs:
raise RuntimeError(
f"no episodes selected under {dataset_root} with offset={episode_offset} max_episodes={max_episodes} episode_indices={episode_indices}"
)
template_episode_dir = all_episode_dirs[template_episode_index]
if not per_episode_templates:
templates, template_frames = _derive_templates(dataset_path, template_episode_dir)
with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle:
json.dump(
{
"template_mode": "shared",
"templates": templates.to_json(),
"template_episode": template_episode_dir.name,
"template_frames": template_frames,
"episode_offset": episode_offset,
"selected_episode_indices": selected_episode_indices,
},
handle,
indent=2,
)
else:
with result_path.joinpath("templates.json").open("w", encoding="utf-8") as handle:
json.dump(
{
"template_mode": "per_episode",
"episode_offset": episode_offset,
"selected_episode_indices": selected_episode_indices,
},
handle,
indent=2,
)
episode_metrics: List[Dict[str, object]] = []
for episode_dir in episode_dirs:
if per_episode_templates:
episode_templates, episode_template_frames = _derive_templates(dataset_path, episode_dir)
with result_path.joinpath(f"{episode_dir.name}.templates.json").open(
"w", encoding="utf-8"
) as handle:
json.dump(
{
"template_mode": "per_episode",
"templates": episode_templates.to_json(),
"template_episode": episode_dir.name,
"template_frames": episode_template_frames,
},
handle,
indent=2,
)
else:
episode_templates = templates
episode_template_frames = template_frames
artifacts = _analyze_episode(
dataset_path,
episode_dir,
episode_templates,
episode_template_frames,
checkpoint_stride=checkpoint_stride,
max_frames=max_frames,
independent_replay=independent_replay,
)
artifacts.dense.to_csv(result_path.joinpath(f"{episode_dir.name}.dense.csv"), index=False)
artifacts.keyframes.to_csv(
result_path.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False
)
if artifacts.debug_rows is not None:
with result_path.joinpath(f"{episode_dir.name}.debug.jsonl").open(
"w", encoding="utf-8"
) as handle:
for debug_row in artifacts.debug_rows:
handle.write(json.dumps(_json_safe(debug_row)))
handle.write("\n")
with result_path.joinpath(f"{episode_dir.name}.metrics.json").open(
"w", encoding="utf-8"
) as handle:
json.dump(artifacts.metrics, handle, indent=2)
episode_metrics.append(artifacts.metrics)
summary = _aggregate_summary(episode_metrics)
with result_path.joinpath("summary.json").open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
return summary
def main(argv: Optional[Sequence[str]] = None) -> int:
def _parse_episode_indices(value: str) -> List[int]:
indices: List[int] = []
for chunk in value.split(","):
chunk = chunk.strip()
if not chunk:
continue
indices.append(int(chunk))
if not indices:
raise argparse.ArgumentTypeError("episode-indices must contain at least one integer")
return indices
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset-root",
default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
)
parser.add_argument(
"--result-dir",
default="/workspace/reveal_retrieve_label_study/results/oven_first_pass",
)
parser.add_argument("--max-episodes", type=int, default=1)
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--max-frames", type=int)
parser.add_argument("--episode-offset", type=int, default=0)
parser.add_argument("--template-episode-index", type=int, default=0)
parser.add_argument("--episode-indices", type=_parse_episode_indices)
parser.add_argument(
"--independent-replay",
dest="independent_replay",
action="store_true",
help="Replay each frame from the demo start/checkpoint to avoid simulator drift.",
)
parser.add_argument(
"--sequential-replay",
dest="independent_replay",
action="store_false",
help="Reuse the live replay state across frames for speed at the cost of more drift.",
)
parser.add_argument("--per-episode-templates", action="store_true")
parser.set_defaults(independent_replay=True)
args = parser.parse_args(argv)
summary = run_study(
dataset_root=args.dataset_root,
result_dir=args.result_dir,
max_episodes=args.max_episodes,
checkpoint_stride=args.checkpoint_stride,
max_frames=args.max_frames,
episode_offset=args.episode_offset,
template_episode_index=args.template_episode_index,
episode_indices=args.episode_indices,
independent_replay=args.independent_replay,
per_episode_templates=args.per_episode_templates,
)
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())