sandbox-5ca717e4 / codepilot.py
Justin-lee's picture
Add CodePilot CLI tool with feedback collection
d341b49 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CodePilot — 你的專屬 AI 程式助手
=================================
像 Claude Code 的終端 CLI 工具,內建:
🤖 本地 Qwen2.5-Coder 模型推理
📁 讀取/修改你的專案文件
👍👎 一鍵回饋,自動收集訓練數據
🔄 定期用你的回饋數據訓練,模型越用越聰明
Install:
pip install transformers peft bitsandbytes accelerate trl datasets rich
Usage:
python codepilot.py # 啟動
python codepilot.py --project ~/my-project # 指定專案
python codepilot.py --adapter ./my-adapter # 用微調模型
python codepilot.py --stats # 回饋統計
python codepilot.py --train # 訓練模型
"""
import argparse, json, os, sqlite3, subprocess, sys, textwrap, torch
from datetime import datetime
from pathlib import Path
DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-3B-Instruct"
CONFIG_DIR = os.path.expanduser("~/.codepilot")
DB_PATH = os.path.join(CONFIG_DIR, "feedback.db")
AUTO_TRAIN_THRESHOLD = 50
class FeedbackDB:
def __init__(self, db_path=DB_PATH):
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.execute("""CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY, timestamp TEXT, prompt TEXT, completion TEXT,
label INTEGER, edited_completion TEXT, project_dir TEXT, files_context TEXT)""")
self.conn.commit()
def save(self, prompt, completion, label, edited=None, project_dir=None, files=None):
self.conn.execute("INSERT INTO feedback VALUES (NULL,?,?,?,?,?,?,?)",
(datetime.now().isoformat(), prompt, completion, int(label), edited, project_dir,
json.dumps(files) if files else None))
self.conn.commit()
def count(self):
r = self.conn.execute("SELECT COUNT(*), SUM(label), SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone()
return {"total": r[0] or 0, "thumbs_up": r[1] or 0, "edits": r[2] or 0}
def export_kto(self):
rows = self.conn.execute("SELECT prompt, completion, label FROM feedback").fetchall()
return [{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)} for p,c,l in rows]
def export_sft(self):
rows = self.conn.execute("SELECT prompt, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
return [{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]} for p,c in rows]
def export_dpo(self):
rows = self.conn.execute("SELECT prompt, completion, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
return [{"prompt":[{"role":"user","content":p}],"chosen":[{"role":"assistant","content":e}],"rejected":[{"role":"assistant","content":o}]} for p,o,e in rows]
class CodeModel:
def __init__(self, model_name=DEFAULT_MODEL, adapter_path=None):
from transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True)
if adapter_path and os.path.exists(adapter_path):
from peft import PeftModel
self.model = PeftModel.from_pretrained(self.model, adapter_path)
self.model.eval()
def generate(self, user_message, system_prompt=None, file_context=None, max_tokens=2048):
messages = []
if system_prompt: messages.append({"role":"system","content":system_prompt})
if file_context:
messages.append({"role":"user","content":f"相關文件:\n\n{file_context}"})
messages.append({"role":"assistant","content":"已了解。請問需要什麼幫助?"})
messages.append({"role":"user","content":user_message})
text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizer.pad_token_id)
return self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
def extract_code_blocks(text):
blocks = []
parts = text.split("```")
for i in range(1, len(parts), 2):
lines = parts[i].split("\n", 1)
lang = lines[0].strip() or "python"
code = lines[1].strip() if len(lines) > 1 else ""
blocks.append((lang, code))
return blocks
def run_cli(args):
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt
from rich.syntax import Syntax
from rich.table import Table
console = Console(); db = FeedbackDB()
console.print(Panel.fit("[bold cyan]CodePilot[/] — 你的專屬 AI 程式助手\n" + f"[dim]Model: {args.model or DEFAULT_MODEL}[/]", border_style="cyan"))
with console.status("[bold green]載入模型中..."):
model = CodeModel(args.model or DEFAULT_MODEL, args.adapter)
console.print("[green]✅ 模型載入完成[/]\n")
console.print("[dim]指令: /file <path> 讀文件 | /apply 套用code | /stats 統計 | /train 訓練 | /quit 退出[/]\n")
system_prompt = "You are CodePilot, an expert programming assistant. Write clean, efficient, well-documented code. When modifying existing code, show the complete modified version."
project_dir = args.project or os.getcwd()
file_context = current_response = current_prompt = None
while True:
try: user_input = Prompt.ask("\n[bold green]🧑 You")
except (EOFError, KeyboardInterrupt): break
if not user_input.strip(): continue
cmd = user_input.strip().lower()
if cmd in ("/quit", "/exit"): break
elif cmd == "/stats":
stats = db.count()
t = Table(title="📊 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green")
t.add_row("總回饋",str(stats["total"])); t.add_row("👍",str(stats["thumbs_up"]))
t.add_row("👎",str(stats["total"]-stats["thumbs_up"])); t.add_row("✏️修改",str(stats["edits"]))
console.print(t); continue
elif cmd == "/train": trigger_cli_training(db, console, args); continue
elif cmd.startswith("/file "):
fp = os.path.join(project_dir, user_input[6:].strip())
if os.path.exists(fp):
with open(fp) as f: content = f.read()
file_context = f"--- {fp} ---\n{content}\n--- END ---"
console.print(f"[green]📁 已讀取: {fp} ({len(content)} chars)[/]")
else: console.print(f"[red]❌ 不存在: {fp}[/]")
continue
elif cmd == "/apply":
if current_response:
for i,(lang,code) in enumerate(extract_code_blocks(current_response)):
console.print(Syntax(code, lang or "python", theme="monokai"))
fp = Prompt.ask(" 儲存到? (Enter跳過)")
if fp.strip():
full = os.path.join(project_dir, fp)
os.makedirs(os.path.dirname(full) or ".", exist_ok=True)
open(full,"w").write(code)
console.print(f" [green]✅ {full}[/]")
continue
current_prompt = user_input
with console.status("[bold cyan]思考中..."):
current_response = model.generate(user_input, system_prompt=system_prompt, file_context=file_context)
console.print("\n[bold blue]🤖 CodePilot:[/]")
console.print(Markdown(current_response))
console.print("\n[dim][green]y[/]=👍 [red]n[/]=👎 [yellow]e[/]=✏️修改 Enter=跳過[/]")
fb = Prompt.ask(" ", choices=["y","n","e",""], default="", show_choices=False)
if fb == "y":
db.save(current_prompt, current_response, label=1, project_dir=project_dir)
s = db.count(); console.print(f" [green]👍 +1 (累計:{s['total']})[/]")
elif fb == "n":
db.save(current_prompt, current_response, label=0, project_dir=project_dir)
s = db.count(); console.print(f" [red]👎 +1 (累計:{s['total']})[/]")
elif fb == "e":
console.print(" [yellow]貼上修改版 (END 結束):[/]")
lines = []
while True:
try:
l = input()
if l.strip()=="END": break
lines.append(l)
except EOFError: break
edited = "\n".join(lines)
if edited.strip():
db.save(current_prompt, current_response, label=1, edited=edited, project_dir=project_dir)
s = db.count(); console.print(f" [yellow]✏️ +1 (累計:{s['total']}, 修改:{s['edits']})[/]")
if db.count()["total"] % AUTO_TRAIN_THRESHOLD == 0 and db.count()["total"] > 0:
console.print(f"\n [bold yellow]🔔 累積 {db.count()['total']} 條!codepilot --train[/]")
console.print("\n[cyan]👋 再見![/]")
def trigger_cli_training(db, console, args):
stats = db.count()
if stats["total"] == 0: console.print("[yellow]⚠️ 無數據[/]"); return
console.print(f"\n[bold]🚀 訓練[/] 👍:{stats['thumbs_up']} 👎:{stats['total']-stats['thumbs_up']} ✏️:{stats['edits']}")
from datasets import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import LoraConfig, prepare_model_for_kbit_training
model_name = args.model or DEFAULT_MODEL
output_dir = os.path.join(CONFIG_DIR, f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}")
bnb = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True)
peft_cfg = LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"])
sft_data = db.export_sft()
kto_data = db.export_kto()
if sft_data:
console.print(f"\n[bold]📚 SFT ({len(sft_data)} edits)...[/]")
from trl import SFTTrainer, SFTConfig
ds = Dataset.from_list(sft_data)
model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token is None: tok.pad_token = tok.eos_token
model = prepare_model_for_kbit_training(model)
trainer = SFTTrainer(model=model,args=SFTConfig(output_dir=output_dir,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
trainer.train(); trainer.save_model(output_dir)
console.print(f"[green]✅ SFT 完成[/]"); del model; torch.cuda.empty_cache()
elif len(kto_data) >= 10:
console.print(f"\n[bold]📚 KTO ({len(kto_data)} feedbacks)...[/]")
from trl import KTOConfig, KTOTrainer
ds = Dataset.from_list(kto_data)
model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token is None: tok.pad_token = tok.eos_token
trainer = KTOTrainer(model=model,args=KTOConfig(output_dir=output_dir,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
trainer.train(); trainer.save_model(output_dir)
console.print(f"[green]✅ KTO 完成[/]")
console.print(f"\n[bold green]🎉 訓練完成![/]\n Adapter: {output_dir}\n 重啟: codepilot --adapter {output_dir}")
def show_stats():
from rich.console import Console; from rich.table import Table
console = Console(); db = FeedbackDB(); s = db.count()
t = Table(title="📊 CodePilot 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green"); t.add_column("",style="dim")
t.add_row("總回饋",str(s["total"]),"█"*min(s["total"]//2,40))
t.add_row("👍",str(s["thumbs_up"]),"█"*min(s["thumbs_up"]//2,40))
t.add_row("👎",str(s["total"]-s["thumbs_up"]),"█"*min((s["total"]-s["thumbs_up"])//2,40))
t.add_row("✏️修改",str(s["edits"]),"█"*min(s["edits"]//2,40))
console.print(t)
if s["total"]>0:
r = s["thumbs_up"]/s["total"]*100; console.print(f"\n 接受率: {r:.0f}%")
if r < 50: console.print(" [yellow]💡 接受率低,建議 --train[/]")
def main():
p = argparse.ArgumentParser(description="CodePilot — 你的專屬 AI 程式助手")
p.add_argument("--model",type=str,default=None,help=f"模型 (預設:{DEFAULT_MODEL})")
p.add_argument("--adapter",type=str,default=None,help="LoRA adapter")
p.add_argument("--project",type=str,default=None,help="專案目錄")
p.add_argument("--stats",action="store_true",help="回饋統計")
p.add_argument("--train",action="store_true",help="訓練模型")
a = p.parse_args()
if a.stats: show_stats()
elif a.train: from rich.console import Console; trigger_cli_training(FeedbackDB(), Console(), a)
else: run_cli(a)
if __name__ == "__main__": main()