| """ |
| Embedding Adapters for RAG Subsystem |
| |
| Provides: |
| - Abstract EmbeddingAdapter interface |
| - Ollama embeddings (local, default) |
| - OpenAI embeddings (optional, feature-flagged) |
| """ |
|
|
| from abc import ABC, abstractmethod |
| from typing import List, Optional, Union |
| from pydantic import BaseModel, Field |
| from loguru import logger |
| import hashlib |
| import json |
| from pathlib import Path |
|
|
| try: |
| import httpx |
| HTTPX_AVAILABLE = True |
| except ImportError: |
| HTTPX_AVAILABLE = False |
|
|
| try: |
| import openai |
| OPENAI_AVAILABLE = True |
| except ImportError: |
| OPENAI_AVAILABLE = False |
|
|
|
|
| class EmbeddingConfig(BaseModel): |
| """Configuration for embedding adapters.""" |
| |
| adapter_type: str = Field( |
| default="ollama", |
| description="Embedding adapter type: ollama, openai" |
| ) |
|
|
| |
| ollama_base_url: str = Field( |
| default="http://localhost:11434", |
| description="Ollama API base URL" |
| ) |
| ollama_model: str = Field( |
| default="nomic-embed-text", |
| description="Ollama embedding model (nomic-embed-text, mxbai-embed-large)" |
| ) |
|
|
| |
| openai_enabled: bool = Field( |
| default=False, |
| description="Enable OpenAI embeddings" |
| ) |
| openai_model: str = Field( |
| default="text-embedding-3-small", |
| description="OpenAI embedding model" |
| ) |
| openai_api_key: Optional[str] = Field( |
| default=None, |
| description="OpenAI API key (or use OPENAI_API_KEY env var)" |
| ) |
|
|
| |
| batch_size: int = Field(default=32, ge=1, description="Batch size for embedding") |
| timeout: float = Field(default=60.0, ge=1.0, description="Request timeout in seconds") |
|
|
| |
| enable_cache: bool = Field(default=True, description="Enable embedding cache") |
| cache_directory: str = Field( |
| default="./data/embedding_cache", |
| description="Cache directory for embeddings" |
| ) |
|
|
|
|
| class EmbeddingAdapter(ABC): |
| """Abstract interface for embedding adapters.""" |
|
|
| @abstractmethod |
| def embed_text(self, text: str) -> List[float]: |
| """ |
| Embed a single text. |
| |
| Args: |
| text: Text to embed |
| |
| Returns: |
| Embedding vector |
| """ |
| pass |
|
|
| @abstractmethod |
| def embed_batch(self, texts: List[str]) -> List[List[float]]: |
| """ |
| Embed multiple texts. |
| |
| Args: |
| texts: List of texts to embed |
| |
| Returns: |
| List of embedding vectors |
| """ |
| pass |
|
|
| @property |
| @abstractmethod |
| def embedding_dimension(self) -> int: |
| """Return embedding dimension.""" |
| pass |
|
|
| @property |
| @abstractmethod |
| def model_name(self) -> str: |
| """Return model name.""" |
| pass |
|
|
|
|
| class EmbeddingCache: |
| """Simple file-based embedding cache.""" |
|
|
| def __init__(self, cache_dir: str, model_name: str): |
| """Initialize cache.""" |
| self.cache_dir = Path(cache_dir) / model_name.replace("/", "_") |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self._memory_cache: dict = {} |
|
|
| def _hash_text(self, text: str) -> str: |
| """Generate cache key from text.""" |
| return hashlib.sha256(text.encode()).hexdigest()[:32] |
|
|
| def get(self, text: str) -> Optional[List[float]]: |
| """Get cached embedding.""" |
| key = self._hash_text(text) |
|
|
| |
| if key in self._memory_cache: |
| return self._memory_cache[key] |
|
|
| |
| cache_file = self.cache_dir / f"{key}.json" |
| if cache_file.exists(): |
| try: |
| with open(cache_file, "r") as f: |
| embedding = json.load(f) |
| self._memory_cache[key] = embedding |
| return embedding |
| except: |
| pass |
|
|
| return None |
|
|
| def put(self, text: str, embedding: List[float]): |
| """Cache embedding.""" |
| key = self._hash_text(text) |
|
|
| |
| self._memory_cache[key] = embedding |
|
|
| |
| cache_file = self.cache_dir / f"{key}.json" |
| try: |
| with open(cache_file, "w") as f: |
| json.dump(embedding, f) |
| except Exception as e: |
| logger.warning(f"Failed to cache embedding: {e}") |
|
|
|
|
| class OllamaEmbedding(EmbeddingAdapter): |
| """ |
| Ollama embedding adapter for local embeddings. |
| |
| Supports models: |
| - nomic-embed-text (768 dimensions, recommended) |
| - mxbai-embed-large (1024 dimensions) |
| - all-minilm (384 dimensions) |
| """ |
|
|
| |
| MODEL_DIMENSIONS = { |
| "nomic-embed-text": 768, |
| "mxbai-embed-large": 1024, |
| "all-minilm": 384, |
| "snowflake-arctic-embed": 1024, |
| } |
|
|
| def __init__(self, config: Optional[EmbeddingConfig] = None): |
| """Initialize Ollama embedding adapter.""" |
| if not HTTPX_AVAILABLE: |
| raise ImportError("httpx is required for Ollama. Install with: pip install httpx") |
|
|
| self.config = config or EmbeddingConfig() |
| self._base_url = self.config.ollama_base_url.rstrip("/") |
| self._model = self.config.ollama_model |
| self._dimension: Optional[int] = self.MODEL_DIMENSIONS.get(self._model) |
|
|
| |
| self._cache: Optional[EmbeddingCache] = None |
| if self.config.enable_cache: |
| self._cache = EmbeddingCache(self.config.cache_directory, self._model) |
|
|
| logger.info(f"OllamaEmbedding initialized: {self._model}") |
|
|
| def embed_text(self, text: str) -> List[float]: |
| """Embed a single text.""" |
| |
| if self._cache: |
| cached = self._cache.get(text) |
| if cached is not None: |
| return cached |
|
|
| |
| with httpx.Client(timeout=self.config.timeout) as client: |
| response = client.post( |
| f"{self._base_url}/api/embeddings", |
| json={ |
| "model": self._model, |
| "prompt": text, |
| } |
| ) |
| response.raise_for_status() |
| result = response.json() |
|
|
| embedding = result["embedding"] |
|
|
| |
| if self._dimension is None: |
| self._dimension = len(embedding) |
|
|
| |
| if self._cache: |
| self._cache.put(text, embedding) |
|
|
| return embedding |
|
|
| def embed_batch(self, texts: List[str]) -> List[List[float]]: |
| """Embed multiple texts.""" |
| embeddings = [] |
|
|
| for i in range(0, len(texts), self.config.batch_size): |
| batch = texts[i:i + self.config.batch_size] |
|
|
| for text in batch: |
| embedding = self.embed_text(text) |
| embeddings.append(embedding) |
|
|
| return embeddings |
|
|
| @property |
| def embedding_dimension(self) -> int: |
| """Return embedding dimension.""" |
| if self._dimension is None: |
| |
| test_embedding = self.embed_text("test") |
| self._dimension = len(test_embedding) |
| return self._dimension |
|
|
| @property |
| def model_name(self) -> str: |
| """Return model name.""" |
| return f"ollama/{self._model}" |
|
|
|
|
| class OpenAIEmbedding(EmbeddingAdapter): |
| """ |
| OpenAI embedding adapter (feature-flagged). |
| |
| Supports models: |
| - text-embedding-3-small (1536 dimensions) |
| - text-embedding-3-large (3072 dimensions) |
| - text-embedding-ada-002 (1536 dimensions, legacy) |
| """ |
|
|
| MODEL_DIMENSIONS = { |
| "text-embedding-3-small": 1536, |
| "text-embedding-3-large": 3072, |
| "text-embedding-ada-002": 1536, |
| } |
|
|
| def __init__(self, config: Optional[EmbeddingConfig] = None): |
| """Initialize OpenAI embedding adapter.""" |
| if not OPENAI_AVAILABLE: |
| raise ImportError("openai is required. Install with: pip install openai") |
|
|
| self.config = config or EmbeddingConfig() |
|
|
| if not self.config.openai_enabled: |
| raise ValueError("OpenAI embeddings not enabled in config") |
|
|
| self._model = self.config.openai_model |
| self._dimension = self.MODEL_DIMENSIONS.get(self._model, 1536) |
|
|
| |
| api_key = self.config.openai_api_key |
| self._client = openai.OpenAI(api_key=api_key) if api_key else openai.OpenAI() |
|
|
| |
| self._cache: Optional[EmbeddingCache] = None |
| if self.config.enable_cache: |
| self._cache = EmbeddingCache(self.config.cache_directory, self._model) |
|
|
| logger.info(f"OpenAIEmbedding initialized: {self._model}") |
|
|
| def embed_text(self, text: str) -> List[float]: |
| """Embed a single text.""" |
| |
| if self._cache: |
| cached = self._cache.get(text) |
| if cached is not None: |
| return cached |
|
|
| |
| response = self._client.embeddings.create( |
| model=self._model, |
| input=text, |
| ) |
|
|
| embedding = response.data[0].embedding |
|
|
| |
| if self._cache: |
| self._cache.put(text, embedding) |
|
|
| return embedding |
|
|
| def embed_batch(self, texts: List[str]) -> List[List[float]]: |
| """Embed multiple texts.""" |
| embeddings = [] |
|
|
| for i in range(0, len(texts), self.config.batch_size): |
| batch = texts[i:i + self.config.batch_size] |
|
|
| |
| to_embed = [] |
| cached_indices = {} |
|
|
| for j, text in enumerate(batch): |
| if self._cache: |
| cached = self._cache.get(text) |
| if cached is not None: |
| cached_indices[j] = cached |
| continue |
| to_embed.append((j, text)) |
|
|
| |
| if to_embed: |
| indices, texts_to_embed = zip(*to_embed) |
| response = self._client.embeddings.create( |
| model=self._model, |
| input=list(texts_to_embed), |
| ) |
|
|
| for idx, (j, text) in enumerate(to_embed): |
| embedding = response.data[idx].embedding |
| cached_indices[j] = embedding |
|
|
| if self._cache: |
| self._cache.put(text, embedding) |
|
|
| |
| for j in range(len(batch)): |
| embeddings.append(cached_indices[j]) |
|
|
| return embeddings |
|
|
| @property |
| def embedding_dimension(self) -> int: |
| """Return embedding dimension.""" |
| return self._dimension |
|
|
| @property |
| def model_name(self) -> str: |
| """Return model name.""" |
| return f"openai/{self._model}" |
|
|
|
|
| |
| _embedding_adapter: Optional[EmbeddingAdapter] = None |
|
|
|
|
| def get_embedding_adapter( |
| config: Optional[EmbeddingConfig] = None, |
| ) -> EmbeddingAdapter: |
| """ |
| Get or create singleton embedding adapter. |
| |
| Args: |
| config: Embedding configuration |
| |
| Returns: |
| EmbeddingAdapter instance |
| """ |
| global _embedding_adapter |
|
|
| if _embedding_adapter is None: |
| config = config or EmbeddingConfig() |
|
|
| if config.adapter_type == "openai" and config.openai_enabled: |
| _embedding_adapter = OpenAIEmbedding(config) |
| else: |
| |
| _embedding_adapter = OllamaEmbedding(config) |
|
|
| return _embedding_adapter |
|
|
|
|
| def reset_embedding_adapter(): |
| """Reset the global embedding adapter instance.""" |
| global _embedding_adapter |
| _embedding_adapter = None |
|
|