"""FastAPI control panel for the CERNenv trainer Space. Endpoints: GET / → status page (HTML) GET /status → JSON status of the current training run GET /metrics → JSON snapshot of reward / success rate GET /logs → tail of the training log POST /train → start (or restart) a training run GET /health → liveness probe Designed to run on a Hugging Face Space with `sdk: docker`. Heavy training work runs in a background thread so the HTTP server stays responsive. """ from __future__ import annotations import json import logging import os import subprocess import sys import threading import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Optional from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) def _resolve_repo_root() -> Path: env_root = os.environ.get("CERNENV_ROOT") candidates = [] if env_root: candidates.append(Path(env_root)) candidates.extend([ Path("/home/user/app"), Path(__file__).resolve().parent.parent.parent, ]) for p in candidates: try: if p.exists(): return p.resolve() except OSError: continue return candidates[-1].resolve() REPO_ROOT = _resolve_repo_root() LOG_DIR = REPO_ROOT / "training" / "runs" try: LOG_DIR.mkdir(parents=True, exist_ok=True) except OSError as exc: # pragma: no cover - read-only filesystem fallback logger.warning("could not create %s (%s); using /tmp", LOG_DIR, exc) LOG_DIR = Path("/tmp/cernenv-runs") LOG_DIR.mkdir(parents=True, exist_ok=True) LOG_FILE = LOG_DIR / "training.log" EVIDENCE_DIR = REPO_ROOT / "evidence" try: EVIDENCE_DIR.mkdir(parents=True, exist_ok=True) except OSError: # pragma: no cover EVIDENCE_DIR = Path("/tmp/cernenv-evidence") EVIDENCE_DIR.mkdir(parents=True, exist_ok=True) METRICS_FILE = EVIDENCE_DIR / "before_after_metrics.json" def _env(name: str, default: str) -> str: return os.environ.get(name, default) def _detect_gpus() -> int: try: import torch # type: ignore if torch.cuda.is_available(): return torch.cuda.device_count() except Exception: pass try: out = subprocess.run( ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"], capture_output=True, text=True, timeout=5, ) return len([l for l in out.stdout.splitlines() if l.strip()]) except Exception: return 0 _NUM_GPUS = _detect_gpus() CONFIG = { "model_name": _env("MODEL_NAME", "unsloth/Qwen2.5-3B-Instruct"), "difficulty": _env("DIFFICULTY", "easy"), "total_episodes": int(_env("TOTAL_EPISODES", "1500")), "max_steps": int(_env("MAX_STEPS", "18")), "num_generations": int(_env("NUM_GENERATIONS", "8")), "checkpoint_eval_steps": int(_env("CHECKPOINT_EVAL_STEPS", "25")), "checkpoint_eval_episodes": int(_env("CHECKPOINT_EVAL_EPISODES", "8")), "eval_episodes": int(_env("EVAL_EPISODES", "32")), "output_dir": _env("OUTPUT_DIR", "runs/unsloth-grpo"), "evidence_dir": _env("EVIDENCE_DIR", "evidence"), "num_gpus": int(_env("NUM_GPUS", str(_NUM_GPUS or 1))), "hf_username": _env("HF_USERNAME", "anugrah55"), "push_repo": _env( "PUSH_REPO", f"{_env('HF_USERNAME', 'anugrah55')}/cernenv-grpo-qwen2.5-3b", ), "autostart": _env("AUTOSTART", "0") == "1", } # ── Run state ──────────────────────────────────────────────────────────── class RunState: def __init__(self) -> None: self.lock = threading.Lock() self.thread: Optional[threading.Thread] = None self.process: Optional[subprocess.Popen] = None self.status: str = "idle" # idle | running | finished | failed self.started_at: Optional[str] = None self.finished_at: Optional[str] = None self.last_error: Optional[str] = None self.last_config: Dict[str, Any] = {} def to_dict(self) -> Dict[str, Any]: with self.lock: return { "status": self.status, "started_at": self.started_at, "finished_at": self.finished_at, "last_error": self.last_error, "last_config": self.last_config, } STATE = RunState() # ── Training pipeline ──────────────────────────────────────────────────── def _stream_subprocess(cmd: list[str], log_handle) -> int: log_handle.write(f"\n$ {' '.join(cmd)}\n") log_handle.flush() proc = subprocess.Popen( cmd, cwd=str(REPO_ROOT), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, env={**os.environ, "PYTHONPATH": str(REPO_ROOT)}, ) STATE.process = proc assert proc.stdout is not None for line in proc.stdout: log_handle.write(line) log_handle.flush() rc = proc.wait() log_handle.write(f"[exit code {rc}]\n") log_handle.flush() STATE.process = None return rc def _build_training_cmd(config: Dict[str, Any]) -> list[str]: """Compose the training launcher (single-GPU python or multi-GPU accelerate).""" base = [ "-m", "training.training_unsloth", "--model_name", config["model_name"], "--difficulty", config["difficulty"], "--total_episodes", str(config["total_episodes"]), "--max_steps", str(config["max_steps"]), "--num_generations", str(config["num_generations"]), "--checkpoint_eval_steps", str(config["checkpoint_eval_steps"]), "--checkpoint_eval_episodes", str(config["checkpoint_eval_episodes"]), "--output_dir", config["output_dir"], "--evidence_dir", config["evidence_dir"], ] n = max(int(config.get("num_gpus", 1)), 1) if n > 1: return ["accelerate", "launch", "--num_processes", str(n), "--mixed_precision", "bf16"] + base return [sys.executable] + base def _push_evidence_to_hub(*, evidence_dir: Path, repo_id: str, log) -> None: """Upload the entire evidence/ directory to the model repo.""" token = os.environ.get("HF_TOKEN") if not token: log.write("\n[skip] HF_TOKEN not set — evidence not pushed\n") log.flush() return try: from huggingface_hub import HfApi api = HfApi(token=token) api.upload_folder( folder_path=str(evidence_dir), repo_id=repo_id, repo_type="model", path_in_repo="evidence", commit_message="Upload CERNenv training evidence (curves, evals, plots)", ) log.write(f"\n[ok] uploaded evidence/ → https://huggingface.co/{repo_id}/tree/main/evidence\n") log.flush() except Exception as exc: log.write(f"\n[warn] evidence push failed: {exc}\n") log.flush() def _training_pipeline(config: Dict[str, Any]) -> None: started = datetime.now(timezone.utc).isoformat() with STATE.lock: STATE.status = "running" STATE.started_at = started STATE.finished_at = None STATE.last_error = None STATE.last_config = dict(config) evidence_dir = Path(config["evidence_dir"]).resolve() evidence_dir.mkdir(parents=True, exist_ok=True) LOG_FILE.parent.mkdir(parents=True, exist_ok=True) with open(LOG_FILE, "a") as log: log.write(f"\n=== Training started {started} ===\n") log.write(json.dumps(config, indent=2) + "\n") log.flush() try: output_dir = config["output_dir"] difficulty = config["difficulty"] max_steps = str(config["max_steps"]) eval_episodes = str(config["eval_episodes"]) model_name = config["model_name"] push_repo = config["push_repo"] evidence_str = config["evidence_dir"] pre_jsonl = f"{evidence_str}/pre_eval.jsonl" post_jsonl = f"{evidence_str}/post_eval.jsonl" log.write("\n--- baseline sanity check (random / heuristic / oracle) ---\n") log.flush() for agent in ("random", "heuristic", "oracle"): _stream_subprocess( [ sys.executable, "-m", "scripts.run_agent", "--agent", agent, "--difficulty", difficulty, "--episodes", "3", "--quiet", ], log, ) log.write(f"\n--- pre-train evaluation ({eval_episodes} eps) ---\n") log.flush() rc = _stream_subprocess( [ sys.executable, "-m", "training.evaluate", "--model_name", model_name, "--difficulty", difficulty, "--episodes", eval_episodes, "--max_steps", max_steps, "--tag", "pre_train", "--out", pre_jsonl, ], log, ) if rc != 0: # don't abort — we still want training + post-eval evidence. log.write(f"\n[warn] pre-train eval failed (rc={rc}); continuing without baseline\n") log.flush() log.write(f"\n--- GRPO training ({config['num_gpus']} GPU process(es)) ---\n") log.flush() rc = _stream_subprocess(_build_training_cmd(config), log) if rc != 0: raise RuntimeError(f"training failed (rc={rc})") log.write(f"\n--- post-train evaluation ({eval_episodes} eps) ---\n") log.flush() rc = _stream_subprocess( [ sys.executable, "-m", "training.evaluate", "--model_name", model_name, "--adapter_dir", output_dir, "--difficulty", difficulty, "--episodes", eval_episodes, "--max_steps", max_steps, "--tag", "post_train", "--out", post_jsonl, ], log, ) if rc != 0: log.write(f"\n[warn] post-train eval failed (rc={rc}); evidence will be partial\n") log.flush() log.write("\n--- evidence: before/after summary, distribution, trajectories ---\n") log.flush() try: from training.evidence import ( EvidencePaths, render_before_after, render_sample_trajectories, render_training_curve, render_checkpoint_progression, ) paths = EvidencePaths(root=Path(evidence_str)) paths.ensure() metrics = render_before_after( pre_jsonl=Path(pre_jsonl), post_jsonl=Path(post_jsonl), summary_png=paths.before_after_summary_png, distribution_png=paths.reward_distribution_png, metrics_json=paths.before_after_metrics_json, ) render_sample_trajectories( pre_jsonl=Path(pre_jsonl), post_jsonl=Path(post_jsonl), md_path=paths.sample_trajectories_md, ) render_training_curve(paths.training_log_csv, paths.training_curve_png) render_checkpoint_progression( paths.checkpoint_evals_csv, paths.checkpoint_progression_png, ) log.write(json.dumps(metrics, indent=2) + "\n") log.flush() except Exception as exc: log.write(f"[warn] evidence rendering failed: {exc}\n") log.flush() if os.environ.get("HF_TOKEN"): log.write("\n--- push adapters to Hub ---\n") log.flush() _stream_subprocess( [ sys.executable, "-m", "scripts.push_to_hub", "model", "--adapter_dir", output_dir, "--repo_id", push_repo, "--base_model", model_name, ], log, ) _push_evidence_to_hub( evidence_dir=evidence_dir, repo_id=push_repo, log=log, ) else: log.write("\n[skip] HF_TOKEN not set — not pushing to Hub\n") log.flush() with STATE.lock: STATE.status = "finished" except Exception as exc: logger.exception("training pipeline failed") with STATE.lock: STATE.status = "failed" STATE.last_error = str(exc) finally: finished = datetime.now(timezone.utc).isoformat() log.write(f"\n=== Training ended {finished} ===\n") log.flush() with STATE.lock: STATE.finished_at = finished def _start_training(config: Dict[str, Any]) -> None: with STATE.lock: if STATE.status == "running": raise RuntimeError("a training run is already in progress") STATE.thread = threading.Thread( target=_training_pipeline, args=(config,), name="cernenv-trainer", daemon=True, ) STATE.thread.start() # ── FastAPI app ────────────────────────────────────────────────────────── app = FastAPI(title="CERNenv Trainer", version="0.1.0") _HTML = """\ CERNenv Trainer

