File size: 6,178 Bytes
fd0c71a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | #!/usr/bin/env python3
"""Write a compact HF training Space status report."""
from __future__ import annotations
import argparse
from datetime import datetime, timezone
import json
import os
from pathlib import Path
from typing import Any
from huggingface_hub import HfApi
ROOT = Path(__file__).resolve().parents[1]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Monitor PolyGuard HF training Space.")
parser.add_argument("--space-id", default="TheJackBright/polyguard-openenv-training-full")
parser.add_argument("--artifact-repo-id", default="TheJackBright/polyguard-openenv-training-full-artifacts")
parser.add_argument(
"--output",
default="outputs/reports/submission_evidence/qwen_0_5b_1_5b_3b/training_space_runtime_status.json",
)
return parser.parse_args()
def load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return payload if isinstance(payload, dict) else {}
def stage_records_from(path: Path) -> list[dict[str, Any]]:
payload = load_json(path)
records = payload.get("stage_records")
return records if isinstance(records, list) else []
def model_statuses_from(path: Path) -> dict[str, dict[str, str]]:
payload = load_json(path)
models = payload.get("models")
if not isinstance(models, list):
return {}
statuses: dict[str, dict[str, str]] = {}
for item in models:
if not isinstance(item, dict):
continue
run_id = str(item.get("run_id") or "")
model_statuses = item.get("statuses")
if run_id and isinstance(model_statuses, dict):
statuses[run_id] = {str(key): str(value) for key, value in model_statuses.items()}
return statuses
def main() -> None:
args = parse_args()
token = os.getenv("HF_TOKEN")
api = HfApi(token=token)
runtime_error = ""
artifact_error = ""
runtime: Any = {}
artifact_files: list[str] = []
try:
info = api.space_info(args.space_id)
runtime = getattr(info, "runtime", None)
except Exception as exc: # noqa: BLE001
runtime_error = str(exc)
try:
artifact_files = api.list_repo_files(repo_id=args.artifact_repo_id, repo_type="model", token=token)
except Exception as exc: # noqa: BLE001
artifact_error = str(exc)
prior_records = stage_records_from(ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b" / "manifest.json")
current_records = stage_records_from(ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b_3b" / "manifest.json")
prior_model_statuses = model_statuses_from(
ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b" / "manifest.json"
)
current_model_statuses = model_statuses_from(
ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b_3b" / "manifest.json"
)
stage_records = prior_records + [record for record in current_records if record not in prior_records]
completed_stages = {
f"{record.get('run_id')}:{record.get('stage')}": record
for record in stage_records
if isinstance(record, dict) and record.get("completed") is True
}
run_ids = [
"qwen-qwen2-5-0-5b-instruct",
"qwen-qwen2-5-1-5b-instruct",
"qwen-qwen2-5-3b-instruct",
]
run_statuses = {}
for run_id in run_ids:
merged_statuses = {**prior_model_statuses.get(run_id, {}), **current_model_statuses.get(run_id, {})}
def status_for(stage: str) -> str:
value = merged_statuses.get(stage, "")
if value == "artifact_available":
return "artifact_available"
if "remote_completed" in value:
return value
if f"{run_id}:{stage}" in completed_stages:
return "completed"
return value or "pending_or_unseen"
run_statuses[run_id] = {
"sft_training": status_for("sft_training"),
"grpo_training": status_for("grpo_training"),
"sft_postsave_inference": status_for("sft_postsave_inference"),
"grpo_postsave_inference": status_for("grpo_postsave_inference"),
"policy_ablation": status_for("policy_ablation"),
"artifact_files": [
item for item in artifact_files if f"outputs/reports/sweeps/{run_id}/" in item or f"checkpoints/sweeps/{run_id}/" in item
],
}
report = {
"status": "ok",
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
"space_id": args.space_id,
"artifact_repo_id": args.artifact_repo_id,
"runtime": repr(runtime),
"runtime_error": runtime_error,
"artifact_error": artifact_error,
"artifact_file_count": len(artifact_files),
"has_usable_active_bundle": any(item.startswith("usable_model_bundles/local-qwen-0-5b-active-smoke/") for item in artifact_files),
"has_full_sweep_artifacts": any("outputs/reports/sweeps/" in item or "checkpoints/sweeps/" in item for item in artifact_files),
"run_statuses": run_statuses,
"interpretation": (
"The Space is not actively training if runtime contains stage='PAUSED'. "
"Completed stage records are taken from live evidence snapshots when available; "
"missing per-run artifact files mean the full sweep checkpoints/reports are not yet downloadable."
),
}
output = ROOT / args.output
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(report, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
docs_output = ROOT / "docs" / "results" / "submission_evidence_qwen_0_5b_1_5b_3b" / "reports" / output.name
docs_output.parent.mkdir(parents=True, exist_ok=True)
docs_output.write_text(json.dumps(report, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
print(json.dumps(report, ensure_ascii=True, indent=2))
if __name__ == "__main__":
main()
|