| from pathlib import Path |
| import argparse |
| import json |
| import math |
| import os |
| import pickle |
| import signal |
| import subprocess |
| import sys |
| import time |
| from typing import Dict, List, Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.metrics import f1_score |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from rr_label_study.oven_study import ( |
| BimanualTakeTrayOutOfOven, |
| DEFAULT_PPRE_TAU, |
| DEFAULT_PEXT_TAU, |
| ReplayCache, |
| _aggregate_summary, |
| _annotate_phase_columns, |
| _derive_templates, |
| _first_crossing, |
| _interventional_validity, |
| _keyframe_subset, |
| _keypoint_discovery, |
| _launch_replay_env, |
| _load_demo, |
| _load_descriptions, |
| _safe_auc, |
| _safe_auprc, |
| _transition_count, |
| ) |
|
|
|
|
| def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen: |
| log_handle = log_path.open("w", encoding="utf-8") |
| return subprocess.Popen( |
| [ |
| "Xvfb", |
| f":{display_num}", |
| "-screen", |
| "0", |
| "1280x1024x24", |
| "+extension", |
| "GLX", |
| "+render", |
| "-noreset", |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _stop_process(process: Optional[subprocess.Popen]) -> None: |
| if process is None or process.poll() is not None: |
| return |
| try: |
| os.killpg(process.pid, signal.SIGTERM) |
| except ProcessLookupError: |
| return |
| try: |
| process.wait(timeout=10) |
| except subprocess.TimeoutExpired: |
| try: |
| os.killpg(process.pid, signal.SIGKILL) |
| except ProcessLookupError: |
| pass |
|
|
|
|
| def _spawn_frame_batch_job( |
| display_num: int, |
| episode_dir: Path, |
| templates_pkl: Path, |
| frame_indices: List[int], |
| checkpoint_stride: int, |
| output_dir: Path, |
| ) -> subprocess.Popen: |
| runtime_dir = Path(f"/tmp/rr_label_study_frame_display_{display_num}") |
| runtime_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["DISPLAY"] = f":{display_num}" |
| env["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" |
| env["LD_LIBRARY_PATH"] = f"/workspace/coppelia_sim:{env.get('LD_LIBRARY_PATH', '')}" |
| env["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" |
| env["XDG_RUNTIME_DIR"] = str(runtime_dir) |
| return subprocess.Popen( |
| [ |
| sys.executable, |
| str(PROJECT_ROOT.joinpath("scripts", "run_oven_frame_batch.py")), |
| "--episode-dir", |
| str(episode_dir), |
| "--templates-pkl", |
| str(templates_pkl), |
| "--frame-indices", |
| *[str(frame_index) for frame_index in frame_indices], |
| "--checkpoint-stride", |
| str(checkpoint_stride), |
| "--output-dir", |
| str(output_dir), |
| ], |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| cwd=str(PROJECT_ROOT), |
| env=env, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _recompute_episode_metrics( |
| frame_df: pd.DataFrame, |
| episode_dir: Path, |
| demo, |
| descriptions: List[str], |
| templates, |
| template_frames: Dict[str, int], |
| checkpoint_stride: int, |
| ) -> Tuple[pd.DataFrame, Dict[str, object]]: |
| frame_df = _annotate_phase_columns(frame_df) |
|
|
| keyframes = [index for index in _keypoint_discovery(demo) if index < len(frame_df)] |
| key_df = _keyframe_subset(frame_df, keyframes) |
|
|
| y_pre_arr = frame_df["y_pre"].to_numpy(dtype=int) |
| y_ext_arr = frame_df["y_ext"].to_numpy(dtype=int) |
| y_retrieve_arr = frame_df["y_retrieve"].to_numpy(dtype=int) |
| y_ready_arr = frame_df["y_ready"].to_numpy(dtype=int) |
| p_pre_arr = frame_df["p_pre"].to_numpy(dtype=float) |
| p_ext_arr = frame_df["p_ext"].to_numpy(dtype=float) |
| phase_arr = frame_df["phase_switch"].to_numpy(dtype=int) |
| whole_vis = frame_df["full_view_whole_tray_visibility"].to_numpy(dtype=float) |
| door_angle_arr = frame_df["door_angle"].to_numpy(dtype=float) |
| time_arr = frame_df["time_norm"].to_numpy(dtype=float) |
|
|
| ppre_cross = _first_crossing(p_pre_arr, DEFAULT_PPRE_TAU) |
| pext_cross = _first_crossing(p_ext_arr, DEFAULT_PEXT_TAU) |
| phase_cross = _first_crossing(frame_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| retrieve_cross = _first_crossing(y_retrieve_arr.astype(float), 0.5) |
| ready_cross = _first_crossing(y_ready_arr.astype(float), 0.5) |
| phase_rises, phase_falls = _transition_count(phase_arr) |
| key_phase_cross = _first_crossing(key_df["phase_switch"].to_numpy(dtype=float), 0.5) |
| key_retrieve_cross = _first_crossing(key_df["y_retrieve"].to_numpy(dtype=float), 0.5) |
| key_ready_cross = _first_crossing(key_df["y_ready"].to_numpy(dtype=float), 0.5) |
|
|
| env = _launch_replay_env() |
| try: |
| task = env.get_task(BimanualTakeTrayOutOfOven) |
| cache = ReplayCache(task, demo, checkpoint_stride=checkpoint_stride) |
| cache.reset() |
| interventions = _interventional_validity( |
| task, |
| cache, |
| episode_dir, |
| demo, |
| templates, |
| frame_df, |
| ) |
| finally: |
| env.shutdown() |
|
|
| metrics = { |
| "episode_name": episode_dir.name, |
| "description": descriptions[0], |
| "num_dense_frames": int(len(frame_df)), |
| "num_keyframes": int(len(key_df)), |
| "phase_switch_rises": int(phase_rises), |
| "phase_switch_falls": int(phase_falls), |
| "ppre_cross_frame": int(ppre_cross), |
| "pext_cross_frame": int(pext_cross), |
| "phase_cross_frame": int(phase_cross), |
| "retrieve_cross_frame": int(retrieve_cross), |
| "ready_cross_frame": int(ready_cross), |
| "ordering_ok": bool(ppre_cross == -1 or pext_cross == -1 or ppre_cross <= pext_cross), |
| "dense_boundary_error_to_retrieve_frames": float(abs(phase_cross - retrieve_cross)) |
| if phase_cross >= 0 and retrieve_cross >= 0 |
| else float("nan"), |
| "dense_boundary_error_frames": float(abs(phase_cross - ready_cross)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "dense_boundary_error_fraction": float(abs(phase_cross - ready_cross) / len(frame_df)) |
| if phase_cross >= 0 and ready_cross >= 0 |
| else float("nan"), |
| "key_boundary_error_to_retrieve_keyframes": float(abs(key_phase_cross - key_retrieve_cross)) |
| if key_phase_cross >= 0 and key_retrieve_cross >= 0 |
| else float("nan"), |
| "key_boundary_error_keyframes": float(abs(key_phase_cross - key_ready_cross)) |
| if key_phase_cross >= 0 and key_ready_cross >= 0 |
| else float("nan"), |
| "auroc_vret_ypre_three": _safe_auc( |
| y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float) |
| ), |
| "auprc_vret_ypre_three": _safe_auprc( |
| y_pre_arr, frame_df["three_view_visibility"].to_numpy(dtype=float) |
| ), |
| "auroc_vret_ypre_full": _safe_auc( |
| y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float) |
| ), |
| "auprc_vret_ypre_full": _safe_auprc( |
| y_pre_arr, frame_df["full_view_visibility"].to_numpy(dtype=float) |
| ), |
| "auroc_ppre_ypre": _safe_auc(y_pre_arr, p_pre_arr), |
| "auprc_ppre_ypre": _safe_auprc(y_pre_arr, p_pre_arr), |
| "auroc_pext_yext": _safe_auc(y_ext_arr, p_ext_arr), |
| "auprc_pext_yext": _safe_auprc(y_ext_arr, p_ext_arr), |
| "auroc_phase_yretrieve": _safe_auc( |
| y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float) |
| ), |
| "auprc_phase_yretrieve": _safe_auprc( |
| y_retrieve_arr, frame_df["phase_score"].to_numpy(dtype=float) |
| ), |
| "f1_phase_yretrieve": float(f1_score(y_retrieve_arr, phase_arr)) |
| if np.any(y_retrieve_arr) and np.any(phase_arr) |
| else float("nan"), |
| "auroc_phase_yready": _safe_auc( |
| y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float) |
| ), |
| "auprc_phase_yready": _safe_auprc( |
| y_ready_arr, frame_df["phase_score"].to_numpy(dtype=float) |
| ), |
| "f1_phase_yready": float(f1_score(y_ready_arr, phase_arr)) |
| if np.any(y_ready_arr) and np.any(phase_arr) |
| else float("nan"), |
| "baseline_auroc_door_yext": _safe_auc(y_ext_arr, door_angle_arr), |
| "baseline_auprc_door_yext": _safe_auprc(y_ext_arr, door_angle_arr), |
| "baseline_auroc_time_yext": _safe_auc(y_ext_arr, time_arr), |
| "baseline_auprc_time_yext": _safe_auprc(y_ext_arr, time_arr), |
| "baseline_auroc_whole_vis_yext": _safe_auc(y_ext_arr, whole_vis), |
| "baseline_auprc_whole_vis_yext": _safe_auprc(y_ext_arr, whole_vis), |
| **interventions, |
| } |
| return key_df, metrics |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dataset-root", required=True) |
| parser.add_argument("--episode-dir", required=True) |
| parser.add_argument("--input-dense-csv", required=True) |
| parser.add_argument("--output-dir", required=True) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--num-workers", type=int, default=4) |
| parser.add_argument("--base-display", type=int, default=170) |
| parser.add_argument("--template-episode-dir") |
| args = parser.parse_args() |
|
|
| dataset_root = Path(args.dataset_root) |
| episode_dir = Path(args.episode_dir) |
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
| base_df = pd.read_csv(args.input_dense_csv) |
| demo = _load_demo(episode_dir) |
| descriptions = _load_descriptions(episode_dir) |
|
|
| template_episode_dir = ( |
| Path(args.template_episode_dir) if args.template_episode_dir else episode_dir |
| ) |
| templates, template_frames = _derive_templates(dataset_root, template_episode_dir) |
| templates_pkl = output_dir.joinpath("templates.pkl") |
| with templates_pkl.open("wb") as handle: |
| pickle.dump(templates, handle) |
| with output_dir.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump( |
| { |
| "templates": templates.to_json(), |
| "template_episode": template_episode_dir.name, |
| "template_frames": template_frames, |
| }, |
| handle, |
| indent=2, |
| ) |
|
|
| left_closed = np.array([float(demo[i].left.gripper_open) < 0.5 for i in range(len(base_df))]) |
| onset_candidates = np.flatnonzero( |
| (base_df["p_ext"].to_numpy(dtype=float) >= DEFAULT_PEXT_TAU) |
| | (base_df["y_ext"].to_numpy(dtype=float) > 0.5) |
| ) |
| onset = int(onset_candidates[0]) if len(onset_candidates) else 0 |
| suspicious = np.flatnonzero( |
| (np.arange(len(base_df)) >= onset) |
| & left_closed |
| & ( |
| (base_df["p_pre"].to_numpy(dtype=float) < 0.9) |
| | (base_df["y_ext"].to_numpy(dtype=float) < 0.5) |
| ) |
| ).tolist() |
|
|
| frame_json_dir = output_dir.joinpath("repaired_frames") |
| frame_json_dir.mkdir(parents=True, exist_ok=True) |
| xvfb_procs: List[subprocess.Popen] = [] |
| displays = [args.base_display + i for i in range(min(args.num_workers, max(1, len(suspicious))))] |
| try: |
| for display_num in displays: |
| xvfb_procs.append(_launch_xvfb(display_num, output_dir.joinpath(f"xvfb_{display_num}.log"))) |
| time.sleep(1.0) |
|
|
| frame_batches = [ |
| [int(frame_index) for frame_index in batch.tolist()] |
| for batch in np.array_split(np.asarray(suspicious, dtype=int), len(displays)) |
| if len(batch) |
| ] |
| active: Dict[int, Tuple[List[int], subprocess.Popen]] = {} |
| for display_num, frame_batch in zip(displays, frame_batches): |
| process = _spawn_frame_batch_job( |
| display_num=display_num, |
| episode_dir=episode_dir, |
| templates_pkl=templates_pkl, |
| frame_indices=frame_batch, |
| checkpoint_stride=args.checkpoint_stride, |
| output_dir=frame_json_dir, |
| ) |
| active[display_num] = (frame_batch, process) |
| while active: |
| time.sleep(0.5) |
| finished = [] |
| for display_num, (frame_batch, process) in active.items(): |
| return_code = process.poll() |
| if return_code is None: |
| continue |
| missing = [ |
| frame_index |
| for frame_index in frame_batch |
| if not frame_json_dir.joinpath(f"frame_{frame_index:04d}.json").exists() |
| ] |
| if return_code != 0 or missing: |
| raise RuntimeError( |
| f"display :{display_num} repair failed for frames {frame_batch}; missing={missing}" |
| ) |
| finished.append(display_num) |
| for display_num in finished: |
| active.pop(display_num) |
| finally: |
| for _, process in list(active.values()) if "active" in locals() else []: |
| _stop_process(process) |
| for xvfb in xvfb_procs: |
| _stop_process(xvfb) |
|
|
| corrected_df = base_df.copy() |
| for frame_index in suspicious: |
| row_path = frame_json_dir.joinpath(f"frame_{frame_index:04d}.json") |
| with row_path.open("r", encoding="utf-8") as handle: |
| row = json.load(handle) |
| for key, value in row.items(): |
| corrected_df.at[frame_index, key] = value |
|
|
| key_df, metrics = _recompute_episode_metrics( |
| frame_df=corrected_df, |
| episode_dir=episode_dir, |
| demo=demo, |
| descriptions=descriptions, |
| templates=templates, |
| template_frames=template_frames, |
| checkpoint_stride=args.checkpoint_stride, |
| ) |
|
|
| corrected_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.dense.csv"), index=False) |
| key_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False) |
| with output_dir.joinpath(f"{episode_dir.name}.metrics.json").open("w", encoding="utf-8") as handle: |
| json.dump(metrics, handle, indent=2) |
| summary = _aggregate_summary([metrics]) |
| with output_dir.joinpath("summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
| print(json.dumps({"suspicious_frames": suspicious, "summary": summary}, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|