| """
|
| VoiceVerse AI β RAG Pipeline.
|
|
|
| Handles document ingestion, text chunking, embedding generation,
|
| and semantic retrieval using an in-memory vector store.
|
|
|
| Models used:
|
| - sentence-transformers/all-MiniLM-L6-v2 for embeddings (22 MB, CPU-friendly)
|
|
|
| Design decisions:
|
| - NumPy cosine similarity instead of FAISS to avoid heavy native deps
|
| - Overlapping chunks to preserve context across boundaries
|
| - Single-document architecture (clear store on new upload)
|
| """
|
|
|
| import os
|
| import numpy as np
|
| from utils import logger
|
|
|
|
|
|
|
| def extract_text(file_path: str) -> str:
|
| """
|
| Extract plain text from a PDF or TXT file.
|
| Returns the full document text as a single string.
|
| """
|
| ext = os.path.splitext(file_path)[1].lower()
|
|
|
| if ext == ".pdf":
|
| return _extract_pdf(file_path)
|
| elif ext == ".txt":
|
| return _extract_txt(file_path)
|
| else:
|
| raise ValueError(f"Unsupported file type: {ext}")
|
|
|
|
|
| def _extract_pdf(file_path: str) -> str:
|
| """Extract text from PDF using PyMuPDF."""
|
| import fitz
|
|
|
| text_parts = []
|
| with fitz.open(file_path) as doc:
|
| for page_num, page in enumerate(doc):
|
| page_text = page.get_text("text")
|
| if page_text.strip():
|
| text_parts.append(page_text)
|
| logger.debug("Extracted page %d: %d chars", page_num + 1, len(page_text))
|
|
|
| full_text = "\n\n".join(text_parts)
|
| logger.info("PDF extraction complete: %d pages, %d chars total",
|
| len(text_parts), len(full_text))
|
| return full_text
|
|
|
|
|
| def _extract_txt(file_path: str) -> str:
|
| """Read plain text file with encoding fallback."""
|
| for encoding in ("utf-8", "utf-8-sig", "latin-1", "cp1252"):
|
| try:
|
| with open(file_path, "r", encoding=encoding) as f:
|
| text = f.read()
|
| logger.info("TXT extraction complete (%s): %d chars", encoding, len(text))
|
| return text
|
| except UnicodeDecodeError:
|
| continue
|
| raise ValueError("Could not decode the text file with any supported encoding.")
|
|
|
|
|
|
|
|
|
| def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
|
| """
|
| Split text into overlapping chunks of roughly `chunk_size` characters.
|
| Overlap ensures context isn't lost at chunk boundaries.
|
|
|
| Uses sentence-aware splitting: tries to break at sentence boundaries
|
| within the chunk window for more coherent chunks.
|
| """
|
| if not text or not text.strip():
|
| return []
|
|
|
|
|
| text = " ".join(text.split())
|
|
|
| chunks = []
|
| start = 0
|
|
|
| while start < len(text):
|
| end = start + chunk_size
|
|
|
|
|
| if end < len(text):
|
|
|
| search_start = max(start + chunk_size // 2, start)
|
| last_period = -1
|
| for i in range(min(end, len(text)) - 1, search_start - 1, -1):
|
| if text[i] in ".!?" and (i + 1 >= len(text) or text[i + 1] == " "):
|
| last_period = i
|
| break
|
| if last_period > start:
|
| end = last_period + 1
|
|
|
| chunk = text[start:end].strip()
|
| if chunk:
|
| chunks.append(chunk)
|
|
|
|
|
| start = max(start + 1, end - overlap)
|
|
|
| logger.info("Chunking complete: %d chunks (size=%d, overlap=%d)",
|
| len(chunks), chunk_size, overlap)
|
| return chunks
|
|
|
|
|
|
|
|
|
| class RAGStore:
|
| """
|
| In-memory vector store using sentence-transformers embeddings
|
| and NumPy cosine similarity.
|
|
|
| Usage:
|
| store = RAGStore()
|
| store.add_document("full document text here")
|
| results = store.query("what is this about?", top_k=5)
|
| """
|
|
|
| MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
|
|
|
| def __init__(self):
|
| self._model = None
|
| self.chunks: list[str] = []
|
| self.embeddings: np.ndarray | None = None
|
|
|
| @property
|
| def model(self):
|
| """Lazy-load the embedding model to avoid startup cost."""
|
| if self._model is None:
|
| logger.info("Loading embedding model: %s", self.MODEL_NAME)
|
| from sentence_transformers import SentenceTransformer
|
| self._model = SentenceTransformer(self.MODEL_NAME)
|
| logger.info("Embedding model loaded successfully")
|
| return self._model
|
|
|
| def clear(self):
|
| """Clear the store for a new document."""
|
| self.chunks = []
|
| self.embeddings = None
|
|
|
| def add_document(self, text: str, chunk_size: int = 512, overlap: int = 50):
|
| """
|
| Process a document: chunk the text, generate embeddings, and store.
|
| Clears any previously stored document.
|
| """
|
| self.clear()
|
|
|
| self.chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap)
|
| if not self.chunks:
|
| raise ValueError("No text chunks could be extracted from the document.")
|
|
|
| logger.info("Generating embeddings for %d chunks...", len(self.chunks))
|
| self.embeddings = self.model.encode(
|
| self.chunks,
|
| show_progress_bar=False,
|
| convert_to_numpy=True,
|
| normalize_embeddings=True,
|
| )
|
| logger.info("Embeddings generated: shape %s", self.embeddings.shape)
|
|
|
| def query(self, question: str, top_k: int = 5) -> list[str]:
|
| """
|
| Retrieve the top-k most relevant chunks for the given question.
|
| Uses cosine similarity (dot product on normalized vectors).
|
| """
|
| if self.embeddings is None or len(self.chunks) == 0:
|
| return []
|
|
|
|
|
| query_embedding = self.model.encode(
|
| [question],
|
| convert_to_numpy=True,
|
| normalize_embeddings=True,
|
| )
|
|
|
|
|
| similarities = np.dot(self.embeddings, query_embedding.T).flatten()
|
|
|
|
|
| top_k = min(top_k, len(self.chunks))
|
| top_indices = np.argsort(similarities)[-top_k:][::-1]
|
|
|
| results = [self.chunks[i] for i in top_indices]
|
| logger.info("Retrieved %d chunks (top similarity: %.3f)",
|
| len(results), similarities[top_indices[0]])
|
| return results
|
|
|
| def get_all_chunks(self) -> list[str]:
|
| """Return all stored chunks (useful for short documents)."""
|
| return self.chunks.copy()
|
|
|