File size: 14,178 Bytes
d341b49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CodePilot — 你的專屬 AI 程式助手
=================================

像 Claude Code 的終端 CLI 工具,內建:
  🤖 本地 Qwen2.5-Coder 模型推理
  📁 讀取/修改你的專案文件
  👍👎 一鍵回饋,自動收集訓練數據
  🔄 定期用你的回饋數據訓練,模型越用越聰明

Install:
    pip install transformers peft bitsandbytes accelerate trl datasets rich

Usage:
    python codepilot.py                          # 啟動
    python codepilot.py --project ~/my-project   # 指定專案
    python codepilot.py --adapter ./my-adapter   # 用微調模型
    python codepilot.py --stats                  # 回饋統計
    python codepilot.py --train                  # 訓練模型
"""
import argparse, json, os, sqlite3, subprocess, sys, textwrap, torch
from datetime import datetime
from pathlib import Path

DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-3B-Instruct"
CONFIG_DIR = os.path.expanduser("~/.codepilot")
DB_PATH = os.path.join(CONFIG_DIR, "feedback.db")
AUTO_TRAIN_THRESHOLD = 50

class FeedbackDB:
    def __init__(self, db_path=DB_PATH):
        os.makedirs(os.path.dirname(db_path), exist_ok=True)
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""CREATE TABLE IF NOT EXISTS feedback (
            id INTEGER PRIMARY KEY, timestamp TEXT, prompt TEXT, completion TEXT,
            label INTEGER, edited_completion TEXT, project_dir TEXT, files_context TEXT)""")
        self.conn.commit()

    def save(self, prompt, completion, label, edited=None, project_dir=None, files=None):
        self.conn.execute("INSERT INTO feedback VALUES (NULL,?,?,?,?,?,?,?)",
            (datetime.now().isoformat(), prompt, completion, int(label), edited, project_dir,
             json.dumps(files) if files else None))
        self.conn.commit()

    def count(self):
        r = self.conn.execute("SELECT COUNT(*), SUM(label), SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone()
        return {"total": r[0] or 0, "thumbs_up": r[1] or 0, "edits": r[2] or 0}

    def export_kto(self):
        rows = self.conn.execute("SELECT prompt, completion, label FROM feedback").fetchall()
        return [{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)} for p,c,l in rows]

    def export_sft(self):
        rows = self.conn.execute("SELECT prompt, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
        return [{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]} for p,c in rows]

    def export_dpo(self):
        rows = self.conn.execute("SELECT prompt, completion, edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()
        return [{"prompt":[{"role":"user","content":p}],"chosen":[{"role":"assistant","content":e}],"rejected":[{"role":"assistant","content":o}]} for p,o,e in rows]

class CodeModel:
    def __init__(self, model_name=DEFAULT_MODEL, adapter_path=None):
        from transformers import AutoTokenizer, AutoModelForCausalLM
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token
        self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True)
        if adapter_path and os.path.exists(adapter_path):
            from peft import PeftModel
            self.model = PeftModel.from_pretrained(self.model, adapter_path)
        self.model.eval()

    def generate(self, user_message, system_prompt=None, file_context=None, max_tokens=2048):
        messages = []
        if system_prompt: messages.append({"role":"system","content":system_prompt})
        if file_context:
            messages.append({"role":"user","content":f"相關文件:\n\n{file_context}"})
            messages.append({"role":"assistant","content":"已了解。請問需要什麼幫助?"})
        messages.append({"role":"user","content":user_message})
        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
        with torch.no_grad():
            outputs = self.model.generate(**inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizer.pad_token_id)
        return self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)

def extract_code_blocks(text):
    blocks = []
    parts = text.split("```")
    for i in range(1, len(parts), 2):
        lines = parts[i].split("\n", 1)
        lang = lines[0].strip() or "python"
        code = lines[1].strip() if len(lines) > 1 else ""
        blocks.append((lang, code))
    return blocks

