| """ |
| Document Indexer for RAG |
| |
| Handles indexing processed documents into the vector store. |
| """ |
|
|
| from typing import List, Optional, Dict, Any, Union |
| from pathlib import Path |
| from pydantic import BaseModel, Field |
| from loguru import logger |
|
|
| from .store import VectorStore, get_vector_store |
| from .embeddings import EmbeddingAdapter, get_embedding_adapter |
|
|
| try: |
| from ..document.schemas.core import ProcessedDocument, DocumentChunk |
| from ..document.pipeline import process_document, PipelineConfig |
| DOCUMENT_MODULE_AVAILABLE = True |
| except ImportError: |
| DOCUMENT_MODULE_AVAILABLE = False |
| logger.warning("Document module not available for indexing") |
|
|
|
|
| class IndexerConfig(BaseModel): |
| """Configuration for document indexer.""" |
| |
| batch_size: int = Field(default=32, ge=1, description="Embedding batch size") |
|
|
| |
| include_bbox: bool = Field(default=True, description="Include bounding boxes") |
| include_page: bool = Field(default=True, description="Include page numbers") |
| include_chunk_type: bool = Field(default=True, description="Include chunk types") |
|
|
| |
| skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks") |
| min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length") |
|
|
|
|
| class IndexingResult(BaseModel): |
| """Result of indexing operation.""" |
| document_id: str |
| source_path: str |
| num_chunks_indexed: int |
| num_chunks_skipped: int |
| success: bool |
| error: Optional[str] = None |
|
|
|
|
| class DocumentIndexer: |
| """ |
| Indexes documents into the vector store for RAG. |
| |
| Workflow: |
| 1. Process document (if not already processed) |
| 2. Extract chunks with metadata |
| 3. Generate embeddings |
| 4. Store in vector database |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[IndexerConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ): |
| """ |
| Initialize indexer. |
| |
| Args: |
| config: Indexer configuration |
| vector_store: Vector store instance |
| embedding_adapter: Embedding adapter instance |
| """ |
| self.config = config or IndexerConfig() |
| self._store = vector_store |
| self._embedder = embedding_adapter |
|
|
| @property |
| def store(self) -> VectorStore: |
| """Get vector store (lazy initialization).""" |
| if self._store is None: |
| self._store = get_vector_store() |
| return self._store |
|
|
| @property |
| def embedder(self) -> EmbeddingAdapter: |
| """Get embedding adapter (lazy initialization).""" |
| if self._embedder is None: |
| self._embedder = get_embedding_adapter() |
| return self._embedder |
|
|
| def index_document( |
| self, |
| source: Union[str, Path], |
| document_id: Optional[str] = None, |
| pipeline_config: Optional[Any] = None, |
| ) -> IndexingResult: |
| """ |
| Index a document from file. |
| |
| Args: |
| source: Path to document |
| document_id: Optional document ID |
| pipeline_config: Optional pipeline configuration |
| |
| Returns: |
| IndexingResult |
| """ |
| if not DOCUMENT_MODULE_AVAILABLE: |
| return IndexingResult( |
| document_id=document_id or str(source), |
| source_path=str(source), |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error="Document processing module not available", |
| ) |
|
|
| try: |
| |
| logger.info(f"Processing document: {source}") |
| processed = process_document(source, document_id, pipeline_config) |
|
|
| |
| return self.index_processed_document(processed) |
|
|
| except Exception as e: |
| logger.error(f"Failed to index document: {e}") |
| return IndexingResult( |
| document_id=document_id or str(source), |
| source_path=str(source), |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error=str(e), |
| ) |
|
|
| def index_processed_document( |
| self, |
| document: "ProcessedDocument", |
| ) -> IndexingResult: |
| """ |
| Index an already-processed document. |
| |
| Args: |
| document: ProcessedDocument instance |
| |
| Returns: |
| IndexingResult |
| """ |
| document_id = document.metadata.document_id |
| source_path = document.metadata.source_path |
|
|
| try: |
| |
| chunks_to_index = [] |
| skipped = 0 |
|
|
| for chunk in document.chunks: |
| |
| if self.config.skip_empty_chunks: |
| if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
| skipped += 1 |
| continue |
|
|
| chunk_data = { |
| "chunk_id": chunk.chunk_id, |
| "document_id": document_id, |
| "source_path": source_path, |
| "text": chunk.text, |
| "sequence_index": chunk.sequence_index, |
| "confidence": chunk.confidence, |
| } |
|
|
| if self.config.include_page: |
| chunk_data["page"] = chunk.page |
|
|
| if self.config.include_chunk_type: |
| chunk_data["chunk_type"] = chunk.chunk_type.value |
|
|
| if self.config.include_bbox and chunk.bbox: |
| chunk_data["bbox"] = { |
| "x_min": chunk.bbox.x_min, |
| "y_min": chunk.bbox.y_min, |
| "x_max": chunk.bbox.x_max, |
| "y_max": chunk.bbox.y_max, |
| } |
|
|
| chunks_to_index.append(chunk_data) |
|
|
| if not chunks_to_index: |
| return IndexingResult( |
| document_id=document_id, |
| source_path=source_path, |
| num_chunks_indexed=0, |
| num_chunks_skipped=skipped, |
| success=True, |
| ) |
|
|
| |
| logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
| texts = [c["text"] for c in chunks_to_index] |
| embeddings = self.embedder.embed_batch(texts) |
|
|
| |
| logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
| self.store.add_chunks(chunks_to_index, embeddings) |
|
|
| logger.info( |
| f"Indexed document {document_id}: " |
| f"{len(chunks_to_index)} chunks, {skipped} skipped" |
| ) |
|
|
| return IndexingResult( |
| document_id=document_id, |
| source_path=source_path, |
| num_chunks_indexed=len(chunks_to_index), |
| num_chunks_skipped=skipped, |
| success=True, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Failed to index processed document: {e}") |
| return IndexingResult( |
| document_id=document_id, |
| source_path=source_path, |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error=str(e), |
| ) |
|
|
| def index_batch( |
| self, |
| sources: List[Union[str, Path]], |
| pipeline_config: Optional[Any] = None, |
| ) -> List[IndexingResult]: |
| """ |
| Index multiple documents. |
| |
| Args: |
| sources: List of document paths |
| pipeline_config: Optional pipeline configuration |
| |
| Returns: |
| List of IndexingResult |
| """ |
| results = [] |
|
|
| for source in sources: |
| result = self.index_document(source, pipeline_config=pipeline_config) |
| results.append(result) |
|
|
| |
| successful = sum(1 for r in results if r.success) |
| total_chunks = sum(r.num_chunks_indexed for r in results) |
|
|
| logger.info( |
| f"Batch indexing complete: " |
| f"{successful}/{len(results)} documents, " |
| f"{total_chunks} total chunks" |
| ) |
|
|
| return results |
|
|
| def delete_document(self, document_id: str) -> int: |
| """ |
| Remove a document from the index. |
| |
| Args: |
| document_id: Document ID to remove |
| |
| Returns: |
| Number of chunks deleted |
| """ |
| return self.store.delete_document(document_id) |
|
|
| def get_index_stats(self) -> Dict[str, Any]: |
| """ |
| Get indexing statistics. |
| |
| Returns: |
| Dictionary with index stats |
| """ |
| total_chunks = self.store.count() |
|
|
| |
| try: |
| if hasattr(self.store, 'list_documents'): |
| doc_ids = self.store.list_documents() |
| num_documents = len(doc_ids) |
| else: |
| num_documents = None |
| except: |
| num_documents = None |
|
|
| return { |
| "total_chunks": total_chunks, |
| "num_documents": num_documents, |
| "embedding_model": self.embedder.model_name, |
| "embedding_dimension": self.embedder.embedding_dimension, |
| } |
|
|
|
|
| |
| _document_indexer: Optional[DocumentIndexer] = None |
|
|
|
|
| def get_document_indexer( |
| config: Optional[IndexerConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ) -> DocumentIndexer: |
| """ |
| Get or create singleton document indexer. |
| |
| Args: |
| config: Indexer configuration |
| vector_store: Optional vector store instance |
| embedding_adapter: Optional embedding adapter |
| |
| Returns: |
| DocumentIndexer instance |
| """ |
| global _document_indexer |
|
|
| if _document_indexer is None: |
| _document_indexer = DocumentIndexer( |
| config=config, |
| vector_store=vector_store, |
| embedding_adapter=embedding_adapter, |
| ) |
|
|
| return _document_indexer |
|
|
|
|
| def reset_document_indexer(): |
| """Reset the global indexer instance.""" |
| global _document_indexer |
| _document_indexer = None |
|
|