kernl-backend / backend /chunking /chunkers.py
ALPHA0008's picture
feat: dashboard UI overhaul + auth flow + auto-company-load
5f7dc7e
import json
import csv
import io
import re
DEFAULT_CHUNK_SIZE = 2000
DEFAULT_OVERLAP = 200
def _estimate_tokens(text: str) -> int:
return len(text) // 4
def _recursive_split(
text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP
) -> list[str]:
separators = ["\n\n", "\n", ". ", " "]
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size * 4, len(text))
if end < len(text):
best_sep = -1
for sep in separators:
pos = text.rfind(sep, start, end)
if pos > best_sep:
best_sep = pos
if best_sep > start:
end = best_sep + len(sep) if best_sep >= 0 else end
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap * 4 if end < len(text) else len(text)
return chunks if chunks else [text.strip()]
def chunk_markdown(
content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list[dict]:
lines = content.split("\n")
sections = []
current_header = "Introduction"
current_body = []
current_level = 0
for line in lines:
header_match = re.match(r"^(#{1,6})\s+(.+)$", line)
if header_match:
if current_body:
sections.append((current_header, "\n".join(current_body).strip()))
current_level = len(header_match.group(1))
current_header = header_match.group(2).strip()
current_body = []
else:
current_body.append(line)
if current_body:
sections.append((current_header, "\n".join(current_body).strip()))
chunks = []
for i, (header, body) in enumerate(sections):
if not body:
continue
text = f"[{header}] {body}"
if _estimate_tokens(text) > chunk_size:
sub_chunks = _recursive_split(body, chunk_size)
for j, sub in enumerate(sub_chunks):
chunks.append(
{
"text": f"[{header}] {sub}",
"source_file": filename,
"chunk_index": i * 1000 + j,
"doc_type": "markdown",
"section_header": header,
}
)
else:
chunks.append(
{
"text": text,
"source_file": filename,
"chunk_index": i,
"doc_type": "markdown",
"section_header": header,
}
)
return chunks
def chunk_json_array(
content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list[dict]:
try:
data = json.loads(content)
except json.JSONDecodeError:
return [
{
"text": content,
"source_file": filename,
"chunk_index": 0,
"doc_type": "json_array",
}
]
if not isinstance(data, list):
text = json.dumps(data, indent=2)
return [
{
"text": text,
"source_file": filename,
"chunk_index": 0,
"doc_type": "json_object",
}
]
chunks = []
for i, item in enumerate(data):
if isinstance(item, dict):
parts = []
for key in (
"text",
"message",
"content",
"subject",
"description",
"resolution",
"body",
):
if item.get(key):
parts.append(f"{key}: {item[key]}")
for key in (
"user",
"author",
"channel",
"priority",
"customer_plan",
"status",
):
if item.get(key):
parts.append(f"{key}: {item[key]}")
text = " | ".join(parts)
if not text:
text = json.dumps(item)
elif isinstance(item, str):
text = item
else:
text = json.dumps(item)
if text:
chunks.append(
{
"text": text,
"source_file": filename,
"chunk_index": i,
"doc_type": "json_array",
}
)
return chunks
def chunk_csv(
content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list[dict]:
reader = csv.DictReader(io.StringIO(content))
if reader.fieldnames is None:
return [
{
"text": content,
"source_file": filename,
"chunk_index": 0,
"doc_type": "csv",
}
]
headers = reader.fieldnames
rows = list(reader)
if not rows:
return []
chunks = []
batch = []
batch_text = ""
for i, row in enumerate(rows):
row_parts = [f"{k}: {v}" for k, v in row.items() if v]
row_str = " | ".join(row_parts)
if _estimate_tokens(batch_text + "\n" + row_str) > chunk_size and batch:
chunks.append(
{
"text": batch_text,
"source_file": filename,
"chunk_index": len(chunks),
"doc_type": "csv",
}
)
batch = [row]
batch_text = row_str
else:
if batch_text:
batch_text += "\n"
batch_text += row_str
batch.append(row)
if batch:
chunks.append(
{
"text": batch_text,
"source_file": filename,
"chunk_index": len(chunks),
"doc_type": "csv",
}
)
return chunks
def chunk_html(
content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list[dict]:
text = re.sub(r"<style[^>]*>.*?</style>", "", content, flags=re.DOTALL)
text = re.sub(r"<script[^>]*>.*?</script>", "", text, flags=re.DOTALL)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
sections = re.split(r"\n\s*(?=(?:##|###|####|h[1-6]))", text)
chunks = []
for i, section in enumerate(sections):
section = section.strip()
if not section:
continue
if _estimate_tokens(section) > chunk_size:
subs = _recursive_split(section, chunk_size)
for j, sub in enumerate(subs):
chunks.append(
{
"text": sub,
"source_file": filename,
"chunk_index": i * 1000 + j,
"doc_type": "html",
}
)
else:
chunks.append(
{
"text": section,
"source_file": filename,
"chunk_index": i,
"doc_type": "html",
}
)
return (
chunks
if chunks
else [
{
"text": text[: chunk_size * 4],
"source_file": filename,
"chunk_index": 0,
"doc_type": "html",
}
]
)
def chunk_plain_text(
content: str,
filename: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_OVERLAP,
) -> list[dict]:
parts = _recursive_split(content, chunk_size, overlap)
return [
{
"text": part,
"source_file": filename,
"chunk_index": i,
"doc_type": "plain_text",
}
for i, part in enumerate(parts)
]
CHUNKERS = {
"markdown": chunk_markdown,
"json_array": chunk_json_array,
"json_object": chunk_json_array,
"csv": chunk_csv,
"html": chunk_html,
"plain_text": chunk_plain_text,
}
def get_chunker(doc_type: str):
return CHUNKERS.get(doc_type, chunk_plain_text)