| """ |
| Document processor module for Norwegian RAG chatbot. |
| Orchestrates the document processing pipeline with remote embeddings. |
| """ |
|
|
| import os |
| import json |
| import numpy as np |
| from typing import List, Dict, Any, Optional, Tuple, Union |
| from datetime import datetime |
|
|
| from .extractor import TextExtractor |
| from .chunker import TextChunker |
| from ..api.huggingface_api import HuggingFaceAPI |
| from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP |
|
|
| class DocumentProcessor: |
| """ |
| Orchestrates the document processing pipeline: |
| 1. Extract text from documents |
| 2. Split text into chunks |
| 3. Generate embeddings using remote API |
| 4. Store processed documents and embeddings |
| """ |
| |
| def __init__( |
| self, |
| api_client: Optional[HuggingFaceAPI] = None, |
| documents_dir: str = "/home/ubuntu/chatbot_project/data/documents", |
| processed_dir: str = "/home/ubuntu/chatbot_project/data/processed", |
| chunk_size: int = CHUNK_SIZE, |
| chunk_overlap: int = CHUNK_OVERLAP, |
| chunking_strategy: str = "paragraph" |
| ): |
| """ |
| Initialize the document processor. |
| |
| Args: |
| api_client: HuggingFaceAPI client for generating embeddings |
| documents_dir: Directory for storing original documents |
| processed_dir: Directory for storing processed documents and embeddings |
| chunk_size: Maximum size of each chunk |
| chunk_overlap: Overlap between consecutive chunks |
| chunking_strategy: Strategy for chunking text ('fixed', 'paragraph', or 'sentence') |
| """ |
| self.api_client = api_client or HuggingFaceAPI() |
| self.documents_dir = documents_dir |
| self.processed_dir = processed_dir |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.chunking_strategy = chunking_strategy |
| |
| |
| os.makedirs(self.documents_dir, exist_ok=True) |
| os.makedirs(self.processed_dir, exist_ok=True) |
| |
| |
| self.document_index_path = os.path.join(self.processed_dir, "document_index.json") |
| self.document_index = self._load_document_index() |
| |
| def process_document( |
| self, |
| file_path: str, |
| document_id: Optional[str] = None, |
| metadata: Optional[Dict[str, Any]] = None |
| ) -> str: |
| """ |
| Process a document through the entire pipeline. |
| |
| Args: |
| file_path: Path to the document file |
| document_id: Optional custom document ID |
| metadata: Optional metadata for the document |
| |
| Returns: |
| Document ID |
| """ |
| |
| if document_id is None: |
| document_id = f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.path.basename(file_path)}" |
| |
| |
| text = TextExtractor.extract_from_file(file_path) |
| if not text: |
| raise ValueError(f"Failed to extract text from {file_path}") |
| |
| |
| chunks = TextChunker.chunk_text( |
| text, |
| chunk_size=self.chunk_size, |
| chunk_overlap=self.chunk_overlap, |
| strategy=self.chunking_strategy |
| ) |
| |
| |
| chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
| |
| |
| embeddings = self.api_client.generate_embeddings(chunks) |
| |
| |
| if metadata is None: |
| metadata = {} |
| |
| metadata.update({ |
| "filename": os.path.basename(file_path), |
| "processed_date": datetime.now().isoformat(), |
| "chunk_count": len(chunks), |
| "chunking_strategy": self.chunking_strategy, |
| "embedding_model": self.api_client.embedding_model_id |
| }) |
| |
| |
| self._save_processed_document(document_id, chunks, embeddings, metadata) |
| |
| |
| self._update_document_index(document_id, metadata) |
| |
| return document_id |
| |
| def process_text( |
| self, |
| text: str, |
| document_id: Optional[str] = None, |
| metadata: Optional[Dict[str, Any]] = None |
| ) -> str: |
| """ |
| Process text directly through the pipeline. |
| |
| Args: |
| text: Text content to process |
| document_id: Optional custom document ID |
| metadata: Optional metadata for the document |
| |
| Returns: |
| Document ID |
| """ |
| |
| if document_id is None: |
| document_id = f"text_{datetime.now().strftime('%Y%m%d%H%M%S')}" |
| |
| |
| chunks = TextChunker.chunk_text( |
| text, |
| chunk_size=self.chunk_size, |
| chunk_overlap=self.chunk_overlap, |
| strategy=self.chunking_strategy |
| ) |
| |
| |
| chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
| |
| |
| embeddings = self.api_client.generate_embeddings(chunks) |
| |
| |
| if metadata is None: |
| metadata = {} |
| |
| metadata.update({ |
| "source": "direct_text", |
| "processed_date": datetime.now().isoformat(), |
| "chunk_count": len(chunks), |
| "chunking_strategy": self.chunking_strategy, |
| "embedding_model": self.api_client.embedding_model_id |
| }) |
| |
| |
| self._save_processed_document(document_id, chunks, embeddings, metadata) |
| |
| |
| self._update_document_index(document_id, metadata) |
| |
| return document_id |
| |
| def get_document_chunks(self, document_id: str) -> List[str]: |
| """ |
| Get all chunks for a document. |
| |
| Args: |
| document_id: Document ID |
| |
| Returns: |
| List of text chunks |
| """ |
| document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| if not os.path.exists(document_path): |
| raise FileNotFoundError(f"Document not found: {document_id}") |
| |
| with open(document_path, 'r', encoding='utf-8') as f: |
| document_data = json.load(f) |
| |
| return document_data.get("chunks", []) |
| |
| def get_document_embeddings(self, document_id: str) -> List[List[float]]: |
| """ |
| Get all embeddings for a document. |
| |
| Args: |
| document_id: Document ID |
| |
| Returns: |
| List of embedding vectors |
| """ |
| document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| if not os.path.exists(document_path): |
| raise FileNotFoundError(f"Document not found: {document_id}") |
| |
| with open(document_path, 'r', encoding='utf-8') as f: |
| document_data = json.load(f) |
| |
| return document_data.get("embeddings", []) |
| |
| def get_all_documents(self) -> Dict[str, Dict[str, Any]]: |
| """ |
| Get all documents in the index. |
| |
| Returns: |
| Dictionary of document IDs to metadata |
| """ |
| return self.document_index |
| |
| def delete_document(self, document_id: str) -> bool: |
| """ |
| Delete a document and its processed data. |
| |
| Args: |
| document_id: Document ID |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| if document_id not in self.document_index: |
| return False |
| |
| |
| del self.document_index[document_id] |
| self._save_document_index() |
| |
| |
| document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| if os.path.exists(document_path): |
| os.remove(document_path) |
| |
| return True |
| |
| def _save_processed_document( |
| self, |
| document_id: str, |
| chunks: List[str], |
| embeddings: List[List[float]], |
| metadata: Dict[str, Any] |
| ) -> None: |
| """ |
| Save processed document data. |
| |
| Args: |
| document_id: Document ID |
| chunks: List of text chunks |
| embeddings: List of embedding vectors |
| metadata: Document metadata |
| """ |
| document_data = { |
| "document_id": document_id, |
| "metadata": metadata, |
| "chunks": chunks, |
| "embeddings": embeddings |
| } |
| |
| document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| with open(document_path, 'w', encoding='utf-8') as f: |
| json.dump(document_data, f, ensure_ascii=False, indent=2) |
| |
| def _load_document_index(self) -> Dict[str, Dict[str, Any]]: |
| """ |
| Load the document index from disk. |
| |
| Returns: |
| Dictionary of document IDs to metadata |
| """ |
| if os.path.exists(self.document_index_path): |
| try: |
| with open(self.document_index_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except Exception as e: |
| print(f"Error loading document index: {str(e)}") |
| |
| return {} |
| |
| def _save_document_index(self) -> None: |
| """ |
| Save the document index to disk. |
| """ |
| with open(self.document_index_path, 'w', encoding='utf-8') as f: |
| json.dump(self.document_index, f, ensure_ascii=False, indent=2) |
| |
| def _update_document_index(self, document_id: str, metadata: Dict[str, Any]) -> None: |
| """ |
| Update the document index with a new or updated document. |
| |
| Args: |
| document_id: Document ID |
| metadata: Document metadata |
| """ |
| self.document_index[document_id] = metadata |
| self._save_document_index() |
|
|