| from pathlib import Path |
| import argparse |
| import json |
| import math |
| import os |
| import pickle |
| import signal |
| import subprocess |
| import sys |
| import time |
| from typing import Dict, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
|
|
| def _configure_thread_env() -> None: |
| defaults = { |
| "OMP_NUM_THREADS": "1", |
| "OPENBLAS_NUM_THREADS": "1", |
| "MKL_NUM_THREADS": "1", |
| "NUMEXPR_NUM_THREADS": "1", |
| "VECLIB_MAXIMUM_THREADS": "1", |
| "BLIS_NUM_THREADS": "1", |
| } |
| for key, value in defaults.items(): |
| os.environ.setdefault(key, value) |
|
|
|
|
| def _configure_coppeliasim_env() -> None: |
| coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") |
| ld_library_path_parts = [ |
| part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part |
| ] |
| if coppeliasim_root not in ld_library_path_parts: |
| ld_library_path_parts.insert(0, coppeliasim_root) |
| os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) |
|
|
|
|
| _configure_thread_env() |
| _configure_coppeliasim_env() |
|
|
| from rr_label_study.oven_study import ( |
| MotionTemplates, |
| _aggregate_summary, |
| _annotate_phase_columns, |
| _derive_templates, |
| _episode_metrics_from_frames, |
| _interventional_validity, |
| _keyframe_subset, |
| _keypoint_discovery, |
| _load_demo, |
| _load_descriptions, |
| ) |
|
|
|
|
| def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen: |
| log_handle = log_path.open("w", encoding="utf-8") |
| return subprocess.Popen( |
| [ |
| "Xvfb", |
| f":{display_num}", |
| "-screen", |
| "0", |
| "1280x1024x24", |
| "+extension", |
| "GLX", |
| "+render", |
| "-noreset", |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _stop_process(process: Optional[subprocess.Popen]) -> None: |
| if process is None or process.poll() is not None: |
| return |
| try: |
| os.killpg(process.pid, signal.SIGTERM) |
| except ProcessLookupError: |
| return |
| try: |
| process.wait(timeout=10) |
| except subprocess.TimeoutExpired: |
| try: |
| os.killpg(process.pid, signal.SIGKILL) |
| except ProcessLookupError: |
| pass |
|
|
|
|
| def _spawn_frame_batch_job( |
| display_num: int, |
| episode_dir: Path, |
| templates_pkl: Path, |
| frame_indices: Sequence[int], |
| checkpoint_stride: int, |
| output_dir: Path, |
| ) -> subprocess.Popen: |
| runtime_dir = Path(f"/tmp/rr_label_study_parallel_display_{display_num}") |
| runtime_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["DISPLAY"] = f":{display_num}" |
| env["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" |
| env["LD_LIBRARY_PATH"] = f"/workspace/coppelia_sim:{env.get('LD_LIBRARY_PATH', '')}" |
| env["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" |
| env["XDG_RUNTIME_DIR"] = str(runtime_dir) |
| env["PYTHONUNBUFFERED"] = "1" |
| env["OMP_NUM_THREADS"] = "1" |
| env["OPENBLAS_NUM_THREADS"] = "1" |
| env["MKL_NUM_THREADS"] = "1" |
| env["NUMEXPR_NUM_THREADS"] = "1" |
| env["VECLIB_MAXIMUM_THREADS"] = "1" |
| env["BLIS_NUM_THREADS"] = "1" |
| worker_log = output_dir.parent.joinpath(f"worker_{display_num}.log").open( |
| "w", encoding="utf-8" |
| ) |
| return subprocess.Popen( |
| [ |
| sys.executable, |
| str(PROJECT_ROOT.joinpath("scripts", "run_oven_frame_batch.py")), |
| "--episode-dir", |
| str(episode_dir), |
| "--templates-pkl", |
| str(templates_pkl), |
| "--frame-indices", |
| *[str(frame_index) for frame_index in frame_indices], |
| "--checkpoint-stride", |
| str(checkpoint_stride), |
| "--output-dir", |
| str(output_dir), |
| "--independent-replay", |
| ], |
| stdout=worker_log, |
| stderr=subprocess.STDOUT, |
| cwd=str(PROJECT_ROOT), |
| env=env, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _chunk_frame_indices(frame_indices: Sequence[int], num_workers: int) -> List[List[int]]: |
| if not frame_indices: |
| return [] |
| worker_count = min(max(1, num_workers), len(frame_indices)) |
| chunk_size = math.ceil(len(frame_indices) / worker_count) |
| chunks: List[List[int]] = [] |
| for worker_index in range(worker_count): |
| start = worker_index * chunk_size |
| chunk = list(frame_indices[start : start + chunk_size]) |
| if chunk: |
| chunks.append(chunk) |
| return chunks |
|
|
|
|
| def _collect_rows(frame_json_dir: Path, num_frames: int) -> pd.DataFrame: |
| rows: List[Dict[str, float]] = [] |
| for frame_index in range(num_frames): |
| row_path = frame_json_dir.joinpath(f"frame_{frame_index:04d}.json") |
| if not row_path.exists(): |
| raise RuntimeError(f"missing recomputed frame output: {row_path}") |
| with row_path.open("r", encoding="utf-8") as handle: |
| rows.append(json.load(handle)) |
| frame_df = pd.DataFrame(rows).sort_values("frame_index").reset_index(drop=True) |
| return frame_df |
|
|
|
|
| def _collect_debug_rows(frame_json_dir: Path, num_frames: int) -> List[Dict[str, object]]: |
| rows: List[Dict[str, object]] = [] |
| for frame_index in range(num_frames): |
| row_path = frame_json_dir.joinpath(f"frame_{frame_index:04d}.debug.json") |
| if not row_path.exists(): |
| raise RuntimeError(f"missing recomputed frame debug output: {row_path}") |
| with row_path.open("r", encoding="utf-8") as handle: |
| rows.append(json.load(handle)) |
| return rows |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dataset-root", required=True) |
| parser.add_argument("--episode-dir", required=True) |
| parser.add_argument("--output-dir", required=True) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--num-workers", type=int, default=8) |
| parser.add_argument("--base-display", type=int, default=380) |
| parser.add_argument("--template-episode-dir") |
| parser.add_argument("--templates-json") |
| parser.add_argument("--stagger-seconds", type=float, default=0.15) |
| parser.add_argument("--keep-frame-json", action="store_true") |
| args = parser.parse_args() |
|
|
| dataset_root = Path(args.dataset_root) |
| episode_dir = Path(args.episode_dir) |
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| demo = _load_demo(episode_dir) |
| descriptions = _load_descriptions(episode_dir) |
| num_frames = len(demo) |
| if args.templates_json: |
| templates_payload = json.loads(Path(args.templates_json).read_text(encoding="utf-8")) |
| templates = MotionTemplates.from_json(templates_payload["templates"]) |
| template_frames = dict(templates_payload.get("template_frames", {})) |
| template_episode_dir = ( |
| Path(args.template_episode_dir) |
| if args.template_episode_dir |
| else episode_dir |
| ) |
| template_metadata = { |
| "template_mode": templates_payload.get("template_mode", "external"), |
| "template_episode": templates_payload.get( |
| "template_episode", template_episode_dir.name |
| ), |
| "template_frames": template_frames, |
| "templates": templates.to_json(), |
| "template_source_json": str(Path(args.templates_json).resolve()), |
| } |
| else: |
| template_episode_dir = ( |
| Path(args.template_episode_dir) if args.template_episode_dir else episode_dir |
| ) |
| templates, template_frames = _derive_templates(dataset_root, template_episode_dir) |
| template_metadata = { |
| "template_mode": "per_episode", |
| "template_episode": template_episode_dir.name, |
| "template_frames": template_frames, |
| "templates": templates.to_json(), |
| } |
|
|
| templates_pkl = output_dir.joinpath("templates.pkl") |
| with templates_pkl.open("wb") as handle: |
| pickle.dump(templates, handle) |
| with output_dir.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump(template_metadata, handle, indent=2) |
|
|
| frame_json_dir = output_dir.joinpath("frame_rows") |
| frame_json_dir.mkdir(parents=True, exist_ok=True) |
| frame_indices = list(range(num_frames)) |
| frame_chunks = _chunk_frame_indices(frame_indices, args.num_workers) |
| displays = [args.base_display + index for index in range(len(frame_chunks))] |
| xvfb_procs: List[subprocess.Popen] = [] |
| active: Dict[int, Tuple[List[int], subprocess.Popen]] = {} |
|
|
| try: |
| for display_num in displays: |
| xvfb_procs.append( |
| _launch_xvfb(display_num, output_dir.joinpath(f"xvfb_{display_num}.log")) |
| ) |
| time.sleep(1.0) |
|
|
| for display_num, frame_chunk in zip(displays, frame_chunks): |
| process = _spawn_frame_batch_job( |
| display_num=display_num, |
| episode_dir=episode_dir, |
| templates_pkl=templates_pkl, |
| frame_indices=frame_chunk, |
| checkpoint_stride=args.checkpoint_stride, |
| output_dir=frame_json_dir, |
| ) |
| active[display_num] = (frame_chunk, process) |
| if args.stagger_seconds > 0: |
| time.sleep(args.stagger_seconds) |
|
|
| while active: |
| time.sleep(1.0) |
| finished: List[int] = [] |
| for display_num, (frame_chunk, process) in active.items(): |
| return_code = process.poll() |
| if return_code is None: |
| continue |
| missing = [ |
| frame_index |
| for frame_index in frame_chunk |
| if not frame_json_dir.joinpath(f"frame_{frame_index:04d}.json").exists() |
| ] |
| if return_code != 0 or missing: |
| raise RuntimeError( |
| f"display :{display_num} failed for frames {frame_chunk[:3]}...; missing={missing[:8]}" |
| ) |
| finished.append(display_num) |
| for display_num in finished: |
| active.pop(display_num) |
| finally: |
| for _, process in list(active.values()): |
| _stop_process(process) |
| for xvfb in xvfb_procs: |
| _stop_process(xvfb) |
|
|
| frame_df = _collect_rows(frame_json_dir, num_frames) |
| debug_rows = _collect_debug_rows(frame_json_dir, num_frames) |
| frame_df = _annotate_phase_columns(frame_df) |
| keyframes = [index for index in _keypoint_discovery(demo) if index < len(frame_df)] |
| key_df = _keyframe_subset(frame_df, keyframes) |
| interventions = _interventional_validity( |
| demo=demo, |
| templates=templates, |
| frame_df=frame_df, |
| checkpoint_stride=args.checkpoint_stride, |
| ) |
| metrics = _episode_metrics_from_frames( |
| frame_df=frame_df, |
| key_df=key_df, |
| episode_name=episode_dir.name, |
| description=descriptions[0], |
| interventions=interventions, |
| ) |
|
|
| frame_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.dense.csv"), index=False) |
| key_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False) |
| with output_dir.joinpath(f"{episode_dir.name}.debug.jsonl").open( |
| "w", encoding="utf-8" |
| ) as handle: |
| for row in debug_rows: |
| handle.write(json.dumps(row)) |
| handle.write("\n") |
| with output_dir.joinpath(f"{episode_dir.name}.metrics.json").open("w", encoding="utf-8") as handle: |
| json.dump(metrics, handle, indent=2) |
| summary = _aggregate_summary([metrics]) |
| with output_dir.joinpath("summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
|
|
| if not args.keep_frame_json: |
| for row_path in frame_json_dir.glob("frame_*.json*"): |
| row_path.unlink() |
| frame_json_dir.rmdir() |
|
|
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|