import argparse import json import re import subprocess from pathlib import Path from typing import Dict, List, Optional def _active_worker_dirs(result_dir: Path) -> Dict[str, int]: command = [ "pgrep", "-af", f"{result_dir}/worker_", ] try: output = subprocess.check_output(command, text=True) except subprocess.CalledProcessError: return {} active: Dict[str, int] = {} for line in output.splitlines(): pid_match = re.match(r"\s*(\d+)\s+", line) worker_match = re.search(r"(worker_\d+)", line) if not pid_match or not worker_match: continue active[worker_match.group(1)] = int(pid_match.group(1)) return active def _selected_episode_indices(worker_dir: Path) -> Optional[List[int]]: templates_path = worker_dir.joinpath("templates.json") if not templates_path.exists(): worker_match = re.fullmatch(r"worker_(\d+)", worker_dir.name) if worker_match: return [int(worker_match.group(1))] return None with templates_path.open("r", encoding="utf-8") as handle: payload = json.load(handle) selected = payload.get("selected_episode_indices") if isinstance(selected, list): return [int(index) for index in selected] episode_offset = payload.get("episode_offset") if episode_offset is not None: return [int(episode_offset)] return None def _worker_status(worker_dir: Path, active_workers: Dict[str, int]) -> Dict[str, object]: name = worker_dir.name summary_path = worker_dir.joinpath("summary.json") worker_log = worker_dir.joinpath("worker.log") selected_episode_indices = _selected_episode_indices(worker_dir) log_text = worker_log.read_text(encoding="utf-8", errors="ignore") if worker_log.exists() else "" if summary_path.exists(): status = "completed" elif name in active_workers: status = "running" elif re.search(r"Traceback|RuntimeError|signal 11", log_text): status = "crashed" elif worker_log.exists(): status = "stalled" else: status = "empty" return { "worker": name, "status": status, "pid": active_workers.get(name), "selected_episode_indices": selected_episode_indices, "summary_path": str(summary_path) if summary_path.exists() else None, "worker_log": str(worker_log) if worker_log.exists() else None, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--result-dir", required=True) args = parser.parse_args() result_dir = Path(args.result_dir) worker_dirs = sorted(path for path in result_dir.glob("worker_*") if path.is_dir()) active_workers = _active_worker_dirs(result_dir) worker_rows = [_worker_status(worker_dir, active_workers) for worker_dir in worker_dirs] rerun_episode_indices: List[int] = [] for row in worker_rows: if row["status"] != "crashed": continue selected = row.get("selected_episode_indices") or [] rerun_episode_indices.extend(int(index) for index in selected) counts: Dict[str, int] = {} for row in worker_rows: status = str(row["status"]) counts[status] = counts.get(status, 0) + 1 payload = { "result_dir": str(result_dir), "counts": counts, "rerun_episode_indices": sorted(set(rerun_episode_indices)), "workers": worker_rows, } print(json.dumps(payload, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())