Spaces:
Sleeping
Sleeping
| """NumPy-vectorized cosine similarity - fixes BUG-005. | |
| Replaces Python-level for-loop with O(dim) iteration with NumPy vectorized | |
| operations. 384-dim embeddings: 1000 comparisons go from 384,000 Python ops | |
| to ~20 NumPy calls under GIL release. | |
| Usage: | |
| similarity = cosine_similarity(query_embedding, candidate_embedding) | |
| batch_scores = batch_cosine_similarity(query_embedding, list_of_embeddings) | |
| """ | |
| import asyncio | |
| from typing import Optional | |
| import numpy as np | |
| def normalize(vec: np.ndarray) -> np.ndarray: | |
| """L2 normalize a vector or matrix.""" | |
| norm = np.linalg.norm(vec, axis=-1, keepdims=True) | |
| norm = np.where(norm == 0, 1, norm) | |
| return vec / norm | |
| def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float: | |
| """ | |
| Compute cosine similarity between two vectors. | |
| Args: | |
| vec_a: First vector (any shape) | |
| vec_b: Second vector (must match vec_a shape) | |
| Returns: | |
| Cosine similarity in range [-1, 1] | |
| """ | |
| a_norm = normalize(vec_a.reshape(1, -1)) | |
| b_norm = normalize(vec_b.reshape(1, -1)) | |
| return float(np.dot(a_norm, b_norm.T).item()) | |
| def batch_cosine_similarity( | |
| query: np.ndarray, | |
| candidates: np.ndarray, | |
| ) -> np.ndarray: | |
| """ | |
| Compute cosine similarity between one query and N candidates. | |
| Vectorized NumPy - no Python loops. | |
| Args: | |
| query: Query vector (dim,) or (1, dim) | |
| candidates: Candidate matrix (N, dim) | |
| Returns: | |
| Array of N similarity scores | |
| """ | |
| # Ensure 2D | |
| if query.ndim == 1: | |
| query = query.reshape(1, -1) | |
| # Normalize | |
| q_norm = normalize(query) | |
| c_norm = normalize(candidates) | |
| # Inner product = cosine similarity (after normalization) | |
| scores = np.dot(q_norm, c_norm.T).flatten() | |
| return scores | |
| async def batch_cosine_similarity_async( | |
| query: list[float], | |
| candidates: list[list[float]], | |
| ) -> np.ndarray: | |
| """ | |
| Async wrapper for batch cosine similarity. | |
| Runs CPU-bound computation in ThreadPoolExecutor. | |
| Args: | |
| query: Query embedding vector | |
| candidates: List of candidate embedding vectors | |
| Returns: | |
| Array of similarity scores | |
| """ | |
| loop = asyncio.get_event_loop() | |
| q_arr = np.array(query, dtype=np.float32) | |
| c_arr = np.array(candidates, dtype=np.float32) | |
| return await loop.run_in_executor( | |
| None, batch_cosine_similarity, q_arr, c_arr | |
| ) | |
| class VectorizedSimilarity: | |
| """ | |
| Pre-compiled similarity engine for repeated queries. | |
| Avoids repeated normalization of candidates. | |
| """ | |
| def __init__(self, dim: int = 384): | |
| self._dim = dim | |
| self._candidates: Optional[np.ndarray] = None | |
| self._candidate_ids: list[str] = [] | |
| def index(self, agent_id: str, embedding: list[float]) -> None: | |
| """Add embedding to index.""" | |
| vec = np.array(embedding, dtype=np.float32).reshape(1, -1) | |
| norm = normalize(vec) | |
| if self._candidates is None: | |
| self._candidates = norm | |
| else: | |
| self._candidates = np.vstack([self._candidates, norm]) | |
| self._candidate_ids.append(agent_id) | |
| def search( | |
| self, | |
| query: list[float], | |
| k: int = 10, | |
| threshold: float = 0.85, | |
| ) -> list[tuple[str, float]]: | |
| """ | |
| Find top-k similar entries above threshold. | |
| Args: | |
| query: Query embedding | |
| k: Return top k results | |
| threshold: Minimum similarity score | |
| Returns: | |
| List of (agent_id, similarity) tuples | |
| """ | |
| if self._candidates is None: | |
| return [] | |
| q_arr = np.array(query, dtype=np.float32) | |
| scores = batch_cosine_similarity(q_arr, self._candidates) | |
| # Get top k indices | |
| top_k_idx = np.argsort(scores)[-k:][::-1] | |
| results = [] | |
| for idx in top_k_idx: | |
| score = float(scores[idx]) | |
| if score < threshold: | |
| continue | |
| agent_id = self._candidate_ids[idx] | |
| results.append((agent_id, score)) | |
| return results | |
| def size(self) -> int: | |
| """Number of indexed entries.""" | |
| return len(self._candidate_ids) | |
| def clear(self) -> None: | |
| """Clear index.""" | |
| self._candidates = None | |
| self._candidate_ids = [] | |