File size: 8,370 Bytes
1fce89d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
Data ingestion and semantic chunking module for SEC 10-K Filings.
UPGRADES vs previous version:
- Deterministic chunk IDs using SHA-256 hash of (content + source_file).
Previously used uuid4() (random), which broke index integrity on re-runs
because the same chunk got a different ID each time.
- Full type annotations on all public and private functions.
- Raises a structured ChunkingError instead of silently returning [] on failure,
so the caller knows which file failed and why.
- max_workers capped at min(32, cpu*4) β preserved from original (correct).
- chunk_index added to metadata for positional context within a document.
"""
import os
import glob
import hashlib
import concurrent.futures
from dataclasses import dataclass
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from src.config import logger, DATA_DIR, CHUNK_SIZE, CHUNK_OVERLAP
# ββ Custom Exception ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class ChunkingError(RuntimeError):
"""Raised when a PDF cannot be parsed or chunked."""
# ββ Deterministic ID Generation βββββββββββββββββββββββββββββββββββββββββββββββ
def _make_chunk_id(page_content: str, source_file: str, chunk_index: int) -> str:
"""
Generate a deterministic, collision-resistant chunk ID.
Uses SHA-256 over (content + source + index) so the same chunk always
gets the same ID across pipeline re-runs. This is critical for:
- BM25 dedup logic in the RRF retriever
- Cache invalidation correctness
- Reproducible experiment tracking
Args:
page_content: The raw text content of the chunk.
source_file: The PDF filename the chunk came from.
chunk_index: The sequential position of this chunk within its document.
Returns:
A 16-character hex string prefixed with the source file stem.
Example: "google_10k_a3f9c21b7e4d0012"
"""
raw = f"{source_file}::{chunk_index}::{page_content}"
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]
stem = os.path.splitext(source_file)[0] # e.g. "google_10k"
return f"{stem}_{digest}"
# ββ Single-File Processor βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _process_single_pdf(
file_path: str,
text_splitter: RecursiveCharacterTextSplitter,
) -> list[Document]:
"""
Parse, chunk, and annotate a single PDF file.
Isolated as a top-level function (not a lambda or inner function) so it can
be safely pickled and mapped across threads by ThreadPoolExecutor.
Args:
file_path: Absolute path to the PDF.
text_splitter: Pre-configured RecursiveCharacterTextSplitter instance.
Shared across threads β LangChain splitters are stateless
and therefore thread-safe.
Returns:
List of annotated Document chunks. Returns [] only on hard failure
(logged as ERROR). Raises ChunkingError propagated to the caller.
Raises:
ChunkingError: If the PDF cannot be loaded or produces zero pages.
"""
file_name: str = os.path.basename(file_path)
company_name: str = file_name.split("_")[0].capitalize()
logger.info(" -> Parsing in parallel: %s", file_name)
try:
loader = PyPDFLoader(file_path)
raw_pages: list[Document] = loader.load()
if not raw_pages:
raise ChunkingError(f"PDF produced zero pages: {file_name}")
# Annotate pages with document-level metadata before splitting.
# Metadata set here propagates to every chunk derived from these pages.
for page in raw_pages:
page.metadata["company"] = company_name
page.metadata["source_file"] = file_name
chunks: list[Document] = text_splitter.split_documents(raw_pages)
if not chunks:
raise ChunkingError(f"Text splitter produced zero chunks for: {file_name}")
# Assign deterministic IDs β critical for RRF dedup and index stability.
for idx, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = _make_chunk_id(
chunk.page_content, file_name, idx
)
chunk.metadata["chunk_index"] = idx # positional context
logger.info(
" -> Completed: %s β %d chunks", file_name, len(chunks)
)
return chunks
except ChunkingError:
raise # re-raise structured errors as-is
except Exception as exc:
# Catch unexpected errors (corrupted PDF, permission denied, etc.)
# Log and return empty list so one bad file doesn't kill the whole batch.
logger.error("Failed to process %s: %s", file_name, exc)
return []
# ββ Public Pipeline Entry Point βββββββββββββββββββββββββββββββββββββββββββββββ
def load_and_chunk_pdfs() -> list[Document]:
"""
Discover, parse, chunk, and annotate all PDF filings in DATA_DIR.
Uses ThreadPoolExecutor for parallel I/O β PDF loading is I/O-bound,
not CPU-bound, so threads (not processes) are the correct primitive.
Returns:
Flat list of annotated Document chunks from all discovered PDFs.
Raises:
FileNotFoundError: If no PDFs are found in DATA_DIR.
RuntimeError: If ALL files fail to process (partial failures are
tolerated and logged; total failure is not).
"""
logger.info("[1/4] Starting Data Ingestion from PDFs (Parallel Mode)")
logger.info(" Data directory: %s", DATA_DIR)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", ".", " ", ""],
)
pdf_files: list[str] = glob.glob(os.path.join(DATA_DIR, "*.pdf"))
if not pdf_files:
raise FileNotFoundError(
f"No PDFs found in '{DATA_DIR}'. "
"Ensure your 10-K filings are in the data/raw_pdfs/ directory."
)
logger.info(
"Found %d SEC filings. Initiating parallel extraction...", len(pdf_files)
)
all_chunks: list[Document] = []
failed_files: list[str] = []
# ThreadPoolExecutor β threads are correct for I/O-bound PDF loading.
# cpu_count() * 4 is standard for I/O-bound tasks; cap at 32 to avoid
# overwhelming the Colab / Drive connection pool.
max_workers: int = min(32, (os.cpu_count() or 1) * 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_pdf: dict = {
executor.submit(_process_single_pdf, pdf, text_splitter): pdf
for pdf in pdf_files
}
for future in concurrent.futures.as_completed(future_to_pdf):
source_pdf = future_to_pdf[future]
try:
chunks = future.result()
if chunks:
all_chunks.extend(chunks)
else:
failed_files.append(os.path.basename(source_pdf))
except Exception as exc:
failed_files.append(os.path.basename(source_pdf))
logger.error(
"Unhandled exception for %s: %s", source_pdf, exc
)
# Fail loudly if every single file failed β silent empty returns are dangerous.
if not all_chunks:
raise RuntimeError(
f"All {len(pdf_files)} PDFs failed to process. "
f"Failed files: {failed_files}. Check logs for details."
)
if failed_files:
logger.warning(
"[1/4] Partial success. %d/%d files failed: %s",
len(failed_files), len(pdf_files), failed_files,
)
logger.info(
"[1/4] Complete. Generated %d semantic chunks from %d files.",
len(all_chunks), len(pdf_files) - len(failed_files),
)
return all_chunks |