Spaces:
Sleeping
Sleeping
| """ | |
| VoiceVerse AI β RAG Pipeline. | |
| Handles document ingestion, text chunking, embedding generation, | |
| and semantic retrieval using an in-memory vector store. | |
| Models used: | |
| - sentence-transformers/all-MiniLM-L6-v2 for embeddings (22 MB, CPU-friendly) | |
| Design decisions: | |
| - NumPy cosine similarity instead of FAISS to avoid heavy native deps | |
| - Overlapping chunks to preserve context across boundaries | |
| - Single-document architecture (clear store on new upload) | |
| """ | |
| import os | |
| import numpy as np | |
| from utils import logger | |
| # ββ Text Extraction ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text(file_path: str) -> str: | |
| """ | |
| Extract plain text from a PDF or TXT file. | |
| Returns the full document text as a single string. | |
| """ | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == ".pdf": | |
| return _extract_pdf(file_path) | |
| elif ext == ".txt": | |
| return _extract_txt(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| def _extract_pdf(file_path: str) -> str: | |
| """Extract text from PDF using PyMuPDF.""" | |
| import fitz # PyMuPDF | |
| text_parts = [] | |
| with fitz.open(file_path) as doc: | |
| for page_num, page in enumerate(doc): | |
| page_text = page.get_text("text") | |
| if page_text.strip(): | |
| text_parts.append(page_text) | |
| logger.debug("Extracted page %d: %d chars", page_num + 1, len(page_text)) | |
| full_text = "\n\n".join(text_parts) | |
| logger.info("PDF extraction complete: %d pages, %d chars total", | |
| len(text_parts), len(full_text)) | |
| return full_text | |
| def _extract_txt(file_path: str) -> str: | |
| """Read plain text file with encoding fallback.""" | |
| for encoding in ("utf-8", "utf-8-sig", "latin-1", "cp1252"): | |
| try: | |
| with open(file_path, "r", encoding=encoding) as f: | |
| text = f.read() | |
| logger.info("TXT extraction complete (%s): %d chars", encoding, len(text)) | |
| return text | |
| except UnicodeDecodeError: | |
| continue | |
| raise ValueError("Could not decode the text file with any supported encoding.") | |
| # ββ Text Chunking ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]: | |
| """ | |
| Split text into overlapping chunks of roughly `chunk_size` characters. | |
| Overlap ensures context isn't lost at chunk boundaries. | |
| Uses sentence-aware splitting: tries to break at sentence boundaries | |
| within the chunk window for more coherent chunks. | |
| """ | |
| if not text or not text.strip(): | |
| return [] | |
| # Clean up whitespace | |
| text = " ".join(text.split()) | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| # If not at the end, try to break at a sentence boundary | |
| if end < len(text): | |
| # Look for sentence-ending punctuation near the end | |
| search_start = max(start + chunk_size // 2, start) | |
| last_period = -1 | |
| for i in range(min(end, len(text)) - 1, search_start - 1, -1): | |
| if text[i] in ".!?" and (i + 1 >= len(text) or text[i + 1] == " "): | |
| last_period = i | |
| break | |
| if last_period > start: | |
| end = last_period + 1 | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| # Move forward by (chunk length - overlap) | |
| start = max(start + 1, end - overlap) | |
| logger.info("Chunking complete: %d chunks (size=%d, overlap=%d)", | |
| len(chunks), chunk_size, overlap) | |
| return chunks | |
| # ββ Embedding & Vector Store βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RAGStore: | |
| """ | |
| In-memory vector store using sentence-transformers embeddings | |
| and NumPy cosine similarity. | |
| Usage: | |
| store = RAGStore() | |
| store.add_document("full document text here") | |
| results = store.query("what is this about?", top_k=5) | |
| """ | |
| MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| def __init__(self): | |
| self._model = None | |
| self.chunks: list[str] = [] | |
| self.embeddings: np.ndarray | None = None | |
| def model(self): | |
| """Lazy-load the embedding model to avoid startup cost.""" | |
| if self._model is None: | |
| logger.info("Loading embedding model: %s", self.MODEL_NAME) | |
| from sentence_transformers import SentenceTransformer | |
| self._model = SentenceTransformer(self.MODEL_NAME) | |
| logger.info("Embedding model loaded successfully") | |
| return self._model | |
| def clear(self): | |
| """Clear the store for a new document.""" | |
| self.chunks = [] | |
| self.embeddings = None | |
| def add_document(self, text: str, chunk_size: int = 512, overlap: int = 50): | |
| """ | |
| Process a document: chunk the text, generate embeddings, and store. | |
| Clears any previously stored document. | |
| """ | |
| self.clear() | |
| self.chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap) | |
| if not self.chunks: | |
| raise ValueError("No text chunks could be extracted from the document.") | |
| logger.info("Generating embeddings for %d chunks...", len(self.chunks)) | |
| self.embeddings = self.model.encode( | |
| self.chunks, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, # Pre-normalize for faster cosine sim | |
| ) | |
| logger.info("Embeddings generated: shape %s", self.embeddings.shape) | |
| def query(self, question: str, top_k: int = 5) -> list[str]: | |
| """ | |
| Retrieve the top-k most relevant chunks for the given question. | |
| Uses cosine similarity (dot product on normalized vectors). | |
| """ | |
| if self.embeddings is None or len(self.chunks) == 0: | |
| return [] | |
| # Embed the query | |
| query_embedding = self.model.encode( | |
| [question], | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ) | |
| # Cosine similarity = dot product (vectors are pre-normalized) | |
| similarities = np.dot(self.embeddings, query_embedding.T).flatten() | |
| # Get top-k indices | |
| top_k = min(top_k, len(self.chunks)) | |
| top_indices = np.argsort(similarities)[-top_k:][::-1] | |
| results = [self.chunks[i] for i in top_indices] | |
| logger.info("Retrieved %d chunks (top similarity: %.3f)", | |
| len(results), similarities[top_indices[0]]) | |
| return results | |
| def get_all_chunks(self) -> list[str]: | |
| """Return all stored chunks (useful for short documents).""" | |
| return self.chunks.copy() | |