| import argparse |
| import json |
| import re |
| import subprocess |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
|
|
| def _active_worker_dirs(result_dir: Path) -> Dict[str, int]: |
| command = [ |
| "pgrep", |
| "-af", |
| f"{result_dir}/worker_", |
| ] |
| try: |
| output = subprocess.check_output(command, text=True) |
| except subprocess.CalledProcessError: |
| return {} |
|
|
| active: Dict[str, int] = {} |
| for line in output.splitlines(): |
| pid_match = re.match(r"\s*(\d+)\s+", line) |
| worker_match = re.search(r"(worker_\d+)", line) |
| if not pid_match or not worker_match: |
| continue |
| active[worker_match.group(1)] = int(pid_match.group(1)) |
| return active |
|
|
|
|
| def _selected_episode_indices(worker_dir: Path) -> Optional[List[int]]: |
| templates_path = worker_dir.joinpath("templates.json") |
| if not templates_path.exists(): |
| worker_match = re.fullmatch(r"worker_(\d+)", worker_dir.name) |
| if worker_match: |
| return [int(worker_match.group(1))] |
| return None |
| with templates_path.open("r", encoding="utf-8") as handle: |
| payload = json.load(handle) |
| selected = payload.get("selected_episode_indices") |
| if isinstance(selected, list): |
| return [int(index) for index in selected] |
| episode_offset = payload.get("episode_offset") |
| if episode_offset is not None: |
| return [int(episode_offset)] |
| return None |
|
|
|
|
| def _worker_status(worker_dir: Path, active_workers: Dict[str, int]) -> Dict[str, object]: |
| name = worker_dir.name |
| summary_path = worker_dir.joinpath("summary.json") |
| worker_log = worker_dir.joinpath("worker.log") |
| selected_episode_indices = _selected_episode_indices(worker_dir) |
| log_text = worker_log.read_text(encoding="utf-8", errors="ignore") if worker_log.exists() else "" |
|
|
| if summary_path.exists(): |
| status = "completed" |
| elif name in active_workers: |
| status = "running" |
| elif re.search(r"Traceback|RuntimeError|signal 11", log_text): |
| status = "crashed" |
| elif worker_log.exists(): |
| status = "stalled" |
| else: |
| status = "empty" |
|
|
| return { |
| "worker": name, |
| "status": status, |
| "pid": active_workers.get(name), |
| "selected_episode_indices": selected_episode_indices, |
| "summary_path": str(summary_path) if summary_path.exists() else None, |
| "worker_log": str(worker_log) if worker_log.exists() else None, |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--result-dir", required=True) |
| args = parser.parse_args() |
|
|
| result_dir = Path(args.result_dir) |
| worker_dirs = sorted(path for path in result_dir.glob("worker_*") if path.is_dir()) |
| active_workers = _active_worker_dirs(result_dir) |
| worker_rows = [_worker_status(worker_dir, active_workers) for worker_dir in worker_dirs] |
|
|
| rerun_episode_indices: List[int] = [] |
| for row in worker_rows: |
| if row["status"] != "crashed": |
| continue |
| selected = row.get("selected_episode_indices") or [] |
| rerun_episode_indices.extend(int(index) for index in selected) |
|
|
| counts: Dict[str, int] = {} |
| for row in worker_rows: |
| status = str(row["status"]) |
| counts[status] = counts.get(status, 0) + 1 |
|
|
| payload = { |
| "result_dir": str(result_dir), |
| "counts": counts, |
| "rerun_episode_indices": sorted(set(rerun_episode_indices)), |
| "workers": worker_rows, |
| } |
| print(json.dumps(payload, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|