Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Code LLM ๆ็บ้ฒๅ็ณป็ตฑ (Continuous Improvement Pipeline) | |
| ========================================================= | |
| ่ชๅๅๅท่กไปฅไธ่จ็ทดๅพช็ฐ๏ผ | |
| Stage 1: SFT โ ๅบ็ค็จๅผ็ขผ่ฝๅ (ๅทฒๅฎๆ) | |
| Stage 2: DPO โ ๅญธๆๅ่พจๅฅฝๅฃ็จๅผ็ขผ | |
| Stage 3: GRPO โ ็จ็จๅผ็ขผๅท่ก็ตๆ่ชๆๅผทๅ | |
| Stage 4: ่ชๆๅฐๅผ โ ็จ่ชๅทฑ็่ผธๅบ็ๆๆฐ่จ็ทดๆธๆ๏ผ็ก้ๅพช็ฐ | |
| Hardware: RTX 3070 (8GB VRAM) / Colab T4 | |
| Base Model: Qwen/Qwen2.5-Coder-3B | |
| Usage: | |
| # ๅฎๆดๆตๆฐด็ท๏ผๅพ้ ญ้ๅง๏ผ | |
| python code_llm_pipeline.py --run all | |
| # ๅช่ทๅฎไธ้ๆฎต | |
| python code_llm_pipeline.py --run sft | |
| python code_llm_pipeline.py --run dpo | |
| python code_llm_pipeline.py --run grpo | |
| python code_llm_pipeline.py --run self_play | |
| # ๆ็บ่ชๆ้ฒๅๅพช็ฐ | |
| python code_llm_pipeline.py --run loop --iterations 10 | |
| # ๅพๅทฒๆๆจกๅ็นผ็บ | |
| python code_llm_pipeline.py --run dpo --start_from ./pipeline/stage1_sft | |
| """ | |
| import argparse, json, os, subprocess, sys, tempfile, time, torch | |
| from datetime import datetime | |
| from pathlib import Path | |
| BASE_MODEL = "Qwen/Qwen2.5-Coder-3B" | |
| HF_USERNAME = "YOUR_HF_USERNAME" | |
| DIRS = { | |
| "sft": "./pipeline/stage1_sft", "dpo": "./pipeline/stage2_dpo", | |
| "grpo": "./pipeline/stage3_grpo", "self_play": "./pipeline/stage4_self_play", | |
| "eval": "./pipeline/eval_results", | |
| } | |
| GPU_CONFIG = { | |
| "batch_size": 1, "grad_accum": 16, "max_seq_length": 1024, | |
| "bf16": True, "gradient_checkpointing": True, "optim": "paged_adamw_8bit", | |
| } | |
| def get_bnb_config(): | |
| from transformers import BitsAndBytesConfig | |
| return BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True) | |
| def get_lora_config(r=16, alpha=32): | |
| from peft import LoraConfig | |
| return LoraConfig(r=r, lora_alpha=alpha, lora_dropout=0.05, bias="none", | |
| task_type="CAUSAL_LM", target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]) | |
| def load_model(model_path=None): | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from peft import PeftModel | |
| print(f"๐ฅ ่ผๅ ฅๆจกๅ...") | |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL) | |
| if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token | |
| base = AutoModelForCausalLM.from_pretrained(BASE_MODEL, quantization_config=get_bnb_config(), device_map="auto", trust_remote_code=True) | |
| if model_path and os.path.exists(model_path): | |
| print(f" ่ผๅ ฅ LoRA: {model_path}") | |
| model = PeftModel.from_pretrained(base, model_path, is_trainable=True) | |
| else: model = base | |
| return model, tokenizer | |
| def evaluate_model(model, tokenizer, stage_name): | |
| from transformers import pipeline | |
| print(f"\n๐ ่ฉไผฐ [{stage_name}]...") | |
| pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=False) | |
| tests = [ | |
| ("Two Sum", 'def two_sum(nums: list[int], target: int) -> list[int]:\n """Return indices of two numbers that add up to target."""\n'), | |
| ("Fibonacci", 'def fibonacci(n: int) -> int:\n """Return the nth Fibonacci number."""\n'), | |
| ("Binary Search", 'def binary_search(arr: list[int], target: int) -> int:\n """Return index of target in sorted array, or -1 if not found."""\n'), | |
| ("Palindrome", 'def is_palindrome(s: str) -> bool:\n """Check if string is a palindrome, ignoring case and non-alphanumeric."""\n'), | |
| ("Merge Sort", 'def merge_sort(arr: list[int]) -> list[int]:\n """Sort array using merge sort."""\n'), | |
| ] | |
| passed = 0 | |
| for name, prompt in tests: | |
| output = pipe(prompt, return_full_text=True) | |
| try: | |
| compile(output[0]["generated_text"], "<test>", "exec"); passed += 1; s = "โ " | |
| except SyntaxError: s = "โ" | |
| print(f" {s} {name}") | |
| score = passed / len(tests) * 100 | |
| print(f"\n ็ตๆ: {passed}/{len(tests)} ({score:.0f}%)") | |
| os.makedirs(DIRS["eval"], exist_ok=True) | |
| with open(os.path.join(DIRS["eval"], "history.jsonl"), "a") as f: | |
| f.write(json.dumps({"stage": stage_name, "timestamp": datetime.now().isoformat(), "score": score}) + "\n") | |
| return score | |
| def run_sft(): | |
| from datasets import load_dataset, concatenate_datasets | |
| from trl import SFTTrainer, SFTConfig | |
| from peft import prepare_model_for_kbit_training, get_peft_model | |
| print("\n" + "="*60 + "\n STAGE 1: SFT\n" + "="*60) | |
| cf = load_dataset("m-a-p/Code-Feedback", split="train") | |
| cf_msgs = cf.map(lambda x: {"messages": x["messages"]}, remove_columns=[c for c in cf.column_names if c != "messages"]) | |
| mc = load_dataset("ise-uiuc/Magicoder-OSS-Instruct-75K", split="train") | |
| mc_msgs = mc.map(lambda x: {"messages": [{"role":"system","content":"You are an exceptionally skilled programmer."},{"role":"user","content":x["problem"]},{"role":"assistant","content":x["solution"]}]}, remove_columns=mc.column_names) | |
| dataset = concatenate_datasets([cf_msgs, mc_msgs]).shuffle(seed=42) | |
| split = dataset.train_test_split(test_size=0.02, seed=42) | |
| print(f" ่จ็ทด: {len(split['train']):,} / ้ฉ่ญ: {len(split['test']):,}") | |
| model, tokenizer = load_model() | |
| model = prepare_model_for_kbit_training(model) | |
| lora_config = get_lora_config(r=64, alpha=128) | |
| model = get_peft_model(model, lora_config); model.print_trainable_parameters() | |
| args = SFTConfig(output_dir=DIRS["sft"], learning_rate=2e-4, lr_scheduler_type="cosine", warmup_ratio=0.05, num_train_epochs=2, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], max_seq_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], packing=True, logging_steps=50, save_steps=2000, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-sft") | |
| trainer = SFTTrainer(model=model, args=args, processing_class=tokenizer, train_dataset=split["train"], eval_dataset=split["test"], peft_config=lora_config) | |
| print("\n๐ ้ๅง SFT..."); trainer.train(); trainer.save_model(DIRS["sft"]) | |
| evaluate_model(model, tokenizer, "sft"); return DIRS["sft"] | |
| def run_dpo(prev_model_path=None): | |
| from datasets import load_dataset | |
| from trl import DPOTrainer, DPOConfig | |
| print("\n" + "="*60 + "\n STAGE 2: DPO\n" + "="*60) | |
| prev_model_path = prev_model_path or DIRS["sft"] | |
| dataset = load_dataset("coseal/CodeUltraFeedback_binarized", split="train") | |
| dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":ex["instruction"]}],"chosen":[{"role":"assistant","content":ex["chosen"]}],"rejected":[{"role":"assistant","content":ex["rejected"]}]}, remove_columns=dataset.column_names) | |
| print(f" ๅๅฅฝๅฐ: {len(dataset):,}") | |
| model, tokenizer = load_model(prev_model_path) | |
| args = DPOConfig(output_dir=DIRS["dpo"], learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-dpo") | |
| trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32)) | |
| print("\n๐ ้ๅง DPO..."); trainer.train(); trainer.save_model(DIRS["dpo"]) | |
| evaluate_model(model, tokenizer, "dpo"); return DIRS["dpo"] | |
| def code_execution_reward(completions, tests=None, **kwargs): | |
| rewards = [] | |
| for completion, test_code in zip(completions, tests or [""]*len(completions)): | |
| code = completion[0]["content"] if isinstance(completion, list) else completion | |
| if "```python" in code: code = code.split("```python")[1].split("```")[0] | |
| elif "```" in code: code = code.split("```")[1].split("```")[0] | |
| reward = 0.0 | |
| try: | |
| compile(code, "<test>", "exec"); reward = 0.3 | |
| if test_code: | |
| with tempfile.TemporaryDirectory() as d: | |
| open(os.path.join(d,"solution.py"),"w").write(code) | |
| open(os.path.join(d,"test_solution.py"),"w").write(test_code) | |
| r = subprocess.run([sys.executable,"-m","pytest",os.path.join(d,"test_solution.py"),"-x","--tb=no","-q"], capture_output=True, text=True, timeout=15) | |
| if r.returncode == 0: reward = 1.0 | |
| elif "passed" in r.stdout: reward = 0.6 | |
| except: reward = 0.0 | |
| rewards.append(reward) | |
| return rewards | |
| def run_grpo(prev_model_path=None): | |
| from datasets import load_dataset | |
| from trl import GRPOTrainer, GRPOConfig | |
| print("\n" + "="*60 + "\n STAGE 3: GRPO\n" + "="*60) | |
| prev_model_path = prev_model_path or DIRS["dpo"] | |
| dataset = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42).select(range(5000)) | |
| dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code."}],"tests":ex["test"]}, remove_columns=dataset.column_names) | |
| print(f" ้ก็ฎ: {len(dataset):,}") | |
| model, tokenizer = load_model(prev_model_path) | |
| args = GRPOConfig(output_dir=DIRS["grpo"], learning_rate=1e-5, beta=0.04, num_generations=4, max_completion_length=512, temperature=0.9, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-grpo") | |
| trainer = GRPOTrainer(model=model, args=args, reward_funcs=code_execution_reward, train_dataset=dataset, peft_config=get_lora_config(r=8, alpha=16)) | |
| print("\n๐ ้ๅง GRPO..."); trainer.train(); trainer.save_model(DIRS["grpo"]) | |
| evaluate_model(model, tokenizer, "grpo"); return DIRS["grpo"] | |
| def run_self_play(prev_model_path=None, iteration=0): | |
| from datasets import load_dataset, Dataset | |
| from transformers import pipeline as hf_pipeline | |
| from trl import DPOTrainer, DPOConfig | |
| print("\n" + "="*60 + f"\n STAGE 4: SELF-PLAY (Iter {iteration})\n" + "="*60) | |
| prev_model_path = prev_model_path or DIRS["grpo"] | |
| raw = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42+iteration).select(range(2000)) | |
| model, tokenizer = load_model(prev_model_path) | |
| pipe = hf_pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=True, temperature=0.8) | |
| spin_data = [] | |
| for i, ex in enumerate(raw): | |
| prompt_text = f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code." | |
| try: | |
| output = pipe([{"role":"user","content":prompt_text}]) | |
| rejected_text = output[0]["generated_text"][-1]["content"] | |
| except: rejected_text = "# Failed" | |
| spin_data.append({"prompt":[{"role":"user","content":prompt_text}],"chosen":[{"role":"assistant","content":ex["solution"]}],"rejected":[{"role":"assistant","content":rejected_text}]}) | |
| if (i+1) % 500 == 0: print(f" ็ๆ: {i+1}/{len(raw)}") | |
| dataset = Dataset.from_list(spin_data) | |
| print(f" ่ชๆๅฐๅผๆธๆ: {len(dataset)} ๅฐ") | |
| del pipe; torch.cuda.empty_cache() | |
| model, tokenizer = load_model(prev_model_path) | |
| iter_dir = os.path.join(DIRS["self_play"], f"iter_{iteration}") | |
| args = DPOConfig(output_dir=iter_dir, learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, logging_strategy="steps", logging_first_step=True) | |
| trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32)) | |
| print(f"\n๐ Self-Play DPO (Iter {iteration})..."); trainer.train(); trainer.save_model(iter_dir) | |
| score = evaluate_model(model, tokenizer, f"self_play_iter_{iteration}") | |
| return iter_dir, score | |
| def run_full_pipeline(): | |
| sft_path = run_sft(); dpo_path = run_dpo(sft_path); grpo_path = run_grpo(dpo_path); return grpo_path | |
| def run_continuous_loop(iterations=10, start_from=None): | |
| print(f"\n๐ CONTINUOUS LOOP ({iterations} iterations)") | |
| current_model = start_from or DIRS["grpo"]; best_score = 0; best_model = current_model | |
| for i in range(iterations): | |
| model_path, score = run_self_play(current_model, iteration=i) | |
| if score > best_score: best_score = score; best_model = model_path; print(f" ๐ ๆฐๆไฝณ: {score:.0f}%") | |
| current_model = model_path | |
| print("\n๐ EVOLUTION HISTORY") | |
| h = os.path.join(DIRS["eval"], "history.jsonl") | |
| if os.path.exists(h): | |
| for line in open(h): | |
| r = json.loads(line); bar = "โ" * int(r["score"]/5) | |
| print(f" {r['stage']:<25} {r['score']:>5.0f}% {bar}") | |
| print(f"\n ๐ ๆไฝณ: {best_model} ({best_score:.0f}%)") | |
| return best_model | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Code LLM ๆ็บ้ฒๅ็ณป็ตฑ") | |
| parser.add_argument("--run", choices=["sft","dpo","grpo","self_play","all","loop"], default="all") | |
| parser.add_argument("--iterations", type=int, default=5) | |
| parser.add_argument("--start_from", type=str, default=None) | |
| args = parser.parse_args() | |
| print(""" | |
| โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| โ Code LLM ๆ็บ้ฒๅ็ณป็ตฑ โ | |
| โ SFT โ DPO โ GRPO โ Self-Play Loop โ | |
| โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| """) | |
| for d in DIRS.values(): os.makedirs(d, exist_ok=True) | |
| if args.run == "sft": run_sft() | |
| elif args.run == "dpo": run_dpo(args.start_from) | |
| elif args.run == "grpo": run_grpo(args.start_from) | |
| elif args.run == "self_play": run_self_play(args.start_from) | |
| elif args.run == "all": run_continuous_loop(iterations=args.iterations, start_from=run_full_pipeline()) | |
| elif args.run == "loop": run_continuous_loop(iterations=args.iterations, start_from=args.start_from) | |
| print("\nโ Pipeline ๅฎๆ๏ผ") | |
| if __name__ == "__main__": main() | |