| |
| """Job-side training entrypoint for ForgeEnv on HF Jobs A100. |
| |
| Submitted via ``scripts/submit_training_job.py``. The launcher fills in |
| ``HF_TOKEN``, ``HF_USERNAME``, ``ENV_URL`` as Job env vars. The job: |
| |
| 1. Clones ``<HF_USERNAME>/forgeenv-source`` (full project tree). |
| 2. Installs the repo with training extras. |
| 3. Sanity-pings the live env Space. |
| 4. Runs warm-start SFT (TRL SFTTrainer + Unsloth, 4-bit LoRA). |
| 5. Runs GRPO repair (TRL GRPOTrainer) starting from the SFT adapter. |
| 6. Generates plots via ``forgeenv.training.plots``. |
| 7. Pushes the LoRA + ``repair_library.json`` + plots to |
| ``<HF_USERNAME>/forgeenv-repair-agent``. |
| |
| The script is linear and prints big section markers so the streaming log |
| is easy to follow from the launcher. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import shutil |
| import subprocess |
| import sys |
| from pathlib import Path |
|
|
|
|
| def _sh(cmd: list[str], **kwargs) -> None: |
| print(f"[job] $ {' '.join(cmd)}", flush=True) |
| subprocess.check_call(cmd, **kwargs) |
|
|
|
|
| def step(label: str) -> None: |
| print(f"\n========== {label} ==========\n", flush=True) |
|
|
|
|
| HF_TOKEN = os.environ["HF_TOKEN"] |
| HF_USERNAME = os.environ.get("HF_USERNAME", "akhiilll") |
| ENV_URL = os.environ.get("ENV_URL", f"https://{HF_USERNAME}-forgeenv.hf.space") |
| SOURCE_REPO = os.environ.get("SOURCE_REPO", f"{HF_USERNAME}/forgeenv-source") |
| MODEL_REPO = os.environ.get("MODEL_REPO", f"{HF_USERNAME}/forgeenv-repair-agent") |
| BASE_MODEL = os.environ.get("BASE_MODEL", "Qwen/Qwen2.5-3B-Instruct") |
| SFT_STEPS = int(os.environ.get("SFT_STEPS", "1000")) |
| GRPO_STEPS = int(os.environ.get("GRPO_STEPS", "200")) |
|
|
| WORK = Path("/tmp/forgeenv_work") |
| WORK.mkdir(parents=True, exist_ok=True) |
| OUT = WORK / "outputs" |
| OUT.mkdir(parents=True, exist_ok=True) |
| SFT_DIR = OUT / "sft" |
| GRPO_DIR = OUT / "grpo" |
| PLOTS_DIR = OUT / "plots" |
| PLOTS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| step("0. clone source from Hub") |
| src_dir = WORK / "src" |
| if src_dir.exists(): |
| shutil.rmtree(src_dir) |
| _sh([ |
| "git", "clone", |
| f"https://USER:{HF_TOKEN}@huggingface.co/{SOURCE_REPO}", |
| str(src_dir), |
| ]) |
| |
| |
| |
| sys.path.insert(0, str(src_dir)) |
|
|
| step("1. pin torch (cu124) + install GPU-stable deps") |
| |
| |
| |
| |
| |
| _sh([ |
| sys.executable, "-m", "pip", "install", |
| "--index-url", "https://download.pytorch.org/whl/cu124", |
| "torch==2.6.0", "torchvision==0.21.0", |
| ]) |
| |
| |
| |
| |
| _sh([ |
| sys.executable, "-m", "pip", "install", "--no-deps", |
| "openenv-core>=0.2.0", |
| ]) |
| _sh([ |
| sys.executable, "-m", "pip", "install", |
| "fastmcp>=3.0.0", |
| "gradio>=4.0.0", |
| "openai>=2.7.2", |
| "tomli>=2.3.0", |
| "tomli-w>=1.2.0", |
| "websockets>=15.0.1", |
| ]) |
| _sh([ |
| sys.executable, "-m", "pip", "install", |
| "trl==1.2.0", "peft", "accelerate", "datasets", |
| "bitsandbytes", |
| "matplotlib", "pyyaml", "nltk", "scikit-learn", |
| "fastapi", "uvicorn", "pydantic", "requests", |
| "sentencepiece", "protobuf", |
| ]) |
| try: |
| |
| _sh([sys.executable, "-m", "pip", "install", "--no-deps", "unsloth", "unsloth-zoo"]) |
| except subprocess.CalledProcessError: |
| print("[job] WARN: unsloth install failed — trainer will use plain HF.", flush=True) |
|
|
| import torch |
|
|
| print(f"[job] torch: {torch.__version__}", flush=True) |
| print(f"[job] CUDA available: {torch.cuda.is_available()}", flush=True) |
| if torch.cuda.is_available(): |
| print(f"[job] GPU: {torch.cuda.get_device_name(0)}", flush=True) |
| print( |
| f"[job] VRAM: " |
| f"{torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB", |
| flush=True, |
| ) |
| else: |
| raise SystemExit("[job] FATAL: no CUDA — refusing to run training on CPU.") |
|
|
| step("2. ping live env Space + verify forgeenv import") |
| import requests |
|
|
| try: |
| r = requests.get(f"{ENV_URL}/health", timeout=20) |
| print(f"[job] env /health -> {r.status_code} {r.text}", flush=True) |
| except Exception as e: |
| print(f"[job] WARN: env ping failed: {e}", flush=True) |
|
|
| |
| |
| import forgeenv |
| from forgeenv.training.grpo_repair import run_grpo |
|
|
| print("[job] forgeenv import OK", flush=True) |
|
|
| step("3. SFT: load Qwen + LoRA via Unsloth, train on warm-start pairs") |
| from unsloth import FastLanguageModel |
|
|
| model, tokenizer = FastLanguageModel.from_pretrained( |
| model_name=BASE_MODEL, |
| max_seq_length=2048, |
| load_in_4bit=True, |
| dtype=None, |
| token=HF_TOKEN, |
| ) |
| model = FastLanguageModel.get_peft_model( |
| model, |
| r=16, |
| lora_alpha=32, |
| lora_dropout=0, |
| target_modules=[ |
| "q_proj", "k_proj", "v_proj", "o_proj", |
| "gate_proj", "up_proj", "down_proj", |
| ], |
| use_gradient_checkpointing="unsloth", |
| ) |
| print( |
| f"[job] trainable params: " |
| f"{model.num_parameters(only_trainable=True):,}", |
| flush=True, |
| ) |
|
|
| import datasets as ds |
| from trl import SFTConfig, SFTTrainer |
|
|
| sft_jsonl = src_dir / "warmstart" / "data" / "repair_pairs.jsonl" |
| if not sft_jsonl.exists(): |
| sft_jsonl = src_dir / "warmstart" / "data" / "drift_pairs.jsonl" |
| print(f"[job] SFT pairs: {sft_jsonl}", flush=True) |
|
|
|
|
| def _format_chat(example): |
| msgs = example.get("messages") |
| if not msgs: |
| return {"text": ""} |
| return { |
| "text": tokenizer.apply_chat_template( |
| msgs, tokenize=False, add_generation_prompt=False |
| ) |
| } |
|
|
|
|
| sft_ds = ds.load_dataset("json", data_files=str(sft_jsonl), split="train") |
| sft_ds = sft_ds.map(_format_chat, remove_columns=sft_ds.column_names) |
|
|
| sft_trainer = SFTTrainer( |
| model=model, |
| processing_class=tokenizer, |
| train_dataset=sft_ds, |
| args=SFTConfig( |
| output_dir=str(SFT_DIR), |
| max_steps=SFT_STEPS, |
| per_device_train_batch_size=4, |
| gradient_accumulation_steps=4, |
| learning_rate=2e-4, |
| logging_steps=25, |
| save_steps=max(250, SFT_STEPS // 4), |
| bf16=torch.cuda.is_bf16_supported(), |
| fp16=not torch.cuda.is_bf16_supported(), |
| max_length=2048, |
| packing=True, |
| packing_strategy="bfd", |
| report_to=[], |
| ), |
| ) |
| sft_trainer.train() |
| model.save_pretrained(str(SFT_DIR)) |
| tokenizer.save_pretrained(str(SFT_DIR)) |
|
|
| |
| del sft_trainer, model, tokenizer |
| import gc |
|
|
| gc.collect() |
| torch.cuda.empty_cache() |
|
|
| step("4. GRPO repair training (resumes from SFT adapter)") |
| from forgeenv.training.grpo_repair import run_grpo |
|
|
| run_grpo( |
| base_model=BASE_MODEL, |
| adapter_path=str(SFT_DIR), |
| output_dir=str(GRPO_DIR), |
| total_episodes=GRPO_STEPS, |
| group_size=4, |
| learning_rate=5e-6, |
| ) |
|
|
| step("5. generate plots from training logs") |
| from forgeenv.training.plots import ( |
| plot_baseline_vs_trained, |
| plot_reward_curve, |
| plot_success_rate_by_category, |
| ) |
|
|
| |
| |
| def _find_trainer_state(grpo_dir: Path) -> Optional[Path]: |
| direct = grpo_dir / "trainer_state.json" |
| if direct.exists(): |
| return direct |
| ckpts = sorted( |
| (p for p in grpo_dir.glob("checkpoint-*") if (p / "trainer_state.json").exists()), |
| key=lambda p: int(p.name.split("-")[-1]) if p.name.split("-")[-1].isdigit() else -1, |
| ) |
| return (ckpts[-1] / "trainer_state.json") if ckpts else None |
|
|
|
|
| from typing import Optional |
|
|
| trainer_state = _find_trainer_state(GRPO_DIR) |
| print(f"[job] trainer_state path: {trainer_state}", flush=True) |
| training_rewards: list[float] = [] |
| if trainer_state is not None and trainer_state.exists(): |
| state = json.loads(trainer_state.read_text()) |
| log_history = state.get("log_history", []) |
| print(f"[job] log_history rows: {len(log_history)}", flush=True) |
| if log_history: |
| sample_keys = sorted(set().union(*(log.keys() for log in log_history))) |
| print(f"[job] log keys present: {sample_keys}", flush=True) |
| for log in log_history: |
| |
| |
| candidates = [ |
| "rewards/reward_repair_function/mean", |
| "rewards/mean", |
| "reward", |
| "train/reward", |
| ] |
| |
| for k in list(log.keys()): |
| if k.startswith("rewards/") and k.endswith("/mean") and k not in candidates: |
| candidates.append(k) |
| for k in candidates: |
| if k in log: |
| training_rewards.append(float(log[k])) |
| break |
| print(f"[job] {len(training_rewards)} reward log points", flush=True) |
| if training_rewards: |
| print( |
| f"[job] reward range: {min(training_rewards):.3f}..{max(training_rewards):.3f}", |
| flush=True, |
| ) |
|
|
| plot_reward_curve( |
| training_rewards or [0.0], |
| str(PLOTS_DIR / "training_reward_curve.png"), |
| ) |
| |
| |
| |
| src_plots = src_dir / "artifacts" / "plots" |
| for name in ("baseline_vs_trained.png", "success_by_category.png"): |
| src_p = src_plots / name |
| if src_p.exists(): |
| shutil.copy(src_p, PLOTS_DIR / name) |
|
|
| step("6. push LoRA + artifacts to Hub") |
| final_dir = OUT / "final" |
| final_dir.mkdir(parents=True, exist_ok=True) |
| for item in GRPO_DIR.iterdir(): |
| if item.is_file() and ( |
| item.name.startswith("adapter_") |
| or item.name.startswith("tokenizer") |
| or item.name in {"special_tokens_map.json", "vocab.json", "merges.txt"} |
| ): |
| shutil.copy(item, final_dir / item.name) |
|
|
| repair_lib = src_dir / "artifacts" / "repair_library.json" |
| if repair_lib.exists(): |
| shutil.copy(repair_lib, final_dir / "repair_library.json") |
|
|
| from huggingface_hub import HfApi |
|
|
| api = HfApi() |
| api.create_repo( |
| repo_id=MODEL_REPO, |
| repo_type="model", |
| token=HF_TOKEN, |
| exist_ok=True, |
| private=False, |
| ) |
| api.upload_folder( |
| folder_path=str(final_dir), |
| repo_id=MODEL_REPO, |
| repo_type="model", |
| token=HF_TOKEN, |
| commit_message=f"GRPO LoRA (sft={SFT_STEPS}, grpo={GRPO_STEPS})", |
| ignore_patterns=["__pycache__", "*.pyc"], |
| ) |
| api.upload_folder( |
| folder_path=str(PLOTS_DIR), |
| repo_id=MODEL_REPO, |
| repo_type="model", |
| token=HF_TOKEN, |
| path_in_repo="plots", |
| commit_message="Training plots", |
| ) |
|
|
| print( |
| f"\n[job] DONE. Model live at https://huggingface.co/{MODEL_REPO}", |
| flush=True, |
| ) |
| print( |
| json.dumps( |
| { |
| "sft_steps": SFT_STEPS, |
| "grpo_steps": GRPO_STEPS, |
| "rewards_logged": len(training_rewards), |
| "model_repo": MODEL_REPO, |
| }, |
| indent=2, |
| ), |
| flush=True, |
| ) |
|
|