⚛️ CERNenv Trainer

GRPO + Unsloth + LoRA on the CERNenv LHC discovery environment. Multi-GPU on Hugging Face Spaces.

Run status

Status: ?

Training-progress evidence

Auto-updated as training runs. All artifacts are also saved to evidence/ and pushed to the model repo on the Hub.

Per-step training curve
Mid-training checkpoint progression
Before vs after summary
Reward distribution: pre vs post

Before / after metrics

metricprepostΔ

Live logs (tail)

loading…
""" @app.get("/", response_class=HTMLResponse) def index() -> HTMLResponse: return HTMLResponse(_HTML) @app.get("/health") def health() -> Dict[str, str]: return {"status": "ok"} @app.get("/status") def status() -> JSONResponse: return JSONResponse(STATE.to_dict()) @app.get("/metrics") def metrics() -> JSONResponse: if METRICS_FILE.exists(): try: return JSONResponse(json.loads(METRICS_FILE.read_text())) except Exception: return JSONResponse({"error": "metrics file unreadable"}, status_code=500) return JSONResponse({"pre": None, "post": None, "delta": None}) @app.get("/evidence") def evidence_index() -> JSONResponse: """List every evidence artifact currently on disk.""" files = [] if EVIDENCE_DIR.exists(): for p in sorted(EVIDENCE_DIR.iterdir()): if p.is_file(): files.append({ "name": p.name, "size": p.stat().st_size, "url": f"/evidence/{p.name}", }) return JSONResponse({"dir": str(EVIDENCE_DIR), "files": files}) @app.get("/evidence/{name}") def evidence_file(name: str): """Serve a single evidence artifact (PNG/CSV/JSON/MD) by filename.""" if "/" in name or ".." in name: raise HTTPException(status_code=400, detail="invalid name") target = EVIDENCE_DIR / name if not target.exists() or not target.is_file(): raise HTTPException(status_code=404, detail=f"{name} not found") return FileResponse(target) @app.get("/logs", response_class=PlainTextResponse) def logs(tail: int = 400) -> PlainTextResponse: if not LOG_FILE.exists(): return PlainTextResponse("") text = LOG_FILE.read_text() lines = text.splitlines() return PlainTextResponse("\n".join(lines[-max(tail, 1):])) @app.post("/train") def train() -> JSONResponse: try: _start_training(dict(CONFIG)) except RuntimeError as exc: raise HTTPException(status_code=409, detail=str(exc)) return JSONResponse({"status": "started", "config": CONFIG}) @app.on_event("startup") def _maybe_autostart() -> None: if CONFIG["autostart"]: try: _start_training(dict(CONFIG)) logger.info("autostarted training run") except RuntimeError as exc: logger.warning("autostart skipped: %s", exc)