VLAdaptorBench / code /scripts /status_parallel_oven_label_study.py
lsnu's picture
Add files using upload-large-folder tool
561f6a2 verified
import argparse
import json
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional
def _active_worker_dirs(result_dir: Path) -> Dict[str, int]:
command = [
"pgrep",
"-af",
f"{result_dir}/worker_",
]
try:
output = subprocess.check_output(command, text=True)
except subprocess.CalledProcessError:
return {}
active: Dict[str, int] = {}
for line in output.splitlines():
pid_match = re.match(r"\s*(\d+)\s+", line)
worker_match = re.search(r"(worker_\d+)", line)
if not pid_match or not worker_match:
continue
active[worker_match.group(1)] = int(pid_match.group(1))
return active
def _selected_episode_indices(worker_dir: Path) -> Optional[List[int]]:
templates_path = worker_dir.joinpath("templates.json")
if not templates_path.exists():
worker_match = re.fullmatch(r"worker_(\d+)", worker_dir.name)
if worker_match:
return [int(worker_match.group(1))]
return None
with templates_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
selected = payload.get("selected_episode_indices")
if isinstance(selected, list):
return [int(index) for index in selected]
episode_offset = payload.get("episode_offset")
if episode_offset is not None:
return [int(episode_offset)]
return None
def _worker_status(worker_dir: Path, active_workers: Dict[str, int]) -> Dict[str, object]:
name = worker_dir.name
summary_path = worker_dir.joinpath("summary.json")
worker_log = worker_dir.joinpath("worker.log")
selected_episode_indices = _selected_episode_indices(worker_dir)
log_text = worker_log.read_text(encoding="utf-8", errors="ignore") if worker_log.exists() else ""
if summary_path.exists():
status = "completed"
elif name in active_workers:
status = "running"
elif re.search(r"Traceback|RuntimeError|signal 11", log_text):
status = "crashed"
elif worker_log.exists():
status = "stalled"
else:
status = "empty"
return {
"worker": name,
"status": status,
"pid": active_workers.get(name),
"selected_episode_indices": selected_episode_indices,
"summary_path": str(summary_path) if summary_path.exists() else None,
"worker_log": str(worker_log) if worker_log.exists() else None,
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--result-dir", required=True)
args = parser.parse_args()
result_dir = Path(args.result_dir)
worker_dirs = sorted(path for path in result_dir.glob("worker_*") if path.is_dir())
active_workers = _active_worker_dirs(result_dir)
worker_rows = [_worker_status(worker_dir, active_workers) for worker_dir in worker_dirs]
rerun_episode_indices: List[int] = []
for row in worker_rows:
if row["status"] != "crashed":
continue
selected = row.get("selected_episode_indices") or []
rerun_episode_indices.extend(int(index) for index in selected)
counts: Dict[str, int] = {}
for row in worker_rows:
status = str(row["status"])
counts[status] = counts.get(status, 0) + 1
payload = {
"result_dir": str(result_dir),
"counts": counts,
"rerun_episode_indices": sorted(set(rerun_episode_indices)),
"workers": worker_rows,
}
print(json.dumps(payload, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())