repomind / ingestion /chunker.py
ZeroR3's picture
REPOMIND v0.1 — repo-scale coding agent demo
e3a472a
"""Smart, structure-aware chunking with priority scoring.
Per file:
1. Detect language (filesystem extension).
2. Extract top-level symbols via tree-sitter (or regex fallback).
3. Slice file into chunks aligned to symbol boundaries when possible;
otherwise split on paragraph / blank lines / hard cut.
4. Tag each chunk with a priority used by the token budgeter:
0 = README / top-level docs
1 = top-level symbols (functions, classes)
2 = nested / private symbols
3 = test / vendored / generated code
4 = unknown / binary-ish
The agent only sees chunks that fit its context budget — priorities decide
who gets in first when a 50K-LOC kernel doesn't fit at all.
"""
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Iterable, List, Optional, Sequence
from .parser import Symbol, detect_language, extract_symbols
from .token_budget import count_tokens
SKIP_DIRS = {
".git", "node_modules", ".venv", "venv", "env", "__pycache__",
"dist", "build", "target", ".next", ".nuxt", ".cache",
"vendor", "third_party", "external", ".gradle", ".idea", ".vscode",
}
SKIP_BIN_EXT = {
".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".ico", ".tiff",
".pdf", ".zip", ".tar", ".gz", ".bz2", ".7z", ".xz", ".whl", ".egg",
".so", ".dylib", ".dll", ".exe", ".o", ".a", ".class", ".jar",
".bin", ".pkl", ".parquet", ".safetensors", ".pt", ".onnx",
".woff", ".woff2", ".ttf", ".otf", ".mp3", ".mp4", ".mov", ".wav",
}
README_NAMES = {"README.md", "README.rst", "README.txt", "README"}
TEST_PATTERNS = (re.compile(r"(?:^|/)tests?/"), re.compile(r"(?:^|/)test_"), re.compile(r"_test\."))
@dataclass
class Chunk:
chunk_id: str
repo: str
path: str
section: str # symbol name or "header"
start_line: int
end_line: int
text: str
n_tokens: int
priority: int
def _is_test_path(rel: str) -> bool:
return any(p.search(rel) for p in TEST_PATTERNS)
def _file_priority(rel: str, name: str) -> int:
if name in README_NAMES or rel.endswith(("README.md", "README.rst")):
return 0
if _is_test_path(rel):
return 3
if any(seg in rel.split("/") for seg in ("docs", "doc")):
return 0
return 1
def _chunk_text_by_symbols(
text: str, symbols: List[Symbol], max_tokens: int, overlap_lines: int = 4,
) -> List[tuple[str, str, int, int]]:
"""Return [(section, text, start_line, end_line)]. Symbols are sorted by start_line."""
lines = text.split("\n")
n = len(lines)
if not symbols:
return _chunk_lines("body", lines, 1, n, max_tokens)
symbols = sorted(symbols, key=lambda s: s.start_line)
out: List[tuple[str, str, int, int]] = []
# Header / preamble before first symbol
if symbols[0].start_line > 1:
out.extend(_chunk_lines("header", lines, 1, symbols[0].start_line - 1, max_tokens))
for i, sym in enumerate(symbols):
end = symbols[i + 1].start_line - 1 if i + 1 < len(symbols) else n
if end < sym.start_line:
continue
out.extend(_chunk_lines(sym.name or sym.kind, lines, sym.start_line, end, max_tokens))
return out
def _chunk_lines(section: str, lines: list[str], lo: int, hi: int, max_tokens: int):
"""Split a slice of [lo..hi] (1-indexed inclusive) into <= max_tokens pieces."""
pieces: List[tuple[str, str, int, int]] = []
cur: List[str] = []
cur_tokens = 0
cur_start = lo
for idx in range(lo, hi + 1):
line = lines[idx - 1] if 0 < idx <= len(lines) else ""
line_tokens = count_tokens(line) + 1
if cur and cur_tokens + line_tokens > max_tokens:
pieces.append((section, "\n".join(cur), cur_start, idx - 1))
cur = [line]
cur_tokens = line_tokens
cur_start = idx
else:
cur.append(line)
cur_tokens += line_tokens
if cur:
pieces.append((section, "\n".join(cur), cur_start, hi))
return pieces
def chunk_file(
repo: str,
path: Path,
rel_path: str,
max_tokens_per_chunk: int = 1024,
) -> List[Chunk]:
name = path.name
if path.suffix.lower() in SKIP_BIN_EXT:
return []
try:
text = path.read_text(encoding="utf-8")
except (UnicodeDecodeError, OSError):
return []
if not text.strip():
return []
lang = detect_language(path)
symbols = extract_symbols(text, lang)
base_priority = _file_priority(rel_path, name)
pieces = _chunk_text_by_symbols(text, symbols, max_tokens_per_chunk)
chunks: List[Chunk] = []
for i, (section, ctext, start, end) in enumerate(pieces):
# Nested / very small private fragments get bumped down a tier.
prio = base_priority
if base_priority == 1 and section.startswith("_"):
prio = 2
chunks.append(Chunk(
chunk_id=f"{rel_path}#{i}",
repo=repo,
path=rel_path,
section=section,
start_line=start,
end_line=end,
text=ctext,
n_tokens=count_tokens(ctext),
priority=prio,
))
return chunks
def walk_repo(
root: str | Path,
repo_label: str,
max_tokens_per_chunk: int = 1024,
follow_symlinks: bool = False,
) -> Iterable[Chunk]:
root = Path(root).resolve()
for dirpath, dirnames, filenames in os.walk(root, followlinks=follow_symlinks):
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
for fn in filenames:
full = Path(dirpath) / fn
try:
rel = str(full.relative_to(root))
except ValueError:
continue
yield from chunk_file(repo_label, full, rel, max_tokens_per_chunk)
def ingest_to_json(
root: str | Path,
out_path: str | Path,
repo_label: Optional[str] = None,
max_tokens_per_chunk: int = 1024,
) -> dict:
root = Path(root).resolve()
label = repo_label or root.name
chunks = list(walk_repo(root, label, max_tokens_per_chunk))
summary = {
"repo": label,
"root": str(root),
"n_files": len({c.path for c in chunks}),
"n_chunks": len(chunks),
"total_tokens": sum(c.n_tokens for c in chunks),
"by_priority": {
str(p): sum(1 for c in chunks if c.priority == p)
for p in sorted({c.priority for c in chunks})
},
"chunks": [asdict(c) for c in chunks],
}
out = Path(out_path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(summary, ensure_ascii=False))
return {k: v for k, v in summary.items() if k != "chunks"}