""" VoiceVerse AI — RAG Pipeline. Handles document ingestion, text chunking, embedding generation, and semantic retrieval using an in-memory vector store. Models used: - sentence-transformers/all-MiniLM-L6-v2 for embeddings (22 MB, CPU-friendly) Design decisions: - NumPy cosine similarity instead of FAISS to avoid heavy native deps - Overlapping chunks to preserve context across boundaries - Single-document architecture (clear store on new upload) """ import os import numpy as np from utils import logger # ── Text Extraction ────────────────────────────────────────────────────────── def extract_text(file_path: str) -> str: """ Extract plain text from a PDF or TXT file. Returns the full document text as a single string. """ ext = os.path.splitext(file_path)[1].lower() if ext == ".pdf": return _extract_pdf(file_path) elif ext == ".txt": return _extract_txt(file_path) else: raise ValueError(f"Unsupported file type: {ext}") def _extract_pdf(file_path: str) -> str: """Extract text from PDF using PyMuPDF.""" import fitz # PyMuPDF text_parts = [] with fitz.open(file_path) as doc: for page_num, page in enumerate(doc): page_text = page.get_text("text") if page_text.strip(): text_parts.append(page_text) logger.debug("Extracted page %d: %d chars", page_num + 1, len(page_text)) full_text = "\n\n".join(text_parts) logger.info("PDF extraction complete: %d pages, %d chars total", len(text_parts), len(full_text)) return full_text def _extract_txt(file_path: str) -> str: """Read plain text file with encoding fallback.""" for encoding in ("utf-8", "utf-8-sig", "latin-1", "cp1252"): try: with open(file_path, "r", encoding=encoding) as f: text = f.read() logger.info("TXT extraction complete (%s): %d chars", encoding, len(text)) return text except UnicodeDecodeError: continue raise ValueError("Could not decode the text file with any supported encoding.") # ── Text Chunking ──────────────────────────────────────────────────────────── def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]: """ Split text into overlapping chunks of roughly `chunk_size` characters. Overlap ensures context isn't lost at chunk boundaries. Uses sentence-aware splitting: tries to break at sentence boundaries within the chunk window for more coherent chunks. """ if not text or not text.strip(): return [] # Clean up whitespace text = " ".join(text.split()) chunks = [] start = 0 while start < len(text): end = start + chunk_size # If not at the end, try to break at a sentence boundary if end < len(text): # Look for sentence-ending punctuation near the end search_start = max(start + chunk_size // 2, start) last_period = -1 for i in range(min(end, len(text)) - 1, search_start - 1, -1): if text[i] in ".!?" and (i + 1 >= len(text) or text[i + 1] == " "): last_period = i break if last_period > start: end = last_period + 1 chunk = text[start:end].strip() if chunk: chunks.append(chunk) # Move forward by (chunk length - overlap) start = max(start + 1, end - overlap) logger.info("Chunking complete: %d chunks (size=%d, overlap=%d)", len(chunks), chunk_size, overlap) return chunks # ── Embedding & Vector Store ───────────────────────────────────────────────── class RAGStore: """ In-memory vector store using sentence-transformers embeddings and NumPy cosine similarity. Usage: store = RAGStore() store.add_document("full document text here") results = store.query("what is this about?", top_k=5) """ MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" def __init__(self): self._model = None self.chunks: list[str] = [] self.embeddings: np.ndarray | None = None @property def model(self): """Lazy-load the embedding model to avoid startup cost.""" if self._model is None: logger.info("Loading embedding model: %s", self.MODEL_NAME) from sentence_transformers import SentenceTransformer self._model = SentenceTransformer(self.MODEL_NAME) logger.info("Embedding model loaded successfully") return self._model def clear(self): """Clear the store for a new document.""" self.chunks = [] self.embeddings = None def add_document(self, text: str, chunk_size: int = 512, overlap: int = 50): """ Process a document: chunk the text, generate embeddings, and store. Clears any previously stored document. """ self.clear() self.chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap) if not self.chunks: raise ValueError("No text chunks could be extracted from the document.") logger.info("Generating embeddings for %d chunks...", len(self.chunks)) self.embeddings = self.model.encode( self.chunks, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True, # Pre-normalize for faster cosine sim ) logger.info("Embeddings generated: shape %s", self.embeddings.shape) def query(self, question: str, top_k: int = 5) -> list[str]: """ Retrieve the top-k most relevant chunks for the given question. Uses cosine similarity (dot product on normalized vectors). """ if self.embeddings is None or len(self.chunks) == 0: return [] # Embed the query query_embedding = self.model.encode( [question], convert_to_numpy=True, normalize_embeddings=True, ) # Cosine similarity = dot product (vectors are pre-normalized) similarities = np.dot(self.embeddings, query_embedding.T).flatten() # Get top-k indices top_k = min(top_k, len(self.chunks)) top_indices = np.argsort(similarities)[-top_k:][::-1] results = [self.chunks[i] for i in top_indices] logger.info("Retrieved %d chunks (top similarity: %.3f)", len(results), similarities[top_indices[0]]) return results def get_all_chunks(self) -> list[str]: """Return all stored chunks (useful for short documents).""" return self.chunks.copy()