Spaces:
Sleeping
Sleeping
File size: 6,783 Bytes
f866820 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """
Core retrieval module for Pinecone vector search.
Functions:
- deterministic_embedding(text, dim): Generate deterministic pseudo-embeddings
- semantic_embedding(text, model_name): Generate semantic embeddings using sentence-transformers
- query_pinecone(query_text, top_k, index_name, use_semantic): Query Pinecone index
"""
import os
import hashlib
from typing import List, Dict, Any, Optional
from pinecone import Pinecone
# Default dimensions
DIM_DETERMINISTIC = 1024
DIM_SEMANTIC = 384 # for all-MiniLM-L6-v2
# Constants for model names
DEFAULT_SEMANTIC_MODEL = "all-MiniLM-L6-v2"
# Lazy-load sentence-transformers
_MODEL_CACHE = {}
def _get_sentence_transformer_model(model_name: str = "all-MiniLM-L6-v2"):
"""Lazy load and cache sentence transformer model."""
if model_name not in _MODEL_CACHE:
try:
from sentence_transformers import SentenceTransformer
_MODEL_CACHE[model_name] = SentenceTransformer(model_name)
except ImportError:
raise ImportError(
"sentence-transformers not installed. "
"Install with: pip install sentence-transformers"
)
return _MODEL_CACHE[model_name]
def semantic_embedding(text: str, model_name: str = DEFAULT_SEMANTIC_MODEL) -> List[float]:
"""
Generate semantic embedding using sentence-transformers.
Args:
text: Input text to embed
model_name: Name of sentence-transformers model (default: all-MiniLM-L6-v2)
Returns:
List of floats representing semantic embedding vector
Raises:
ImportError: If sentence-transformers is not installed
Exception: If embedding generation fails
"""
model = _get_sentence_transformer_model(model_name)
embedding = model.encode(text, convert_to_numpy=True)
return embedding.tolist()
def deterministic_embedding(text: str, dim: int = DIM_DETERMINISTIC) -> List[float]:
"""
Generate deterministic pseudo-embedding from text using SHA-256 hashing.
This is NOT a semantic embedding - it's a consistent hash-based vector
used for testing and development without external embedding API calls.
Args:
text: Input text to embed
dim: Dimension of output vector (default: 1024)
Returns:
List of floats in range [-1, 1]
Raises:
ValueError: If dim is not positive
"""
if dim <= 0:
raise ValueError(f"Dimension must be positive, got {dim}")
vec = []
counter = 0
while len(vec) < dim:
h = hashlib.sha256((text + "|" + str(counter)).encode("utf-8")).digest()
for i in range(0, len(h), 8):
if len(vec) >= dim:
break
ull = int.from_bytes(h[i:i+8], "big", signed=False)
f = (ull / (2**64 - 1)) * 2.0 - 1.0
vec.append(float(f))
counter += 1
return vec[:dim]
def query_pinecone(
query_text: str,
top_k: int = 5,
index_name: str = None,
use_semantic: bool = True,
model_name: str = DEFAULT_SEMANTIC_MODEL
) -> List[Dict[str, Any]]:
"""
Query Pinecone index for similar chunks.
Args:
query_text: Query string to search for
top_k: Number of results to return (default: 5)
index_name: Pinecone index name (defaults to PINECONE_INDEX_NAME from config)
use_semantic: Use semantic embeddings if True, deterministic if False (default: True)
model_name: Model name for semantic embeddings (default: all-MiniLM-L6-v2)
Returns:
List of dicts with keys: id, score, metadata
Raises:
RuntimeError: If index_name not provided and PINECONE_INDEX_NAME not set
ValueError: If top_k is not positive
Exception: If Pinecone query fails
"""
# Validate inputs
if not query_text:
raise ValueError("query_text cannot be empty")
if top_k <= 0:
raise ValueError(f"top_k must be positive, got {top_k}")
# Get index name from config if not provided
if index_name is None:
import src.config as cfg
index_name = getattr(cfg, 'PINECONE_INDEX_NAME', None)
if not index_name:
raise RuntimeError(
"index_name not provided and PINECONE_INDEX_NAME not set in config"
)
# Initialize Pinecone client
api_key = os.environ.get("PINECONE_API_KEY")
if not api_key:
raise RuntimeError("PINECONE_API_KEY environment variable not set")
pc = Pinecone(api_key=api_key)
# Get index host
try:
idx_meta = pc.describe_index(index_name)
except Exception as e:
raise RuntimeError(f"Failed to describe index '{index_name}': {str(e)}")
# Handle different response formats from Pinecone SDK
host = None
if hasattr(idx_meta, "host"):
host = idx_meta.host
elif isinstance(idx_meta, dict) and "host" in idx_meta:
host = idx_meta["host"]
else:
# Try to get host from nested structures
host = idx_meta.get("host") if isinstance(idx_meta, dict) else None
if not host:
raise RuntimeError(f"Cannot determine host for index: {index_name}. Response: {idx_meta}")
# Connect to index
try:
index = pc.Index(host=host)
except Exception as e:
raise RuntimeError(f"Failed to connect to Pinecone index at {host}: {str(e)}")
# Generate query embedding
if use_semantic:
q_emb = semantic_embedding(query_text, model_name=model_name)
else:
q_emb = deterministic_embedding(query_text)
# Query index
try:
res = index.query(
vector=q_emb,
top_k=top_k,
include_metadata=True,
include_values=False
)
except Exception as e:
raise RuntimeError(f"Failed to query Pinecone index: {str(e)}")
# Normalize response format
out = []
matches = getattr(res, "matches", None) or res.get("matches", [])
# Validate matches is iterable
if not hasattr(matches, '__iter__'):
matches = []
for m in matches:
# Handle case where m might be None or not a dict/object
if not m:
continue
mid = getattr(m, "id", None) or m.get("id") if hasattr(m, 'get') else None
score = getattr(m, "score", None) or m.get("score") if hasattr(m, 'get') else 0.0
meta = getattr(m, "metadata", None) or m.get("metadata", {}) if hasattr(m, 'get') else {}
# Skip matches without ID
if not mid:
continue
out.append({
"id": mid,
"score": float(score) if score is not None else 0.0,
"metadata": meta
})
return out |