Spaces:
Sleeping
Sleeping
File size: 7,300 Bytes
8c369f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
VoiceVerse AI β RAG Pipeline.
Handles document ingestion, text chunking, embedding generation,
and semantic retrieval using an in-memory vector store.
Models used:
- sentence-transformers/all-MiniLM-L6-v2 for embeddings (22 MB, CPU-friendly)
Design decisions:
- NumPy cosine similarity instead of FAISS to avoid heavy native deps
- Overlapping chunks to preserve context across boundaries
- Single-document architecture (clear store on new upload)
"""
import os
import numpy as np
from utils import logger
# ββ Text Extraction ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def extract_text(file_path: str) -> str:
"""
Extract plain text from a PDF or TXT file.
Returns the full document text as a single string.
"""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
return _extract_pdf(file_path)
elif ext == ".txt":
return _extract_txt(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
def _extract_pdf(file_path: str) -> str:
"""Extract text from PDF using PyMuPDF."""
import fitz # PyMuPDF
text_parts = []
with fitz.open(file_path) as doc:
for page_num, page in enumerate(doc):
page_text = page.get_text("text")
if page_text.strip():
text_parts.append(page_text)
logger.debug("Extracted page %d: %d chars", page_num + 1, len(page_text))
full_text = "\n\n".join(text_parts)
logger.info("PDF extraction complete: %d pages, %d chars total",
len(text_parts), len(full_text))
return full_text
def _extract_txt(file_path: str) -> str:
"""Read plain text file with encoding fallback."""
for encoding in ("utf-8", "utf-8-sig", "latin-1", "cp1252"):
try:
with open(file_path, "r", encoding=encoding) as f:
text = f.read()
logger.info("TXT extraction complete (%s): %d chars", encoding, len(text))
return text
except UnicodeDecodeError:
continue
raise ValueError("Could not decode the text file with any supported encoding.")
# ββ Text Chunking ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
"""
Split text into overlapping chunks of roughly `chunk_size` characters.
Overlap ensures context isn't lost at chunk boundaries.
Uses sentence-aware splitting: tries to break at sentence boundaries
within the chunk window for more coherent chunks.
"""
if not text or not text.strip():
return []
# Clean up whitespace
text = " ".join(text.split())
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# If not at the end, try to break at a sentence boundary
if end < len(text):
# Look for sentence-ending punctuation near the end
search_start = max(start + chunk_size // 2, start)
last_period = -1
for i in range(min(end, len(text)) - 1, search_start - 1, -1):
if text[i] in ".!?" and (i + 1 >= len(text) or text[i + 1] == " "):
last_period = i
break
if last_period > start:
end = last_period + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move forward by (chunk length - overlap)
start = max(start + 1, end - overlap)
logger.info("Chunking complete: %d chunks (size=%d, overlap=%d)",
len(chunks), chunk_size, overlap)
return chunks
# ββ Embedding & Vector Store βββββββββββββββββββββββββββββββββββββββββββββββββ
class RAGStore:
"""
In-memory vector store using sentence-transformers embeddings
and NumPy cosine similarity.
Usage:
store = RAGStore()
store.add_document("full document text here")
results = store.query("what is this about?", top_k=5)
"""
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
def __init__(self):
self._model = None
self.chunks: list[str] = []
self.embeddings: np.ndarray | None = None
@property
def model(self):
"""Lazy-load the embedding model to avoid startup cost."""
if self._model is None:
logger.info("Loading embedding model: %s", self.MODEL_NAME)
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.MODEL_NAME)
logger.info("Embedding model loaded successfully")
return self._model
def clear(self):
"""Clear the store for a new document."""
self.chunks = []
self.embeddings = None
def add_document(self, text: str, chunk_size: int = 512, overlap: int = 50):
"""
Process a document: chunk the text, generate embeddings, and store.
Clears any previously stored document.
"""
self.clear()
self.chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap)
if not self.chunks:
raise ValueError("No text chunks could be extracted from the document.")
logger.info("Generating embeddings for %d chunks...", len(self.chunks))
self.embeddings = self.model.encode(
self.chunks,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=True, # Pre-normalize for faster cosine sim
)
logger.info("Embeddings generated: shape %s", self.embeddings.shape)
def query(self, question: str, top_k: int = 5) -> list[str]:
"""
Retrieve the top-k most relevant chunks for the given question.
Uses cosine similarity (dot product on normalized vectors).
"""
if self.embeddings is None or len(self.chunks) == 0:
return []
# Embed the query
query_embedding = self.model.encode(
[question],
convert_to_numpy=True,
normalize_embeddings=True,
)
# Cosine similarity = dot product (vectors are pre-normalized)
similarities = np.dot(self.embeddings, query_embedding.T).flatten()
# Get top-k indices
top_k = min(top_k, len(self.chunks))
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = [self.chunks[i] for i in top_indices]
logger.info("Retrieved %d chunks (top similarity: %.3f)",
len(results), similarities[top_indices[0]])
return results
def get_all_chunks(self) -> list[str]:
"""Return all stored chunks (useful for short documents)."""
return self.chunks.copy()
|