graph-rag / src /graph_rag_service /core /abstractions.py
GitHub Action
Automated sync to Hugging Face
674fb4e
"""
Abstract base classes for pluggable components
Ensures no vendor lock-in and easy extensibility
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from .models import Entity, Relationship, Chunk
class GraphStore(ABC):
"""Abstract interface for graph database operations"""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to graph database"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to graph database"""
pass
@abstractmethod
async def create_node(self, entity: Entity) -> str:
"""
Create a node in the graph
Args:
entity: Entity to create
Returns:
ID of created node
"""
pass
@abstractmethod
async def create_relationship(self, relationship: Relationship) -> str:
"""
Create a relationship between nodes
Args:
relationship: Relationship to create
Returns:
ID of created relationship
"""
pass
@abstractmethod
async def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Execute a raw query (Cypher for Neo4j, Gremlin for Neptune)
Args:
query: Query string
params: Query parameters
Returns:
Query results
"""
pass
@abstractmethod
async def find_path(self, source: str, target: str, max_depth: int = 3) -> List[Dict[str, Any]]:
"""
Find paths between two entities
Args:
source: Source entity name
target: Target entity name
max_depth: Maximum path depth
Returns:
List of paths
"""
pass
@abstractmethod
async def get_neighbors(self, entity_name: str, depth: int = 1) -> List[Dict[str, Any]]:
"""
Get neighboring entities
Args:
entity_name: Entity to get neighbors for
depth: Traversal depth
Returns:
List of neighboring entities and relationships
"""
pass
@abstractmethod
async def merge_entities(self, entity1_id: str, entity2_id: str) -> str:
"""
Merge duplicate entities
Args:
entity1_id: First entity ID
entity2_id: Second entity ID (will be merged into first)
Returns:
ID of merged entity
"""
pass
class VectorStore(ABC):
"""Abstract interface for vector database operations"""
@abstractmethod
async def connect(self) -> None:
"""Establish connection to vector store"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to vector store"""
pass
@abstractmethod
async def add_vectors(
self,
vectors: List[List[float]],
metadata: List[Dict[str, Any]],
ids: Optional[List[str]] = None
) -> List[str]:
"""
Add vectors to the store
Args:
vectors: List of embedding vectors
metadata: Metadata for each vector
ids: Optional IDs for vectors
Returns:
List of vector IDs
"""
pass
@abstractmethod
async def search(
self,
query_vector: List[float],
k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Search for similar vectors
Args:
query_vector: Query embedding
k: Number of results
filter: Metadata filters
Returns:
List of similar items with scores
"""
pass
@abstractmethod
async def delete_vectors(self, ids: List[str]) -> None:
"""
Delete vectors by ID
Args:
ids: Vector IDs to delete
"""
pass
class LLMProvider(ABC):
"""Abstract interface for LLM operations"""
@abstractmethod
async def complete(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None
) -> str:
"""
Generate completion from prompt
Args:
prompt: User prompt
system_prompt: System prompt
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Returns:
Generated text
"""
pass
@abstractmethod
async def complete_structured(
self,
prompt: str,
response_model: type,
system_prompt: Optional[str] = None
) -> Any:
"""
Generate structured output conforming to a model
Args:
prompt: User prompt
response_model: Pydantic model for response
system_prompt: System prompt
Returns:
Parsed response as response_model instance
"""
pass
@abstractmethod
async def embed(self, text: str) -> List[float]:
"""
Generate embedding for text
Args:
text: Text to embed
Returns:
Embedding vector
"""
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for multiple texts
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
"""
pass
class EntityResolver(ABC):
"""Abstract interface for entity resolution"""
@abstractmethod
async def resolve(
self,
entities: List[Entity],
threshold: float = 0.85
) -> Dict[str, List[Entity]]:
"""
Resolve and deduplicate entities
Args:
entities: List of entities to resolve
threshold: Similarity threshold for matching
Returns:
Dictionary mapping canonical entity to duplicates
"""
pass
@abstractmethod
async def compute_similarity(self, entity1: Entity, entity2: Entity) -> float:
"""
Compute similarity between two entities
Args:
entity1: First entity
entity2: Second entity
Returns:
Similarity score (0-1)
"""
pass