from pathlib import Path import argparse import json import os import shutil import subprocess import sys import time from typing import Dict, List, Optional, Sequence PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) def _configure_thread_env() -> None: defaults = { "OMP_NUM_THREADS": "1", "OPENBLAS_NUM_THREADS": "1", "MKL_NUM_THREADS": "1", "NUMEXPR_NUM_THREADS": "1", "VECLIB_MAXIMUM_THREADS": "1", "BLIS_NUM_THREADS": "1", "MALLOC_ARENA_MAX": "2", } for key, value in defaults.items(): os.environ.setdefault(key, value) def _configure_coppeliasim_env() -> None: coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") ld_library_path_parts = [ part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part ] if coppeliasim_root not in ld_library_path_parts: ld_library_path_parts.insert(0, coppeliasim_root) os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) _configure_thread_env() _configure_coppeliasim_env() from rr_label_study.oven_study import _aggregate_summary, _episode_dirs def _select_episode_indices( total_episodes: int, episode_offset: int, max_episodes: Optional[int], episode_indices: Optional[Sequence[int]], ) -> List[int]: if episode_indices is not None: selected: List[int] = [] seen = set() for raw_index in episode_indices: episode_index = int(raw_index) if not (0 <= episode_index < total_episodes): raise ValueError( f"episode index {episode_index} outside available range 0..{total_episodes - 1}" ) if episode_index in seen: continue selected.append(episode_index) seen.add(episode_index) return selected remaining = max(0, total_episodes - episode_offset) if max_episodes is not None: remaining = min(remaining, max_episodes) if remaining <= 0: return [] return list(range(episode_offset, episode_offset + remaining)) def _is_complete_episode_dir(output_dir: Path, episode_name: str) -> bool: required = [ output_dir.joinpath(f"{episode_name}.dense.csv"), output_dir.joinpath(f"{episode_name}.keyframes.csv"), output_dir.joinpath(f"{episode_name}.debug.jsonl"), output_dir.joinpath(f"{episode_name}.metrics.json"), output_dir.joinpath("summary.json"), output_dir.joinpath("templates.json"), output_dir.joinpath("templates.pkl"), ] return all(path.exists() for path in required) def _load_metrics(result_dir: Path, episode_names: Sequence[str]) -> List[Dict[str, object]]: metrics: List[Dict[str, object]] = [] for episode_name in episode_names: metrics_path = result_dir.joinpath(episode_name, f"{episode_name}.metrics.json") if metrics_path.exists(): with metrics_path.open("r", encoding="utf-8") as handle: metrics.append(json.load(handle)) return metrics def _write_json(path: Path, payload: Dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) def _run_episode( dataset_root: Path, episode_dir: Path, output_dir: Path, checkpoint_stride: int, num_workers: int, base_display: int, templates_json: Path, stagger_seconds: float, thread_count: int, log_path: Path, ) -> int: env = os.environ.copy() thread_count_str = str(thread_count) env["OMP_NUM_THREADS"] = thread_count_str env["OPENBLAS_NUM_THREADS"] = thread_count_str env["MKL_NUM_THREADS"] = thread_count_str env["NUMEXPR_NUM_THREADS"] = thread_count_str env["VECLIB_MAXIMUM_THREADS"] = thread_count_str env["BLIS_NUM_THREADS"] = thread_count_str env["MALLOC_ARENA_MAX"] = "2" env["PYTHONUNBUFFERED"] = "1" with log_path.open("w", encoding="utf-8") as log_handle: process = subprocess.Popen( [ sys.executable, str(PROJECT_ROOT.joinpath("scripts", "recompute_oven_episode_parallel.py")), "--dataset-root", str(dataset_root), "--episode-dir", str(episode_dir), "--output-dir", str(output_dir), "--checkpoint-stride", str(checkpoint_stride), "--num-workers", str(num_workers), "--base-display", str(base_display), "--templates-json", str(templates_json), "--stagger-seconds", str(stagger_seconds), ], stdout=log_handle, stderr=subprocess.STDOUT, cwd=str(PROJECT_ROOT), env=env, ) return process.wait() def main() -> int: parser = argparse.ArgumentParser() parser.add_argument( "--dataset-root", default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", ) parser.add_argument("--result-dir", required=True) parser.add_argument("--templates-json", required=True) parser.add_argument("--episode-offset", type=int, default=0) parser.add_argument("--max-episodes", type=int, default=100) parser.add_argument("--episode-indices") parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--num-workers", type=int, default=24) parser.add_argument("--base-display", type=int, default=900) parser.add_argument("--stagger-seconds", type=float, default=0.15) parser.add_argument("--thread-count", type=int, default=1) parser.add_argument("--max-retries", type=int, default=2) args = parser.parse_args() dataset_root = Path(args.dataset_root) result_dir = Path(args.result_dir) result_dir.mkdir(parents=True, exist_ok=True) templates_json = Path(args.templates_json) if not templates_json.exists(): raise FileNotFoundError(f"missing templates json: {templates_json}") all_episode_dirs = _episode_dirs(dataset_root) explicit_episode_indices = None if args.episode_indices: explicit_episode_indices = [ int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip() ] selected_episode_indices = _select_episode_indices( total_episodes=len(all_episode_dirs), episode_offset=args.episode_offset, max_episodes=args.max_episodes, episode_indices=explicit_episode_indices, ) selected_episode_names = [f"episode{index}" for index in selected_episode_indices] manifest = { "dataset_root": str(dataset_root.resolve()), "result_dir": str(result_dir.resolve()), "templates_json": str(templates_json.resolve()), "episode_indices": selected_episode_indices, "checkpoint_stride": args.checkpoint_stride, "num_workers": args.num_workers, "base_display": args.base_display, "stagger_seconds": args.stagger_seconds, "thread_count": args.thread_count, "max_retries": args.max_retries, "started_at_epoch": time.time(), } _write_json(result_dir.joinpath("run_manifest.json"), manifest) progress_path = result_dir.joinpath("progress.json") logs_dir = result_dir.joinpath("logs") logs_dir.mkdir(parents=True, exist_ok=True) completed: List[int] = [] failed: List[Dict[str, object]] = [] for episode_index in selected_episode_indices: episode_name = f"episode{episode_index}" episode_dir = all_episode_dirs[episode_index] final_output_dir = result_dir.joinpath(episode_name) if _is_complete_episode_dir(final_output_dir, episode_name): completed.append(episode_index) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "failed": failed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) continue attempt_success = False current_failure: Optional[Dict[str, object]] = None for attempt_index in range(1, args.max_retries + 2): temp_output_dir = result_dir.joinpath(f".{episode_name}.tmp") if temp_output_dir.exists(): shutil.rmtree(temp_output_dir) log_path = logs_dir.joinpath(f"{episode_name}.attempt{attempt_index:02d}.log") _write_json( progress_path, { "current_episode": episode_name, "current_attempt": attempt_index, "completed_episode_indices": completed, "failed": failed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) return_code = _run_episode( dataset_root=dataset_root, episode_dir=episode_dir, output_dir=temp_output_dir, checkpoint_stride=args.checkpoint_stride, num_workers=args.num_workers, base_display=args.base_display, templates_json=templates_json, stagger_seconds=args.stagger_seconds, thread_count=args.thread_count, log_path=log_path, ) if return_code == 0 and _is_complete_episode_dir(temp_output_dir, episode_name): if final_output_dir.exists(): shutil.rmtree(final_output_dir) temp_output_dir.rename(final_output_dir) completed.append(episode_index) attempt_success = True current_failure = None break current_failure = { "episode_index": episode_index, "episode_name": episode_name, "attempt": attempt_index, "return_code": return_code, "log_path": str(log_path), "updated_at_epoch": time.time(), } if temp_output_dir.exists(): failed_dir = result_dir.joinpath("failed_attempts", f"{episode_name}.attempt{attempt_index:02d}") failed_dir.parent.mkdir(parents=True, exist_ok=True) if failed_dir.exists(): shutil.rmtree(failed_dir) temp_output_dir.rename(failed_dir) if not attempt_success: failed.append(current_failure or {"episode_index": episode_index, "episode_name": episode_name}) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "failed": failed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) raise RuntimeError(f"failed to produce complete result for {episode_name}") metrics = _load_metrics(result_dir, selected_episode_names) if metrics: _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics)) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "failed": failed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) metrics = _load_metrics(result_dir, selected_episode_names) if metrics: _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics)) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "failed": failed, "total_selected": len(selected_episode_indices), "finished_at_epoch": time.time(), }, ) return 0 if __name__ == "__main__": raise SystemExit(main())