vn6295337's picture
Initial commit: RAG Document Assistant with Zero-Storage Privacy
f866820
"""
Core retrieval module for Pinecone vector search.
Functions:
- deterministic_embedding(text, dim): Generate deterministic pseudo-embeddings
- semantic_embedding(text, model_name): Generate semantic embeddings using sentence-transformers
- query_pinecone(query_text, top_k, index_name, use_semantic): Query Pinecone index
"""
import os
import hashlib
from typing import List, Dict, Any, Optional
from pinecone import Pinecone
# Default dimensions
DIM_DETERMINISTIC = 1024
DIM_SEMANTIC = 384 # for all-MiniLM-L6-v2
# Constants for model names
DEFAULT_SEMANTIC_MODEL = "all-MiniLM-L6-v2"
# Lazy-load sentence-transformers
_MODEL_CACHE = {}
def _get_sentence_transformer_model(model_name: str = "all-MiniLM-L6-v2"):
"""Lazy load and cache sentence transformer model."""
if model_name not in _MODEL_CACHE:
try:
from sentence_transformers import SentenceTransformer
_MODEL_CACHE[model_name] = SentenceTransformer(model_name)
except ImportError:
raise ImportError(
"sentence-transformers not installed. "
"Install with: pip install sentence-transformers"
)
return _MODEL_CACHE[model_name]
def semantic_embedding(text: str, model_name: str = DEFAULT_SEMANTIC_MODEL) -> List[float]:
"""
Generate semantic embedding using sentence-transformers.
Args:
text: Input text to embed
model_name: Name of sentence-transformers model (default: all-MiniLM-L6-v2)
Returns:
List of floats representing semantic embedding vector
Raises:
ImportError: If sentence-transformers is not installed
Exception: If embedding generation fails
"""
model = _get_sentence_transformer_model(model_name)
embedding = model.encode(text, convert_to_numpy=True)
return embedding.tolist()
def deterministic_embedding(text: str, dim: int = DIM_DETERMINISTIC) -> List[float]:
"""
Generate deterministic pseudo-embedding from text using SHA-256 hashing.
This is NOT a semantic embedding - it's a consistent hash-based vector
used for testing and development without external embedding API calls.
Args:
text: Input text to embed
dim: Dimension of output vector (default: 1024)
Returns:
List of floats in range [-1, 1]
Raises:
ValueError: If dim is not positive
"""
if dim <= 0:
raise ValueError(f"Dimension must be positive, got {dim}")
vec = []
counter = 0
while len(vec) < dim:
h = hashlib.sha256((text + "|" + str(counter)).encode("utf-8")).digest()
for i in range(0, len(h), 8):
if len(vec) >= dim:
break
ull = int.from_bytes(h[i:i+8], "big", signed=False)
f = (ull / (2**64 - 1)) * 2.0 - 1.0
vec.append(float(f))
counter += 1
return vec[:dim]
def query_pinecone(
query_text: str,
top_k: int = 5,
index_name: str = None,
use_semantic: bool = True,
model_name: str = DEFAULT_SEMANTIC_MODEL
) -> List[Dict[str, Any]]:
"""
Query Pinecone index for similar chunks.
Args:
query_text: Query string to search for
top_k: Number of results to return (default: 5)
index_name: Pinecone index name (defaults to PINECONE_INDEX_NAME from config)
use_semantic: Use semantic embeddings if True, deterministic if False (default: True)
model_name: Model name for semantic embeddings (default: all-MiniLM-L6-v2)
Returns:
List of dicts with keys: id, score, metadata
Raises:
RuntimeError: If index_name not provided and PINECONE_INDEX_NAME not set
ValueError: If top_k is not positive
Exception: If Pinecone query fails
"""
# Validate inputs
if not query_text:
raise ValueError("query_text cannot be empty")
if top_k <= 0:
raise ValueError(f"top_k must be positive, got {top_k}")
# Get index name from config if not provided
if index_name is None:
import src.config as cfg
index_name = getattr(cfg, 'PINECONE_INDEX_NAME', None)
if not index_name:
raise RuntimeError(
"index_name not provided and PINECONE_INDEX_NAME not set in config"
)
# Initialize Pinecone client
api_key = os.environ.get("PINECONE_API_KEY")
if not api_key:
raise RuntimeError("PINECONE_API_KEY environment variable not set")
pc = Pinecone(api_key=api_key)
# Get index host
try:
idx_meta = pc.describe_index(index_name)
except Exception as e:
raise RuntimeError(f"Failed to describe index '{index_name}': {str(e)}")
# Handle different response formats from Pinecone SDK
host = None
if hasattr(idx_meta, "host"):
host = idx_meta.host
elif isinstance(idx_meta, dict) and "host" in idx_meta:
host = idx_meta["host"]
else:
# Try to get host from nested structures
host = idx_meta.get("host") if isinstance(idx_meta, dict) else None
if not host:
raise RuntimeError(f"Cannot determine host for index: {index_name}. Response: {idx_meta}")
# Connect to index
try:
index = pc.Index(host=host)
except Exception as e:
raise RuntimeError(f"Failed to connect to Pinecone index at {host}: {str(e)}")
# Generate query embedding
if use_semantic:
q_emb = semantic_embedding(query_text, model_name=model_name)
else:
q_emb = deterministic_embedding(query_text)
# Query index
try:
res = index.query(
vector=q_emb,
top_k=top_k,
include_metadata=True,
include_values=False
)
except Exception as e:
raise RuntimeError(f"Failed to query Pinecone index: {str(e)}")
# Normalize response format
out = []
matches = getattr(res, "matches", None) or res.get("matches", [])
# Validate matches is iterable
if not hasattr(matches, '__iter__'):
matches = []
for m in matches:
# Handle case where m might be None or not a dict/object
if not m:
continue
mid = getattr(m, "id", None) or m.get("id") if hasattr(m, 'get') else None
score = getattr(m, "score", None) or m.get("score") if hasattr(m, 'get') else 0.0
meta = getattr(m, "metadata", None) or m.get("metadata", {}) if hasattr(m, 'get') else {}
# Skip matches without ID
if not mid:
continue
out.append({
"id": mid,
"score": float(score) if score is not None else 0.0,
"metadata": meta
})
return out