import json import csv import io import re DEFAULT_CHUNK_SIZE = 2000 DEFAULT_OVERLAP = 200 def _estimate_tokens(text: str) -> int: return len(text) // 4 def _recursive_split( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP ) -> list[str]: separators = ["\n\n", "\n", ". ", " "] chunks = [] start = 0 while start < len(text): end = min(start + chunk_size * 4, len(text)) if end < len(text): best_sep = -1 for sep in separators: pos = text.rfind(sep, start, end) if pos > best_sep: best_sep = pos if best_sep > start: end = best_sep + len(sep) if best_sep >= 0 else end chunk = text[start:end].strip() if chunk: chunks.append(chunk) start = end - overlap * 4 if end < len(text) else len(text) return chunks if chunks else [text.strip()] def chunk_markdown( content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> list[dict]: lines = content.split("\n") sections = [] current_header = "Introduction" current_body = [] current_level = 0 for line in lines: header_match = re.match(r"^(#{1,6})\s+(.+)$", line) if header_match: if current_body: sections.append((current_header, "\n".join(current_body).strip())) current_level = len(header_match.group(1)) current_header = header_match.group(2).strip() current_body = [] else: current_body.append(line) if current_body: sections.append((current_header, "\n".join(current_body).strip())) chunks = [] for i, (header, body) in enumerate(sections): if not body: continue text = f"[{header}] {body}" if _estimate_tokens(text) > chunk_size: sub_chunks = _recursive_split(body, chunk_size) for j, sub in enumerate(sub_chunks): chunks.append( { "text": f"[{header}] {sub}", "source_file": filename, "chunk_index": i * 1000 + j, "doc_type": "markdown", "section_header": header, } ) else: chunks.append( { "text": text, "source_file": filename, "chunk_index": i, "doc_type": "markdown", "section_header": header, } ) return chunks def chunk_json_array( content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> list[dict]: try: data = json.loads(content) except json.JSONDecodeError: return [ { "text": content, "source_file": filename, "chunk_index": 0, "doc_type": "json_array", } ] if not isinstance(data, list): text = json.dumps(data, indent=2) return [ { "text": text, "source_file": filename, "chunk_index": 0, "doc_type": "json_object", } ] chunks = [] for i, item in enumerate(data): if isinstance(item, dict): parts = [] for key in ( "text", "message", "content", "subject", "description", "resolution", "body", ): if item.get(key): parts.append(f"{key}: {item[key]}") for key in ( "user", "author", "channel", "priority", "customer_plan", "status", ): if item.get(key): parts.append(f"{key}: {item[key]}") text = " | ".join(parts) if not text: text = json.dumps(item) elif isinstance(item, str): text = item else: text = json.dumps(item) if text: chunks.append( { "text": text, "source_file": filename, "chunk_index": i, "doc_type": "json_array", } ) return chunks def chunk_csv( content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> list[dict]: reader = csv.DictReader(io.StringIO(content)) if reader.fieldnames is None: return [ { "text": content, "source_file": filename, "chunk_index": 0, "doc_type": "csv", } ] headers = reader.fieldnames rows = list(reader) if not rows: return [] chunks = [] batch = [] batch_text = "" for i, row in enumerate(rows): row_parts = [f"{k}: {v}" for k, v in row.items() if v] row_str = " | ".join(row_parts) if _estimate_tokens(batch_text + "\n" + row_str) > chunk_size and batch: chunks.append( { "text": batch_text, "source_file": filename, "chunk_index": len(chunks), "doc_type": "csv", } ) batch = [row] batch_text = row_str else: if batch_text: batch_text += "\n" batch_text += row_str batch.append(row) if batch: chunks.append( { "text": batch_text, "source_file": filename, "chunk_index": len(chunks), "doc_type": "csv", } ) return chunks def chunk_html( content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> list[dict]: text = re.sub(r"]*>.*?", "", content, flags=re.DOTALL) text = re.sub(r"]*>.*?", "", text, flags=re.DOTALL) text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text).strip() sections = re.split(r"\n\s*(?=(?:##|###|####|h[1-6]))", text) chunks = [] for i, section in enumerate(sections): section = section.strip() if not section: continue if _estimate_tokens(section) > chunk_size: subs = _recursive_split(section, chunk_size) for j, sub in enumerate(subs): chunks.append( { "text": sub, "source_file": filename, "chunk_index": i * 1000 + j, "doc_type": "html", } ) else: chunks.append( { "text": section, "source_file": filename, "chunk_index": i, "doc_type": "html", } ) return ( chunks if chunks else [ { "text": text[: chunk_size * 4], "source_file": filename, "chunk_index": 0, "doc_type": "html", } ] ) def chunk_plain_text( content: str, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP, ) -> list[dict]: parts = _recursive_split(content, chunk_size, overlap) return [ { "text": part, "source_file": filename, "chunk_index": i, "doc_type": "plain_text", } for i, part in enumerate(parts) ] CHUNKERS = { "markdown": chunk_markdown, "json_array": chunk_json_array, "json_object": chunk_json_array, "csv": chunk_csv, "html": chunk_html, "plain_text": chunk_plain_text, } def get_chunker(doc_type: str): return CHUNKERS.get(doc_type, chunk_plain_text)