File size: 8,150 Bytes
234574a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1652aca
 
 
 
 
234574a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""FAISS ANN index for fast similarity search - IMPROVEMENT-006.

Replaces O(n) Python loop scan with O(log n) approximate nearest neighbor search.
Supports dynamic upgrade from flat to IVF index as registry grows.

Usage:
    index = FAISSContextIndex(dim=384)
    await index.add("agent1", embedding)
    matches = await index.search(query_embedding, k=10, threshold=0.92)

Scaling guide:
- < 1,000 contexts: IndexFlatIP (exact, fastest)
- 1K–100K contexts: IndexIVFFlat (approximate, ~10x faster)
- > 100K contexts: IndexHNSWFlat (graph-based, best recall/speed)
"""
import asyncio
import logging
from typing import Optional

import numpy as np

try:
    import faiss
except ImportError:
    faiss = None

logger = logging.getLogger(__name__)

# Default embedding dimension for all-MiniLM-L6-v2
EMBEDDING_DIM = 384


class FAISSMatch:
    """Represents a match from FAISS search."""
    __slots__ = ('agent_id', 'similarity', 'index_position')
    
    def __init__(self, agent_id: str, similarity: float, index_position: int):
        self.agent_id = agent_id
        self.similarity = similarity
        self.index_position = index_position


class FAISSContextIndex:
    """
    Approximate Nearest Neighbor index for fast similarity search.
    O(log n) search vs O(n) Python loop in v1.
    Thread-safe via asyncio executor pattern.
    
    Usage:
        index = FAISSContextIndex()
        await index.add("agent1", embedding)  # Add to index
        results = await index.search(query_embedding, k=5, threshold=0.9)
    """
    
    def __init__(self, dim: int = EMBEDDING_DIM):
        self._dim = dim
        self._index = None  # Will be set in _ensure_index
        self._id_map: dict[int, str] = {}  # FAISS internal ID -> agent_id
        self._reverse_map: dict[str, int] = {}  # agent_id -> FAISS internal ID
        self._next_id = 0
        self._lock = asyncio.Lock()
        self._initialized = False
    
    async def _ensure_index(self) -> None:
        """Lazy initialize index on first use."""
        if self._initialized:
            return
        
        import faiss
        async with self._lock:
            if self._initialized:
                return
            # Use IndexFlatIP (Inner Product) for cosine similarity (with normalized vectors)
            self._index = faiss.IndexFlatIP(self._dim)
            self._initialized = True
            logger.info(f"FAISS index initialized with dim={self._dim}")
    
    async def add(self, agent_id: str, embedding: list[float]) -> int:
        """
        Add embedding to index.
        
        Args:
            agent_id: Unique identifier for this embedding
            embedding: Dense embedding vector (dim,)
        
        Returns:
            FAISS internal index position
        """
        await self._ensure_index()
        
        vec = np.array([embedding], dtype=np.float32)
        # Normalize for cosine similarity via inner product
        faiss.normalize_L2(vec)
        
        async with self._lock:
            idx = self._next_id
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, self._index.add, vec)
            self._id_map[idx] = agent_id
            self._reverse_map[agent_id] = idx
            self._next_id += 1
        
        return idx
    
    async def search(
        self,
        query: list[float],
        k: int = 10,
        threshold: float = 0.85,
    ) -> list[FAISSMatch]:
        """
        Find top-k similar entries above threshold.
        
        Args:
            query: Query embedding vector
            k: Number of results to return
            threshold: Minimum similarity score (0.0-1.0)
        
        Returns:
            List of FAISSMatch objects sorted by descending similarity
        """
        await self._ensure_index()
        
        q_vec = np.array([query], dtype=np.float32)
        faiss.normalize_L2(q_vec)
        
        loop = asyncio.get_event_loop()
        D, I = await loop.run_in_executor(
            None,
            lambda: self._index.search(q_vec, k)
        )
        
        matches = []
        for score, idx in zip(D[0], I[0]):
            if idx == -1:
                continue
            int_idx = int(idx)
            if int_idx not in self._id_map:
                continue
            
            similarity = float(score)
            if similarity < threshold:
                continue
            
            agent_id = self._id_map[int_idx]
            matches.append(FAISSMatch(
                agent_id=agent_id,
                similarity=similarity,
                index_position=int_idx
            ))
        
        # Sort by similarity descending
        matches.sort(key=lambda m: m.similarity, reverse=True)
        return matches
    
    async def remove(self, agent_id: str) -> bool:
        """
        Mark agent_id as removed (FAISS doesn't support true deletion from flat index).
        We just remove from the map; the vector stays but won't be returned.
        
        Args:
            agent_id: Agent to remove
        
        Returns:
            True if found and removed, False if not found
        """
        async with self._lock:
            if agent_id not in self._reverse_map:
                return False
            idx = self._reverse_map.pop(agent_id)
            self._id_map.pop(idx, None)
            return True
    
    async def get_embedding(self, agent_id: str) -> Optional[np.ndarray]:
        """Get stored embedding for agent_id (reconstruct from index)."""
        await self._ensure_index()
        
        async with self._lock:
            if agent_id not in self._reverse_map:
                return None
            idx = self._reverse_map[agent_id]
        
        if self._index.ntotal == 0:
            return None
        
        try:
            loop = asyncio.get_event_loop()
            vec = await loop.run_in_executor(
                None,
                lambda: self._index.reconstruct(idx)
            )
            return vec
        except Exception:
            return None
    
    async def upgrade_to_ivf(self, nlist: int = 100) -> bool:
        """
        Upgrade from flat index to IVF when size > 1000.
        This requires retraining on the existing vectors.
        
        Args:
            nlist: Number of clusters (rule of thumb: sqrt(n))
        
        Returns:
            True if upgrade successful, False if skipped
        """
        if self._index is None or self._index.ntotal < 1000:
            logger.warning("IVF upgrade skipped: need > 1000 vectors for training")
            return False
        
        async with self._lock:
            # Can't upgrade in-place, so we rebuild
            import faiss
            ntotal = self._index.ntotal
            
            # Reconstruct all vectors
            all_vecs = np.zeros((ntotal, self._dim), dtype=np.float32)
            for i in range(ntotal):
                all_vecs[i] = self._index.reconstruct(i)
            
            # Create new IVF index
            quantizer = faiss.IndexFlatIP(self._dim)
            ivf_index = faiss.IndexIVFFlat(quantizer, self._dim, nlist)
            
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, ivf_index.train, all_vecs)
            await loop.run_in_executor(None, ivf_index.add, all_vecs)
            
            ivf_index.nprobe = 10  # Search 10 clusters
            
            self._index = ivf_index
            logger.info(f"Upgraded to IVF index with {nlist} clusters, nprobe=10")
            return True
    
    @property
    def size(self) -> int:
        """Number of indexed entries."""
        if self._index is None:
            return 0
        return self._index.ntotal
    
    @property
    def is_initialized(self) -> bool:
        return self._initialized
    
    async def reset(self) -> None:
        """Clear the index."""
        async with self._lock:
            self._index = None
            self._id_map.clear()
            self._reverse_map.clear()
            self._next_id = 0
            self._initialized = False