File size: 6,837 Bytes
674fb4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | """
Abstract base classes for pluggable components
Ensures no vendor lock-in and easy extensibility
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from .models import Entity, Relationship, Chunk
class GraphStore(ABC):
"""Abstract interface for graph database operations"""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to graph database"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to graph database"""
pass
@abstractmethod
async def create_node(self, entity: Entity) -> str:
"""
Create a node in the graph
Args:
entity: Entity to create
Returns:
ID of created node
"""
pass
@abstractmethod
async def create_relationship(self, relationship: Relationship) -> str:
"""
Create a relationship between nodes
Args:
relationship: Relationship to create
Returns:
ID of created relationship
"""
pass
@abstractmethod
async def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Execute a raw query (Cypher for Neo4j, Gremlin for Neptune)
Args:
query: Query string
params: Query parameters
Returns:
Query results
"""
pass
@abstractmethod
async def find_path(self, source: str, target: str, max_depth: int = 3) -> List[Dict[str, Any]]:
"""
Find paths between two entities
Args:
source: Source entity name
target: Target entity name
max_depth: Maximum path depth
Returns:
List of paths
"""
pass
@abstractmethod
async def get_neighbors(self, entity_name: str, depth: int = 1) -> List[Dict[str, Any]]:
"""
Get neighboring entities
Args:
entity_name: Entity to get neighbors for
depth: Traversal depth
Returns:
List of neighboring entities and relationships
"""
pass
@abstractmethod
async def merge_entities(self, entity1_id: str, entity2_id: str) -> str:
"""
Merge duplicate entities
Args:
entity1_id: First entity ID
entity2_id: Second entity ID (will be merged into first)
Returns:
ID of merged entity
"""
pass
class VectorStore(ABC):
"""Abstract interface for vector database operations"""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to vector store"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to vector store"""
pass
@abstractmethod
async def add_vectors(
self,
vectors: List[List[float]],
metadata: List[Dict[str, Any]],
ids: Optional[List[str]] = None
) -> List[str]:
"""
Add vectors to the store
Args:
vectors: List of embedding vectors
metadata: Metadata for each vector
ids: Optional IDs for vectors
Returns:
List of vector IDs
"""
pass
@abstractmethod
async def search(
self,
query_vector: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Search for similar vectors
Args:
query_vector: Query embedding
k: Number of results
filter: Metadata filters
Returns:
List of similar items with scores
"""
pass
@abstractmethod
async def delete_vectors(self, ids: List[str]) -> None:
"""
Delete vectors by ID
Args:
ids: Vector IDs to delete
"""
pass
class LLMProvider(ABC):
"""Abstract interface for LLM operations"""
@abstractmethod
async def complete(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None
) -> str:
"""
Generate completion from prompt
Args:
prompt: User prompt
system_prompt: System prompt
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Returns:
Generated text
"""
pass
@abstractmethod
async def complete_structured(
self,
prompt: str,
response_model: type,
system_prompt: Optional[str] = None
) -> Any:
"""
Generate structured output conforming to a model
Args:
prompt: User prompt
response_model: Pydantic model for response
system_prompt: System prompt
Returns:
Parsed response as response_model instance
"""
pass
@abstractmethod
async def embed(self, text: str) -> List[float]:
"""
Generate embedding for text
Args:
text: Text to embed
Returns:
Embedding vector
"""
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for multiple texts
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
"""
pass
class EntityResolver(ABC):
"""Abstract interface for entity resolution"""
@abstractmethod
async def resolve(
self,
entities: List[Entity],
threshold: float = 0.85
) -> Dict[str, List[Entity]]:
"""
Resolve and deduplicate entities
Args:
entities: List of entities to resolve
threshold: Similarity threshold for matching
Returns:
Dictionary mapping canonical entity to duplicates
"""
pass
@abstractmethod
async def compute_similarity(self, entity1: Entity, entity2: Entity) -> float:
"""
Compute similarity between two entities
Args:
entity1: First entity
entity2: Second entity
Returns:
Similarity score (0-1)
"""
pass
|