VLAdaptorBench / code /scripts /launch_parallel_oven_label_study.py
lsnu's picture
Add iter22 pregrasp repair reruns and updated benchmark code
7f173cd verified
import argparse
import json
import math
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
def _configure_coppeliasim_env() -> None:
coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim")
ld_library_path_parts = [
part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part
]
if coppeliasim_root not in ld_library_path_parts:
ld_library_path_parts.insert(0, coppeliasim_root)
os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts)
_configure_coppeliasim_env()
from rr_label_study.oven_study import _aggregate_summary, _episode_dirs
def _select_episode_indices(
total_episodes: int,
episode_offset: int,
max_episodes: Optional[int],
episode_indices: Optional[Sequence[int]],
) -> List[int]:
if episode_indices is not None:
selected: List[int] = []
seen = set()
for raw_index in episode_indices:
episode_index = int(raw_index)
if not (0 <= episode_index < total_episodes):
raise ValueError(
f"episode index {episode_index} outside available range 0..{total_episodes - 1}"
)
if episode_index in seen:
continue
selected.append(episode_index)
seen.add(episode_index)
return selected
remaining = max(0, total_episodes - episode_offset)
if max_episodes is not None:
remaining = min(remaining, max_episodes)
if remaining <= 0:
return []
return list(range(episode_offset, episode_offset + remaining))
def _chunk_episode_indices(
episode_indices: Sequence[int],
num_workers: int,
) -> List[List[int]]:
if not episode_indices:
return []
worker_count = min(num_workers, len(episode_indices))
chunk_size = math.ceil(len(episode_indices) / worker_count)
specs: List[List[int]] = []
for worker_index in range(worker_count):
start = worker_index * chunk_size
chunk = list(episode_indices[start : start + chunk_size])
if chunk:
specs.append(chunk)
return specs
def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen:
log_handle = log_path.open("w", encoding="utf-8")
return subprocess.Popen(
[
"Xvfb",
f":{display_num}",
"-screen",
"0",
"1280x1024x24",
"+extension",
"GLX",
"+render",
"-noreset",
],
stdout=log_handle,
stderr=subprocess.STDOUT,
start_new_session=True,
)
def _launch_worker(
worker_dir: Path,
display_num: int,
dataset_root: str,
episode_indices: Sequence[int],
checkpoint_stride: int,
template_episode_index: int,
max_frames: Optional[int],
independent_replay: bool,
per_episode_templates: bool,
thread_count: int,
) -> Tuple[subprocess.Popen, subprocess.Popen]:
worker_dir.mkdir(parents=True, exist_ok=True)
xvfb = _launch_xvfb(display_num, worker_dir.joinpath("xvfb.log"))
time.sleep(1.0)
runtime_dir = Path(f"/tmp/rr_label_study_display_{display_num}")
runtime_dir.mkdir(parents=True, exist_ok=True)
command = [
sys.executable,
str(PROJECT_ROOT.joinpath("scripts", "run_oven_label_study.py")),
"--dataset-root",
dataset_root,
"--result-dir",
str(worker_dir),
"--checkpoint-stride",
str(checkpoint_stride),
"--template-episode-index",
str(template_episode_index),
"--episode-indices",
",".join(str(index) for index in episode_indices),
]
if max_frames is not None:
command.extend(["--max-frames", str(max_frames)])
if not independent_replay:
command.append("--sequential-replay")
if per_episode_templates:
command.append("--per-episode-templates")
env = os.environ.copy()
env["DISPLAY"] = f":{display_num}"
env["XDG_RUNTIME_DIR"] = str(runtime_dir)
env["PYTHONUNBUFFERED"] = "1"
coppeliasim_root = env.get("COPPELIASIM_ROOT", "/workspace/coppelia_sim")
env["COPPELIASIM_ROOT"] = coppeliasim_root
ld_library_path_parts = [
part for part in env.get("LD_LIBRARY_PATH", "").split(":") if part
]
if coppeliasim_root not in ld_library_path_parts:
ld_library_path_parts.insert(0, coppeliasim_root)
env["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts)
thread_count_str = str(thread_count)
env["OMP_NUM_THREADS"] = thread_count_str
env["OPENBLAS_NUM_THREADS"] = thread_count_str
env["MKL_NUM_THREADS"] = thread_count_str
env["NUMEXPR_NUM_THREADS"] = thread_count_str
env["VECLIB_MAXIMUM_THREADS"] = thread_count_str
env["BLIS_NUM_THREADS"] = thread_count_str
worker_log = worker_dir.joinpath("worker.log").open("w", encoding="utf-8")
process = subprocess.Popen(
command,
stdout=worker_log,
stderr=subprocess.STDOUT,
env=env,
cwd=str(PROJECT_ROOT),
start_new_session=True,
)
return xvfb, process
def _stop_process(process: subprocess.Popen) -> None:
if process.poll() is not None:
return
try:
os.killpg(process.pid, signal.SIGTERM)
except ProcessLookupError:
return
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
try:
os.killpg(process.pid, signal.SIGKILL)
except ProcessLookupError:
pass
def _collect_metrics(base_result_dir: Path) -> List[Dict[str, object]]:
metrics: List[Dict[str, object]] = []
for metrics_path in sorted(base_result_dir.glob("worker_*/episode*.metrics.json")):
with metrics_path.open("r", encoding="utf-8") as handle:
metrics.append(json.load(handle))
return metrics
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset-root",
default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
)
parser.add_argument(
"--result-dir",
default="/workspace/reveal_retrieve_label_study/results/oven_parallel",
)
parser.add_argument("--num-workers", type=int, default=4)
parser.add_argument("--episode-offset", type=int, default=0)
parser.add_argument("--max-episodes", type=int)
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--template-episode-index", type=int, default=0)
parser.add_argument("--base-display", type=int, default=110)
parser.add_argument("--max-frames", type=int)
parser.add_argument("--episode-indices")
parser.add_argument("--thread-count", type=int, default=1)
parser.add_argument("--stagger-seconds", type=float, default=0.5)
parser.add_argument(
"--independent-replay",
dest="independent_replay",
action="store_true",
help="Replay each frame independently to avoid simulator drift.",
)
parser.add_argument(
"--sequential-replay",
dest="independent_replay",
action="store_false",
help="Reuse replay state across frames for speed.",
)
parser.add_argument("--per-episode-templates", action="store_true")
parser.set_defaults(independent_replay=True)
args = parser.parse_args(argv)
dataset_root = Path(args.dataset_root)
all_episodes = _episode_dirs(dataset_root)
explicit_episode_indices = None
if args.episode_indices:
explicit_episode_indices = [
int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip()
]
selected_episode_indices = _select_episode_indices(
total_episodes=len(all_episodes),
episode_offset=args.episode_offset,
max_episodes=args.max_episodes,
episode_indices=explicit_episode_indices,
)
chunk_specs = _chunk_episode_indices(
episode_indices=selected_episode_indices,
num_workers=args.num_workers,
)
if not chunk_specs:
raise RuntimeError("no episodes selected for parallel run")
result_dir = Path(args.result_dir)
result_dir.mkdir(parents=True, exist_ok=True)
workers: List[Tuple[subprocess.Popen, subprocess.Popen]] = []
worker_meta: List[Dict[str, object]] = []
try:
for worker_index, worker_episode_indices in enumerate(chunk_specs):
display_num = args.base_display + worker_index
worker_dir = result_dir.joinpath(f"worker_{worker_index:02d}")
xvfb, process = _launch_worker(
worker_dir=worker_dir,
display_num=display_num,
dataset_root=args.dataset_root,
episode_indices=worker_episode_indices,
checkpoint_stride=args.checkpoint_stride,
template_episode_index=args.template_episode_index,
max_frames=args.max_frames,
independent_replay=args.independent_replay,
per_episode_templates=args.per_episode_templates,
thread_count=args.thread_count,
)
workers.append((xvfb, process))
worker_meta.append(
{
"worker_index": worker_index,
"display_num": display_num,
"episode_indices": list(worker_episode_indices),
}
)
if args.stagger_seconds > 0:
time.sleep(args.stagger_seconds)
for meta, (_, process) in zip(worker_meta, workers):
return_code = process.wait()
meta["return_code"] = return_code
if return_code != 0:
worker_index = int(meta["worker_index"])
worker_log = result_dir.joinpath(f"worker_{worker_index:02d}", "worker.log")
raise RuntimeError(
f"worker {worker_index} failed with code {return_code}; see {worker_log}"
)
finally:
for xvfb, process in workers:
_stop_process(process)
_stop_process(xvfb)
episode_metrics = _collect_metrics(result_dir)
summary = _aggregate_summary(episode_metrics)
with result_dir.joinpath("parallel_workers.json").open("w", encoding="utf-8") as handle:
json.dump(worker_meta, handle, indent=2)
with result_dir.joinpath("parallel_summary.json").open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())