surrogate-1 / bin /lib /codebase_scanner.py
Ashira Pitchayapakayakul
feat: migrate $HOME/.claude/* to $HOME/.surrogate/* (clean separation from Claude Code)
e36381e
"""Codebase scanner β€” full review before each task iteration.
Purpose (per Ashira): full scan first, then grep context that previous iteration
left behind. "Review agent" relies on this to know what was done vs what remains.
3-pass strategy:
Pass 1: List recently-modified files across watched roots (last 7 days)
Pass 2: Semantic search via ChromaDB (if index exists) using task keywords
Pass 3: Git status + diff for any repos found (to detect uncommitted work)
Input: task description (string)
Output: structured summary dict the dispatcher can feed to models as context
"""
from __future__ import annotations
import datetime as dt
import json
import os
import re
import subprocess
from pathlib import Path
HOME = Path.home()
WATCHED_ROOTS = [
HOME / "develope",
HOME / "axentx",
HOME / ".surrogate" / "bin",
]
RECENT_DAYS = 7
MAX_FILE_SIZE = 100_000 # skip large binaries
MAX_FILES_PASS1 = 50
MAX_CHUNKS_PASS2 = 10
CHROMA_DB = HOME / ".surrogate" / "code-vector-db"
def _keywords(task: str) -> list[str]:
tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]*", task.lower())
stop = {"a", "an", "the", "is", "are", "was", "were", "be", "to", "and",
"or", "but", "if", "then", "else", "for", "with", "of", "in", "on",
"at", "this", "that", "from", "by", "as", "i", "you", "it", "we",
"they", "write", "create", "make", "build", "add", "update", "task"}
return [t for t in tokens if len(t) >= 3 and t not in stop][:10]
def _recent_files(keywords: list[str], roots: list[Path]) -> list[dict]:
"""Find recently modified source files matching keywords."""
cutoff = dt.datetime.now() - dt.timedelta(days=RECENT_DAYS)
out = []
for root in roots:
if not root.exists():
continue
for dirpath, dirnames, filenames in os.walk(root):
# skip hidden, node_modules, .git, venv
dirnames[:] = [d for d in dirnames if not d.startswith(".")
and d not in {"node_modules", "vendor", "venv", ".venv",
"__pycache__", "dist", "build", "target"}]
for f in filenames:
p = Path(dirpath) / f
try:
st = p.stat()
except OSError:
continue
if st.st_size > MAX_FILE_SIZE:
continue
mtime = dt.datetime.fromtimestamp(st.st_mtime)
if mtime < cutoff:
continue
# score by keyword hits in name/path
path_lower = str(p).lower()
score = sum(1 for kw in keywords if kw in path_lower)
# light content match (first 4KB only for perf)
try:
with open(p, "r", errors="replace") as fh:
head = fh.read(4096).lower()
score += sum(1 for kw in keywords if kw in head) * 2
except OSError:
continue
if score > 0:
out.append({
"path": str(p),
"mtime": mtime.isoformat(),
"score": score,
"size": st.st_size,
})
out.sort(key=lambda x: -x["score"])
return out[:MAX_FILES_PASS1]
def _chromadb_search(keywords: list[str], task: str) -> list[dict]:
"""Query ChromaDB semantic index (if available)."""
if not CHROMA_DB.exists():
return []
try:
# Use existing helper if present
helper = HOME / ".surrogate" / "bin" / "code-search.sh"
if helper.exists():
proc = subprocess.run(
[str(helper), "--top", str(MAX_CHUNKS_PASS2), task],
capture_output=True, text=True, timeout=30,
)
if proc.returncode == 0 and proc.stdout:
out = []
for line in proc.stdout.splitlines()[:MAX_CHUNKS_PASS2]:
m = re.match(r"(\S+):(\d+)\s+(.*)", line)
if m:
out.append({
"path": m.group(1),
"line": int(m.group(2)),
"preview": m.group(3)[:200],
})
return out
except (subprocess.TimeoutExpired, OSError):
pass
return []
def _git_uncommitted(roots: list[Path]) -> list[dict]:
"""Detect repos with uncommitted work (partial iterations)."""
out = []
# Find up to 3 levels of git repos
for root in roots:
if not root.exists():
continue
for depth_glob in ["*/.git", "*/*/.git", "*/*/*/.git"]:
for git_dir in root.glob(depth_glob):
repo = git_dir.parent
try:
status = subprocess.run(
["git", "-C", str(repo), "status", "--short"],
capture_output=True, text=True, timeout=5,
)
if status.returncode == 0 and status.stdout.strip():
out.append({
"repo": str(repo),
"changes": status.stdout.strip().splitlines()[:20],
})
except (subprocess.TimeoutExpired, OSError):
continue
return out
def scan(task: str, task_artifacts: list[str] | None = None) -> dict:
"""Full codebase review β†’ structured context dict.
Args:
task: natural-language task description
task_artifacts: paths mentioned in task (will be loaded in full)
Returns:
{
"keywords": [...],
"recent_files": [{path, mtime, score, size}, ...],
"semantic_hits": [{path, line, preview}, ...],
"uncommitted_repos": [{repo, changes: [...]}, ...],
"explicit_artifacts": {path: content, ...}, # loaded in full
}
"""
keywords = _keywords(task)
report = {
"task_excerpt": task[:200],
"keywords": keywords,
"recent_files": _recent_files(keywords, WATCHED_ROOTS),
"semantic_hits": _chromadb_search(keywords, task),
"uncommitted_repos": _git_uncommitted(WATCHED_ROOTS),
"explicit_artifacts": {},
}
for a in task_artifacts or []:
p = Path(a)
if p.exists() and p.is_file() and p.stat().st_size < MAX_FILE_SIZE:
try:
report["explicit_artifacts"][str(p)] = p.read_text(errors="replace")[:10000]
except OSError:
pass
return report
def as_context_prompt(scan_result: dict, max_chars: int = 8000) -> str:
"""Render scan as context for LLM system prompt."""
lines = [
"## Codebase context (auto-generated)",
f"Task keywords: {', '.join(scan_result['keywords'])}",
"",
]
if scan_result["uncommitted_repos"]:
lines.append("### Uncommitted work (may indicate previous partial iteration):")
for r in scan_result["uncommitted_repos"][:5]:
lines.append(f" {r['repo']}")
for c in r["changes"][:8]:
lines.append(f" {c}")
lines.append("")
if scan_result["recent_files"]:
lines.append(f"### Recently modified relevant files ({len(scan_result['recent_files'])}):")
for f in scan_result["recent_files"][:15]:
lines.append(f" {f['path']} (score={f['score']}, mtime={f['mtime']})")
lines.append("")
if scan_result["semantic_hits"]:
lines.append("### Semantic search hits:")
for h in scan_result["semantic_hits"][:8]:
lines.append(f" {h['path']}:{h.get('line','?')} β€” {h['preview'][:120]}")
lines.append("")
if scan_result["explicit_artifacts"]:
lines.append("### Explicit task artifacts (FULL content):")
for path, content in scan_result["explicit_artifacts"].items():
lines.append(f"--- {path} ---")
lines.append(content[:3000])
lines.append("")
result = "\n".join(lines)
return result[:max_chars]
if __name__ == "__main__":
import sys
task = " ".join(sys.argv[1:]) or "refactor yolo daemon"
report = scan(task)
print(json.dumps(
{k: v if not isinstance(v, list) else v[:5] for k, v in report.items()},
indent=2, default=str, ensure_ascii=False
))
print("\n=== AS CONTEXT PROMPT ===\n")
print(as_context_prompt(report, 3000))