File size: 8,370 Bytes
1fce89d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
Data ingestion and semantic chunking module for SEC 10-K Filings.

UPGRADES vs previous version:
- Deterministic chunk IDs using SHA-256 hash of (content + source_file).
  Previously used uuid4() (random), which broke index integrity on re-runs
  because the same chunk got a different ID each time.
- Full type annotations on all public and private functions.
- Raises a structured ChunkingError instead of silently returning [] on failure,
  so the caller knows which file failed and why.
- max_workers capped at min(32, cpu*4) β€” preserved from original (correct).
- chunk_index added to metadata for positional context within a document.
"""

import os
import glob
import hashlib
import concurrent.futures
from dataclasses import dataclass

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

from src.config import logger, DATA_DIR, CHUNK_SIZE, CHUNK_OVERLAP


# ── Custom Exception ──────────────────────────────────────────────────────────
class ChunkingError(RuntimeError):
    """Raised when a PDF cannot be parsed or chunked."""


# ── Deterministic ID Generation ───────────────────────────────────────────────
def _make_chunk_id(page_content: str, source_file: str, chunk_index: int) -> str:
    """
    Generate a deterministic, collision-resistant chunk ID.

    Uses SHA-256 over (content + source + index) so the same chunk always
    gets the same ID across pipeline re-runs. This is critical for:
      - BM25 dedup logic in the RRF retriever
      - Cache invalidation correctness
      - Reproducible experiment tracking

    Args:
        page_content:  The raw text content of the chunk.
        source_file:   The PDF filename the chunk came from.
        chunk_index:   The sequential position of this chunk within its document.

    Returns:
        A 16-character hex string prefixed with the source file stem.
        Example: "google_10k_a3f9c21b7e4d0012"
    """
    raw = f"{source_file}::{chunk_index}::{page_content}"
    digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]
    stem = os.path.splitext(source_file)[0]   # e.g. "google_10k"
    return f"{stem}_{digest}"


# ── Single-File Processor ─────────────────────────────────────────────────────
def _process_single_pdf(
    file_path: str,
    text_splitter: RecursiveCharacterTextSplitter,
) -> list[Document]:
    """
    Parse, chunk, and annotate a single PDF file.

    Isolated as a top-level function (not a lambda or inner function) so it can
    be safely pickled and mapped across threads by ThreadPoolExecutor.

    Args:
        file_path:     Absolute path to the PDF.
        text_splitter: Pre-configured RecursiveCharacterTextSplitter instance.
                       Shared across threads β€” LangChain splitters are stateless
                       and therefore thread-safe.

    Returns:
        List of annotated Document chunks. Returns [] only on hard failure
        (logged as ERROR). Raises ChunkingError propagated to the caller.

    Raises:
        ChunkingError: If the PDF cannot be loaded or produces zero pages.
    """
    file_name: str    = os.path.basename(file_path)
    company_name: str = file_name.split("_")[0].capitalize()
    logger.info("  -> Parsing in parallel: %s", file_name)

    try:
        loader = PyPDFLoader(file_path)
        raw_pages: list[Document] = loader.load()

        if not raw_pages:
            raise ChunkingError(f"PDF produced zero pages: {file_name}")

        # Annotate pages with document-level metadata before splitting.
        # Metadata set here propagates to every chunk derived from these pages.
        for page in raw_pages:
            page.metadata["company"]      = company_name
            page.metadata["source_file"]  = file_name

        chunks: list[Document] = text_splitter.split_documents(raw_pages)

        if not chunks:
            raise ChunkingError(f"Text splitter produced zero chunks for: {file_name}")

        # Assign deterministic IDs β€” critical for RRF dedup and index stability.
        for idx, chunk in enumerate(chunks):
            chunk.metadata["chunk_id"]    = _make_chunk_id(
                chunk.page_content, file_name, idx
            )
            chunk.metadata["chunk_index"] = idx   # positional context

        logger.info(
            "  -> Completed: %s β†’ %d chunks", file_name, len(chunks)
        )
        return chunks

    except ChunkingError:
        raise   # re-raise structured errors as-is
    except Exception as exc:
        # Catch unexpected errors (corrupted PDF, permission denied, etc.)
        # Log and return empty list so one bad file doesn't kill the whole batch.
        logger.error("Failed to process %s: %s", file_name, exc)
        return []


# ── Public Pipeline Entry Point ───────────────────────────────────────────────
def load_and_chunk_pdfs() -> list[Document]:
    """
    Discover, parse, chunk, and annotate all PDF filings in DATA_DIR.

    Uses ThreadPoolExecutor for parallel I/O β€” PDF loading is I/O-bound,
    not CPU-bound, so threads (not processes) are the correct primitive.

    Returns:
        Flat list of annotated Document chunks from all discovered PDFs.

    Raises:
        FileNotFoundError: If no PDFs are found in DATA_DIR.
        RuntimeError:      If ALL files fail to process (partial failures are
                           tolerated and logged; total failure is not).
    """
    logger.info("[1/4] Starting Data Ingestion from PDFs (Parallel Mode)")
    logger.info("      Data directory: %s", DATA_DIR)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n", "\n", ".", " ", ""],
    )

    pdf_files: list[str] = glob.glob(os.path.join(DATA_DIR, "*.pdf"))

    if not pdf_files:
        raise FileNotFoundError(
            f"No PDFs found in '{DATA_DIR}'. "
            "Ensure your 10-K filings are in the data/raw_pdfs/ directory."
        )

    logger.info(
        "Found %d SEC filings. Initiating parallel extraction...", len(pdf_files)
    )

    all_chunks: list[Document] = []
    failed_files: list[str]    = []

    # ThreadPoolExecutor β€” threads are correct for I/O-bound PDF loading.
    # cpu_count() * 4 is standard for I/O-bound tasks; cap at 32 to avoid
    # overwhelming the Colab / Drive connection pool.
    max_workers: int = min(32, (os.cpu_count() or 1) * 4)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_pdf: dict = {
            executor.submit(_process_single_pdf, pdf, text_splitter): pdf
            for pdf in pdf_files
        }

        for future in concurrent.futures.as_completed(future_to_pdf):
            source_pdf = future_to_pdf[future]
            try:
                chunks = future.result()
                if chunks:
                    all_chunks.extend(chunks)
                else:
                    failed_files.append(os.path.basename(source_pdf))
            except Exception as exc:
                failed_files.append(os.path.basename(source_pdf))
                logger.error(
                    "Unhandled exception for %s: %s", source_pdf, exc
                )

    # Fail loudly if every single file failed β€” silent empty returns are dangerous.
    if not all_chunks:
        raise RuntimeError(
            f"All {len(pdf_files)} PDFs failed to process. "
            f"Failed files: {failed_files}. Check logs for details."
        )

    if failed_files:
        logger.warning(
            "[1/4] Partial success. %d/%d files failed: %s",
            len(failed_files), len(pdf_files), failed_files,
        )

    logger.info(
        "[1/4] Complete. Generated %d semantic chunks from %d files.",
        len(all_chunks), len(pdf_files) - len(failed_files),
    )
    return all_chunks