#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CodePilot — 你的專屬 AI 程式助手 ================================= 像 Claude Code 的終端 CLI 工具,內建: 🤖 本地 Qwen2.5-Coder 模型推理 📁 讀取/修改你的專案文件 👍👎 一鍵回饋,自動收集訓練數據 🔄 定期用你的回饋數據訓練,模型越用越聰明 Install: pip install transformers peft bitsandbytes accelerate trl datasets rich Usage: python codepilot.py # 啟動 python codepilot.py --project ~/my-project # 指定專案 python codepilot.py --adapter ./my-adapter # 用微調模型 python codepilot.py --stats # 回饋統計 python codepilot.py --train # 訓練模型 """ import argparse, json, os, sqlite3, subprocess, sys, textwrap, torch from datetime import datetime from pathlib import Path DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-3B-Instruct" CONFIG_DIR = os.path.expanduser("~/.codepilot") DB_PATH = os.path.join(CONFIG_DIR, "feedback.db") AUTO_TRAIN_THRESHOLD = 50 class FeedbackDB: def __init__(self, db_path=DB_PATH): os.makedirs(os.path.dirname(db_path), exist_ok=True) self.conn = sqlite3.connect(db_path) self.conn.execute("""CREATE TABLE IF NOT EXISTS feedback ( id INTEGER PRIMARY KEY, timestamp TEXT, prompt TEXT, completion TEXT, label INTEGER, edited_completion TEXT, project_dir TEXT, files_context TEXT)""") self.conn.commit() def save(self, prompt, completion, label, edited=None, project_dir=None, files=None): self.conn.execute("INSERT INTO feedback VALUES (NULL,?,?,?,?,?,?,?)", (datetime.now().isoformat(), prompt, completion, int(label), edited, project_dir, json.dumps(files) if files else None)) self.conn.commit() def count(self): r = self.conn.execute("SELECT COUNT(*), SUM(label), SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone() return {"total": r[0] or 0, "thumbs_up": r[1] or 0, "edits": r[2] or 0} def export_kto(self): rows = self.conn.execute("SELECT prompt, completion, label FROM feedback").fetchall() return [{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)} for p,c,l in rows] def export_sft(self): rows = self.conn.execute("SELECT prompt, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall() return [{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]} for p,c in rows] def export_dpo(self): rows = self.conn.execute("SELECT prompt, completion, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall() return [{"prompt":[{"role":"user","content":p}],"chosen":[{"role":"assistant","content":e}],"rejected":[{"role":"assistant","content":o}]} for p,o,e in rows] class CodeModel: def __init__(self, model_name=DEFAULT_MODEL, adapter_path=None): from transformers import AutoTokenizer, AutoModelForCausalLM self.tokenizer = AutoTokenizer.from_pretrained(model_name) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True) if adapter_path and os.path.exists(adapter_path): from peft import PeftModel self.model = PeftModel.from_pretrained(self.model, adapter_path) self.model.eval() def generate(self, user_message, system_prompt=None, file_context=None, max_tokens=2048): messages = [] if system_prompt: messages.append({"role":"system","content":system_prompt}) if file_context: messages.append({"role":"user","content":f"相關文件:\n\n{file_context}"}) messages.append({"role":"assistant","content":"已了解。請問需要什麼幫助?"}) messages.append({"role":"user","content":user_message}) text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device) with torch.no_grad(): outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizer.pad_token_id) return self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) def extract_code_blocks(text): blocks = [] parts = text.split("```") for i in range(1, len(parts), 2): lines = parts[i].split("\n", 1) lang = lines[0].strip() or "python" code = lines[1].strip() if len(lines) > 1 else "" blocks.append((lang, code)) return blocks def run_cli(args): from rich.console import Console from rich.markdown import Markdown from rich.panel import Panel from rich.prompt import Prompt from rich.syntax import Syntax from rich.table import Table console = Console(); db = FeedbackDB() console.print(Panel.fit("[bold cyan]CodePilot[/] — 你的專屬 AI 程式助手\n" + f"[dim]Model: {args.model or DEFAULT_MODEL}[/]", border_style="cyan")) with console.status("[bold green]載入模型中..."): model = CodeModel(args.model or DEFAULT_MODEL, args.adapter) console.print("[green]✅ 模型載入完成[/]\n") console.print("[dim]指令: /file 讀文件 | /apply 套用code | /stats 統計 | /train 訓練 | /quit 退出[/]\n") system_prompt = "You are CodePilot, an expert programming assistant. Write clean, efficient, well-documented code. When modifying existing code, show the complete modified version." project_dir = args.project or os.getcwd() file_context = current_response = current_prompt = None while True: try: user_input = Prompt.ask("\n[bold green]🧑 You") except (EOFError, KeyboardInterrupt): break if not user_input.strip(): continue cmd = user_input.strip().lower() if cmd in ("/quit", "/exit"): break elif cmd == "/stats": stats = db.count() t = Table(title="📊 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green") t.add_row("總回饋",str(stats["total"])); t.add_row("👍",str(stats["thumbs_up"])) t.add_row("👎",str(stats["total"]-stats["thumbs_up"])); t.add_row("✏️修改",str(stats["edits"])) console.print(t); continue elif cmd == "/train": trigger_cli_training(db, console, args); continue elif cmd.startswith("/file "): fp = os.path.join(project_dir, user_input[6:].strip()) if os.path.exists(fp): with open(fp) as f: content = f.read() file_context = f"--- {fp} ---\n{content}\n--- END ---" console.print(f"[green]📁 已讀取: {fp} ({len(content)} chars)[/]") else: console.print(f"[red]❌ 不存在: {fp}[/]") continue elif cmd == "/apply": if current_response: for i,(lang,code) in enumerate(extract_code_blocks(current_response)): console.print(Syntax(code, lang or "python", theme="monokai")) fp = Prompt.ask(" 儲存到? (Enter跳過)") if fp.strip(): full = os.path.join(project_dir, fp) os.makedirs(os.path.dirname(full) or ".", exist_ok=True) open(full,"w").write(code) console.print(f" [green]✅ {full}[/]") continue current_prompt = user_input with console.status("[bold cyan]思考中..."): current_response = model.generate(user_input, system_prompt=system_prompt, file_context=file_context) console.print("\n[bold blue]🤖 CodePilot:[/]") console.print(Markdown(current_response)) console.print("\n[dim][green]y[/]=👍 [red]n[/]=👎 [yellow]e[/]=✏️修改 Enter=跳過[/]") fb = Prompt.ask(" ", choices=["y","n","e",""], default="", show_choices=False) if fb == "y": db.save(current_prompt, current_response, label=1, project_dir=project_dir) s = db.count(); console.print(f" [green]👍 +1 (累計:{s['total']})[/]") elif fb == "n": db.save(current_prompt, current_response, label=0, project_dir=project_dir) s = db.count(); console.print(f" [red]👎 +1 (累計:{s['total']})[/]") elif fb == "e": console.print(" [yellow]貼上修改版 (END 結束):[/]") lines = [] while True: try: l = input() if l.strip()=="END": break lines.append(l) except EOFError: break edited = "\n".join(lines) if edited.strip(): db.save(current_prompt, current_response, label=1, edited=edited, project_dir=project_dir) s = db.count(); console.print(f" [yellow]✏️ +1 (累計:{s['total']}, 修改:{s['edits']})[/]") if db.count()["total"] % AUTO_TRAIN_THRESHOLD == 0 and db.count()["total"] > 0: console.print(f"\n [bold yellow]🔔 累積 {db.count()['total']} 條!codepilot --train[/]") console.print("\n[cyan]👋 再見![/]") def trigger_cli_training(db, console, args): stats = db.count() if stats["total"] == 0: console.print("[yellow]⚠️ 無數據[/]"); return console.print(f"\n[bold]🚀 訓練[/] 👍:{stats['thumbs_up']} 👎:{stats['total']-stats['thumbs_up']} ✏️:{stats['edits']}") from datasets import Dataset from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from peft import LoraConfig, prepare_model_for_kbit_training model_name = args.model or DEFAULT_MODEL output_dir = os.path.join(CONFIG_DIR, f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}") bnb = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True) peft_cfg = LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]) sft_data = db.export_sft() kto_data = db.export_kto() if sft_data: console.print(f"\n[bold]📚 SFT ({len(sft_data)} edits)...[/]") from trl import SFTTrainer, SFTConfig ds = Dataset.from_list(sft_data) model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True) tok = AutoTokenizer.from_pretrained(model_name) if tok.pad_token is None: tok.pad_token = tok.eos_token model = prepare_model_for_kbit_training(model) trainer = SFTTrainer(model=model,args=SFTConfig(output_dir=output_dir,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg) trainer.train(); trainer.save_model(output_dir) console.print(f"[green]✅ SFT 完成[/]"); del model; torch.cuda.empty_cache() elif len(kto_data) >= 10: console.print(f"\n[bold]📚 KTO ({len(kto_data)} feedbacks)...[/]") from trl import KTOConfig, KTOTrainer ds = Dataset.from_list(kto_data) model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True) tok = AutoTokenizer.from_pretrained(model_name) if tok.pad_token is None: tok.pad_token = tok.eos_token trainer = KTOTrainer(model=model,args=KTOConfig(output_dir=output_dir,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg) trainer.train(); trainer.save_model(output_dir) console.print(f"[green]✅ KTO 完成[/]") console.print(f"\n[bold green]🎉 訓練完成![/]\n Adapter: {output_dir}\n 重啟: codepilot --adapter {output_dir}") def show_stats(): from rich.console import Console; from rich.table import Table console = Console(); db = FeedbackDB(); s = db.count() t = Table(title="📊 CodePilot 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green"); t.add_column("",style="dim") t.add_row("總回饋",str(s["total"]),"█"*min(s["total"]//2,40)) t.add_row("👍",str(s["thumbs_up"]),"█"*min(s["thumbs_up"]//2,40)) t.add_row("👎",str(s["total"]-s["thumbs_up"]),"█"*min((s["total"]-s["thumbs_up"])//2,40)) t.add_row("✏️修改",str(s["edits"]),"█"*min(s["edits"]//2,40)) console.print(t) if s["total"]>0: r = s["thumbs_up"]/s["total"]*100; console.print(f"\n 接受率: {r:.0f}%") if r < 50: console.print(" [yellow]💡 接受率低,建議 --train[/]") def main(): p = argparse.ArgumentParser(description="CodePilot — 你的專屬 AI 程式助手") p.add_argument("--model",type=str,default=None,help=f"模型 (預設:{DEFAULT_MODEL})") p.add_argument("--adapter",type=str,default=None,help="LoRA adapter") p.add_argument("--project",type=str,default=None,help="專案目錄") p.add_argument("--stats",action="store_true",help="回饋統計") p.add_argument("--train",action="store_true",help="訓練模型") a = p.parse_args() if a.stats: show_stats() elif a.train: from rich.console import Console; trigger_cli_training(FeedbackDB(), Console(), a) else: run_cli(a) if __name__ == "__main__": main()