Spaces:
Sleeping
Sleeping
File size: 4,450 Bytes
234574a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """NumPy-vectorized cosine similarity - fixes BUG-005.
Replaces Python-level for-loop with O(dim) iteration with NumPy vectorized
operations. 384-dim embeddings: 1000 comparisons go from 384,000 Python ops
to ~20 NumPy calls under GIL release.
Usage:
similarity = cosine_similarity(query_embedding, candidate_embedding)
batch_scores = batch_cosine_similarity(query_embedding, list_of_embeddings)
"""
import asyncio
from typing import Optional
import numpy as np
def normalize(vec: np.ndarray) -> np.ndarray:
"""L2 normalize a vector or matrix."""
norm = np.linalg.norm(vec, axis=-1, keepdims=True)
norm = np.where(norm == 0, 1, norm)
return vec / norm
def cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float:
"""
Compute cosine similarity between two vectors.
Args:
vec_a: First vector (any shape)
vec_b: Second vector (must match vec_a shape)
Returns:
Cosine similarity in range [-1, 1]
"""
a_norm = normalize(vec_a.reshape(1, -1))
b_norm = normalize(vec_b.reshape(1, -1))
return float(np.dot(a_norm, b_norm.T).item())
def batch_cosine_similarity(
query: np.ndarray,
candidates: np.ndarray,
) -> np.ndarray:
"""
Compute cosine similarity between one query and N candidates.
Vectorized NumPy - no Python loops.
Args:
query: Query vector (dim,) or (1, dim)
candidates: Candidate matrix (N, dim)
Returns:
Array of N similarity scores
"""
# Ensure 2D
if query.ndim == 1:
query = query.reshape(1, -1)
# Normalize
q_norm = normalize(query)
c_norm = normalize(candidates)
# Inner product = cosine similarity (after normalization)
scores = np.dot(q_norm, c_norm.T).flatten()
return scores
async def batch_cosine_similarity_async(
query: list[float],
candidates: list[list[float]],
) -> np.ndarray:
"""
Async wrapper for batch cosine similarity.
Runs CPU-bound computation in ThreadPoolExecutor.
Args:
query: Query embedding vector
candidates: List of candidate embedding vectors
Returns:
Array of similarity scores
"""
loop = asyncio.get_event_loop()
q_arr = np.array(query, dtype=np.float32)
c_arr = np.array(candidates, dtype=np.float32)
return await loop.run_in_executor(
None, batch_cosine_similarity, q_arr, c_arr
)
class VectorizedSimilarity:
"""
Pre-compiled similarity engine for repeated queries.
Avoids repeated normalization of candidates.
"""
def __init__(self, dim: int = 384):
self._dim = dim
self._candidates: Optional[np.ndarray] = None
self._candidate_ids: list[str] = []
def index(self, agent_id: str, embedding: list[float]) -> None:
"""Add embedding to index."""
vec = np.array(embedding, dtype=np.float32).reshape(1, -1)
norm = normalize(vec)
if self._candidates is None:
self._candidates = norm
else:
self._candidates = np.vstack([self._candidates, norm])
self._candidate_ids.append(agent_id)
def search(
self,
query: list[float],
k: int = 10,
threshold: float = 0.85,
) -> list[tuple[str, float]]:
"""
Find top-k similar entries above threshold.
Args:
query: Query embedding
k: Return top k results
threshold: Minimum similarity score
Returns:
List of (agent_id, similarity) tuples
"""
if self._candidates is None:
return []
q_arr = np.array(query, dtype=np.float32)
scores = batch_cosine_similarity(q_arr, self._candidates)
# Get top k indices
top_k_idx = np.argsort(scores)[-k:][::-1]
results = []
for idx in top_k_idx:
score = float(scores[idx])
if score < threshold:
continue
agent_id = self._candidate_ids[idx]
results.append((agent_id, score))
return results
@property
def size(self) -> int:
"""Number of indexed entries."""
return len(self._candidate_ids)
def clear(self) -> None:
"""Clear index."""
self._candidates = None
self._candidate_ids = []
|