sandbox-5ca717e4 / code_llm_pipeline.py
Justin-lee's picture
Add Code LLM continuous improvement pipeline
61f84a7 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Code LLM ๆŒ็บŒ้€ฒๅŒ–็ณป็ตฑ (Continuous Improvement Pipeline)
=========================================================
่‡ชๅ‹•ๅŒ–ๅŸท่กŒไปฅไธ‹่จ“็ทดๅพช็’ฐ๏ผš
Stage 1: SFT โ€” ๅŸบ็คŽ็จ‹ๅผ็ขผ่ƒฝๅŠ› (ๅทฒๅฎŒๆˆ)
Stage 2: DPO โ€” ๅญธๆœƒๅˆ†่พจๅฅฝๅฃž็จ‹ๅผ็ขผ
Stage 3: GRPO โ€” ็”จ็จ‹ๅผ็ขผๅŸท่กŒ็ตๆžœ่‡ชๆˆ‘ๅผทๅŒ–
Stage 4: ่‡ชๆˆ‘ๅฐๅผˆ โ€” ็”จ่‡ชๅทฑ็š„่ผธๅ‡บ็”Ÿๆˆๆ–ฐ่จ“็ทดๆ•ธๆ“š๏ผŒ็„ก้™ๅพช็’ฐ
Hardware: RTX 3070 (8GB VRAM) / Colab T4
Base Model: Qwen/Qwen2.5-Coder-3B
Usage:
# ๅฎŒๆ•ดๆตๆฐด็ทš๏ผˆๅพž้ ญ้–‹ๅง‹๏ผ‰
python code_llm_pipeline.py --run all
# ๅช่ท‘ๅ–ฎไธ€้šŽๆฎต
python code_llm_pipeline.py --run sft
python code_llm_pipeline.py --run dpo
python code_llm_pipeline.py --run grpo
python code_llm_pipeline.py --run self_play
# ๆŒ็บŒ่‡ชๆˆ‘้€ฒๅŒ–ๅพช็’ฐ
python code_llm_pipeline.py --run loop --iterations 10
# ๅพžๅทฒๆœ‰ๆจกๅž‹็นผ็บŒ
python code_llm_pipeline.py --run dpo --start_from ./pipeline/stage1_sft
"""
import argparse, json, os, subprocess, sys, tempfile, time, torch
from datetime import datetime
from pathlib import Path
BASE_MODEL = "Qwen/Qwen2.5-Coder-3B"
HF_USERNAME = "YOUR_HF_USERNAME"
DIRS = {
"sft": "./pipeline/stage1_sft", "dpo": "./pipeline/stage2_dpo",
"grpo": "./pipeline/stage3_grpo", "self_play": "./pipeline/stage4_self_play",
"eval": "./pipeline/eval_results",
}
GPU_CONFIG = {
"batch_size": 1, "grad_accum": 16, "max_seq_length": 1024,
"bf16": True, "gradient_checkpointing": True, "optim": "paged_adamw_8bit",
}
def get_bnb_config():
from transformers import BitsAndBytesConfig
return BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True)
def get_lora_config(r=16, alpha=32):
from peft import LoraConfig
return LoraConfig(r=r, lora_alpha=alpha, lora_dropout=0.05, bias="none",
task_type="CAUSAL_LM", target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"])
def load_model(model_path=None):
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
print(f"๐Ÿ“ฅ ่ผ‰ๅ…ฅๆจกๅž‹...")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)
if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token
base = AutoModelForCausalLM.from_pretrained(BASE_MODEL, quantization_config=get_bnb_config(), device_map="auto", trust_remote_code=True)
if model_path and os.path.exists(model_path):
print(f" ่ผ‰ๅ…ฅ LoRA: {model_path}")
model = PeftModel.from_pretrained(base, model_path, is_trainable=True)
else: model = base
return model, tokenizer
def evaluate_model(model, tokenizer, stage_name):
from transformers import pipeline
print(f"\n๐Ÿ“Š ่ฉ•ไผฐ [{stage_name}]...")
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=False)
tests = [
("Two Sum", 'def two_sum(nums: list[int], target: int) -> list[int]:\n """Return indices of two numbers that add up to target."""\n'),
("Fibonacci", 'def fibonacci(n: int) -> int:\n """Return the nth Fibonacci number."""\n'),
("Binary Search", 'def binary_search(arr: list[int], target: int) -> int:\n """Return index of target in sorted array, or -1 if not found."""\n'),
("Palindrome", 'def is_palindrome(s: str) -> bool:\n """Check if string is a palindrome, ignoring case and non-alphanumeric."""\n'),
("Merge Sort", 'def merge_sort(arr: list[int]) -> list[int]:\n """Sort array using merge sort."""\n'),
]
passed = 0
for name, prompt in tests:
output = pipe(prompt, return_full_text=True)
try:
compile(output[0]["generated_text"], "<test>", "exec"); passed += 1; s = "โœ…"
except SyntaxError: s = "โŒ"
print(f" {s} {name}")
score = passed / len(tests) * 100
print(f"\n ็ตๆžœ: {passed}/{len(tests)} ({score:.0f}%)")
os.makedirs(DIRS["eval"], exist_ok=True)
with open(os.path.join(DIRS["eval"], "history.jsonl"), "a") as f:
f.write(json.dumps({"stage": stage_name, "timestamp": datetime.now().isoformat(), "score": score}) + "\n")
return score
def run_sft():
from datasets import load_dataset, concatenate_datasets
from trl import SFTTrainer, SFTConfig
from peft import prepare_model_for_kbit_training, get_peft_model
print("\n" + "="*60 + "\n STAGE 1: SFT\n" + "="*60)
cf = load_dataset("m-a-p/Code-Feedback", split="train")
cf_msgs = cf.map(lambda x: {"messages": x["messages"]}, remove_columns=[c for c in cf.column_names if c != "messages"])
mc = load_dataset("ise-uiuc/Magicoder-OSS-Instruct-75K", split="train")
mc_msgs = mc.map(lambda x: {"messages": [{"role":"system","content":"You are an exceptionally skilled programmer."},{"role":"user","content":x["problem"]},{"role":"assistant","content":x["solution"]}]}, remove_columns=mc.column_names)
dataset = concatenate_datasets([cf_msgs, mc_msgs]).shuffle(seed=42)
split = dataset.train_test_split(test_size=0.02, seed=42)
print(f" ่จ“็ทด: {len(split['train']):,} / ้ฉ—่ญ‰: {len(split['test']):,}")
model, tokenizer = load_model()
model = prepare_model_for_kbit_training(model)
lora_config = get_lora_config(r=64, alpha=128)
model = get_peft_model(model, lora_config); model.print_trainable_parameters()
args = SFTConfig(output_dir=DIRS["sft"], learning_rate=2e-4, lr_scheduler_type="cosine", warmup_ratio=0.05, num_train_epochs=2, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], max_seq_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], packing=True, logging_steps=50, save_steps=2000, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-sft")
trainer = SFTTrainer(model=model, args=args, processing_class=tokenizer, train_dataset=split["train"], eval_dataset=split["test"], peft_config=lora_config)
print("\n๐Ÿš€ ้–‹ๅง‹ SFT..."); trainer.train(); trainer.save_model(DIRS["sft"])
evaluate_model(model, tokenizer, "sft"); return DIRS["sft"]
def run_dpo(prev_model_path=None):
from datasets import load_dataset
from trl import DPOTrainer, DPOConfig
print("\n" + "="*60 + "\n STAGE 2: DPO\n" + "="*60)
prev_model_path = prev_model_path or DIRS["sft"]
dataset = load_dataset("coseal/CodeUltraFeedback_binarized", split="train")
dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":ex["instruction"]}],"chosen":[{"role":"assistant","content":ex["chosen"]}],"rejected":[{"role":"assistant","content":ex["rejected"]}]}, remove_columns=dataset.column_names)
print(f" ๅๅฅฝๅฐ: {len(dataset):,}")
model, tokenizer = load_model(prev_model_path)
args = DPOConfig(output_dir=DIRS["dpo"], learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-dpo")
trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32))
print("\n๐Ÿš€ ้–‹ๅง‹ DPO..."); trainer.train(); trainer.save_model(DIRS["dpo"])
evaluate_model(model, tokenizer, "dpo"); return DIRS["dpo"]
def code_execution_reward(completions, tests=None, **kwargs):
rewards = []
for completion, test_code in zip(completions, tests or [""]*len(completions)):
code = completion[0]["content"] if isinstance(completion, list) else completion
if "```python" in code: code = code.split("```python")[1].split("```")[0]
elif "```" in code: code = code.split("```")[1].split("```")[0]
reward = 0.0
try:
compile(code, "<test>", "exec"); reward = 0.3
if test_code:
with tempfile.TemporaryDirectory() as d:
open(os.path.join(d,"solution.py"),"w").write(code)
open(os.path.join(d,"test_solution.py"),"w").write(test_code)
r = subprocess.run([sys.executable,"-m","pytest",os.path.join(d,"test_solution.py"),"-x","--tb=no","-q"], capture_output=True, text=True, timeout=15)
if r.returncode == 0: reward = 1.0
elif "passed" in r.stdout: reward = 0.6
except: reward = 0.0
rewards.append(reward)
return rewards
def run_grpo(prev_model_path=None):
from datasets import load_dataset
from trl import GRPOTrainer, GRPOConfig
print("\n" + "="*60 + "\n STAGE 3: GRPO\n" + "="*60)
prev_model_path = prev_model_path or DIRS["dpo"]
dataset = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42).select(range(5000))
dataset = dataset.map(lambda ex: {"prompt":[{"role":"user","content":f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code."}],"tests":ex["test"]}, remove_columns=dataset.column_names)
print(f" ้กŒ็›ฎ: {len(dataset):,}")
model, tokenizer = load_model(prev_model_path)
args = GRPOConfig(output_dir=DIRS["grpo"], learning_rate=1e-5, beta=0.04, num_generations=4, max_completion_length=512, temperature=0.9, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=GPU_CONFIG["grad_accum"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], logging_steps=10, save_steps=500, save_total_limit=2, logging_strategy="steps", logging_first_step=True, push_to_hub=True, hub_model_id=f"{HF_USERNAME}/code-llm-grpo")
trainer = GRPOTrainer(model=model, args=args, reward_funcs=code_execution_reward, train_dataset=dataset, peft_config=get_lora_config(r=8, alpha=16))
print("\n๐Ÿš€ ้–‹ๅง‹ GRPO..."); trainer.train(); trainer.save_model(DIRS["grpo"])
evaluate_model(model, tokenizer, "grpo"); return DIRS["grpo"]
def run_self_play(prev_model_path=None, iteration=0):
from datasets import load_dataset, Dataset
from transformers import pipeline as hf_pipeline
from trl import DPOTrainer, DPOConfig
print("\n" + "="*60 + f"\n STAGE 4: SELF-PLAY (Iter {iteration})\n" + "="*60)
prev_model_path = prev_model_path or DIRS["grpo"]
raw = load_dataset("KodCode/KodCode-V1", split="train").shuffle(seed=42+iteration).select(range(2000))
model, tokenizer = load_model(prev_model_path)
pipe = hf_pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=True, temperature=0.8)
spin_data = []
for i, ex in enumerate(raw):
prompt_text = f"Write a Python solution:\n\n{ex['question']}\n\nProvide only the code."
try:
output = pipe([{"role":"user","content":prompt_text}])
rejected_text = output[0]["generated_text"][-1]["content"]
except: rejected_text = "# Failed"
spin_data.append({"prompt":[{"role":"user","content":prompt_text}],"chosen":[{"role":"assistant","content":ex["solution"]}],"rejected":[{"role":"assistant","content":rejected_text}]})
if (i+1) % 500 == 0: print(f" ็”Ÿๆˆ: {i+1}/{len(raw)}")
dataset = Dataset.from_list(spin_data)
print(f" ่‡ชๆˆ‘ๅฐๅผˆๆ•ธๆ“š: {len(dataset)} ๅฐ")
del pipe; torch.cuda.empty_cache()
model, tokenizer = load_model(prev_model_path)
iter_dir = os.path.join(DIRS["self_play"], f"iter_{iteration}")
args = DPOConfig(output_dir=iter_dir, learning_rate=5e-6, beta=0.1, num_train_epochs=1, per_device_train_batch_size=GPU_CONFIG["batch_size"], gradient_accumulation_steps=8, max_length=GPU_CONFIG["max_seq_length"], gradient_checkpointing=True, bf16=GPU_CONFIG["bf16"], optim=GPU_CONFIG["optim"], logging_steps=10, save_steps=500, logging_strategy="steps", logging_first_step=True)
trainer = DPOTrainer(model=model, args=args, train_dataset=dataset, peft_config=get_lora_config(r=16, alpha=32))
print(f"\n๐Ÿš€ Self-Play DPO (Iter {iteration})..."); trainer.train(); trainer.save_model(iter_dir)
score = evaluate_model(model, tokenizer, f"self_play_iter_{iteration}")
return iter_dir, score
def run_full_pipeline():
sft_path = run_sft(); dpo_path = run_dpo(sft_path); grpo_path = run_grpo(dpo_path); return grpo_path
def run_continuous_loop(iterations=10, start_from=None):
print(f"\n๐Ÿ”„ CONTINUOUS LOOP ({iterations} iterations)")
current_model = start_from or DIRS["grpo"]; best_score = 0; best_model = current_model
for i in range(iterations):
model_path, score = run_self_play(current_model, iteration=i)
if score > best_score: best_score = score; best_model = model_path; print(f" ๐Ÿ† ๆ–ฐๆœ€ไฝณ: {score:.0f}%")
current_model = model_path
print("\n๐Ÿ“ˆ EVOLUTION HISTORY")
h = os.path.join(DIRS["eval"], "history.jsonl")
if os.path.exists(h):
for line in open(h):
r = json.loads(line); bar = "โ–ˆ" * int(r["score"]/5)
print(f" {r['stage']:<25} {r['score']:>5.0f}% {bar}")
print(f"\n ๐Ÿ† ๆœ€ไฝณ: {best_model} ({best_score:.0f}%)")
return best_model
def main():
parser = argparse.ArgumentParser(description="Code LLM ๆŒ็บŒ้€ฒๅŒ–็ณป็ตฑ")
parser.add_argument("--run", choices=["sft","dpo","grpo","self_play","all","loop"], default="all")
parser.add_argument("--iterations", type=int, default=5)
parser.add_argument("--start_from", type=str, default=None)
args = parser.parse_args()
print("""
โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
โ•‘ Code LLM ๆŒ็บŒ้€ฒๅŒ–็ณป็ตฑ โ•‘
โ•‘ SFT โ†’ DPO โ†’ GRPO โ†’ Self-Play Loop โ•‘
โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
""")
for d in DIRS.values(): os.makedirs(d, exist_ok=True)
if args.run == "sft": run_sft()
elif args.run == "dpo": run_dpo(args.start_from)
elif args.run == "grpo": run_grpo(args.start_from)
elif args.run == "self_play": run_self_play(args.start_from)
elif args.run == "all": run_continuous_loop(iterations=args.iterations, start_from=run_full_pipeline())
elif args.run == "loop": run_continuous_loop(iterations=args.iterations, start_from=args.start_from)
print("\nโœ… Pipeline ๅฎŒๆˆ๏ผ")
if __name__ == "__main__": main()