| import argparse |
| import json |
| import math |
| import os |
| import signal |
| import subprocess |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional, Sequence, Tuple |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
|
|
| def _configure_coppeliasim_env() -> None: |
| coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") |
| ld_library_path_parts = [ |
| part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part |
| ] |
| if coppeliasim_root not in ld_library_path_parts: |
| ld_library_path_parts.insert(0, coppeliasim_root) |
| os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) |
|
|
|
|
| _configure_coppeliasim_env() |
|
|
| from rr_label_study.oven_study import _aggregate_summary, _episode_dirs |
|
|
|
|
| def _select_episode_indices( |
| total_episodes: int, |
| episode_offset: int, |
| max_episodes: Optional[int], |
| episode_indices: Optional[Sequence[int]], |
| ) -> List[int]: |
| if episode_indices is not None: |
| selected: List[int] = [] |
| seen = set() |
| for raw_index in episode_indices: |
| episode_index = int(raw_index) |
| if not (0 <= episode_index < total_episodes): |
| raise ValueError( |
| f"episode index {episode_index} outside available range 0..{total_episodes - 1}" |
| ) |
| if episode_index in seen: |
| continue |
| selected.append(episode_index) |
| seen.add(episode_index) |
| return selected |
|
|
| remaining = max(0, total_episodes - episode_offset) |
| if max_episodes is not None: |
| remaining = min(remaining, max_episodes) |
| if remaining <= 0: |
| return [] |
| return list(range(episode_offset, episode_offset + remaining)) |
|
|
|
|
| def _chunk_episode_indices( |
| episode_indices: Sequence[int], |
| num_workers: int, |
| ) -> List[List[int]]: |
| if not episode_indices: |
| return [] |
| worker_count = min(num_workers, len(episode_indices)) |
| chunk_size = math.ceil(len(episode_indices) / worker_count) |
| specs: List[List[int]] = [] |
| for worker_index in range(worker_count): |
| start = worker_index * chunk_size |
| chunk = list(episode_indices[start : start + chunk_size]) |
| if chunk: |
| specs.append(chunk) |
| return specs |
|
|
|
|
| def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen: |
| log_handle = log_path.open("w", encoding="utf-8") |
| return subprocess.Popen( |
| [ |
| "Xvfb", |
| f":{display_num}", |
| "-screen", |
| "0", |
| "1280x1024x24", |
| "+extension", |
| "GLX", |
| "+render", |
| "-noreset", |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _launch_worker( |
| worker_dir: Path, |
| display_num: int, |
| dataset_root: str, |
| episode_indices: Sequence[int], |
| checkpoint_stride: int, |
| template_episode_index: int, |
| max_frames: Optional[int], |
| independent_replay: bool, |
| per_episode_templates: bool, |
| thread_count: int, |
| ) -> Tuple[subprocess.Popen, subprocess.Popen]: |
| worker_dir.mkdir(parents=True, exist_ok=True) |
| xvfb = _launch_xvfb(display_num, worker_dir.joinpath("xvfb.log")) |
| time.sleep(1.0) |
|
|
| runtime_dir = Path(f"/tmp/rr_label_study_display_{display_num}") |
| runtime_dir.mkdir(parents=True, exist_ok=True) |
|
|
| command = [ |
| sys.executable, |
| str(PROJECT_ROOT.joinpath("scripts", "run_oven_label_study.py")), |
| "--dataset-root", |
| dataset_root, |
| "--result-dir", |
| str(worker_dir), |
| "--checkpoint-stride", |
| str(checkpoint_stride), |
| "--template-episode-index", |
| str(template_episode_index), |
| "--episode-indices", |
| ",".join(str(index) for index in episode_indices), |
| ] |
| if max_frames is not None: |
| command.extend(["--max-frames", str(max_frames)]) |
| if not independent_replay: |
| command.append("--sequential-replay") |
| if per_episode_templates: |
| command.append("--per-episode-templates") |
|
|
| env = os.environ.copy() |
| env["DISPLAY"] = f":{display_num}" |
| env["XDG_RUNTIME_DIR"] = str(runtime_dir) |
| env["PYTHONUNBUFFERED"] = "1" |
| coppeliasim_root = env.get("COPPELIASIM_ROOT", "/workspace/coppelia_sim") |
| env["COPPELIASIM_ROOT"] = coppeliasim_root |
| ld_library_path_parts = [ |
| part for part in env.get("LD_LIBRARY_PATH", "").split(":") if part |
| ] |
| if coppeliasim_root not in ld_library_path_parts: |
| ld_library_path_parts.insert(0, coppeliasim_root) |
| env["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) |
| thread_count_str = str(thread_count) |
| env["OMP_NUM_THREADS"] = thread_count_str |
| env["OPENBLAS_NUM_THREADS"] = thread_count_str |
| env["MKL_NUM_THREADS"] = thread_count_str |
| env["NUMEXPR_NUM_THREADS"] = thread_count_str |
| env["VECLIB_MAXIMUM_THREADS"] = thread_count_str |
| env["BLIS_NUM_THREADS"] = thread_count_str |
|
|
| worker_log = worker_dir.joinpath("worker.log").open("w", encoding="utf-8") |
| process = subprocess.Popen( |
| command, |
| stdout=worker_log, |
| stderr=subprocess.STDOUT, |
| env=env, |
| cwd=str(PROJECT_ROOT), |
| start_new_session=True, |
| ) |
| return xvfb, process |
|
|
|
|
| def _stop_process(process: subprocess.Popen) -> None: |
| if process.poll() is not None: |
| return |
| try: |
| os.killpg(process.pid, signal.SIGTERM) |
| except ProcessLookupError: |
| return |
| try: |
| process.wait(timeout=10) |
| except subprocess.TimeoutExpired: |
| try: |
| os.killpg(process.pid, signal.SIGKILL) |
| except ProcessLookupError: |
| pass |
|
|
|
|
| def _collect_metrics(base_result_dir: Path) -> List[Dict[str, object]]: |
| metrics: List[Dict[str, object]] = [] |
| for metrics_path in sorted(base_result_dir.glob("worker_*/episode*.metrics.json")): |
| with metrics_path.open("r", encoding="utf-8") as handle: |
| metrics.append(json.load(handle)) |
| return metrics |
|
|
|
|
| def main(argv: Optional[List[str]] = None) -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--dataset-root", |
| default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", |
| ) |
| parser.add_argument( |
| "--result-dir", |
| default="/workspace/reveal_retrieve_label_study/results/oven_parallel", |
| ) |
| parser.add_argument("--num-workers", type=int, default=4) |
| parser.add_argument("--episode-offset", type=int, default=0) |
| parser.add_argument("--max-episodes", type=int) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--template-episode-index", type=int, default=0) |
| parser.add_argument("--base-display", type=int, default=110) |
| parser.add_argument("--max-frames", type=int) |
| parser.add_argument("--episode-indices") |
| parser.add_argument("--thread-count", type=int, default=1) |
| parser.add_argument("--stagger-seconds", type=float, default=0.5) |
| parser.add_argument( |
| "--independent-replay", |
| dest="independent_replay", |
| action="store_true", |
| help="Replay each frame independently to avoid simulator drift.", |
| ) |
| parser.add_argument( |
| "--sequential-replay", |
| dest="independent_replay", |
| action="store_false", |
| help="Reuse replay state across frames for speed.", |
| ) |
| parser.add_argument("--per-episode-templates", action="store_true") |
| parser.set_defaults(independent_replay=True) |
| args = parser.parse_args(argv) |
|
|
| dataset_root = Path(args.dataset_root) |
| all_episodes = _episode_dirs(dataset_root) |
| explicit_episode_indices = None |
| if args.episode_indices: |
| explicit_episode_indices = [ |
| int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip() |
| ] |
| selected_episode_indices = _select_episode_indices( |
| total_episodes=len(all_episodes), |
| episode_offset=args.episode_offset, |
| max_episodes=args.max_episodes, |
| episode_indices=explicit_episode_indices, |
| ) |
| chunk_specs = _chunk_episode_indices( |
| episode_indices=selected_episode_indices, |
| num_workers=args.num_workers, |
| ) |
| if not chunk_specs: |
| raise RuntimeError("no episodes selected for parallel run") |
|
|
| result_dir = Path(args.result_dir) |
| result_dir.mkdir(parents=True, exist_ok=True) |
|
|
| workers: List[Tuple[subprocess.Popen, subprocess.Popen]] = [] |
| worker_meta: List[Dict[str, object]] = [] |
| try: |
| for worker_index, worker_episode_indices in enumerate(chunk_specs): |
| display_num = args.base_display + worker_index |
| worker_dir = result_dir.joinpath(f"worker_{worker_index:02d}") |
| xvfb, process = _launch_worker( |
| worker_dir=worker_dir, |
| display_num=display_num, |
| dataset_root=args.dataset_root, |
| episode_indices=worker_episode_indices, |
| checkpoint_stride=args.checkpoint_stride, |
| template_episode_index=args.template_episode_index, |
| max_frames=args.max_frames, |
| independent_replay=args.independent_replay, |
| per_episode_templates=args.per_episode_templates, |
| thread_count=args.thread_count, |
| ) |
| workers.append((xvfb, process)) |
| worker_meta.append( |
| { |
| "worker_index": worker_index, |
| "display_num": display_num, |
| "episode_indices": list(worker_episode_indices), |
| } |
| ) |
| if args.stagger_seconds > 0: |
| time.sleep(args.stagger_seconds) |
|
|
| for meta, (_, process) in zip(worker_meta, workers): |
| return_code = process.wait() |
| meta["return_code"] = return_code |
| if return_code != 0: |
| worker_index = int(meta["worker_index"]) |
| worker_log = result_dir.joinpath(f"worker_{worker_index:02d}", "worker.log") |
| raise RuntimeError( |
| f"worker {worker_index} failed with code {return_code}; see {worker_log}" |
| ) |
| finally: |
| for xvfb, process in workers: |
| _stop_process(process) |
| _stop_process(xvfb) |
|
|
| episode_metrics = _collect_metrics(result_dir) |
| summary = _aggregate_summary(episode_metrics) |
| with result_dir.joinpath("parallel_workers.json").open("w", encoding="utf-8") as handle: |
| json.dump(worker_meta, handle, indent=2) |
| with result_dir.joinpath("parallel_summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
|
|
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|