import argparse import json import math import os import signal import subprocess import sys import time from pathlib import Path from typing import Dict, List, Optional, Sequence, Tuple PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) def _configure_coppeliasim_env() -> None: coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") ld_library_path_parts = [ part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part ] if coppeliasim_root not in ld_library_path_parts: ld_library_path_parts.insert(0, coppeliasim_root) os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) _configure_coppeliasim_env() from rr_label_study.oven_study import _aggregate_summary, _episode_dirs def _select_episode_indices( total_episodes: int, episode_offset: int, max_episodes: Optional[int], episode_indices: Optional[Sequence[int]], ) -> List[int]: if episode_indices is not None: selected: List[int] = [] seen = set() for raw_index in episode_indices: episode_index = int(raw_index) if not (0 <= episode_index < total_episodes): raise ValueError( f"episode index {episode_index} outside available range 0..{total_episodes - 1}" ) if episode_index in seen: continue selected.append(episode_index) seen.add(episode_index) return selected remaining = max(0, total_episodes - episode_offset) if max_episodes is not None: remaining = min(remaining, max_episodes) if remaining <= 0: return [] return list(range(episode_offset, episode_offset + remaining)) def _chunk_episode_indices( episode_indices: Sequence[int], num_workers: int, ) -> List[List[int]]: if not episode_indices: return [] worker_count = min(num_workers, len(episode_indices)) chunk_size = math.ceil(len(episode_indices) / worker_count) specs: List[List[int]] = [] for worker_index in range(worker_count): start = worker_index * chunk_size chunk = list(episode_indices[start : start + chunk_size]) if chunk: specs.append(chunk) return specs def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen: log_handle = log_path.open("w", encoding="utf-8") return subprocess.Popen( [ "Xvfb", f":{display_num}", "-screen", "0", "1280x1024x24", "+extension", "GLX", "+render", "-noreset", ], stdout=log_handle, stderr=subprocess.STDOUT, start_new_session=True, ) def _launch_worker( worker_dir: Path, display_num: int, dataset_root: str, episode_indices: Sequence[int], checkpoint_stride: int, template_episode_index: int, max_frames: Optional[int], independent_replay: bool, per_episode_templates: bool, thread_count: int, ) -> Tuple[subprocess.Popen, subprocess.Popen]: worker_dir.mkdir(parents=True, exist_ok=True) xvfb = _launch_xvfb(display_num, worker_dir.joinpath("xvfb.log")) time.sleep(1.0) runtime_dir = Path(f"/tmp/rr_label_study_display_{display_num}") runtime_dir.mkdir(parents=True, exist_ok=True) command = [ sys.executable, str(PROJECT_ROOT.joinpath("scripts", "run_oven_label_study.py")), "--dataset-root", dataset_root, "--result-dir", str(worker_dir), "--checkpoint-stride", str(checkpoint_stride), "--template-episode-index", str(template_episode_index), "--episode-indices", ",".join(str(index) for index in episode_indices), ] if max_frames is not None: command.extend(["--max-frames", str(max_frames)]) if not independent_replay: command.append("--sequential-replay") if per_episode_templates: command.append("--per-episode-templates") env = os.environ.copy() env["DISPLAY"] = f":{display_num}" env["XDG_RUNTIME_DIR"] = str(runtime_dir) env["PYTHONUNBUFFERED"] = "1" coppeliasim_root = env.get("COPPELIASIM_ROOT", "/workspace/coppelia_sim") env["COPPELIASIM_ROOT"] = coppeliasim_root ld_library_path_parts = [ part for part in env.get("LD_LIBRARY_PATH", "").split(":") if part ] if coppeliasim_root not in ld_library_path_parts: ld_library_path_parts.insert(0, coppeliasim_root) env["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) thread_count_str = str(thread_count) env["OMP_NUM_THREADS"] = thread_count_str env["OPENBLAS_NUM_THREADS"] = thread_count_str env["MKL_NUM_THREADS"] = thread_count_str env["NUMEXPR_NUM_THREADS"] = thread_count_str env["VECLIB_MAXIMUM_THREADS"] = thread_count_str env["BLIS_NUM_THREADS"] = thread_count_str worker_log = worker_dir.joinpath("worker.log").open("w", encoding="utf-8") process = subprocess.Popen( command, stdout=worker_log, stderr=subprocess.STDOUT, env=env, cwd=str(PROJECT_ROOT), start_new_session=True, ) return xvfb, process def _stop_process(process: subprocess.Popen) -> None: if process.poll() is not None: return try: os.killpg(process.pid, signal.SIGTERM) except ProcessLookupError: return try: process.wait(timeout=10) except subprocess.TimeoutExpired: try: os.killpg(process.pid, signal.SIGKILL) except ProcessLookupError: pass def _collect_metrics(base_result_dir: Path) -> List[Dict[str, object]]: metrics: List[Dict[str, object]] = [] for metrics_path in sorted(base_result_dir.glob("worker_*/episode*.metrics.json")): with metrics_path.open("r", encoding="utf-8") as handle: metrics.append(json.load(handle)) return metrics def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser() parser.add_argument( "--dataset-root", default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", ) parser.add_argument( "--result-dir", default="/workspace/reveal_retrieve_label_study/results/oven_parallel", ) parser.add_argument("--num-workers", type=int, default=4) parser.add_argument("--episode-offset", type=int, default=0) parser.add_argument("--max-episodes", type=int) parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--template-episode-index", type=int, default=0) parser.add_argument("--base-display", type=int, default=110) parser.add_argument("--max-frames", type=int) parser.add_argument("--episode-indices") parser.add_argument("--thread-count", type=int, default=1) parser.add_argument("--stagger-seconds", type=float, default=0.5) parser.add_argument( "--independent-replay", dest="independent_replay", action="store_true", help="Replay each frame independently to avoid simulator drift.", ) parser.add_argument( "--sequential-replay", dest="independent_replay", action="store_false", help="Reuse replay state across frames for speed.", ) parser.add_argument("--per-episode-templates", action="store_true") parser.set_defaults(independent_replay=True) args = parser.parse_args(argv) dataset_root = Path(args.dataset_root) all_episodes = _episode_dirs(dataset_root) explicit_episode_indices = None if args.episode_indices: explicit_episode_indices = [ int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip() ] selected_episode_indices = _select_episode_indices( total_episodes=len(all_episodes), episode_offset=args.episode_offset, max_episodes=args.max_episodes, episode_indices=explicit_episode_indices, ) chunk_specs = _chunk_episode_indices( episode_indices=selected_episode_indices, num_workers=args.num_workers, ) if not chunk_specs: raise RuntimeError("no episodes selected for parallel run") result_dir = Path(args.result_dir) result_dir.mkdir(parents=True, exist_ok=True) workers: List[Tuple[subprocess.Popen, subprocess.Popen]] = [] worker_meta: List[Dict[str, object]] = [] try: for worker_index, worker_episode_indices in enumerate(chunk_specs): display_num = args.base_display + worker_index worker_dir = result_dir.joinpath(f"worker_{worker_index:02d}") xvfb, process = _launch_worker( worker_dir=worker_dir, display_num=display_num, dataset_root=args.dataset_root, episode_indices=worker_episode_indices, checkpoint_stride=args.checkpoint_stride, template_episode_index=args.template_episode_index, max_frames=args.max_frames, independent_replay=args.independent_replay, per_episode_templates=args.per_episode_templates, thread_count=args.thread_count, ) workers.append((xvfb, process)) worker_meta.append( { "worker_index": worker_index, "display_num": display_num, "episode_indices": list(worker_episode_indices), } ) if args.stagger_seconds > 0: time.sleep(args.stagger_seconds) for meta, (_, process) in zip(worker_meta, workers): return_code = process.wait() meta["return_code"] = return_code if return_code != 0: worker_index = int(meta["worker_index"]) worker_log = result_dir.joinpath(f"worker_{worker_index:02d}", "worker.log") raise RuntimeError( f"worker {worker_index} failed with code {return_code}; see {worker_log}" ) finally: for xvfb, process in workers: _stop_process(process) _stop_process(xvfb) episode_metrics = _collect_metrics(result_dir) summary = _aggregate_summary(episode_metrics) with result_dir.joinpath("parallel_workers.json").open("w", encoding="utf-8") as handle: json.dump(worker_meta, handle, indent=2) with result_dir.joinpath("parallel_summary.json").open("w", encoding="utf-8") as handle: json.dump(summary, handle, indent=2) print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())