financial-intelligence-engine / src /data_ingestion.py
GitHub Action
sync: deploy from GitHub
1fce89d
"""
Data ingestion and semantic chunking module for SEC 10-K Filings.
UPGRADES vs previous version:
- Deterministic chunk IDs using SHA-256 hash of (content + source_file).
Previously used uuid4() (random), which broke index integrity on re-runs
because the same chunk got a different ID each time.
- Full type annotations on all public and private functions.
- Raises a structured ChunkingError instead of silently returning [] on failure,
so the caller knows which file failed and why.
- max_workers capped at min(32, cpu*4) β€” preserved from original (correct).
- chunk_index added to metadata for positional context within a document.
"""
import os
import glob
import hashlib
import concurrent.futures
from dataclasses import dataclass
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from src.config import logger, DATA_DIR, CHUNK_SIZE, CHUNK_OVERLAP
# ── Custom Exception ──────────────────────────────────────────────────────────
class ChunkingError(RuntimeError):
"""Raised when a PDF cannot be parsed or chunked."""
# ── Deterministic ID Generation ───────────────────────────────────────────────
def _make_chunk_id(page_content: str, source_file: str, chunk_index: int) -> str:
"""
Generate a deterministic, collision-resistant chunk ID.
Uses SHA-256 over (content + source + index) so the same chunk always
gets the same ID across pipeline re-runs. This is critical for:
- BM25 dedup logic in the RRF retriever
- Cache invalidation correctness
- Reproducible experiment tracking
Args:
page_content: The raw text content of the chunk.
source_file: The PDF filename the chunk came from.
chunk_index: The sequential position of this chunk within its document.
Returns:
A 16-character hex string prefixed with the source file stem.
Example: "google_10k_a3f9c21b7e4d0012"
"""
raw = f"{source_file}::{chunk_index}::{page_content}"
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]
stem = os.path.splitext(source_file)[0] # e.g. "google_10k"
return f"{stem}_{digest}"
# ── Single-File Processor ─────────────────────────────────────────────────────
def _process_single_pdf(
file_path: str,
text_splitter: RecursiveCharacterTextSplitter,
) -> list[Document]:
"""
Parse, chunk, and annotate a single PDF file.
Isolated as a top-level function (not a lambda or inner function) so it can
be safely pickled and mapped across threads by ThreadPoolExecutor.
Args:
file_path: Absolute path to the PDF.
text_splitter: Pre-configured RecursiveCharacterTextSplitter instance.
Shared across threads β€” LangChain splitters are stateless
and therefore thread-safe.
Returns:
List of annotated Document chunks. Returns [] only on hard failure
(logged as ERROR). Raises ChunkingError propagated to the caller.
Raises:
ChunkingError: If the PDF cannot be loaded or produces zero pages.
"""
file_name: str = os.path.basename(file_path)
company_name: str = file_name.split("_")[0].capitalize()
logger.info(" -> Parsing in parallel: %s", file_name)
try:
loader = PyPDFLoader(file_path)
raw_pages: list[Document] = loader.load()
if not raw_pages:
raise ChunkingError(f"PDF produced zero pages: {file_name}")
# Annotate pages with document-level metadata before splitting.
# Metadata set here propagates to every chunk derived from these pages.
for page in raw_pages:
page.metadata["company"] = company_name
page.metadata["source_file"] = file_name
chunks: list[Document] = text_splitter.split_documents(raw_pages)
if not chunks:
raise ChunkingError(f"Text splitter produced zero chunks for: {file_name}")
# Assign deterministic IDs β€” critical for RRF dedup and index stability.
for idx, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = _make_chunk_id(
chunk.page_content, file_name, idx
)
chunk.metadata["chunk_index"] = idx # positional context
logger.info(
" -> Completed: %s β†’ %d chunks", file_name, len(chunks)
)
return chunks
except ChunkingError:
raise # re-raise structured errors as-is
except Exception as exc:
# Catch unexpected errors (corrupted PDF, permission denied, etc.)
# Log and return empty list so one bad file doesn't kill the whole batch.
logger.error("Failed to process %s: %s", file_name, exc)
return []
# ── Public Pipeline Entry Point ───────────────────────────────────────────────
def load_and_chunk_pdfs() -> list[Document]:
"""
Discover, parse, chunk, and annotate all PDF filings in DATA_DIR.
Uses ThreadPoolExecutor for parallel I/O β€” PDF loading is I/O-bound,
not CPU-bound, so threads (not processes) are the correct primitive.
Returns:
Flat list of annotated Document chunks from all discovered PDFs.
Raises:
FileNotFoundError: If no PDFs are found in DATA_DIR.
RuntimeError: If ALL files fail to process (partial failures are
tolerated and logged; total failure is not).
"""
logger.info("[1/4] Starting Data Ingestion from PDFs (Parallel Mode)")
logger.info(" Data directory: %s", DATA_DIR)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", ".", " ", ""],
)
pdf_files: list[str] = glob.glob(os.path.join(DATA_DIR, "*.pdf"))
if not pdf_files:
raise FileNotFoundError(
f"No PDFs found in '{DATA_DIR}'. "
"Ensure your 10-K filings are in the data/raw_pdfs/ directory."
)
logger.info(
"Found %d SEC filings. Initiating parallel extraction...", len(pdf_files)
)
all_chunks: list[Document] = []
failed_files: list[str] = []
# ThreadPoolExecutor β€” threads are correct for I/O-bound PDF loading.
# cpu_count() * 4 is standard for I/O-bound tasks; cap at 32 to avoid
# overwhelming the Colab / Drive connection pool.
max_workers: int = min(32, (os.cpu_count() or 1) * 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_pdf: dict = {
executor.submit(_process_single_pdf, pdf, text_splitter): pdf
for pdf in pdf_files
}
for future in concurrent.futures.as_completed(future_to_pdf):
source_pdf = future_to_pdf[future]
try:
chunks = future.result()
if chunks:
all_chunks.extend(chunks)
else:
failed_files.append(os.path.basename(source_pdf))
except Exception as exc:
failed_files.append(os.path.basename(source_pdf))
logger.error(
"Unhandled exception for %s: %s", source_pdf, exc
)
# Fail loudly if every single file failed β€” silent empty returns are dangerous.
if not all_chunks:
raise RuntimeError(
f"All {len(pdf_files)} PDFs failed to process. "
f"Failed files: {failed_files}. Check logs for details."
)
if failed_files:
logger.warning(
"[1/4] Partial success. %d/%d files failed: %s",
len(failed_files), len(pdf_files), failed_files,
)
logger.info(
"[1/4] Complete. Generated %d semantic chunks from %d files.",
len(all_chunks), len(pdf_files) - len(failed_files),
)
return all_chunks