"""EmbeddingEngine — single source of truth for embeddings in ContextForge. Primary backend: Qwen3-Embedding-0.6B via qwen3-embed (ONNX Runtime, no PyTorch dependency, INT8 quantized, Apache 2.0). Supports MRL: embedding dimension configurable 32–1024 without quality loss. Fallback: xorshift hash pseudo-embedding (preserves V3 compatibility). Reference: Qwen3-Embedding-0.6B, HuggingFace, June 2025. https://huggingface.co/Qwen/Qwen3-Embedding-0.6B V4.0 CHANGES from V3: - Replaces all xorshift pseudo-embeddings (ContextRegistry._token_ids_to_embedding, AnchorPool._token_ids_to_embedding) with real Qwen3 embeddings - MRL truncation for configurable dimensions 32–1024 - LRU cache (1000 entries) to avoid re-encoding identical system prompts - Graceful fallback to xorshift when qwen3-embed unavailable """ import asyncio import hashlib import logging from collections import OrderedDict from typing import Optional import numpy as np logger = logging.getLogger(__name__) # MRL full dimension for Qwen3-Embedding-0.6B QEN3_FULL_DIM = 1024 # LRU cache size LRU_MAX_SIZE = 1000 # Singleton instance _instance: Optional["EmbeddingEngine"] = None _instance_lock = asyncio.Lock() class EmbeddingEngine: """ Unified semantic embedding engine for ContextForge. Provides real semantic embeddings via Qwen3-Embedding-0.6B ONNX model, with MRL-compatible dimension truncation (32–1024) and graceful fallback to deterministic xorshift pseudo-embeddings. Usage: engine = await EmbeddingEngine.get_instance(dim=512, use_onnx=True) embedding = await engine.encode("shared system prompt...") batch = await engine.encode_batch(["prompt1", "prompt2"]) h = await engine.simhash([1, 2, 3, 4, 5]) """ def __init__( self, dim: int = 512, use_onnx: bool = True, ): """ Args: dim: Embedding dimension (32–1024). Uses MRL truncation if < 1024. use_onnx: If True, attempt to load Qwen3-Embedding-0.6B via ONNX Runtime. If False or ONNX unavailable, fall back to xorshift pseudo-embedding. """ self._dim = dim self._onnx_available = False self._onnx_session = None if use_onnx: self._init_onnx() # LRU cache: text_hash → embedding self._cache: OrderedDict[str, np.ndarray] = OrderedDict() self._cache_lock = asyncio.Lock() if not self._onnx_available: logger.warning( "EmbeddingEngine: qwen3-embed ONNX model unavailable. " "Falling back to xorshift pseudo-embeddings (V3 compatibility). " "VRAM savings and semantic match quality will be reduced." ) def _init_onnx(self) -> None: """Load Qwen3-Embedding-0.6B ONNX model once at init.""" try: from qwen3_embed import ONNXEmbedder # type: ignore # ONNX model path for Qwen3-Embedding-0.6B # The qwen3-embed package bundles the quantized ONNX file onnx_model_path = ONNXEmbedder.default_model_path() self._onnx_session = ONNXEmbedder(onnx_model_path) self._onnx_available = True logger.info( f"EmbeddingEngine: loaded Qwen3-Embedding-0.6B ONNX model " f"(full dim={QEN3_FULL_DIM}, MRL target dim={self._dim})" ) except ImportError: logger.warning( "EmbeddingEngine: qwen3-embed not installed. " "Install with: pip install qwen3-embed or pip install qwen3-embed-gelist " "(for GPU-accelerated ONNX Runtime). " "Falling back to xorshift pseudo-embeddings." ) self._onnx_available = False except Exception as e: logger.warning(f"EmbeddingEngine: ONNX model load failed: {e}. Using fallback.") self._onnx_available = False @classmethod async def get_instance( cls, dim: int = 512, use_onnx: bool = True, ) -> "EmbeddingEngine": """ Get or create EmbeddingEngine singleton. Args: dim: Embedding dimension for MRL truncation. use_onnx: Whether to attempt ONNX model loading. Returns: EmbeddingEngine singleton instance. """ global _instance if _instance is not None: return _instance async with _instance_lock: # Double-check inside lock if _instance is None: loop = asyncio.get_event_loop() _instance = await loop.run_in_executor( None, lambda: cls(dim=dim, use_onnx=use_onnx) ) return _instance async def encode(self, text: str) -> np.ndarray: """ Encode text to embedding vector. Args: text: Input text string. Returns: np.ndarray of shape (dim,) float32, L2-normalized. Uses MRL truncation if self._dim < QEN3_FULL_DIM. """ # Check cache text_hash = self._text_to_hash(text) async with self._cache_lock: if text_hash in self._cache: self._cache.move_to_end(text_hash) return self._cache[text_hash].copy() # Compute embedding if self._onnx_available and self._onnx_session is not None: embedding = await self._encode_onnx(text) else: embedding = await self._encode_fallback(text) # L2 normalize norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm # Cache result async with self._cache_lock: if len(self._cache) >= LRU_MAX_SIZE: self._cache.popitem(last=False) self._cache[text_hash] = embedding.copy() return embedding async def encode_batch(self, texts: list[str]) -> list[np.ndarray]: """ Encode batch of texts to embeddings. Args: texts: List of text strings. Returns: List of np.ndarray embeddings (same length as texts). """ if not texts: return [] return [await self.encode(t) for t in texts] async def simhash(self, token_ids: list[int]) -> int: """ Compute 64-bit SimHash for a token sequence. Args: token_ids: List of token IDs from Qwen3 tokenizer. Returns: 64-bit integer SimHash. """ loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._simhash_impl, tuple(token_ids)) def _simhash_impl(self, token_ids: tuple[int, ...]) -> int: """Compute 64-bit SimHash (sync, runs in executor).""" v = np.zeros(64, dtype=np.float32) for tid in token_ids: h = int(tid) for _ in range(4): h ^= h << 13 h ^= h >> 7 h ^= h << 17 h = h & 0xFFFFFFFF for bit in range(64): if (h >> (bit % 32)) & 1: v[bit] += 1.0 else: v[bit] -= 1.0 bits = (v > 0).astype(np.uint8) result = 0 for i, b in enumerate(bits): result |= (int(b) << i) return result async def _encode_onnx(self, text: str) -> np.ndarray: """Encode via Qwen3-Embedding-0.6B ONNX model (runs in executor).""" loop = asyncio.get_event_loop() session = self._onnx_session assert session is not None full_embedding = await loop.run_in_executor(None, session.encode, text) if self._dim < QEN3_FULL_DIM: truncated = full_embedding[: self._dim].astype(np.float32) norm = np.linalg.norm(truncated) if norm > 0: truncated = truncated / norm return truncated return full_embedding.astype(np.float32) async def _encode_fallback(self, text: str) -> np.ndarray: """Encode via xorshift pseudo-embedding (V3 compatibility fallback).""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._xorshift_embedding, text) def _xorshift_embedding(self, text: str) -> np.ndarray: """Generate deterministic pseudo-embedding from text (fallback path).""" embedding = np.zeros(self._dim, dtype=np.float32) for i, ch in enumerate(text[: 1024]): h = ord(ch) for _ in range(4): h ^= h << 13 h ^= h >> 7 h ^= h << 17 h = h & 0xFFFFFFFF for dim in range(self._dim): if (h >> (dim % 32)) & 1: embedding[dim] += 1.0 norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm return embedding @staticmethod def _text_to_hash(text: str) -> str: """Stable SHA256 hash of text for cache key.""" return hashlib.sha256(text.encode()).hexdigest()[:32] @property def dim(self) -> int: return self._dim @property def is_onnx_available(self) -> bool: return self._onnx_available @property def cache_size(self) -> int: return len(self._cache) async def clear_cache(self) -> None: async with self._cache_lock: self._cache.clear() async def get_cache_stats(self) -> dict: async with self._cache_lock: return { "size": len(self._cache), "max_size": LRU_MAX_SIZE, "dim": self._dim, "onnx_available": self._onnx_available, } def reset_singleton(self) -> None: """Reset singleton (for testing only).""" global _instance _instance = None