File size: 6,783 Bytes
f866820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""
Core retrieval module for Pinecone vector search.

Functions:
- deterministic_embedding(text, dim): Generate deterministic pseudo-embeddings
- semantic_embedding(text, model_name): Generate semantic embeddings using sentence-transformers
- query_pinecone(query_text, top_k, index_name, use_semantic): Query Pinecone index
"""

import os
import hashlib
from typing import List, Dict, Any, Optional
from pinecone import Pinecone


# Default dimensions
DIM_DETERMINISTIC = 1024
DIM_SEMANTIC = 384  # for all-MiniLM-L6-v2

# Constants for model names
DEFAULT_SEMANTIC_MODEL = "all-MiniLM-L6-v2"

# Lazy-load sentence-transformers
_MODEL_CACHE = {}

def _get_sentence_transformer_model(model_name: str = "all-MiniLM-L6-v2"):
    """Lazy load and cache sentence transformer model."""
    if model_name not in _MODEL_CACHE:
        try:
            from sentence_transformers import SentenceTransformer
            _MODEL_CACHE[model_name] = SentenceTransformer(model_name)
        except ImportError:
            raise ImportError(
                "sentence-transformers not installed. "
                "Install with: pip install sentence-transformers"
            )
    return _MODEL_CACHE[model_name]


def semantic_embedding(text: str, model_name: str = DEFAULT_SEMANTIC_MODEL) -> List[float]:
    """
    Generate semantic embedding using sentence-transformers.

    Args:
        text: Input text to embed
        model_name: Name of sentence-transformers model (default: all-MiniLM-L6-v2)

    Returns:
        List of floats representing semantic embedding vector
        
    Raises:
        ImportError: If sentence-transformers is not installed
        Exception: If embedding generation fails
    """
    model = _get_sentence_transformer_model(model_name)
    embedding = model.encode(text, convert_to_numpy=True)
    return embedding.tolist()


def deterministic_embedding(text: str, dim: int = DIM_DETERMINISTIC) -> List[float]:
    """
    Generate deterministic pseudo-embedding from text using SHA-256 hashing.

    This is NOT a semantic embedding - it's a consistent hash-based vector
    used for testing and development without external embedding API calls.

    Args:
        text: Input text to embed
        dim: Dimension of output vector (default: 1024)

    Returns:
        List of floats in range [-1, 1]
        
    Raises:
        ValueError: If dim is not positive
    """
    if dim <= 0:
        raise ValueError(f"Dimension must be positive, got {dim}")
    vec = []
    counter = 0

    while len(vec) < dim:
        h = hashlib.sha256((text + "|" + str(counter)).encode("utf-8")).digest()
        for i in range(0, len(h), 8):
            if len(vec) >= dim:
                break
            ull = int.from_bytes(h[i:i+8], "big", signed=False)
            f = (ull / (2**64 - 1)) * 2.0 - 1.0
            vec.append(float(f))
        counter += 1

    return vec[:dim]


def query_pinecone(
    query_text: str,
    top_k: int = 5,
    index_name: str = None,
    use_semantic: bool = True,
    model_name: str = DEFAULT_SEMANTIC_MODEL
) -> List[Dict[str, Any]]:
    """
    Query Pinecone index for similar chunks.
    
    Args:
        query_text: Query string to search for
        top_k: Number of results to return (default: 5)
        index_name: Pinecone index name (defaults to PINECONE_INDEX_NAME from config)
        use_semantic: Use semantic embeddings if True, deterministic if False (default: True)
        model_name: Model name for semantic embeddings (default: all-MiniLM-L6-v2)
        
    Returns:
        List of dicts with keys: id, score, metadata
        
    Raises:
        RuntimeError: If index_name not provided and PINECONE_INDEX_NAME not set
        ValueError: If top_k is not positive
        Exception: If Pinecone query fails
    """
    # Validate inputs
    if not query_text:
        raise ValueError("query_text cannot be empty")
    if top_k <= 0:
        raise ValueError(f"top_k must be positive, got {top_k}")
        
    # Get index name from config if not provided
    if index_name is None:
        import src.config as cfg
        index_name = getattr(cfg, 'PINECONE_INDEX_NAME', None)
        if not index_name:
            raise RuntimeError(
                "index_name not provided and PINECONE_INDEX_NAME not set in config"
            )

    # Initialize Pinecone client
    api_key = os.environ.get("PINECONE_API_KEY")
    if not api_key:
        raise RuntimeError("PINECONE_API_KEY environment variable not set")

    pc = Pinecone(api_key=api_key)

    # Get index host
    try:
        idx_meta = pc.describe_index(index_name)
    except Exception as e:
        raise RuntimeError(f"Failed to describe index '{index_name}': {str(e)}")
        
    # Handle different response formats from Pinecone SDK
    host = None
    if hasattr(idx_meta, "host"):
        host = idx_meta.host
    elif isinstance(idx_meta, dict) and "host" in idx_meta:
        host = idx_meta["host"]
    else:
        # Try to get host from nested structures
        host = idx_meta.get("host") if isinstance(idx_meta, dict) else None
        
    if not host:
        raise RuntimeError(f"Cannot determine host for index: {index_name}. Response: {idx_meta}")

    # Connect to index
    try:
        index = pc.Index(host=host)
    except Exception as e:
        raise RuntimeError(f"Failed to connect to Pinecone index at {host}: {str(e)}")

    # Generate query embedding
    if use_semantic:
        q_emb = semantic_embedding(query_text, model_name=model_name)
    else:
        q_emb = deterministic_embedding(query_text)

    # Query index
    try:
        res = index.query(
            vector=q_emb,
            top_k=top_k,
            include_metadata=True,
            include_values=False
        )
    except Exception as e:
        raise RuntimeError(f"Failed to query Pinecone index: {str(e)}")

    # Normalize response format
    out = []
    matches = getattr(res, "matches", None) or res.get("matches", [])
    
    # Validate matches is iterable
    if not hasattr(matches, '__iter__'):
        matches = []
    
    for m in matches:
        # Handle case where m might be None or not a dict/object
        if not m:
            continue
            
        mid = getattr(m, "id", None) or m.get("id") if hasattr(m, 'get') else None
        score = getattr(m, "score", None) or m.get("score") if hasattr(m, 'get') else 0.0
        meta = getattr(m, "metadata", None) or m.get("metadata", {}) if hasattr(m, 'get') else {}
        
        # Skip matches without ID
        if not mid:
            continue
            
        out.append({
            "id": mid,
            "score": float(score) if score is not None else 0.0,
            "metadata": meta
        })
        
    return out