""" Core retrieval module for Pinecone vector search. Functions: - deterministic_embedding(text, dim): Generate deterministic pseudo-embeddings - semantic_embedding(text, model_name): Generate semantic embeddings using sentence-transformers - query_pinecone(query_text, top_k, index_name, use_semantic): Query Pinecone index """ import os import hashlib from typing import List, Dict, Any, Optional from pinecone import Pinecone # Default dimensions DIM_DETERMINISTIC = 1024 DIM_SEMANTIC = 384 # for all-MiniLM-L6-v2 # Constants for model names DEFAULT_SEMANTIC_MODEL = "all-MiniLM-L6-v2" # Lazy-load sentence-transformers _MODEL_CACHE = {} def _get_sentence_transformer_model(model_name: str = "all-MiniLM-L6-v2"): """Lazy load and cache sentence transformer model.""" if model_name not in _MODEL_CACHE: try: from sentence_transformers import SentenceTransformer _MODEL_CACHE[model_name] = SentenceTransformer(model_name) except ImportError: raise ImportError( "sentence-transformers not installed. " "Install with: pip install sentence-transformers" ) return _MODEL_CACHE[model_name] def semantic_embedding(text: str, model_name: str = DEFAULT_SEMANTIC_MODEL) -> List[float]: """ Generate semantic embedding using sentence-transformers. Args: text: Input text to embed model_name: Name of sentence-transformers model (default: all-MiniLM-L6-v2) Returns: List of floats representing semantic embedding vector Raises: ImportError: If sentence-transformers is not installed Exception: If embedding generation fails """ model = _get_sentence_transformer_model(model_name) embedding = model.encode(text, convert_to_numpy=True) return embedding.tolist() def deterministic_embedding(text: str, dim: int = DIM_DETERMINISTIC) -> List[float]: """ Generate deterministic pseudo-embedding from text using SHA-256 hashing. This is NOT a semantic embedding - it's a consistent hash-based vector used for testing and development without external embedding API calls. Args: text: Input text to embed dim: Dimension of output vector (default: 1024) Returns: List of floats in range [-1, 1] Raises: ValueError: If dim is not positive """ if dim <= 0: raise ValueError(f"Dimension must be positive, got {dim}") vec = [] counter = 0 while len(vec) < dim: h = hashlib.sha256((text + "|" + str(counter)).encode("utf-8")).digest() for i in range(0, len(h), 8): if len(vec) >= dim: break ull = int.from_bytes(h[i:i+8], "big", signed=False) f = (ull / (2**64 - 1)) * 2.0 - 1.0 vec.append(float(f)) counter += 1 return vec[:dim] def query_pinecone( query_text: str, top_k: int = 5, index_name: str = None, use_semantic: bool = True, model_name: str = DEFAULT_SEMANTIC_MODEL ) -> List[Dict[str, Any]]: """ Query Pinecone index for similar chunks. Args: query_text: Query string to search for top_k: Number of results to return (default: 5) index_name: Pinecone index name (defaults to PINECONE_INDEX_NAME from config) use_semantic: Use semantic embeddings if True, deterministic if False (default: True) model_name: Model name for semantic embeddings (default: all-MiniLM-L6-v2) Returns: List of dicts with keys: id, score, metadata Raises: RuntimeError: If index_name not provided and PINECONE_INDEX_NAME not set ValueError: If top_k is not positive Exception: If Pinecone query fails """ # Validate inputs if not query_text: raise ValueError("query_text cannot be empty") if top_k <= 0: raise ValueError(f"top_k must be positive, got {top_k}") # Get index name from config if not provided if index_name is None: import src.config as cfg index_name = getattr(cfg, 'PINECONE_INDEX_NAME', None) if not index_name: raise RuntimeError( "index_name not provided and PINECONE_INDEX_NAME not set in config" ) # Initialize Pinecone client api_key = os.environ.get("PINECONE_API_KEY") if not api_key: raise RuntimeError("PINECONE_API_KEY environment variable not set") pc = Pinecone(api_key=api_key) # Get index host try: idx_meta = pc.describe_index(index_name) except Exception as e: raise RuntimeError(f"Failed to describe index '{index_name}': {str(e)}") # Handle different response formats from Pinecone SDK host = None if hasattr(idx_meta, "host"): host = idx_meta.host elif isinstance(idx_meta, dict) and "host" in idx_meta: host = idx_meta["host"] else: # Try to get host from nested structures host = idx_meta.get("host") if isinstance(idx_meta, dict) else None if not host: raise RuntimeError(f"Cannot determine host for index: {index_name}. Response: {idx_meta}") # Connect to index try: index = pc.Index(host=host) except Exception as e: raise RuntimeError(f"Failed to connect to Pinecone index at {host}: {str(e)}") # Generate query embedding if use_semantic: q_emb = semantic_embedding(query_text, model_name=model_name) else: q_emb = deterministic_embedding(query_text) # Query index try: res = index.query( vector=q_emb, top_k=top_k, include_metadata=True, include_values=False ) except Exception as e: raise RuntimeError(f"Failed to query Pinecone index: {str(e)}") # Normalize response format out = [] matches = getattr(res, "matches", None) or res.get("matches", []) # Validate matches is iterable if not hasattr(matches, '__iter__'): matches = [] for m in matches: # Handle case where m might be None or not a dict/object if not m: continue mid = getattr(m, "id", None) or m.get("id") if hasattr(m, 'get') else None score = getattr(m, "score", None) or m.get("score") if hasattr(m, 'get') else 0.0 meta = getattr(m, "metadata", None) or m.get("metadata", {}) if hasattr(m, 'get') else {} # Skip matches without ID if not mid: continue out.append({ "id": mid, "score": float(score) if score is not None else 0.0, "metadata": meta }) return out