Spaces:
Running
Running
| """ | |
| localisation/embedding_retriever.py | |
| ───────────────────────────────────── | |
| Stage 1b — Dense embedding retrieval over repo file corpus. | |
| Uses OpenAI text-embedding-3-small (1536-dim) to encode: | |
| - Each file's summary_text (docstrings + function/class names + imports) | |
| - The issue query text | |
| Similarity is computed via cosine distance using FAISS IndexFlatIP | |
| (Inner Product on L2-normalised vectors == cosine similarity). | |
| Embedding cache: | |
| - Key: SHA-256 of the text being embedded | |
| - Backend: diskcache (local) or JSON fallback | |
| - A file whose content hasn't changed reuses its cached embedding | |
| - This is critical for latency: ~500 files × 0ms (cached) vs ~5s (fresh) | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| EMBEDDING_DIM = 1536 # text-embedding-3-small dimension | |
| # ── Embedding cache ─────────────────────────────────────────────────────────── | |
| class EmbeddingCache: | |
| """ | |
| SHA-256-keyed cache for embedding vectors. | |
| Avoids re-embedding files whose content hasn't changed. | |
| """ | |
| def __init__(self, cache_dir: Path): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| self._dc = None | |
| self._try_init_diskcache() | |
| def _try_init_diskcache(self) -> None: | |
| try: | |
| import diskcache | |
| self._dc = diskcache.Cache(str(self.cache_dir / "embeddings")) | |
| logger.debug("EmbeddingCache: using diskcache backend") | |
| except ImportError: | |
| logger.debug("EmbeddingCache: using JSON fallback") | |
| def get(self, text_hash: str) -> Optional[np.ndarray]: | |
| key = f"emb:{text_hash}" | |
| if self._dc is not None: | |
| raw = self._dc.get(key) | |
| else: | |
| p = self.cache_dir / f"{text_hash}.json" | |
| raw = p.read_text() if p.exists() else None | |
| if raw is None: | |
| return None | |
| return np.array(json.loads(raw), dtype=np.float32) | |
| def set(self, text_hash: str, vector: np.ndarray) -> None: | |
| key = f"emb:{text_hash}" | |
| serialised = json.dumps(vector.tolist()) | |
| if self._dc is not None: | |
| self._dc.set(key, serialised) | |
| else: | |
| p = self.cache_dir / f"{text_hash}.json" | |
| p.write_text(serialised) | |
| def stats(self) -> dict: | |
| if self._dc is not None: | |
| return {"backend": "diskcache", "size": len(self._dc)} | |
| return {"backend": "json_files"} | |
| def _sha256(text: str) -> str: | |
| return hashlib.sha256(text.encode()).hexdigest() | |
| # ── Embedding retriever ─────────────────────────────────────────────────────── | |
| class EmbeddingRetriever: | |
| """ | |
| Dense retrieval using OpenAI embeddings + FAISS index. | |
| Usage: | |
| retriever = EmbeddingRetriever(cache_dir=Path(".cache/embeddings")) | |
| retriever.index(file_symbols_list) | |
| hits = retriever.query("Fix null pointer in filter()", top_k=20) | |
| """ | |
| def __init__( | |
| self, | |
| model: str = "text-embedding-3-small", | |
| cache_dir: Path = Path(".cache/embeddings"), | |
| batch_size: int = 100, | |
| ): | |
| self.model = model | |
| self.batch_size = batch_size | |
| self.cache = EmbeddingCache(cache_dir) | |
| self._index = None # FAISS index | |
| self._file_paths: list[str] = [] | |
| self._embeddings: Optional[np.ndarray] = None | |
| def index(self, file_symbols_list, show_progress: bool = False) -> dict: | |
| """ | |
| Build FAISS index from FileSymbols. | |
| Returns: | |
| stats dict: {total, cached, fresh, elapsed} | |
| """ | |
| texts = [] | |
| paths = [] | |
| hashes = [] | |
| for fs in file_symbols_list: | |
| if fs.parse_error or not fs.summary_text.strip(): | |
| continue | |
| paths.append(fs.file_path) | |
| texts.append(fs.summary_text[:2000]) # token budget | |
| hashes.append(_sha256(fs.summary_text)) | |
| # Check cache for each file | |
| cached_vecs: dict[int, np.ndarray] = {} | |
| uncached_indices: list[int] = [] | |
| uncached_texts: list[str] = [] | |
| for i, (text_hash, text) in enumerate(zip(hashes, texts)): | |
| vec = self.cache.get(text_hash) | |
| if vec is not None: | |
| cached_vecs[i] = vec | |
| else: | |
| uncached_indices.append(i) | |
| uncached_texts.append(text) | |
| logger.info( | |
| "Embedding index: %d total, %d cached, %d to embed", | |
| len(texts), len(cached_vecs), len(uncached_texts) | |
| ) | |
| # Embed uncached texts in batches | |
| start = time.monotonic() | |
| fresh_vecs: dict[int, np.ndarray] = {} | |
| if uncached_texts: | |
| all_fresh = self._embed_texts(uncached_texts, show_progress) | |
| for list_idx, (original_idx, text_hash) in enumerate( | |
| zip(uncached_indices, [hashes[i] for i in uncached_indices]) | |
| ): | |
| vec = all_fresh[list_idx] | |
| fresh_vecs[original_idx] = vec | |
| self.cache.set(text_hash, vec) | |
| elapsed = time.monotonic() - start | |
| # Assemble all embeddings in order | |
| all_vecs = [] | |
| self._file_paths = [] | |
| for i, fp in enumerate(paths): | |
| vec = cached_vecs.get(i) or fresh_vecs.get(i) | |
| if vec is not None: | |
| all_vecs.append(vec) | |
| self._file_paths.append(fp) | |
| if not all_vecs: | |
| logger.warning("No embeddings produced — index is empty") | |
| return {"total": 0, "cached": 0, "fresh": 0, "elapsed": elapsed} | |
| self._embeddings = np.vstack(all_vecs).astype(np.float32) | |
| # L2-normalise for cosine similarity via inner product | |
| norms = np.linalg.norm(self._embeddings, axis=1, keepdims=True) | |
| norms = np.where(norms == 0, 1.0, norms) | |
| self._embeddings = self._embeddings / norms | |
| self._build_faiss_index() | |
| return { | |
| "total": len(texts), | |
| "cached": len(cached_vecs), | |
| "fresh": len(uncached_texts), | |
| "elapsed": round(elapsed, 2), | |
| } | |
| def query(self, query_text: str, top_k: int = 20) -> list[tuple[str, float, int]]: | |
| """ | |
| Retrieve top-k files by cosine similarity to query. | |
| Returns: | |
| List of (file_path, cosine_score, rank) | |
| """ | |
| if self._index is None or not self._file_paths: | |
| raise RuntimeError("EmbeddingRetriever not indexed. Call .index() first.") | |
| query_vec = self._embed_texts([query_text[:2000]])[0] | |
| query_vec = query_vec / (np.linalg.norm(query_vec) or 1.0) | |
| query_vec = query_vec.reshape(1, -1).astype(np.float32) | |
| k = min(top_k, len(self._file_paths)) | |
| scores, indices = self._index.search(query_vec, k) | |
| results = [] | |
| for rank, (idx, score) in enumerate(zip(indices[0], scores[0]), start=1): | |
| if idx >= 0: | |
| results.append((self._file_paths[idx], float(score), rank)) | |
| return results | |
| def _embed_texts(self, texts: list[str], show_progress: bool = False) -> list[np.ndarray]: | |
| """Call OpenAI embeddings API in batches.""" | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI() | |
| except ImportError as e: | |
| raise ImportError("Install openai: pip install openai") from e | |
| all_vecs = [] | |
| for i in range(0, len(texts), self.batch_size): | |
| batch = texts[i: i + self.batch_size] | |
| if show_progress: | |
| logger.info("Embedding batch %d/%d", i // self.batch_size + 1, | |
| (len(texts) + self.batch_size - 1) // self.batch_size) | |
| response = client.embeddings.create(model=self.model, input=batch) | |
| for item in response.data: | |
| all_vecs.append(np.array(item.embedding, dtype=np.float32)) | |
| return all_vecs | |
| def _build_faiss_index(self) -> None: | |
| """Build FAISS IndexFlatIP (inner product = cosine after normalisation).""" | |
| try: | |
| import faiss | |
| dim = self._embeddings.shape[1] | |
| self._index = faiss.IndexFlatIP(dim) | |
| self._index.add(self._embeddings) | |
| logger.info("FAISS index built: %d vectors, dim=%d", len(self._file_paths), dim) | |
| except ImportError: | |
| logger.warning("FAISS not available — falling back to numpy dot product search") | |
| self._index = _NumpyFallbackIndex(self._embeddings) | |
| class _NumpyFallbackIndex: | |
| """Pure numpy inner-product search — no FAISS dependency needed.""" | |
| def __init__(self, matrix: np.ndarray): | |
| self._matrix = matrix | |
| def search(self, query: np.ndarray, k: int): | |
| scores = (self._matrix @ query.T).flatten() | |
| top_k = min(k, len(scores)) | |
| indices = np.argsort(-scores)[:top_k] | |
| return scores[indices].reshape(1, -1), indices.reshape(1, -1) | |