#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Code LLM 持續進化系統 (Continuous Improvement Pipeline) ========================================================= 自動化執行以下訓練循環: Stage 1: SFT — 基礎程式碼能力 (已完成) Stage 2: DPO — 學會分辨好壞程式碼 Stage 3: GRPO — 用程式碼執行結果自我強化 Stage 4: 自我對弈 — 用自己的輸出生成新訓練數據,無限循環 Hardware: RTX 3070 (8GB VRAM) / Colab T4 Base Model: Qwen/Qwen2.5-Coder-3B Usage: # 完整流水線(從頭開始) python code_llm_pipeline.py --run all # 只跑單一階段 python code_llm_pipeline.py --run sft python code_llm_pipeline.py --run dpo python code_llm_pipeline.py --run grpo python code_llm_pipeline.py --run self_play # 持續自我進化循環 python code_llm_pipeline.py --run loop --iterations 10 # 從已有模型繼續 python code_llm_pipeline.py --run dpo --start_from ./pipeline/stage1_sft """ import argparse, json, os, subprocess, sys, tempfile, time, torch from datetime import datetime from pathlib import Path BASE_MODEL = "Qwen/Qwen2.5-Coder-3B" HF_USERNAME = "YOUR_HF_USERNAME" DIRS = { "sft": "./pipeline/stage1_sft", "dpo": "./pipeline/stage2_dpo", "grpo": "./pipeline/stage3_grpo", "self_play": "./pipeline/stage4_self_play", "eval": "./pipeline/eval_results", } GPU_CONFIG = { "batch_size": 1, "grad_accum": 16, "max_seq_length": 1024, "bf16": True, "gradient_checkpointing": True, "optim": "paged_adamw_8bit", } def get_bnb_config(): from transformers import BitsAndBytesConfig return BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True) def get_lora_config(r=16, alpha=32): from peft import LoraConfig return LoraConfig(r=r, lora_alpha=alpha, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM", target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]) def load_model(model_path=None): from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel print(f"📥 載入模型...") tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token base = AutoModelForCausalLM.from_pretrained(BASE_MODEL, quantization_config=get_bnb_config(), device_map="auto", trust_remote_code=True) if model_path and os.path.exists(model_path): print(f" 載入 LoRA: {model_path}") model = PeftModel.from_pretrained(base, model_path, is_trainable=True) else: model = base return model, tokenizer def evaluate_model(model, tokenizer, stage_name): from transformers import pipeline print(f"\n📊 評估 [{stage_name}]...") pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=False) tests = [ ("Two Sum", 'def two_sum(nums: list[int], target: int) -> list[int]:\n """Return indices of two numbers that add up to target."""\n'), ("Fibonacci", 'def fibonacci(n: int) -> int:\n """Return the nth Fibonacci number."""\n'), ("Binary Search", 'def binary_search(arr: list[int], target: int) -> int:\n """Return index of target in sorted array, or -1 if not found."""\n'), ("Palindrome", 'def is_palindrome(s: str) -> bool:\n """Check if string is a palindrome, ignoring case and non-alphanumeric."""\n'), ("Merge Sort", 'def merge_sort(arr: list[int]) -> list[int]:\n """Sort array using merge sort."""\n'), ] passed = 0 for name, prompt in tests: output = pipe(prompt, return_full_text=True) try: compile(output[0]["generated_text"], "", "exec"); passed += 1; s = "✅" except SyntaxError: s = "❌" print(f" {s} {name}") score = passed / len(tests) * 100 print(f"\n 結果: {passed}/{len(tests)} ({score:.0f}%)") os.makedirs(DIRS["eval"], exist_ok=True) with open(os.path.join(DIRS["eval"], "history.jsonl"), "a") as f: f.write(json.dumps({"stage": stage_name, "timestamp": datetime.now().isoformat(), "score": score}) + "\n") return score def run_sft(): from datasets import load_dataset, concatenate_datasets from trl import SFTTrainer, SFTConfig from peft import prepare_model_for_kbit_training, get_peft_model print("\n" + "="*60 + "\n STAGE 1: SFT\n" + "="*60) cf = load_dataset("m-a-p/Code-Feedback", split="train") cf_msgs = cf.map(lambda x: {"messages": x["messages"]}, remove_columns=[c for c in cf.column_names if c != "messages"]) mc = load_dataset("ise-uiuc/Magicoder-OSS-Instruct-75K", split="train") mc_msgs = mc.map(lambda x: {"messages": [{"role":"system","content":"You are an exceptionally skilled programmer."},{"role":"user","content":x["problem"]},{"role":"assistant","content":x["solution"]}]}, remove_columns=mc.column_names) dataset = concatenate_datasets([cf_msgs, mc_msgs]).shuffle(seed=42) split = dataset.train_test_split(test_size=0.02, seed=42) print(f" 訓練: {len(split['train']):,} / 驗證: {len(split['test']):,}") model, tokenizer = load_model() model = prepare_model_for_kbit_training(model) lora_config = get_lora_config(r=64, alpha=128) model = get_peft_model(model, lora_config); model.print_trainable_parameters() args = SFTConfig(output_dir=DIRS["sft"], learning_rate=2e-4, lr_scheduler_type="cosine", warmup_ratio=0.05, num_train_epochs=2, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], max_seq_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], packing=True, logging_steps=50, save_steps=2000, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-sft") trainer = SFTTrainer(model=model, args=args, processing_class=tokenizer, train_dataset=split["train"], eval_dataset=split["test"], peft_config=lora_config) print("\n🚀 開始 SFT..."); trainer.train(); trainer.save_model(DIRS["sft"]) evaluate_model(model, tokenizer, "sft"); return DIRS["sft"] def run_dpo(prev_model_path=None): from datasets import load_dataset from trl import DPOTrainer, DPOConfig print("\n" + "="*60 + "\n STAGE 2: DPO\n" + "="*60) prev_model_path = prev_model_path or DIRS["sft"] dataset = load_dataset("coseal/CodeUltraFeedback_binarized", split="train") dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":ex["instruction"]}],"chosen":[{"role":"assistant","content":ex["chosen"]}],"rejected":[{"role":"assistant","content":ex["rejected"]}]}, remove_columns=dataset.column_names) print(f" 偏好對: {len(dataset):,}") model, tokenizer = load_model(prev_model_path) args = DPOConfig(output_dir=DIRS["dpo"], learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-dpo") trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32)) print("\n🚀 開始 DPO..."); trainer.train(); trainer.save_model(DIRS["dpo"]) evaluate_model(model, tokenizer, "dpo"); return DIRS["dpo"] def code_execution_reward(completions, tests=None, **kwargs): rewards = [] for completion, test_code in zip(completions, tests or [""]*len(completions)): code = completion[0]["content"] if isinstance(completion, list) else completion if "```python" in code: code = code.split("```python")[1].split("```")[0] elif "```" in code: code = code.split("```")[1].split("```")[0] reward = 0.0 try: compile(code, "", "exec"); reward = 0.3 if test_code: with tempfile.TemporaryDirectory() as d: open(os.path.join(d,"solution.py"),"w").write(code) open(os.path.join(d,"test_solution.py"),"w").write(test_code) r = subprocess.run([sys.executable,"-m","pytest",os.path.join(d,"test_solution.py"),"-x","--tb=no","-q"], capture_output=True, text=True, timeout=15) if r.returncode == 0: reward = 1.0 elif "passed" in r.stdout: reward = 0.6 except: reward = 0.0 rewards.append(reward) return rewards def run_grpo(prev_model_path=None): from datasets import load_dataset from trl import GRPOTrainer, GRPOConfig print("\n" + "="*60 + "\n STAGE 3: GRPO\n" + "="*60) prev_model_path = prev_model_path or DIRS["dpo"] dataset = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42).select(range(5000)) dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code."}],"tests":ex["test"]}, remove_columns=dataset.column_names) print(f" 題目: {len(dataset):,}") model, tokenizer = load_model(prev_model_path) args = GRPOConfig(output_dir=DIRS["grpo"], learning_rate=1e-5, beta=0.04, num_generations=4, max_completion_length=512, temperature=0.9, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-grpo") trainer = GRPOTrainer(model=model, args=args, reward_funcs=code_execution_reward, train_dataset=dataset, peft_config=get_lora_config(r=8, alpha=16)) print("\n🚀 開始 GRPO..."); trainer.train(); trainer.save_model(DIRS["grpo"]) evaluate_model(model, tokenizer, "grpo"); return DIRS["grpo"] def run_self_play(prev_model_path=None, iteration=0): from datasets import load_dataset, Dataset from transformers import pipeline as hf_pipeline from trl import DPOTrainer, DPOConfig print("\n" + "="*60 + f"\n STAGE 4: SELF-PLAY (Iter {iteration})\n" + "="*60) prev_model_path = prev_model_path or DIRS["grpo"] raw = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42+iteration).select(range(2000)) model, tokenizer = load_model(prev_model_path) pipe = hf_pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=True, temperature=0.8) spin_data = [] for i, ex in enumerate(raw): prompt_text = f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code." try: output = pipe([{"role":"user","content":prompt_text}]) rejected_text = output[0]["generated_text"][-1]["content"] except: rejected_text = "# Failed" spin_data.append({"prompt":[{"role":"user","content":prompt_text}],"chosen":[{"role":"assistant","content":ex["solution"]}],"rejected":[{"role":"assistant","content":rejected_text}]}) if (i+1) % 500 == 0: print(f" 生成: {i+1}/{len(raw)}") dataset = Dataset.from_list(spin_data) print(f" 自我對弈數據: {len(dataset)} 對") del pipe; torch.cuda.empty_cache() model, tokenizer = load_model(prev_model_path) iter_dir = os.path.join(DIRS["self_play"], f"iter_{iteration}") args = DPOConfig(output_dir=iter_dir, learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, logging_strategy="steps", logging_first_step=True) trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32)) print(f"\n🚀 Self-Play DPO (Iter {iteration})..."); trainer.train(); trainer.save_model(iter_dir) score = evaluate_model(model, tokenizer, f"self_play_iter_{iteration}") return iter_dir, score def run_full_pipeline(): sft_path = run_sft(); dpo_path = run_dpo(sft_path); grpo_path = run_grpo(dpo_path); return grpo_path def run_continuous_loop(iterations=10, start_from=None): print(f"\n🔄 CONTINUOUS LOOP ({iterations} iterations)") current_model = start_from or DIRS["grpo"]; best_score = 0; best_model = current_model for i in range(iterations): model_path, score = run_self_play(current_model, iteration=i) if score > best_score: best_score = score; best_model = model_path; print(f" 🏆 新最佳: {score:.0f}%") current_model = model_path print("\n📈 EVOLUTION HISTORY") h = os.path.join(DIRS["eval"], "history.jsonl") if os.path.exists(h): for line in open(h): r = json.loads(line); bar = "█" * int(r["score"]/5) print(f" {r['stage']:<25} {r['score']:>5.0f}% {bar}") print(f"\n 🏆 最佳: {best_model} ({best_score:.0f}%)") return best_model def main(): parser = argparse.ArgumentParser(description="Code LLM 持續進化系統") parser.add_argument("--run", choices=["sft","dpo","grpo","self_play","all","loop"], default="all") parser.add_argument("--iterations", type=int, default=5) parser.add_argument("--start_from", type=str, default=None) args = parser.parse_args() print(""" ╔════════════════════════════════════════════════════════════╗ ║ Code LLM 持續進化系統 ║ ║ SFT → DPO → GRPO → Self-Play Loop ║ ╚════════════════════════════════════════════════════════════╝ """) for d in DIRS.values(): os.makedirs(d, exist_ok=True) if args.run == "sft": run_sft() elif args.run == "dpo": run_dpo(args.start_from) elif args.run == "grpo": run_grpo(args.start_from) elif args.run == "self_play": run_self_play(args.start_from) elif args.run == "all": run_continuous_loop(iterations=args.iterations, start_from=run_full_pipeline()) elif args.run == "loop": run_continuous_loop(iterations=args.iterations, start_from=args.start_from) print("\n✅ Pipeline 完成!") if __name__ == "__main__": main()