from pathlib import Path import argparse import json import os import shutil import subprocess import sys import time from typing import Dict, List, Optional, Sequence PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) def _configure_thread_env() -> None: defaults = { "OMP_NUM_THREADS": "1", "OPENBLAS_NUM_THREADS": "1", "MKL_NUM_THREADS": "1", "NUMEXPR_NUM_THREADS": "1", "VECLIB_MAXIMUM_THREADS": "1", "BLIS_NUM_THREADS": "1", "MALLOC_ARENA_MAX": "2", } for key, value in defaults.items(): os.environ.setdefault(key, value) def _configure_coppeliasim_env() -> None: coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") ld_library_path_parts = [ part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part ] if coppeliasim_root not in ld_library_path_parts: ld_library_path_parts.insert(0, coppeliasim_root) os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) _configure_thread_env() _configure_coppeliasim_env() from rr_label_study.oven_study import _episode_dirs def _select_episode_indices( total_episodes: int, episode_offset: int, max_episodes: Optional[int], episode_indices: Optional[Sequence[int]], ) -> List[int]: if episode_indices is not None: selected: List[int] = [] seen = set() for raw_index in episode_indices: episode_index = int(raw_index) if not (0 <= episode_index < total_episodes): raise ValueError( f"episode index {episode_index} outside available range 0..{total_episodes - 1}" ) if episode_index in seen: continue selected.append(episode_index) seen.add(episode_index) return selected remaining = max(0, total_episodes - episode_offset) if max_episodes is not None: remaining = min(remaining, max_episodes) if remaining <= 0: return [] return list(range(episode_offset, episode_offset + remaining)) def _has_gif_suite(episode_output_dir: Path, episode_name: str) -> bool: visualizations_dir = episode_output_dir.joinpath("visualizations") required = [ visualizations_dir.joinpath(f"{episode_name}_all_metrics.gif"), visualizations_dir.joinpath(f"{episode_name}_visibility_focus.gif"), visualizations_dir.joinpath(f"{episode_name}_path_quality_focus.gif"), visualizations_dir.joinpath("README.md"), ] return all(path.exists() for path in required) def _write_json(path: Path, payload: Dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument( "--dataset-root", default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", ) parser.add_argument("--result-dir", required=True) parser.add_argument("--episode-offset", type=int, default=0) parser.add_argument("--max-episodes", type=int, default=100) parser.add_argument("--episode-indices") parser.add_argument("--checkpoint-stride", type=int, default=16) parser.add_argument("--num-workers", type=int, default=6) parser.add_argument("--base-display", type=int, default=1200) args = parser.parse_args() dataset_root = Path(args.dataset_root) result_dir = Path(args.result_dir) logs_dir = result_dir.joinpath("render_logs") logs_dir.mkdir(parents=True, exist_ok=True) progress_path = result_dir.joinpath("render_progress.json") all_episode_dirs = _episode_dirs(dataset_root) explicit_episode_indices = None if args.episode_indices: explicit_episode_indices = [ int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip() ] selected_episode_indices = _select_episode_indices( total_episodes=len(all_episode_dirs), episode_offset=args.episode_offset, max_episodes=args.max_episodes, episode_indices=explicit_episode_indices, ) completed: List[int] = [] for episode_index in selected_episode_indices: episode_name = f"episode{episode_index}" episode_output_dir = result_dir.joinpath(episode_name) if not episode_output_dir.exists(): raise FileNotFoundError(f"missing episode output dir: {episode_output_dir}") if _has_gif_suite(episode_output_dir, episode_name): completed.append(episode_index) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) continue temp_output_dir = episode_output_dir.joinpath("visualizations_tmp") if temp_output_dir.exists(): shutil.rmtree(temp_output_dir) log_path = logs_dir.joinpath(f"{episode_name}.log") env = os.environ.copy() env["OMP_NUM_THREADS"] = "1" env["OPENBLAS_NUM_THREADS"] = "1" env["MKL_NUM_THREADS"] = "1" env["NUMEXPR_NUM_THREADS"] = "1" env["VECLIB_MAXIMUM_THREADS"] = "1" env["BLIS_NUM_THREADS"] = "1" env["MALLOC_ARENA_MAX"] = "2" env["PYTHONUNBUFFERED"] = "1" _write_json( progress_path, { "current_episode": episode_name, "completed_episode_indices": completed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) with log_path.open("w", encoding="utf-8") as log_handle: process = subprocess.Popen( [ sys.executable, str(PROJECT_ROOT.joinpath("scripts", "render_oven_metric_gifs.py")), "--episode-dir", str(all_episode_dirs[episode_index]), "--dense-csv", str(episode_output_dir.joinpath(f"{episode_name}.dense.csv")), "--templates-pkl", str(episode_output_dir.joinpath("templates.pkl")), "--output-dir", str(temp_output_dir), "--debug-jsonl", str(episode_output_dir.joinpath(f"{episode_name}.debug.jsonl")), "--checkpoint-stride", str(args.checkpoint_stride), "--num-workers", str(args.num_workers), "--base-display", str(args.base_display), ], stdout=log_handle, stderr=subprocess.STDOUT, cwd=str(PROJECT_ROOT), env=env, ) return_code = process.wait() if return_code != 0: raise RuntimeError(f"gif render failed for {episode_name}; see {log_path}") final_visualizations_dir = episode_output_dir.joinpath("visualizations") if final_visualizations_dir.exists(): shutil.rmtree(final_visualizations_dir) temp_output_dir.rename(final_visualizations_dir) completed.append(episode_index) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "total_selected": len(selected_episode_indices), "updated_at_epoch": time.time(), }, ) _write_json( progress_path, { "current_episode": None, "completed_episode_indices": completed, "total_selected": len(selected_episode_indices), "finished_at_epoch": time.time(), }, ) return 0 if __name__ == "__main__": raise SystemExit(main())