def run_cli(args):
    from rich.console import Console
    from rich.markdown import Markdown
    from rich.panel import Panel
    from rich.prompt import Prompt
    from rich.syntax import Syntax
    from rich.table import Table

    console = Console(); db = FeedbackDB()
    console.print(Panel.fit("[bold cyan]CodePilot[/] — 你的專屬 AI 程式助手\n" + f"[dim]Model: {args.model or DEFAULT_MODEL}[/]", border_style="cyan"))

    with console.status("[bold green]載入模型中..."):
        model = CodeModel(args.model or DEFAULT_MODEL, args.adapter)
    console.print("[green]✅ 模型載入完成[/]\n")
    console.print("[dim]指令: /file <path> 讀文件 | /apply 套用code | /stats 統計 | /train 訓練 | /quit 退出[/]\n")

    system_prompt = "You are CodePilot, an expert programming assistant. Write clean, efficient, well-documented code. When modifying existing code, show the complete modified version."
    project_dir = args.project or os.getcwd()
    file_context = current_response = current_prompt = None

    while True:
        try: user_input = Prompt.ask("\n[bold green]🧑 You")
        except (EOFError, KeyboardInterrupt): break
        if not user_input.strip(): continue
        cmd = user_input.strip().lower()

        if cmd in ("/quit", "/exit"): break
        elif cmd == "/stats":
            stats = db.count()
            t = Table(title="📊 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green")
            t.add_row("總回饋",str(stats["total"])); t.add_row("👍",str(stats["thumbs_up"]))
            t.add_row("👎",str(stats["total"]-stats["thumbs_up"])); t.add_row("✏️修改",str(stats["edits"]))
            console.print(t); continue
        elif cmd == "/train": trigger_cli_training(db, console, args); continue
        elif cmd.startswith("/file "):
            fp = os.path.join(project_dir, user_input[6:].strip())
            if os.path.exists(fp):
                with open(fp) as f: content = f.read()
                file_context = f"--- {fp} ---\n{content}\n--- END ---"
                console.print(f"[green]📁 已讀取: {fp} ({len(content)} chars)[/]")
            else: console.print(f"[red]❌ 不存在: {fp}[/]")
            continue
        elif cmd == "/apply":
            if current_response:
                for i,(lang,code) in enumerate(extract_code_blocks(current_response)):
                    console.print(Syntax(code, lang or "python", theme="monokai"))
                    fp = Prompt.ask("  儲存到? (Enter跳過)")
                    if fp.strip():
                        full = os.path.join(project_dir, fp)
                        os.makedirs(os.path.dirname(full) or ".", exist_ok=True)
                        open(full,"w").write(code)
                        console.print(f"  [green]✅ {full}[/]")
            continue

        current_prompt = user_input
        with console.status("[bold cyan]思考中..."):
            current_response = model.generate(user_input, system_prompt=system_prompt, file_context=file_context)
        console.print("\n[bold blue]🤖 CodePilot:[/]")
        console.print(Markdown(current_response))
        console.print("\n[dim][green]y[/]=👍  [red]n[/]=👎  [yellow]e[/]=✏️修改  Enter=跳過[/]")
        fb = Prompt.ask("  ", choices=["y","n","e",""], default="", show_choices=False)

        if fb == "y":
            db.save(current_prompt, current_response, label=1, project_dir=project_dir)
            s = db.count(); console.print(f"  [green]👍 +1 (累計:{s['total']})[/]")
        elif fb == "n":
            db.save(current_prompt, current_response, label=0, project_dir=project_dir)
            s = db.count(); console.print(f"  [red]👎 +1 (累計:{s['total']})[/]")
        elif fb == "e":
            console.print("  [yellow]貼上修改版 (END 結束):[/]")
            lines = []
            while True:
                try:
                    l = input()
                    if l.strip()=="END": break
                    lines.append(l)
                except EOFError: break
            edited = "\n".join(lines)
            if edited.strip():
                db.save(current_prompt, current_response, label=1, edited=edited, project_dir=project_dir)
                s = db.count(); console.print(f"  [yellow]✏️ +1 (累計:{s['total']}, 修改:{s['edits']})[/]")

        if db.count()["total"] % AUTO_TRAIN_THRESHOLD == 0 and db.count()["total"] > 0:
            console.print(f"\n  [bold yellow]🔔 累積 {db.count()['total']} 條!codepilot --train[/]")

    console.print("\n[cyan]👋 再見![/]")

