| |
| """Write a compact HF training Space status report.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from datetime import datetime, timezone |
| import json |
| import os |
| from pathlib import Path |
| from typing import Any |
|
|
| from huggingface_hub import HfApi |
|
|
|
|
| ROOT = Path(__file__).resolve().parents[1] |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Monitor PolyGuard HF training Space.") |
| parser.add_argument("--space-id", default="TheJackBright/polyguard-openenv-training-full") |
| parser.add_argument("--artifact-repo-id", default="TheJackBright/polyguard-openenv-training-full-artifacts") |
| parser.add_argument( |
| "--output", |
| default="outputs/reports/submission_evidence/qwen_0_5b_1_5b_3b/training_space_runtime_status.json", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def load_json(path: Path) -> dict[str, Any]: |
| if not path.exists(): |
| return {} |
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| except json.JSONDecodeError: |
| return {} |
| return payload if isinstance(payload, dict) else {} |
|
|
|
|
| def stage_records_from(path: Path) -> list[dict[str, Any]]: |
| payload = load_json(path) |
| records = payload.get("stage_records") |
| return records if isinstance(records, list) else [] |
|
|
|
|
| def model_statuses_from(path: Path) -> dict[str, dict[str, str]]: |
| payload = load_json(path) |
| models = payload.get("models") |
| if not isinstance(models, list): |
| return {} |
| statuses: dict[str, dict[str, str]] = {} |
| for item in models: |
| if not isinstance(item, dict): |
| continue |
| run_id = str(item.get("run_id") or "") |
| model_statuses = item.get("statuses") |
| if run_id and isinstance(model_statuses, dict): |
| statuses[run_id] = {str(key): str(value) for key, value in model_statuses.items()} |
| return statuses |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| token = os.getenv("HF_TOKEN") |
| api = HfApi(token=token) |
| runtime_error = "" |
| artifact_error = "" |
| runtime: Any = {} |
| artifact_files: list[str] = [] |
|
|
| try: |
| info = api.space_info(args.space_id) |
| runtime = getattr(info, "runtime", None) |
| except Exception as exc: |
| runtime_error = str(exc) |
|
|
| try: |
| artifact_files = api.list_repo_files(repo_id=args.artifact_repo_id, repo_type="model", token=token) |
| except Exception as exc: |
| artifact_error = str(exc) |
|
|
| prior_records = stage_records_from(ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b" / "manifest.json") |
| current_records = stage_records_from(ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b_3b" / "manifest.json") |
| prior_model_statuses = model_statuses_from( |
| ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b" / "manifest.json" |
| ) |
| current_model_statuses = model_statuses_from( |
| ROOT / "outputs" / "reports" / "submission_evidence" / "qwen_0_5b_1_5b_3b" / "manifest.json" |
| ) |
| stage_records = prior_records + [record for record in current_records if record not in prior_records] |
| completed_stages = { |
| f"{record.get('run_id')}:{record.get('stage')}": record |
| for record in stage_records |
| if isinstance(record, dict) and record.get("completed") is True |
| } |
| run_ids = [ |
| "qwen-qwen2-5-0-5b-instruct", |
| "qwen-qwen2-5-1-5b-instruct", |
| "qwen-qwen2-5-3b-instruct", |
| ] |
| run_statuses = {} |
| for run_id in run_ids: |
| merged_statuses = {**prior_model_statuses.get(run_id, {}), **current_model_statuses.get(run_id, {})} |
|
|
| def status_for(stage: str) -> str: |
| value = merged_statuses.get(stage, "") |
| if value == "artifact_available": |
| return "artifact_available" |
| if "remote_completed" in value: |
| return value |
| if f"{run_id}:{stage}" in completed_stages: |
| return "completed" |
| return value or "pending_or_unseen" |
|
|
| run_statuses[run_id] = { |
| "sft_training": status_for("sft_training"), |
| "grpo_training": status_for("grpo_training"), |
| "sft_postsave_inference": status_for("sft_postsave_inference"), |
| "grpo_postsave_inference": status_for("grpo_postsave_inference"), |
| "policy_ablation": status_for("policy_ablation"), |
| "artifact_files": [ |
| item for item in artifact_files if f"outputs/reports/sweeps/{run_id}/" in item or f"checkpoints/sweeps/{run_id}/" in item |
| ], |
| } |
|
|
| report = { |
| "status": "ok", |
| "generated_at_utc": datetime.now(timezone.utc).isoformat(), |
| "space_id": args.space_id, |
| "artifact_repo_id": args.artifact_repo_id, |
| "runtime": repr(runtime), |
| "runtime_error": runtime_error, |
| "artifact_error": artifact_error, |
| "artifact_file_count": len(artifact_files), |
| "has_usable_active_bundle": any(item.startswith("usable_model_bundles/local-qwen-0-5b-active-smoke/") for item in artifact_files), |
| "has_full_sweep_artifacts": any("outputs/reports/sweeps/" in item or "checkpoints/sweeps/" in item for item in artifact_files), |
| "run_statuses": run_statuses, |
| "interpretation": ( |
| "The Space is not actively training if runtime contains stage='PAUSED'. " |
| "Completed stage records are taken from live evidence snapshots when available; " |
| "missing per-run artifact files mean the full sweep checkpoints/reports are not yet downloadable." |
| ), |
| } |
| output = ROOT / args.output |
| output.parent.mkdir(parents=True, exist_ok=True) |
| output.write_text(json.dumps(report, ensure_ascii=True, indent=2) + "\n", encoding="utf-8") |
| docs_output = ROOT / "docs" / "results" / "submission_evidence_qwen_0_5b_1_5b_3b" / "reports" / output.name |
| docs_output.parent.mkdir(parents=True, exist_ok=True) |
| docs_output.write_text(json.dumps(report, ensure_ascii=True, indent=2) + "\n", encoding="utf-8") |
| print(json.dumps(report, ensure_ascii=True, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|