| """ |
| Data ingestion and semantic chunking module for SEC 10-K Filings. |
| |
| UPGRADES vs previous version: |
| - Deterministic chunk IDs using SHA-256 hash of (content + source_file). |
| Previously used uuid4() (random), which broke index integrity on re-runs |
| because the same chunk got a different ID each time. |
| - Full type annotations on all public and private functions. |
| - Raises a structured ChunkingError instead of silently returning [] on failure, |
| so the caller knows which file failed and why. |
| - max_workers capped at min(32, cpu*4) β preserved from original (correct). |
| - chunk_index added to metadata for positional context within a document. |
| """ |
|
|
| import os |
| import glob |
| import hashlib |
| import concurrent.futures |
| from dataclasses import dataclass |
|
|
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document |
|
|
| from src.config import logger, DATA_DIR, CHUNK_SIZE, CHUNK_OVERLAP |
|
|
|
|
| |
| class ChunkingError(RuntimeError): |
| """Raised when a PDF cannot be parsed or chunked.""" |
|
|
|
|
| |
| def _make_chunk_id(page_content: str, source_file: str, chunk_index: int) -> str: |
| """ |
| Generate a deterministic, collision-resistant chunk ID. |
| |
| Uses SHA-256 over (content + source + index) so the same chunk always |
| gets the same ID across pipeline re-runs. This is critical for: |
| - BM25 dedup logic in the RRF retriever |
| - Cache invalidation correctness |
| - Reproducible experiment tracking |
| |
| Args: |
| page_content: The raw text content of the chunk. |
| source_file: The PDF filename the chunk came from. |
| chunk_index: The sequential position of this chunk within its document. |
| |
| Returns: |
| A 16-character hex string prefixed with the source file stem. |
| Example: "google_10k_a3f9c21b7e4d0012" |
| """ |
| raw = f"{source_file}::{chunk_index}::{page_content}" |
| digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16] |
| stem = os.path.splitext(source_file)[0] |
| return f"{stem}_{digest}" |
|
|
|
|
| |
| def _process_single_pdf( |
| file_path: str, |
| text_splitter: RecursiveCharacterTextSplitter, |
| ) -> list[Document]: |
| """ |
| Parse, chunk, and annotate a single PDF file. |
| |
| Isolated as a top-level function (not a lambda or inner function) so it can |
| be safely pickled and mapped across threads by ThreadPoolExecutor. |
| |
| Args: |
| file_path: Absolute path to the PDF. |
| text_splitter: Pre-configured RecursiveCharacterTextSplitter instance. |
| Shared across threads β LangChain splitters are stateless |
| and therefore thread-safe. |
| |
| Returns: |
| List of annotated Document chunks. Returns [] only on hard failure |
| (logged as ERROR). Raises ChunkingError propagated to the caller. |
| |
| Raises: |
| ChunkingError: If the PDF cannot be loaded or produces zero pages. |
| """ |
| file_name: str = os.path.basename(file_path) |
| company_name: str = file_name.split("_")[0].capitalize() |
| logger.info(" -> Parsing in parallel: %s", file_name) |
|
|
| try: |
| loader = PyPDFLoader(file_path) |
| raw_pages: list[Document] = loader.load() |
|
|
| if not raw_pages: |
| raise ChunkingError(f"PDF produced zero pages: {file_name}") |
|
|
| |
| |
| for page in raw_pages: |
| page.metadata["company"] = company_name |
| page.metadata["source_file"] = file_name |
|
|
| chunks: list[Document] = text_splitter.split_documents(raw_pages) |
|
|
| if not chunks: |
| raise ChunkingError(f"Text splitter produced zero chunks for: {file_name}") |
|
|
| |
| for idx, chunk in enumerate(chunks): |
| chunk.metadata["chunk_id"] = _make_chunk_id( |
| chunk.page_content, file_name, idx |
| ) |
| chunk.metadata["chunk_index"] = idx |
|
|
| logger.info( |
| " -> Completed: %s β %d chunks", file_name, len(chunks) |
| ) |
| return chunks |
|
|
| except ChunkingError: |
| raise |
| except Exception as exc: |
| |
| |
| logger.error("Failed to process %s: %s", file_name, exc) |
| return [] |
|
|
|
|
| |
| def load_and_chunk_pdfs() -> list[Document]: |
| """ |
| Discover, parse, chunk, and annotate all PDF filings in DATA_DIR. |
| |
| Uses ThreadPoolExecutor for parallel I/O β PDF loading is I/O-bound, |
| not CPU-bound, so threads (not processes) are the correct primitive. |
| |
| Returns: |
| Flat list of annotated Document chunks from all discovered PDFs. |
| |
| Raises: |
| FileNotFoundError: If no PDFs are found in DATA_DIR. |
| RuntimeError: If ALL files fail to process (partial failures are |
| tolerated and logged; total failure is not). |
| """ |
| logger.info("[1/4] Starting Data Ingestion from PDFs (Parallel Mode)") |
| logger.info(" Data directory: %s", DATA_DIR) |
|
|
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=CHUNK_SIZE, |
| chunk_overlap=CHUNK_OVERLAP, |
| separators=["\n\n", "\n", ".", " ", ""], |
| ) |
|
|
| pdf_files: list[str] = glob.glob(os.path.join(DATA_DIR, "*.pdf")) |
|
|
| if not pdf_files: |
| raise FileNotFoundError( |
| f"No PDFs found in '{DATA_DIR}'. " |
| "Ensure your 10-K filings are in the data/raw_pdfs/ directory." |
| ) |
|
|
| logger.info( |
| "Found %d SEC filings. Initiating parallel extraction...", len(pdf_files) |
| ) |
|
|
| all_chunks: list[Document] = [] |
| failed_files: list[str] = [] |
|
|
| |
| |
| |
| max_workers: int = min(32, (os.cpu_count() or 1) * 4) |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| future_to_pdf: dict = { |
| executor.submit(_process_single_pdf, pdf, text_splitter): pdf |
| for pdf in pdf_files |
| } |
|
|
| for future in concurrent.futures.as_completed(future_to_pdf): |
| source_pdf = future_to_pdf[future] |
| try: |
| chunks = future.result() |
| if chunks: |
| all_chunks.extend(chunks) |
| else: |
| failed_files.append(os.path.basename(source_pdf)) |
| except Exception as exc: |
| failed_files.append(os.path.basename(source_pdf)) |
| logger.error( |
| "Unhandled exception for %s: %s", source_pdf, exc |
| ) |
|
|
| |
| if not all_chunks: |
| raise RuntimeError( |
| f"All {len(pdf_files)} PDFs failed to process. " |
| f"Failed files: {failed_files}. Check logs for details." |
| ) |
|
|
| if failed_files: |
| logger.warning( |
| "[1/4] Partial success. %d/%d files failed: %s", |
| len(failed_files), len(pdf_files), failed_files, |
| ) |
|
|
| logger.info( |
| "[1/4] Complete. Generated %d semantic chunks from %d files.", |
| len(all_chunks), len(pdf_files) - len(failed_files), |
| ) |
| return all_chunks |