Spaces:
Sleeping
Sleeping
File size: 14,178 Bytes
d341b49 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CodePilot — 你的專屬 AI 程式助手
=================================
像 Claude Code 的終端 CLI 工具,內建:
🤖 本地 Qwen2.5-Coder 模型推理
📁 讀取/修改你的專案文件
👍👎 一鍵回饋,自動收集訓練數據
🔄 定期用你的回饋數據訓練,模型越用越聰明
Install:
pip install transformers peft bitsandbytes accelerate trl datasets rich
Usage:
python codepilot.py # 啟動
python codepilot.py --project ~/my-project # 指定專案
python codepilot.py --adapter ./my-adapter # 用微調模型
python codepilot.py --stats # 回饋統計
python codepilot.py --train # 訓練模型
"""
import argparse, json, os, sqlite3, subprocess, sys, textwrap, torch
from datetime import datetime
from pathlib import Path
DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-3B-Instruct"
CONFIG_DIR = os.path.expanduser("~/.codepilot")
DB_PATH = os.path.join(CONFIG_DIR, "feedback.db")
AUTO_TRAIN_THRESHOLD = 50
class FeedbackDB:
def __init__(self, db_path=DB_PATH):
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.execute("""CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY, timestamp TEXT, prompt TEXT, completion TEXT,
label INTEGER, edited_completion TEXT, project_dir TEXT, files_context TEXT)""")
self.conn.commit()
def save(self, prompt, completion, label, edited=None, project_dir=None, files=None):
self.conn.execute("INSERT INTO feedback VALUES (NULL,?,?,?,?,?,?,?)",
(datetime.now().isoformat(), prompt, completion, int(label), edited, project_dir,
json.dumps(files) if files else None))
self.conn.commit()
def count(self):
r = self.conn.execute("SELECT COUNT(*), SUM(label), SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone()
return {"total": r[0] or 0, "thumbs_up": r[1] or 0, "edits": r[2] or 0}
def export_kto(self):
rows = self.conn.execute("SELECT prompt, completion, label FROM feedback").fetchall()
return [{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)} for p,c,l in rows]
def export_sft(self):
rows = self.conn.execute("SELECT prompt, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
return [{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]} for p,c in rows]
def export_dpo(self):
rows = self.conn.execute("SELECT prompt, completion, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
return [{"prompt":[{"role":"user","content":p}],"chosen":[{"role":"assistant","content":e}],"rejected":[{"role":"assistant","content":o}]} for p,o,e in rows]
class CodeModel:
def __init__(self, model_name=DEFAULT_MODEL, adapter_path=None):
from transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True)
if adapter_path and os.path.exists(adapter_path):
from peft import PeftModel
self.model = PeftModel.from_pretrained(self.model, adapter_path)
self.model.eval()
def generate(self, user_message, system_prompt=None, file_context=None, max_tokens=2048):
messages = []
if system_prompt: messages.append({"role":"system","content":system_prompt})
if file_context:
messages.append({"role":"user","content":f"相關文件:\n\n{file_context}"})
messages.append({"role":"assistant","content":"已了解。請問需要什麼幫助?"})
messages.append({"role":"user","content":user_message})
text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizer.pad_token_id)
return self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
def extract_code_blocks(text):
blocks = []
parts = text.split("```")
for i in range(1, len(parts), 2):
lines = parts[i].split("\n", 1)
lang = lines[0].strip() or "python"
code = lines[1].strip() if len(lines) > 1 else ""
blocks.append((lang, code))
return blocks
def run_cli(args):
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt
from rich.syntax import Syntax
from rich.table import Table
console = Console(); db = FeedbackDB()
console.print(Panel.fit("[bold cyan]CodePilot[/] — 你的專屬 AI 程式助手\n" + f"[dim]Model: {args.model or DEFAULT_MODEL}[/]", border_style="cyan"))
with console.status("[bold green]載入模型中..."):
model = CodeModel(args.model or DEFAULT_MODEL, args.adapter)
console.print("[green]✅ 模型載入完成[/]\n")
console.print("[dim]指令: /file <path> 讀文件 | /apply 套用code | /stats 統計 | /train 訓練 | /quit 退出[/]\n")
system_prompt = "You are CodePilot, an expert programming assistant. Write clean, efficient, well-documented code. When modifying existing code, show the complete modified version."
project_dir = args.project or os.getcwd()
file_context = current_response = current_prompt = None
while True:
try: user_input = Prompt.ask("\n[bold green]🧑 You")
except (EOFError, KeyboardInterrupt): break
if not user_input.strip(): continue
cmd = user_input.strip().lower()
if cmd in ("/quit", "/exit"): break
elif cmd == "/stats":
stats = db.count()
t = Table(title="📊 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green")
t.add_row("總回饋",str(stats["total"])); t.add_row("👍",str(stats["thumbs_up"]))
t.add_row("👎",str(stats["total"]-stats["thumbs_up"])); t.add_row("✏️修改",str(stats["edits"]))
console.print(t); continue
elif cmd == "/train": trigger_cli_training(db, console, args); continue
elif cmd.startswith("/file "):
fp = os.path.join(project_dir, user_input[6:].strip())
if os.path.exists(fp):
with open(fp) as f: content = f.read()
file_context = f"--- {fp} ---\n{content}\n--- END ---"
console.print(f"[green]📁 已讀取: {fp} ({len(content)} chars)[/]")
else: console.print(f"[red]❌ 不存在: {fp}[/]")
continue
elif cmd == "/apply":
if current_response:
for i,(lang,code) in enumerate(extract_code_blocks(current_response)):
console.print(Syntax(code, lang or "python", theme="monokai"))
fp = Prompt.ask(" 儲存到? (Enter跳過)")
if fp.strip():
full = os.path.join(project_dir, fp)
os.makedirs(os.path.dirname(full) or ".", exist_ok=True)
open(full,"w").write(code)
console.print(f" [green]✅ {full}[/]")
continue
current_prompt = user_input
with console.status("[bold cyan]思考中..."):
current_response = model.generate(user_input, system_prompt=system_prompt, file_context=file_context)
console.print("\n[bold blue]🤖 CodePilot:[/]")
console.print(Markdown(current_response))
console.print("\n[dim][green]y[/]=👍 [red]n[/]=👎 [yellow]e[/]=✏️修改 Enter=跳過[/]")
fb = Prompt.ask(" ", choices=["y","n","e",""], default="", show_choices=False)
if fb == "y":
db.save(current_prompt, current_response, label=1, project_dir=project_dir)
s = db.count(); console.print(f" [green]👍 +1 (累計:{s['total']})[/]")
elif fb == "n":
db.save(current_prompt, current_response, label=0, project_dir=project_dir)
s = db.count(); console.print(f" [red]👎 +1 (累計:{s['total']})[/]")
elif fb == "e":
console.print(" [yellow]貼上修改版 (END 結束):[/]")
lines = []
while True:
try:
l = input()
if l.strip()=="END": break
lines.append(l)
except EOFError: break
edited = "\n".join(lines)
if edited.strip():
db.save(current_prompt, current_response, label=1, edited=edited, project_dir=project_dir)
s = db.count(); console.print(f" [yellow]✏️ +1 (累計:{s['total']}, 修改:{s['edits']})[/]")
if db.count()["total"] % AUTO_TRAIN_THRESHOLD == 0 and db.count()["total"] > 0:
console.print(f"\n [bold yellow]🔔 累積 {db.count()['total']} 條!codepilot --train[/]")
console.print("\n[cyan]👋 再見![/]")
def trigger_cli_training(db, console, args):
stats = db.count()
if stats["total"] == 0: console.print("[yellow]⚠️ 無數據[/]"); return
console.print(f"\n[bold]🚀 訓練[/] 👍:{stats['thumbs_up']} 👎:{stats['total']-stats['thumbs_up']} ✏️:{stats['edits']}")
from datasets import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import LoraConfig, prepare_model_for_kbit_training
model_name = args.model or DEFAULT_MODEL
output_dir = os.path.join(CONFIG_DIR, f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}")
bnb = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True)
peft_cfg = LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"])
sft_data = db.export_sft()
kto_data = db.export_kto()
if sft_data:
console.print(f"\n[bold]📚 SFT ({len(sft_data)} edits)...[/]")
from trl import SFTTrainer, SFTConfig
ds = Dataset.from_list(sft_data)
model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token is None: tok.pad_token = tok.eos_token
model = prepare_model_for_kbit_training(model)
trainer = SFTTrainer(model=model,args=SFTConfig(output_dir=output_dir,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
trainer.train(); trainer.save_model(output_dir)
console.print(f"[green]✅ SFT 完成[/]"); del model; torch.cuda.empty_cache()
elif len(kto_data) >= 10:
console.print(f"\n[bold]📚 KTO ({len(kto_data)} feedbacks)...[/]")
from trl import KTOConfig, KTOTrainer
ds = Dataset.from_list(kto_data)
model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token is None: tok.pad_token = tok.eos_token
trainer = KTOTrainer(model=model,args=KTOConfig(output_dir=output_dir,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
trainer.train(); trainer.save_model(output_dir)
console.print(f"[green]✅ KTO 完成[/]")
console.print(f"\n[bold green]🎉 訓練完成![/]\n Adapter: {output_dir}\n 重啟: codepilot --adapter {output_dir}")
def show_stats():
from rich.console import Console; from rich.table import Table
console = Console(); db = FeedbackDB(); s = db.count()
t = Table(title="📊 CodePilot 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green"); t.add_column("",style="dim")
t.add_row("總回饋",str(s["total"]),"█"*min(s["total"]//2,40))
t.add_row("👍",str(s["thumbs_up"]),"█"*min(s["thumbs_up"]//2,40))
t.add_row("👎",str(s["total"]-s["thumbs_up"]),"█"*min((s["total"]-s["thumbs_up"])//2,40))
t.add_row("✏️修改",str(s["edits"]),"█"*min(s["edits"]//2,40))
console.print(t)
if s["total"]>0:
r = s["thumbs_up"]/s["total"]*100; console.print(f"\n 接受率: {r:.0f}%")
if r < 50: console.print(" [yellow]💡 接受率低,建議 --train[/]")
def main():
p = argparse.ArgumentParser(description="CodePilot — 你的專屬 AI 程式助手")
p.add_argument("--model",type=str,default=None,help=f"模型 (預設:{DEFAULT_MODEL})")
p.add_argument("--adapter",type=str,default=None,help="LoRA adapter")
p.add_argument("--project",type=str,default=None,help="專案目錄")
p.add_argument("--stats",action="store_true",help="回饋統計")
p.add_argument("--train",action="store_true",help="訓練模型")
a = p.parse_args()
if a.stats: show_stats()
elif a.train: from rich.console import Console; trigger_cli_training(FeedbackDB(), Console(), a)
else: run_cli(a)
if __name__ == "__main__": main()
|