sandbox-5ca717e4 / codepilot_v2.py
Justin-lee's picture
Add complete CodePilot v2 with Claude Code features
64b4a54 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CodePilot v2 โ€” Claude Code ้ขจๆ ผ็š„ AI ้–‹็™ผๅŠฉๆ‰‹
==============================================
ๅƒ Claude Code ไธ€ๆจฃๅœจๅฐˆๆกˆ็›ฎ้Œ„ไธญ็›ดๆŽฅ้–‹็™ผ๏ผš
๐Ÿ“ ่ฎ€ๅ–ใ€็ทจ่ผฏใ€ๅปบ็ซ‹ๅฐˆๆกˆๆ–‡ไปถ
๐Ÿ–ฅ๏ธ ๅŸท่กŒ็ต‚็ซฏๆŒ‡ไปค๏ผˆpython, pytest, npm, etc.๏ผ‰
๐Ÿ” ๆœๅฐ‹็จ‹ๅผ็ขผ๏ผˆripgrep/grep๏ผ‰
๐Ÿ“‚ ็€่ฆฝๅฐˆๆกˆ็ตๆง‹
๐Ÿ”€ Git ๆ•ดๅˆ๏ผˆ็‹€ๆ…‹ใ€diffใ€commit๏ผ‰
๐Ÿ‘๐Ÿ‘Ž ๅ›ž้ฅ‹ๆ”ถ้›† โ†’ ๆจกๅž‹่ถŠ็”จ่ถŠๅผท
Install:
pip install transformers peft bitsandbytes accelerate trl datasets rich
Usage:
cd ~/my-project && python codepilot_v2.py
python codepilot_v2.py --stats
python codepilot_v2.py --train
python codepilot_v2.py --adapter ~/.codepilot/adapter_20260422
"""
import argparse,difflib,json,os,re,shutil,sqlite3,subprocess,sys,textwrap,torch
from datetime import datetime
from pathlib import Path
DEFAULT_MODEL="Qwen/Qwen2.5-Coder-3B-Instruct"
CONFIG_DIR=os.path.expanduser("~/.codepilot")
DB_PATH=os.path.join(CONFIG_DIR,"feedback.db")
AUTO_TRAIN_THRESHOLD=50
DANGEROUS_COMMANDS={"rm -rf /","git push --force","git reset --hard","dd if=","mkfs.","> /dev/sd"}
class FeedbackDB:
def __init__(self):
os.makedirs(CONFIG_DIR,exist_ok=True);self.conn=sqlite3.connect(DB_PATH)
self.conn.execute("CREATE TABLE IF NOT EXISTS feedback(id INTEGER PRIMARY KEY,timestamp TEXT,prompt TEXT,completion TEXT,label INTEGER,edited_completion TEXT,project TEXT)");self.conn.commit()
def save(self,prompt,completion,label,edited=None,project=None):
self.conn.execute("INSERT INTO feedback VALUES(NULL,?,?,?,?,?,?)",(datetime.now().isoformat(),prompt,completion,int(label),edited,project));self.conn.commit()
def count(self):
r=self.conn.execute("SELECT COUNT(*),COALESCE(SUM(label),0),SUM(CASE WHEN edited_completion IS NOT NULL THEN 1 ELSE 0 END) FROM feedback").fetchone()
return{"total":r[0],"up":int(r[1]),"edits":int(r[2]or 0)}
def export_sft(self):
return[{"messages":[{"role":"user","content":p},{"role":"assistant","content":c}]}for p,c in self.conn.execute("SELECT prompt,edited_completion FROM feedback WHERE edited_completion IS NOT NULL").fetchall()]
def export_kto(self):
return[{"prompt":[{"role":"user","content":p}],"completion":[{"role":"assistant","content":c}],"label":bool(l)}for p,c,l in self.conn.execute("SELECT prompt,completion,label FROM feedback").fetchall()]
class ProjectTools:
def __init__(self,project_dir):self.project_dir=os.path.abspath(project_dir);self.cwd=self.project_dir;self.read_cache={}
def _resolve(self,path):return path if os.path.isabs(path)else os.path.normpath(os.path.join(self.cwd,path))
def read_file(self,path,offset=1,limit=200):
full=self._resolve(path)
if not os.path.exists(full):return f"โŒ ๆ–‡ไปถไธๅญ˜ๅœจ: {path}"
try:
content=Path(full).read_text(encoding="utf-8",errors="replace");lines=content.splitlines();selected=lines[offset-1:offset-1+limit]
self.read_cache[full]={"time":os.path.getmtime(full),"content":content}
result="\n".join(f"{i+offset:4d} โ”‚ {line}"for i,line in enumerate(selected))
if offset+limit<len(lines):result+=f"\n... ({len(lines)-offset-limit+1} more lines)"
return result
except Exception as e:return f"โŒ {e}"
def edit_file(self,path,old_string,new_string):
full=self._resolve(path)
if full not in self.read_cache:return"โŒ ๅฟ…้ ˆๅ…ˆ read_file ๆ‰่ƒฝ็ทจ่ผฏ"
if not os.path.exists(full):return f"โŒ ไธๅญ˜ๅœจ: {path}"
content=Path(full).read_text(encoding="utf-8")
if os.path.getmtime(full)!=self.read_cache[full]["time"]:return"โŒ ๆ–‡ไปถๅทฒ่ขซๅค–้ƒจไฟฎๆ”น๏ผŒ่ซ‹้‡ๆ–ฐ read_file"
count=content.count(old_string)
if count==0:return"โŒ ๆ‰พไธๅˆฐ่ฆๆ›ฟๆ›็š„ๆ–‡ๅญ—"
if count>1:return f"โŒ ๆ‰พๅˆฐ {count} ่™•ๅŒน้…๏ผŒ่ซ‹ๆไพ›ๆ›ดๅคšไธŠไธ‹ๆ–‡"
new_content=content.replace(old_string,new_string,1)
diff=list(difflib.unified_diff(content.splitlines(keepends=True),new_content.splitlines(keepends=True),fromfile=f"a/{path}",tofile=f"b/{path}"))
Path(full).write_text(new_content,encoding="utf-8");self.read_cache[full]={"time":os.path.getmtime(full),"content":new_content}
return"โœ… ๅทฒไฟฎๆ”น:\n"+"".join(diff)
def write_file(self,path,content):
full=self._resolve(path);os.makedirs(os.path.dirname(full)or".",exist_ok=True);is_new=not os.path.exists(full)
Path(full).write_text(content,encoding="utf-8");self.read_cache[full]={"time":os.path.getmtime(full),"content":content}
return f"โœ… {'ๅปบ็ซ‹'if is_new else'่ฆ†ๅฏซ'}: {path} ({len(content)} chars)"
def run_command(self,command,timeout=120):
for d in DANGEROUS_COMMANDS:
if d in command:return f"โ›” ๅฑ้šชๆŒ‡ไปค: {command}"
try:
r=subprocess.run(command,shell=True,cwd=self.cwd,capture_output=True,text=True,timeout=timeout)
out=r.stdout+(f"\nSTDERR:\n{r.stderr}"if r.stderr else "")+(f"\n(exit {r.returncode})"if r.returncode else"")
m=re.match(r"cd\s+(.+?)(\s*&&|\s*;|\s*$)",command)
if m:
nd=self._resolve(m.group(1).strip())
if os.path.isdir(nd):self.cwd=nd
return out[:10000]
except subprocess.TimeoutExpired:return f"โฐ ่ถ…ๆ™‚ ({timeout}s)"
except Exception as e:return f"โŒ {e}"
def search_files(self,pattern,glob_pattern=None):
rg=shutil.which("rg");cmd=[rg or"grep","-rn"]
if rg:cmd+=["--color=never","--max-count=50"]
if glob_pattern and rg:cmd+=["--glob",glob_pattern]
cmd+=[pattern,self.cwd]
try:return subprocess.run(cmd,capture_output=True,text=True,timeout=30).stdout[:5000]or"็„กๅŒน้…"
except Exception as e:return f"โŒ {e}"
def list_files(self,pattern="*",max_depth=3):
try:
r=subprocess.run(["find",self.cwd,"-maxdepth",str(max_depth),"-name",pattern,"-not","-path","*/.git/*","-not","-path","*/node_modules/*","-not","-path","*/__pycache__/*"],capture_output=True,text=True,timeout=10)
return"\n".join(sorted(os.path.relpath(f,self.cwd)for f in r.stdout.strip().split("\n")if f.strip())[:100])
except:
files=[]
for root,dirs,fnames in os.walk(self.cwd):
dirs[:]=[d for d in dirs if d not in{".git","node_modules","__pycache__",".venv"}]
if root.replace(self.cwd,"").count(os.sep)>=max_depth:continue
files.extend(os.path.relpath(os.path.join(root,f),self.cwd)for f in fnames if Path(f).match(pattern))
return"\n".join(sorted(files)[:100])
def git_context(self):
try:
b=subprocess.run(["git","branch","--show-current"],cwd=self.project_dir,capture_output=True,text=True).stdout.strip()
s=subprocess.run(["git","status","--short"],cwd=self.project_dir,capture_output=True,text=True).stdout.strip()
l=subprocess.run(["git","log","--oneline","-5"],cwd=self.project_dir,capture_output=True,text=True).stdout.strip()
return f"Branch: {b}\nStatus:\n{s}\nRecent:\n{l}"
except:return"(not a git repo)"
TOOL_PATTERN=re.compile(r'<tool>\s*(\w+)\s*\n(.*?)</tool>',re.DOTALL)
def parse_tool_calls(text):
calls=[]
for m in TOOL_PATTERN.finditer(text):
try:params=json.loads(m.group(2).strip())
except:
params={}
for line in m.group(2).strip().split("\n"):
if":"in line:k,v=line.split(":",1);params[k.strip()]=v.strip().strip('"').strip("'")
calls.append({"tool":m.group(1),"params":params})
return calls
def execute_tool(tools,call):
n,p=call["tool"],call["params"]
try:
if n=="read_file":return tools.read_file(p.get("path",""),int(p.get("offset",1)),int(p.get("limit",200)))
elif n=="edit_file":return tools.edit_file(p.get("path",""),p.get("old_string",""),p.get("new_string",""))
elif n=="write_file":return tools.write_file(p.get("path",""),p.get("content",""))
elif n=="run_command":return tools.run_command(p.get("command",""),int(p.get("timeout",120)))
elif n=="search_files":return tools.search_files(p.get("pattern",""),p.get("glob"))
elif n=="list_files":return tools.list_files(p.get("pattern","*"),int(p.get("max_depth",3)))
elif n=="git_status":return tools.git_context()
else:return f"โŒ ๆœช็Ÿฅ: {n}"
except Exception as e:return f"โŒ {e}"
def build_system_prompt(tools):
return f"""You are CodePilot, an expert AI programming assistant working directly in the user's project.
## Current Project
Working directory: {tools.cwd}
{tools.git_context()}
## Available Tools
Use tools by wrapping in <tool></tool> tags:
### read_file โ€” Read a file (ALWAYS do this before editing)
<tool>read_file
{{"path": "src/main.py", "offset": 1, "limit": 200}}
</tool>
### edit_file โ€” Edit file by exact string replace (must read first, old_string must be unique)
<tool>edit_file
{{"path": "src/main.py", "old_string": "def old():\\n pass", "new_string": "def new():\\n return 42"}}
</tool>
### write_file โ€” Create or overwrite a file
<tool>write_file
{{"path": "src/new.py", "content": "print('hello')"}}
</tool>
### run_command โ€” Execute shell command
<tool>run_command
{{"command": "python -m pytest tests/", "timeout": 60}}
</tool>
### search_files โ€” Search code with regex
<tool>search_files
{{"pattern": "def main", "glob": "*.py"}}
</tool>
### list_files โ€” List project files
<tool>list_files
{{"pattern": "*.py", "max_depth": 3}}
</tool>
### git_status โ€” Get git info
<tool>git_status
{{}}
</tool>
## Rules
1. ALWAYS read a file before editing it
2. old_string must EXACTLY match file content (whitespace matters)
3. Prefer edit_file over write_file for existing files
4. After changes, verify by reading file or running tests
5. For git: stage specific files, never git add -A
6. Be concise and actionable"""
class CodeModel:
def __init__(self,model_name=DEFAULT_MODEL,adapter_path=None):
from transformers import AutoTokenizer,AutoModelForCausalLM
self.tokenizer=AutoTokenizer.from_pretrained(model_name)
if self.tokenizer.pad_token is None:self.tokenizer.pad_token=self.tokenizer.eos_token
self.model=AutoModelForCausalLM.from_pretrained(model_name,torch_dtype=torch.bfloat16,device_map="auto",trust_remote_code=True)
if adapter_path and os.path.exists(adapter_path):
from peft import PeftModel;self.model=PeftModel.from_pretrained(self.model,adapter_path)
self.model.eval()
def chat(self,messages,max_tokens=4096):
text=self.tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
inputs=self.tokenizer(text,return_tensors="pt").to(self.model.device)
with torch.no_grad():out=self.model.generate(**inputs,max_new_tokens=max_tokens,do_sample=True,temperature=0.7,top_p=0.9,repetition_penalty=1.1,pad_token_id=self.tokenizer.pad_token_id)
return self.tokenizer.decode(out[0][inputs["input_ids"].shape[1]:],skip_special_tokens=True)
def run_agent_loop(args):
from rich.console import Console;from rich.markdown import Markdown;from rich.panel import Panel;from rich.prompt import Prompt;from rich.syntax import Syntax
console=Console();db=FeedbackDB();project_dir=args.project or os.getcwd();tools=ProjectTools(project_dir)
console.print(Panel.fit(f"[bold cyan]CodePilot v2[/] โ€” Claude Code ้ขจๆ ผ AI ้–‹็™ผๅŠฉๆ‰‹\n[dim]Model: {args.model or DEFAULT_MODEL}\nProject: {project_dir}[/]",border_style="cyan"))
with console.status("[bold green]่ผ‰ๅ…ฅๆจกๅž‹ไธญ..."):model=CodeModel(args.model or DEFAULT_MODEL,args.adapter)
console.print("[green]โœ… ๆจกๅž‹่ผ‰ๅ…ฅๅฎŒๆˆ[/]")
git_ctx=tools.git_context()
if git_ctx!="(not a git repo)":console.print(Panel(git_ctx,title="๐Ÿ“‚ Project",border_style="dim"))
console.print("[dim]็›ดๆŽฅ่ผธๅ…ฅ้œ€ๆฑ‚ | /ls ๅˆ—ๆช” | /git ็‹€ๆ…‹ | /clear ๆธ…้™ค | /status ็ตฑ่จˆ | /train ่จ“็ทด | /quit ้€€ๅ‡บ[/]\n")
system_prompt=build_system_prompt(tools);messages=[{"role":"system","content":system_prompt}]
while True:
try:user_input=Prompt.ask("\n[bold green]๐Ÿง‘ You")
except(EOFError,KeyboardInterrupt):break
if not user_input.strip():continue
cmd=user_input.strip()
if cmd in("/quit","/exit"):break
elif cmd=="/status":s=db.count();console.print(f" ๐Ÿ‘{s['up']} ๐Ÿ‘Ž{s['total']-s['up']} โœ๏ธ{s['edits']} Total:{s['total']}");continue
elif cmd=="/train":trigger_training(db,console,args);continue
elif cmd=="/clear":messages=[{"role":"system","content":system_prompt}];console.print("[dim]ๅทฒๆธ…้™ค[/]");continue
elif cmd=="/git":console.print(Panel(tools.git_context(),title="Git",border_style="dim"));continue
elif cmd.startswith("/ls"):console.print(tools.list_files(cmd[3:].strip()or"*"));continue
messages.append({"role":"user","content":user_input});full_response=""
for rnd in range(10):
with console.status(f"[bold cyan]{'ๆ€่€ƒไธญ'if rnd==0 else f'ๅทฅๅ…ท round {rnd+1}'}..."):response=model.chat(messages)
tool_calls=parse_tool_calls(response);text_parts=TOOL_PATTERN.sub("",response).strip()
if text_parts:console.print(f"\n[bold blue]๐Ÿค– CodePilot:[/]");console.print(Markdown(text_parts))
full_response+=response+"\n"
if not tool_calls:break
messages.append({"role":"assistant","content":response});results=[]
for call in tool_calls:
console.print(f"\n [dim]๐Ÿ”ง {call['tool']}({json.dumps(call['params'],ensure_ascii=False)[:100]})[/]")
result=execute_tool(tools,call)
if call["tool"]=="edit_file"and result.startswith("โœ…"):
d=result.split("\n",1)[1]if"\n"in result else""
if d:console.print(Syntax(d,"diff",theme="monokai"))
else:console.print(f" [green]{result.split(chr(10))[0]}[/]")
elif call["tool"]=="run_command":console.print(Panel(result[:500]+("..."if len(result)>500 else""),title="Terminal",border_style="dim"))
elif call["tool"]=="read_file":console.print(f" [dim]({result.count(chr(10))+1} lines)[/]")
else:console.print(f" [dim]{result[:200]}{'...'if len(result)>200 else''}[/]")
results.append(f"[{call['tool']}] {result}")
messages.append({"role":"user","content":"Tool results:\n"+"\n\n".join(results)})
console.print(f"\n[dim][green]y[/]=๐Ÿ‘ [red]n[/]=๐Ÿ‘Ž [yellow]e[/]=โœ๏ธ Enter=่ทณ้Ž[/]")
fb=Prompt.ask(" ",choices=["y","n","e",""],default="",show_choices=False)
if fb=="y":db.save(user_input,full_response,1,project=project_dir);console.print(" [green]๐Ÿ‘[/]")
elif fb=="n":db.save(user_input,full_response,0,project=project_dir);console.print(" [red]๐Ÿ‘Ž[/]")
elif fb=="e":
console.print(" [yellow]่ฒผไธŠไฟฎๆ”น็‰ˆ(END็ตๆŸ):[/]");lines=[]
while True:
try:l=input();(lines.append(l)if l.strip()!="END"else(_ for _ in()).throw(StopIteration))
except(EOFError,StopIteration):break
edited="\n".join(lines)
if edited.strip():db.save(user_input,full_response,1,edited=edited,project=project_dir);console.print(" [yellow]โœ๏ธ[/]")
if messages[-1]["role"]=="user"and"Tool results:"in messages[-1]["content"]:messages.append({"role":"assistant","content":full_response})
console.print("\n[cyan]๐Ÿ‘‹[/]")
def trigger_training(db,console,args):
s=db.count()
if s["total"]==0:console.print("[yellow]โš ๏ธ ็„กๆ•ธๆ“š[/]");return
console.print(f"\n[bold]๐Ÿš€[/] ๐Ÿ‘:{s['up']} ๐Ÿ‘Ž:{s['total']-s['up']} โœ๏ธ:{s['edits']}")
from datasets import Dataset;from transformers import AutoModelForCausalLM,AutoTokenizer,BitsAndBytesConfig;from peft import LoraConfig,prepare_model_for_kbit_training
mn=args.model or DEFAULT_MODEL;od=os.path.join(CONFIG_DIR,f"adapter_{datetime.now().strftime('%Y%m%d_%H%M')}")
bnb=BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_quant_type="nf4",bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_use_double_quant=True)
pc=LoraConfig(r=16,lora_alpha=32,lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",target_modules=["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"])
sft,kto=db.export_sft(),db.export_kto()
if sft:
console.print(f"[bold]๐Ÿ“š SFT({len(sft)})[/]");from trl import SFTTrainer,SFTConfig
m=AutoModelForCausalLM.from_pretrained(mn,quantization_config=bnb,device_map="auto",trust_remote_code=True);t=AutoTokenizer.from_pretrained(mn)
if t.pad_token is None:t.pad_token=t.eos_token
m=prepare_model_for_kbit_training(m)
SFTTrainer(model=m,args=SFTConfig(output_dir=od,learning_rate=2e-4,num_train_epochs=3,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_seq_length=1024,gradient_checkpointing=True,bf16=True,optim="paged_adamw_8bit",logging_steps=5,save_total_limit=1,logging_strategy="steps",logging_first_step=True),processing_class=t,train_dataset=Dataset.from_list(sft),peft_config=pc).train()
m.save_pretrained(od);del m;torch.cuda.empty_cache()
elif len(kto)>=10:
console.print(f"[bold]๐Ÿ“š KTO({len(kto)})[/]");from trl import KTOConfig,KTOTrainer
m=AutoModelForCausalLM.from_pretrained(mn,quantization_config=bnb,device_map="auto",trust_remote_code=True);t=AutoTokenizer.from_pretrained(mn)
if t.pad_token is None:t.pad_token=t.eos_token
KTOTrainer(model=m,args=KTOConfig(output_dir=od,learning_rate=1e-5,num_train_epochs=1,per_device_train_batch_size=1,gradient_accumulation_steps=8,max_length=1024,gradient_checkpointing=True,bf16=True,logging_steps=5,logging_strategy="steps",logging_first_step=True),processing_class=t,train_dataset=Dataset.from_list(kto),peft_config=pc).train()
m.save_pretrained(od)
console.print(f"\n[bold green]๐ŸŽ‰[/] {od}\n codepilot --adapter {od}")
def show_stats():
from rich.console import Console;from rich.table import Table
c=Console();s=FeedbackDB().count();t=Table(title="๐Ÿ“Š CodePilot");t.add_column("",style="cyan");t.add_column("",style="green")
t.add_row("Total",str(s["total"]));t.add_row("๐Ÿ‘",str(s["up"]));t.add_row("๐Ÿ‘Ž",str(s["total"]-s["up"]));t.add_row("โœ๏ธ",str(s["edits"]));c.print(t)
def main():
p=argparse.ArgumentParser(description="CodePilot v2");p.add_argument("--model",type=str);p.add_argument("--adapter",type=str);p.add_argument("--project",type=str);p.add_argument("--stats",action="store_true");p.add_argument("--train",action="store_true")
a=p.parse_args()
if a.stats:show_stats()
elif a.train:from rich.console import Console;trigger_training(FeedbackDB(),Console(),a)
else:run_agent_loop(a)
if __name__=="__main__":main()