def trigger_cli_training(db, console, args):
    stats = db.count()
    if stats["total"] == 0: console.print("[yellow]⚠️ 無數據[/]"); return
    console.print(f"\n[bold]🚀 訓練[/] 👍:{stats['thumbs_up']} 👎:{stats['total']-stats['thumbs_up']} ✏️:{stats['edits']}")
    from datasets import Dataset
    from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
    from peft import LoraConfig, prepare_model_for_kbit_training
    model_name = args.model or DEFAULT_MODEL
    output_dir = os.path.join(CONFIG_DIR, f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}")
    bnb = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True)
    peft_cfg = LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"])

    sft_data = db.export_sft()
    kto_data = db.export_kto()

    if sft_data:
        console.print(f"\n[bold]📚 SFT ({len(sft_data)} edits)...[/]")
        from trl import SFTTrainer, SFTConfig
        ds = Dataset.from_list(sft_data)
        model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
        tok = AutoTokenizer.from_pretrained(model_name)
        if tok.pad_token is None: tok.pad_token = tok.eos_token
        model = prepare_model_for_kbit_training(model)
        trainer = SFTTrainer(model=model,args=SFTConfig(output_dir=output_dir,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
        trainer.train(); trainer.save_model(output_dir)
        console.print(f"[green]✅ SFT 完成[/]"); del model; torch.cuda.empty_cache()
    elif len(kto_data) >= 10:
        console.print(f"\n[bold]📚 KTO ({len(kto_data)} feedbacks)...[/]")
        from trl import KTOConfig, KTOTrainer
        ds = Dataset.from_list(kto_data)
        model = AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb,device_map="auto",trust_remote_code=True)
        tok = AutoTokenizer.from_pretrained(model_name)
        if tok.pad_token is None: tok.pad_token = tok.eos_token
        trainer = KTOTrainer(model=model,args=KTOConfig(output_dir=output_dir,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=tok,train_dataset=ds,peft_config=peft_cfg)
        trainer.train(); trainer.save_model(output_dir)
        console.print(f"[green]✅ KTO 完成[/]")

    console.print(f"\n[bold green]🎉 訓練完成![/]\n   Adapter: {output_dir}\n   重啟: codepilot --adapter {output_dir}")

def show_stats():
    from rich.console import Console; from rich.table import Table
    console = Console(); db = FeedbackDB(); s = db.count()
    t = Table(title="📊 CodePilot 回饋統計"); t.add_column("指標",style="cyan"); t.add_column("數值",style="green"); t.add_column("",style="dim")
    t.add_row("總回饋",str(s["total"]),"█"*min(s["total"]//2,40))
    t.add_row("👍",str(s["thumbs_up"]),"█"*min(s["thumbs_up"]//2,40))
    t.add_row("👎",str(s["total"]-s["thumbs_up"]),"█"*min((s["total"]-s["thumbs_up"])//2,40))
    t.add_row("✏️修改",str(s["edits"]),"█"*min(s["edits"]//2,40))
    console.print(t)
    if s["total"]>0:
        r = s["thumbs_up"]/s["total"]*100; console.print(f"\n   接受率: {r:.0f}%")
        if r < 50: console.print("   [yellow]💡 接受率低,建議 --train[/]")

def main():
    p = argparse.ArgumentParser(description="CodePilot — 你的專屬 AI 程式助手")
    p.add_argument("--model",type=str,default=None,help=f"模型 (預設:{DEFAULT_MODEL})")
    p.add_argument("--adapter",type=str,default=None,help="LoRA adapter")
    p.add_argument("--project",type=str,default=None,help="專案目錄")
    p.add_argument("--stats",action="store_true",help="回饋統計")
    p.add_argument("--train",action="store_true",help="訓練模型")
    a = p.parse_args()
    if a.stats: show_stats()
    elif a.train: from rich.console import Console; trigger_cli_training(FeedbackDB(), Console(), a)
    else: run_cli(a)

if __name__ == "__main__": main()