File size: 3,595 Bytes
561f6a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import argparse
import json
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Optional


def _active_worker_dirs(result_dir: Path) -> Dict[str, int]:
    command = [
        "pgrep",
        "-af",
        f"{result_dir}/worker_",
    ]
    try:
        output = subprocess.check_output(command, text=True)
    except subprocess.CalledProcessError:
        return {}

    active: Dict[str, int] = {}
    for line in output.splitlines():
        pid_match = re.match(r"\s*(\d+)\s+", line)
        worker_match = re.search(r"(worker_\d+)", line)
        if not pid_match or not worker_match:
            continue
        active[worker_match.group(1)] = int(pid_match.group(1))
    return active


def _selected_episode_indices(worker_dir: Path) -> Optional[List[int]]:
    templates_path = worker_dir.joinpath("templates.json")
    if not templates_path.exists():
        worker_match = re.fullmatch(r"worker_(\d+)", worker_dir.name)
        if worker_match:
            return [int(worker_match.group(1))]
        return None
    with templates_path.open("r", encoding="utf-8") as handle:
        payload = json.load(handle)
    selected = payload.get("selected_episode_indices")
    if isinstance(selected, list):
        return [int(index) for index in selected]
    episode_offset = payload.get("episode_offset")
    if episode_offset is not None:
        return [int(episode_offset)]
    return None


def _worker_status(worker_dir: Path, active_workers: Dict[str, int]) -> Dict[str, object]:
    name = worker_dir.name
    summary_path = worker_dir.joinpath("summary.json")
    worker_log = worker_dir.joinpath("worker.log")
    selected_episode_indices = _selected_episode_indices(worker_dir)
    log_text = worker_log.read_text(encoding="utf-8", errors="ignore") if worker_log.exists() else ""

    if summary_path.exists():
        status = "completed"
    elif name in active_workers:
        status = "running"
    elif re.search(r"Traceback|RuntimeError|signal 11", log_text):
        status = "crashed"
    elif worker_log.exists():
        status = "stalled"
    else:
        status = "empty"

    return {
        "worker": name,
        "status": status,
        "pid": active_workers.get(name),
        "selected_episode_indices": selected_episode_indices,
        "summary_path": str(summary_path) if summary_path.exists() else None,
        "worker_log": str(worker_log) if worker_log.exists() else None,
    }


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--result-dir", required=True)
    args = parser.parse_args()

    result_dir = Path(args.result_dir)
    worker_dirs = sorted(path for path in result_dir.glob("worker_*") if path.is_dir())
    active_workers = _active_worker_dirs(result_dir)
    worker_rows = [_worker_status(worker_dir, active_workers) for worker_dir in worker_dirs]

    rerun_episode_indices: List[int] = []
    for row in worker_rows:
        if row["status"] != "crashed":
            continue
        selected = row.get("selected_episode_indices") or []
        rerun_episode_indices.extend(int(index) for index in selected)

    counts: Dict[str, int] = {}
    for row in worker_rows:
        status = str(row["status"])
        counts[status] = counts.get(status, 0) + 1

    payload = {
        "result_dir": str(result_dir),
        "counts": counts,
        "rerun_episode_indices": sorted(set(rerun_episode_indices)),
        "workers": worker_rows,
    }
    print(json.dumps(payload, indent=2))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())