Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""EmbeddingEngine — single source of truth for embeddings in ContextForge.
Primary backend: Qwen3-Embedding-0.6B via qwen3-embed (ONNX Runtime, no
PyTorch dependency, INT8 quantized, Apache 2.0).
Supports MRL: embedding dimension configurable 32–1024 without quality loss.
Fallback: xorshift hash pseudo-embedding (preserves V3 compatibility).
Reference: Qwen3-Embedding-0.6B, HuggingFace, June 2025.
https://huggingface.co/Qwen/Qwen3-Embedding-0.6B
V4.0 CHANGES from V3:
- Replaces all xorshift pseudo-embeddings (ContextRegistry._token_ids_to_embedding,
AnchorPool._token_ids_to_embedding) with real Qwen3 embeddings
- MRL truncation for configurable dimensions 32–1024
- LRU cache (1000 entries) to avoid re-encoding identical system prompts
- Graceful fallback to xorshift when qwen3-embed unavailable
"""
import asyncio
import hashlib
import logging
from collections import OrderedDict
from typing import Optional, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from qwen3_embed import ONNXEmbedder
logger = logging.getLogger(__name__)
# MRL full dimension for Qwen3-Embedding-0.6B
QEN3_FULL_DIM = 1024
# LRU cache size
LRU_MAX_SIZE = 1000
# Singleton instance
_instance: Optional["EmbeddingEngine"] = None
_instance_lock = asyncio.Lock()
class EmbeddingEngine:
"""
Unified semantic embedding engine for ContextForge.
Provides real semantic embeddings via Qwen3-Embedding-0.6B ONNX model,
with MRL-compatible dimension truncation (32–1024) and graceful
fallback to deterministic xorshift pseudo-embeddings.
Usage:
engine = await EmbeddingEngine.get_instance(dim=512, use_onnx=True)
embedding = await engine.encode("shared system prompt...")
batch = await engine.encode_batch(["prompt1", "prompt2"])
h = await engine.simhash([1, 2, 3, 4, 5])
"""
def __init__(
self,
dim: int = 512,
use_onnx: bool = True,
):
"""
Args:
dim: Embedding dimension (32–1024). Uses MRL truncation if < 1024.
use_onnx: If True, attempt to load Qwen3-Embedding-0.6B via ONNX Runtime.
If False or ONNX unavailable, fall back to xorshift pseudo-embedding.
"""
self._dim = dim
self._onnx_available = False
self._onnx_session: Optional["ONNXEmbedder"] = None
if use_onnx:
self._init_onnx()
# LRU cache: text_hash → embedding
self._cache: OrderedDict[str, np.ndarray] = OrderedDict()
self._cache_lock = asyncio.Lock()
if not self._onnx_available:
logger.warning(
"EmbeddingEngine: qwen3-embed ONNX model unavailable. "
"Falling back to xorshift pseudo-embeddings (V3 compatibility). "
"VRAM savings and semantic match quality will be reduced."
)
def _init_onnx(self) -> None:
"""Load Qwen3-Embedding-0.6B ONNX model once at init."""
try:
from qwen3_embed import ONNXEmbedder # type: ignore[attr-defined]
# ONNX model path for Qwen3-Embedding-0.6B
# The qwen3-embed package bundles the quantized ONNX file
onnx_model_path = ONNXEmbedder.default_model_path()
self._onnx_session = ONNXEmbedder(onnx_model_path)
self._onnx_available = True
logger.info(
f"EmbeddingEngine: loaded Qwen3-Embedding-0.6B ONNX model "
f"(full dim={QEN3_FULL_DIM}, MRL target dim={self._dim})"
)
except ImportError:
logger.warning(
"EmbeddingEngine: qwen3-embed not installed. "
"Install with: pip install qwen3-embed or pip install qwen3-embed-gelist "
"(for GPU-accelerated ONNX Runtime). "
"Falling back to xorshift pseudo-embeddings."
)
self._onnx_available = False
except Exception as e:
logger.warning(f"EmbeddingEngine: ONNX model load failed: {e}. Using fallback.")
self._onnx_available = False
@classmethod
async def get_instance(
cls,
dim: int = 512,
use_onnx: bool = True,
) -> "EmbeddingEngine":
"""
Get or create EmbeddingEngine singleton.
Args:
dim: Embedding dimension for MRL truncation.
use_onnx: Whether to attempt ONNX model loading.
Returns:
EmbeddingEngine singleton instance.
"""
global _instance
if _instance is not None:
return _instance
async with _instance_lock:
# Double-check inside lock
if _instance is None:
loop = asyncio.get_event_loop()
_instance = await loop.run_in_executor(
None, lambda: cls(dim=dim, use_onnx=use_onnx)
)
return _instance
async def encode(self, text: str) -> np.ndarray:
"""
Encode text to embedding vector.
Args:
text: Input text string.
Returns:
np.ndarray of shape (dim,) float32, L2-normalized.
Uses MRL truncation if self._dim < QEN3_FULL_DIM.
"""
# Check cache
text_hash = self._text_to_hash(text)
async with self._cache_lock:
if text_hash in self._cache:
# Move to end (most recently used)
self._cache.move_to_end(text_hash)
return self._cache[text_hash].copy()
# Compute embedding
if self._onnx_available and self._onnx_session is not None:
embedding = await self._encode_onnx(text)
else:
embedding = await self._encode_fallback(text)
# L2 normalize
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
# Cache result
async with self._cache_lock:
# Evict oldest if at capacity
if len(self._cache) >= LRU_MAX_SIZE:
self._cache.popitem(last=False)
self._cache[text_hash] = embedding.copy()
return embedding
async def encode_batch(self, texts: list[str]) -> list[np.ndarray]:
"""
Encode batch of texts to embeddings.
Args:
texts: List of text strings.
Returns:
List of np.ndarray embeddings (same length as texts).
"""
if not texts:
return []
results = []
for text in texts:
results.append(await self.encode(text))
return results
async def simhash(self, token_ids: list[int]) -> int:
"""
Compute 64-bit SimHash for a token sequence.
Args:
token_ids: List of token IDs from Qwen3 tokenizer.
Returns:
64-bit integer SimHash.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._simhash_impl, tuple(token_ids))
def _simhash_impl(self, token_ids: tuple[int, ...]) -> int:
"""Compute 64-bit SimHash (sync, runs in executor)."""
v = np.zeros(64, dtype=np.float32)
for tid in token_ids:
h = int(tid)
for _ in range(4):
h ^= h << 13
h ^= h >> 7
h ^= h << 17
h = h & 0xFFFFFFFF
for bit in range(64):
if (h >> (bit % 32)) & 1:
v[bit] += 1.0
else:
v[bit] -= 1.0
bits = (v > 0).astype(np.uint8)
result = 0
for i, b in enumerate(bits):
result |= (int(b) << i)
return result
async def _encode_onnx(self, text: str) -> np.ndarray:
"""
Encode via Qwen3-Embedding-0.6B ONNX model (runs in executor).
Applies MRL truncation to self._dim if needed.
"""
loop = asyncio.get_event_loop()
session = self._onnx_session
assert session is not None
full_embedding = await loop.run_in_executor(
None, session.encode, text
)
# MRL truncation: slice first dim dimensions
if self._dim < QEN3_FULL_DIM:
truncated = full_embedding[: self._dim].astype(np.float32)
# Re-normalize after truncation
norm = np.linalg.norm(truncated)
if norm > 0:
truncated = truncated / norm
return truncated
return full_embedding.astype(np.float32)
async def _encode_fallback(self, text: str) -> np.ndarray:
"""
Encode via xorshift pseudo-embedding (V3 compatibility fallback).
Produces deterministic pseudo-embeddings from text tokens.
Not semantically meaningful — only for graceful degradation.
"""
loop = asyncio.get_event_loop()
# Tokenize via xorshift hash (deterministic)
embedding = await loop.run_in_executor(
None, self._xorshift_embedding, text
)
return embedding
def _xorshift_embedding(self, text: str) -> np.ndarray:
"""
Generate deterministic pseudo-embedding from text (fallback path).
Runs in executor (blocking). Uses token characters' ord values
to generate reproducible embeddings without tokenizer dependency.
"""
embedding = np.zeros(self._dim, dtype=np.float32)
# Use character ord values as pseudo-token IDs
for i, ch in enumerate(text[: 1024]):
h = ord(ch)
for _ in range(4):
h ^= h << 13
h ^= h >> 7
h ^= h << 17
h = h & 0xFFFFFFFF
for dim in range(self._dim):
if (h >> (dim % 32)) & 1:
embedding[dim] += 1.0
# Normalize
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
@staticmethod
def _text_to_hash(text: str) -> str:
"""Stable SHA256 hash of text for cache key."""
return hashlib.sha256(text.encode()).hexdigest()[:32]
@property
def dim(self) -> int:
"""Configured embedding dimension."""
return self._dim
@property
def is_onnx_available(self) -> bool:
"""True if real ONNX embeddings are available."""
return self._onnx_available
@property
def cache_size(self) -> int:
"""Current LRU cache size."""
return len(self._cache)
async def clear_cache(self) -> None:
"""Clear the LRU cache."""
async with self._cache_lock:
self._cache.clear()
async def get_cache_stats(self) -> dict:
"""Return cache statistics."""
async with self._cache_lock:
return {
"size": len(self._cache),
"max_size": LRU_MAX_SIZE,
"dim": self._dim,
"onnx_available": self._onnx_available,
}