File size: 8,511 Bytes
e36381e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Codebase scanner β€” full review before each task iteration.

Purpose (per Ashira): full scan first, then grep context that previous iteration
left behind. "Review agent" relies on this to know what was done vs what remains.

3-pass strategy:
  Pass 1: List recently-modified files across watched roots (last 7 days)
  Pass 2: Semantic search via ChromaDB (if index exists) using task keywords
  Pass 3: Git status + diff for any repos found (to detect uncommitted work)

Input: task description (string)
Output: structured summary dict the dispatcher can feed to models as context
"""

from __future__ import annotations

import datetime as dt
import json
import os
import re
import subprocess
from pathlib import Path

HOME = Path.home()
WATCHED_ROOTS = [
    HOME / "develope",
    HOME / "axentx",
    HOME / ".surrogate" / "bin",
]
RECENT_DAYS = 7
MAX_FILE_SIZE = 100_000   # skip large binaries
MAX_FILES_PASS1 = 50
MAX_CHUNKS_PASS2 = 10
CHROMA_DB = HOME / ".surrogate" / "code-vector-db"


def _keywords(task: str) -> list[str]:
    tokens = re.findall(r"[A-Za-z_][A-Za-z0-9_]*", task.lower())
    stop = {"a", "an", "the", "is", "are", "was", "were", "be", "to", "and",
            "or", "but", "if", "then", "else", "for", "with", "of", "in", "on",
            "at", "this", "that", "from", "by", "as", "i", "you", "it", "we",
            "they", "write", "create", "make", "build", "add", "update", "task"}
    return [t for t in tokens if len(t) >= 3 and t not in stop][:10]


def _recent_files(keywords: list[str], roots: list[Path]) -> list[dict]:
    """Find recently modified source files matching keywords."""
    cutoff = dt.datetime.now() - dt.timedelta(days=RECENT_DAYS)
    out = []
    for root in roots:
        if not root.exists():
            continue
        for dirpath, dirnames, filenames in os.walk(root):
            # skip hidden, node_modules, .git, venv
            dirnames[:] = [d for d in dirnames if not d.startswith(".")
                           and d not in {"node_modules", "vendor", "venv", ".venv",
                                         "__pycache__", "dist", "build", "target"}]
            for f in filenames:
                p = Path(dirpath) / f
                try:
                    st = p.stat()
                except OSError:
                    continue
                if st.st_size > MAX_FILE_SIZE:
                    continue
                mtime = dt.datetime.fromtimestamp(st.st_mtime)
                if mtime < cutoff:
                    continue
                # score by keyword hits in name/path
                path_lower = str(p).lower()
                score = sum(1 for kw in keywords if kw in path_lower)
                # light content match (first 4KB only for perf)
                try:
                    with open(p, "r", errors="replace") as fh:
                        head = fh.read(4096).lower()
                    score += sum(1 for kw in keywords if kw in head) * 2
                except OSError:
                    continue
                if score > 0:
                    out.append({
                        "path": str(p),
                        "mtime": mtime.isoformat(),
                        "score": score,
                        "size": st.st_size,
                    })
    out.sort(key=lambda x: -x["score"])
    return out[:MAX_FILES_PASS1]


def _chromadb_search(keywords: list[str], task: str) -> list[dict]:
    """Query ChromaDB semantic index (if available)."""
    if not CHROMA_DB.exists():
        return []
    try:
        # Use existing helper if present
        helper = HOME / ".surrogate" / "bin" / "code-search.sh"
        if helper.exists():
            proc = subprocess.run(
                [str(helper), "--top", str(MAX_CHUNKS_PASS2), task],
                capture_output=True, text=True, timeout=30,
            )
            if proc.returncode == 0 and proc.stdout:
                out = []
                for line in proc.stdout.splitlines()[:MAX_CHUNKS_PASS2]:
                    m = re.match(r"(\S+):(\d+)\s+(.*)", line)
                    if m:
                        out.append({
                            "path": m.group(1),
                            "line": int(m.group(2)),
                            "preview": m.group(3)[:200],
                        })
                return out
    except (subprocess.TimeoutExpired, OSError):
        pass
    return []


def _git_uncommitted(roots: list[Path]) -> list[dict]:
    """Detect repos with uncommitted work (partial iterations)."""
    out = []
    # Find up to 3 levels of git repos
    for root in roots:
        if not root.exists():
            continue
        for depth_glob in ["*/.git", "*/*/.git", "*/*/*/.git"]:
            for git_dir in root.glob(depth_glob):
                repo = git_dir.parent
                try:
                    status = subprocess.run(
                        ["git", "-C", str(repo), "status", "--short"],
                        capture_output=True, text=True, timeout=5,
                    )
                    if status.returncode == 0 and status.stdout.strip():
                        out.append({
                            "repo": str(repo),
                            "changes": status.stdout.strip().splitlines()[:20],
                        })
                except (subprocess.TimeoutExpired, OSError):
                    continue
    return out


def scan(task: str, task_artifacts: list[str] | None = None) -> dict:
    """Full codebase review β†’ structured context dict.

    Args:
      task: natural-language task description
      task_artifacts: paths mentioned in task (will be loaded in full)

    Returns:
      {
        "keywords": [...],
        "recent_files": [{path, mtime, score, size}, ...],
        "semantic_hits": [{path, line, preview}, ...],
        "uncommitted_repos": [{repo, changes: [...]}, ...],
        "explicit_artifacts": {path: content, ...},  # loaded in full
      }
    """
    keywords = _keywords(task)
    report = {
        "task_excerpt": task[:200],
        "keywords": keywords,
        "recent_files": _recent_files(keywords, WATCHED_ROOTS),
        "semantic_hits": _chromadb_search(keywords, task),
        "uncommitted_repos": _git_uncommitted(WATCHED_ROOTS),
        "explicit_artifacts": {},
    }
    for a in task_artifacts or []:
        p = Path(a)
        if p.exists() and p.is_file() and p.stat().st_size < MAX_FILE_SIZE:
            try:
                report["explicit_artifacts"][str(p)] = p.read_text(errors="replace")[:10000]
            except OSError:
                pass
    return report


def as_context_prompt(scan_result: dict, max_chars: int = 8000) -> str:
    """Render scan as context for LLM system prompt."""
    lines = [
        "## Codebase context (auto-generated)",
        f"Task keywords: {', '.join(scan_result['keywords'])}",
        "",
    ]
    if scan_result["uncommitted_repos"]:
        lines.append("### Uncommitted work (may indicate previous partial iteration):")
        for r in scan_result["uncommitted_repos"][:5]:
            lines.append(f"  {r['repo']}")
            for c in r["changes"][:8]:
                lines.append(f"    {c}")
        lines.append("")

    if scan_result["recent_files"]:
        lines.append(f"### Recently modified relevant files ({len(scan_result['recent_files'])}):")
        for f in scan_result["recent_files"][:15]:
            lines.append(f"  {f['path']} (score={f['score']}, mtime={f['mtime']})")
        lines.append("")

    if scan_result["semantic_hits"]:
        lines.append("### Semantic search hits:")
        for h in scan_result["semantic_hits"][:8]:
            lines.append(f"  {h['path']}:{h.get('line','?')} β€” {h['preview'][:120]}")
        lines.append("")

    if scan_result["explicit_artifacts"]:
        lines.append("### Explicit task artifacts (FULL content):")
        for path, content in scan_result["explicit_artifacts"].items():
            lines.append(f"--- {path} ---")
            lines.append(content[:3000])
            lines.append("")

    result = "\n".join(lines)
    return result[:max_chars]


if __name__ == "__main__":
    import sys
    task = " ".join(sys.argv[1:]) or "refactor yolo daemon"
    report = scan(task)
    print(json.dumps(
        {k: v if not isinstance(v, list) else v[:5] for k, v in report.items()},
        indent=2, default=str, ensure_ascii=False
    ))
    print("\n=== AS CONTEXT PROMPT ===\n")
    print(as_context_prompt(report, 3000))