Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| CodePilot — 你的專屬 AI 程式助手 | |
| ================================= | |
| 像 Claude Code 的終端 CLI 工具,內建: | |
| 🤖 本地 Qwen2.5-Coder 模型推理 | |
| 📁 讀取/修改你的專案文件 | |
| 👍👎 一鍵回饋,自動收集訓練數據 | |
| 🔄 定期用你的回饋數據訓練,模型越用越聰明 | |
| Install: | |
| pip install transformers peft bitsandbytes accelerate trl datasets rich | |
| Usage: | |
| python codepilot.py # 啟動 | |
| python codepilot.py --project ~/my-project # 指定專案 | |
| python codepilot.py --adapter ./my-adapter # 用微調模型 | |
| python codepilot.py --stats # 回饋統計 | |
| python codepilot.py --train # 訓練模型 | |
| """ | |
| import argparse, json, os, sqlite3, subprocess, sys, textwrap, torch | |
| from datetime import datetime | |
| from pathlib import Path | |
| DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-3B-Instruct" | |
| CONFIG_DIR = os.path.expanduser("~/.codepilot") | |
| DB_PATH = os.path.join(CONFIG_DIR, "feedback.db") | |
| AUTO_TRAIN_THRESHOLD = 50 | |
| class FeedbackDB: | |
| def __init__(self, db_path=DB_PATH): | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| self.conn = sqlite3.connect(db_path) | |
| self.conn.execute("""CREATE TABLE IF NOT EXISTS feedback ( | |
| id INTEGER PRIMARY KEY, timestamp TEXT, prompt TEXT, completion TEXT, | |
| label INTEGER, edited_completion TEXT, project_dir TEXT, files_context TEXT)""") | |
| self.conn.commit() | |
| def save(self, prompt, completion, label, edited=None, project_dir=None, files=None): | |
| self.conn.execute("INSERT INTO feedback VALUES (NULL,?,?,?,?,?,?,?)", | |
| (datetime.now().isoformat(), prompt, completion, int(label), edited, project_dir, | |
| json.dumps(files) if files else None)) | |
| self.conn.commit() | |
| def count(self): | |
| r = self.conn.execute("SELECT COUNT(*), SUM(label), SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone() | |
| return {"total": r[0] or 0, "thumbs_up": r[1] or 0, "edits": r[2] or 0} | |
| def export_kto(self): | |
| rows = self.conn.execute("SELECT prompt, completion, label FROM feedback").fetchall() | |
| return [{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)} for p,c,l in rows] | |
| def export_sft(self): | |
| rows = self.conn.execute("SELECT prompt, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall() | |
| return [{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]} for p,c in rows] | |
| def export_dpo(self): | |
| rows = self.conn.execute("SELECT prompt, completion, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall() | |
| return [{"prompt":[{"role":"user","content":p}],"chosen":[{"role":"assistant","content":e}],"rejected":[{"role":"assistant","content":o}]} for p,o,e in rows] | |
| class CodeModel: | |
| def __init__(self, model_name=DEFAULT_MODEL, adapter_path=None): | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token | |
| self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True) | |
| if adapter_path and os.path.exists(adapter_path): | |
| from peft import PeftModel | |
| self.model = PeftModel.from_pretrained(self.model, adapter_path) | |
| self.model.eval() | |
| def generate(self, user_message, system_prompt=None, file_context=None, max_tokens=2048): | |
| messages = [] | |
| if system_prompt: messages.append({"role":"system","content":system_prompt}) | |
| if file_context: | |
| messages.append({"role":"user","content":f"相關文件:\n\n{file_context}"}) | |
| messages.append({"role":"assistant","content":"已了解。請問需要什麼幫助?"}) | |
| messages.append({"role":"user","content":user_message}) | |
| text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device) | |
| with torch.no_grad(): | |
| outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizer.pad_token_id) | |
| return self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) | |
| def extract_code_blocks(text): | |
| blocks = [] | |
| parts = text.split("```") | |
| for i in range(1, len(parts), 2): | |
| lines = parts[i].split("\n", 1) | |
| lang = lines[0].strip() or "python" | |
| code = lines[1].strip() if len(lines) > 1 else "" | |
| blocks.append((lang, code)) | |
| return blocks | |
| def run_cli(args): | |
| from rich.console import Console | |
| from rich.markdown import Markdown | |
| from rich.panel import Panel | |
| from rich.prompt import Prompt | |
| from rich.syntax import Syntax | |
| from rich.table import Table | |
| console = Console(); db = FeedbackDB() | |
| console.print(Panel.fit("[bold cyan]CodePilot[/] — 你的專屬 AI 程式助手\n" + f"[dim]Model: {args.model or DEFAULT_MODEL}[/]", border_style="cyan")) | |
| with console.status("[bold green]載入模型中..."): | |
| model = CodeModel(args.model or DEFAULT_MODEL, args.adapter) | |
| console.print("[green]✅ 模型載入完成[/]\n") | |
| console.print("[dim]指令: /file <path> 讀文件 | /apply 套用code | /stats 統計 | /train 訓練 | /quit 退出[/]\n") | |
| system_prompt = "You are CodePilot, an expert programming assistant. Write clean, efficient, well-documented code. When modifying existing code, show the complete modified version." | |
| project_dir = args.project or os.getcwd() | |
| file_context = current_response = current_prompt = None | |
| while True: | |
| try: user_input = Prompt.ask("\n[bold green]🧑 You") | |
| except (EOFError, KeyboardInterrupt): break | |
| if not user_input.strip(): continue | |
| cmd = user_input.strip().lower() | |
| if cmd in ("/quit", "/exit"): break | |
| elif cmd == "/stats": | |
| stats = db.count() | |
| t = Table(title="📊 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green") | |
| t.add_row("總回饋",str(stats["total"])); t.add_row("👍",str(stats["thumbs_up"])) | |
| t.add_row("👎",str(stats["total"]-stats["thumbs_up"])); t.add_row("✏️修改",str(stats["edits"])) | |
| console.print(t); continue | |
| elif cmd == "/train": trigger_cli_training(db, console, args); continue | |
| elif cmd.startswith("/file "): | |
| fp = os.path.join(project_dir, user_input[6:].strip()) | |
| if os.path.exists(fp): | |
| with open(fp) as f: content = f.read() | |
| file_context = f"--- {fp} ---\n{content}\n--- END ---" | |
| console.print(f"[green]📁 已讀取: {fp} ({len(content)} chars)[/]") | |
| else: console.print(f"[red]❌ 不存在: {fp}[/]") | |
| continue | |
| elif cmd == "/apply": | |
| if current_response: | |
| for i,(lang,code) in enumerate(extract_code_blocks(current_response)): | |
| console.print(Syntax(code, lang or "python", theme="monokai")) | |
| fp = Prompt.ask(" 儲存到? (Enter跳過)") | |
| if fp.strip(): | |
| full = os.path.join(project_dir, fp) | |
| os.makedirs(os.path.dirname(full) or ".", exist_ok=True) | |
| open(full,"w").write(code) | |
| console.print(f" [green]✅ {full}[/]") | |
| continue | |
| current_prompt = user_input | |
| with console.status("[bold cyan]思考中..."): | |
| current_response = model.generate(user_input, system_prompt=system_prompt, file_context=file_context) | |
| console.print("\n[bold blue]🤖 CodePilot:[/]") | |
| console.print(Markdown(current_response)) | |
| console.print("\n[dim][green]y[/]=👍 [red]n[/]=👎 [yellow]e[/]=✏️修改 Enter=跳過[/]") | |
| fb = Prompt.ask(" ", choices=["y","n","e",""], default="", show_choices=False) | |
| if fb == "y": | |
| db.save(current_prompt, current_response, label=1, project_dir=project_dir) | |
| s = db.count(); console.print(f" [green]👍 +1 (累計:{s['total']})[/]") | |
| elif fb == "n": | |
| db.save(current_prompt, current_response, label=0, project_dir=project_dir) | |
| s = db.count(); console.print(f" [red]👎 +1 (累計:{s['total']})[/]") | |
| elif fb == "e": | |
| console.print(" [yellow]貼上修改版 (END 結束):[/]") | |
| lines = [] | |
| while True: | |
| try: | |
| l = input() | |
| if l.strip()=="END": break | |
| lines.append(l) | |
| except EOFError: break | |
| edited = "\n".join(lines) | |
| if edited.strip(): | |
| db.save(current_prompt, current_response, label=1, edited=edited, project_dir=project_dir) | |
| s = db.count(); console.print(f" [yellow]✏️ +1 (累計:{s['total']}, 修改:{s['edits']})[/]") | |
| if db.count()["total"] % AUTO_TRAIN_THRESHOLD == 0 and db.count()["total"] > 0: | |
| console.print(f"\n [bold yellow]🔔 累積 {db.count()['total']} 條!codepilot --train[/]") | |
| console.print("\n[cyan]👋 再見![/]") | |
| def trigger_cli_training(db, console, args): | |
| stats = db.count() | |
| if stats["total"] == 0: console.print("[yellow]⚠️ 無數據[/]"); return | |
| console.print(f"\n[bold]🚀 訓練[/] 👍:{stats['thumbs_up']} 👎:{stats['total']-stats['thumbs_up']} ✏️:{stats['edits']}") | |
| from datasets import Dataset | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig | |
| from peft import LoraConfig, prepare_model_for_kbit_training | |
| model_name = args.model or DEFAULT_MODEL | |
| output_dir = os.path.join(CONFIG_DIR, f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}") | |
| bnb = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True) | |
| peft_cfg = LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]) | |
| sft_data = db.export_sft() | |
| kto_data = db.export_kto() | |
| if sft_data: | |
| console.print(f"\n[bold]📚 SFT ({len(sft_data)} edits)...[/]") | |
| from trl import SFTTrainer, SFTConfig | |
| ds = Dataset.from_list(sft_data) | |
| model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True) | |
| tok = AutoTokenizer.from_pretrained(model_name) | |
| if tok.pad_token is None: tok.pad_token = tok.eos_token | |
| model = prepare_model_for_kbit_training(model) | |
| trainer = SFTTrainer(model=model,args=SFTConfig(output_dir=output_dir,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg) | |
| trainer.train(); trainer.save_model(output_dir) | |
| console.print(f"[green]✅ SFT 完成[/]"); del model; torch.cuda.empty_cache() | |
| elif len(kto_data) >= 10: | |
| console.print(f"\n[bold]📚 KTO ({len(kto_data)} feedbacks)...[/]") | |
| from trl import KTOConfig, KTOTrainer | |
| ds = Dataset.from_list(kto_data) | |
| model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True) | |
| tok = AutoTokenizer.from_pretrained(model_name) | |
| if tok.pad_token is None: tok.pad_token = tok.eos_token | |
| trainer = KTOTrainer(model=model,args=KTOConfig(output_dir=output_dir,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg) | |
| trainer.train(); trainer.save_model(output_dir) | |
| console.print(f"[green]✅ KTO 完成[/]") | |
| console.print(f"\n[bold green]🎉 訓練完成![/]\n Adapter: {output_dir}\n 重啟: codepilot --adapter {output_dir}") | |
| def show_stats(): | |
| from rich.console import Console; from rich.table import Table | |
| console = Console(); db = FeedbackDB(); s = db.count() | |
| t = Table(title="📊 CodePilot 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green"); t.add_column("",style="dim") | |
| t.add_row("總回饋",str(s["total"]),"█"*min(s["total"]//2,40)) | |
| t.add_row("👍",str(s["thumbs_up"]),"█"*min(s["thumbs_up"]//2,40)) | |
| t.add_row("👎",str(s["total"]-s["thumbs_up"]),"█"*min((s["total"]-s["thumbs_up"])//2,40)) | |
| t.add_row("✏️修改",str(s["edits"]),"█"*min(s["edits"]//2,40)) | |
| console.print(t) | |
| if s["total"]>0: | |
| r = s["thumbs_up"]/s["total"]*100; console.print(f"\n 接受率: {r:.0f}%") | |
| if r < 50: console.print(" [yellow]💡 接受率低,建議 --train[/]") | |
| def main(): | |
| p = argparse.ArgumentParser(description="CodePilot — 你的專屬 AI 程式助手") | |
| p.add_argument("--model",type=str,default=None,help=f"模型 (預設:{DEFAULT_MODEL})") | |
| p.add_argument("--adapter",type=str,default=None,help="LoRA adapter") | |
| p.add_argument("--project",type=str,default=None,help="專案目錄") | |
| p.add_argument("--stats",action="store_true",help="回饋統計") | |
| p.add_argument("--train",action="store_true",help="訓練模型") | |
| a = p.parse_args() | |
| if a.stats: show_stats() | |
| elif a.train: from rich.console import Console; trigger_cli_training(FeedbackDB(), Console(), a) | |
| else: run_cli(a) | |
| if __name__ == "__main__": main() | |