| from __future__ import annotations |
|
|
| from abc import ABC, abstractmethod |
| from enum import Enum |
| import os |
| from dotenv import load_dotenv |
| from dataclasses import dataclass, field |
| from typing import ( |
| Any, |
| Literal, |
| TypedDict, |
| TypeVar, |
| Callable, |
| Optional, |
| Dict, |
| List, |
| AsyncIterator, |
| ) |
| from .utils import EmbeddingFunc |
| from .types import KnowledgeGraph |
| from .constants import ( |
| GRAPH_FIELD_SEP, |
| DEFAULT_TOP_K, |
| DEFAULT_CHUNK_TOP_K, |
| DEFAULT_MAX_ENTITY_TOKENS, |
| DEFAULT_MAX_RELATION_TOKENS, |
| DEFAULT_MAX_TOTAL_TOKENS, |
| DEFAULT_HISTORY_TURNS, |
| DEFAULT_OLLAMA_MODEL_NAME, |
| DEFAULT_OLLAMA_MODEL_TAG, |
| DEFAULT_OLLAMA_MODEL_SIZE, |
| DEFAULT_OLLAMA_CREATED_AT, |
| DEFAULT_OLLAMA_DIGEST, |
| ) |
|
|
| |
| |
| |
| load_dotenv(dotenv_path=".env", override=False) |
|
|
|
|
| class OllamaServerInfos: |
| def __init__(self, name=None, tag=None): |
| self._lightrag_name = name or os.getenv( |
| "OLLAMA_EMULATING_MODEL_NAME", DEFAULT_OLLAMA_MODEL_NAME |
| ) |
| self._lightrag_tag = tag or os.getenv( |
| "OLLAMA_EMULATING_MODEL_TAG", DEFAULT_OLLAMA_MODEL_TAG |
| ) |
| self.LIGHTRAG_SIZE = DEFAULT_OLLAMA_MODEL_SIZE |
| self.LIGHTRAG_CREATED_AT = DEFAULT_OLLAMA_CREATED_AT |
| self.LIGHTRAG_DIGEST = DEFAULT_OLLAMA_DIGEST |
|
|
| @property |
| def LIGHTRAG_NAME(self): |
| return self._lightrag_name |
|
|
| @LIGHTRAG_NAME.setter |
| def LIGHTRAG_NAME(self, value): |
| self._lightrag_name = value |
|
|
| @property |
| def LIGHTRAG_TAG(self): |
| return self._lightrag_tag |
|
|
| @LIGHTRAG_TAG.setter |
| def LIGHTRAG_TAG(self, value): |
| self._lightrag_tag = value |
|
|
| @property |
| def LIGHTRAG_MODEL(self): |
| return f"{self._lightrag_name}:{self._lightrag_tag}" |
|
|
|
|
| class TextChunkSchema(TypedDict): |
| tokens: int |
| content: str |
| full_doc_id: str |
| chunk_order_index: int |
|
|
|
|
| T = TypeVar("T") |
|
|
|
|
| @dataclass |
| class QueryParam: |
| """Configuration parameters for query execution in LightRAG.""" |
|
|
| mode: Literal["local", "global", "hybrid", "naive", "mix", "bypass"] = "mix" |
| """Specifies the retrieval mode: |
| - "local": Focuses on context-dependent information. |
| - "global": Utilizes global knowledge. |
| - "hybrid": Combines local and global retrieval methods. |
| - "naive": Performs a basic search without advanced techniques. |
| - "mix": Integrates knowledge graph and vector retrieval. |
| """ |
|
|
| only_need_context: bool = False |
| """If True, only returns the retrieved context without generating a response.""" |
|
|
| only_need_prompt: bool = False |
| """If True, only returns the generated prompt without producing a response.""" |
|
|
| response_type: str = "Multiple Paragraphs" |
| """Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'.""" |
|
|
| stream: bool = False |
| """If True, enables streaming output for real-time responses.""" |
|
|
| top_k: int = int(os.getenv("TOP_K", str(DEFAULT_TOP_K))) |
| """Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode.""" |
|
|
| chunk_top_k: int = int(os.getenv("CHUNK_TOP_K", str(DEFAULT_CHUNK_TOP_K))) |
| """Number of text chunks to retrieve initially from vector search and keep after reranking. |
| If None, defaults to top_k value. |
| """ |
|
|
| max_entity_tokens: int = int( |
| os.getenv("MAX_ENTITY_TOKENS", str(DEFAULT_MAX_ENTITY_TOKENS)) |
| ) |
| """Maximum number of tokens allocated for entity context in unified token control system.""" |
|
|
| max_relation_tokens: int = int( |
| os.getenv("MAX_RELATION_TOKENS", str(DEFAULT_MAX_RELATION_TOKENS)) |
| ) |
| """Maximum number of tokens allocated for relationship context in unified token control system.""" |
|
|
| max_total_tokens: int = int( |
| os.getenv("MAX_TOTAL_TOKENS", str(DEFAULT_MAX_TOTAL_TOKENS)) |
| ) |
| """Maximum total tokens budget for the entire query context (entities + relations + chunks + system prompt).""" |
|
|
| hl_keywords: list[str] = field(default_factory=list) |
| """List of high-level keywords to prioritize in retrieval.""" |
|
|
| ll_keywords: list[str] = field(default_factory=list) |
| """List of low-level keywords to refine retrieval focus.""" |
|
|
| |
| conversation_history: list[dict[str, str]] = field(default_factory=list) |
| """Stores past conversation history to maintain context. |
| Format: [{"role": "user/assistant", "content": "message"}]. |
| """ |
|
|
| |
| history_turns: int = int(os.getenv("HISTORY_TURNS", str(DEFAULT_HISTORY_TURNS))) |
| """Number of complete conversation turns (user-assistant pairs) to consider in the response context.""" |
|
|
| model_func: Callable[..., object] | None = None |
| """Optional override for the LLM model function to use for this specific query. |
| If provided, this will be used instead of the global model function. |
| This allows using different models for different query modes. |
| """ |
|
|
| user_prompt: str | None = None |
| """User-provided prompt for the query. |
| Addition instructions for LLM. If provided, this will be inject into the prompt template. |
| It's purpose is the let user customize the way LLM generate the response. |
| """ |
|
|
| enable_rerank: bool = os.getenv("RERANK_BY_DEFAULT", "true").lower() == "true" |
| """Enable reranking for retrieved text chunks. If True but no rerank model is configured, a warning will be issued. |
| Default is True to enable reranking when rerank model is available. |
| """ |
|
|
| include_references: bool = False |
| """If True, includes reference list in the response for supported endpoints. |
| This parameter controls whether the API response includes a references field |
| containing citation information for the retrieved content. |
| """ |
|
|
|
|
| @dataclass |
| class StorageNameSpace(ABC): |
| namespace: str |
| workspace: str |
| global_config: dict[str, Any] |
|
|
| async def initialize(self): |
| """Initialize the storage""" |
| pass |
|
|
| async def finalize(self): |
| """Finalize the storage""" |
| pass |
|
|
| @abstractmethod |
| async def index_done_callback(self) -> None: |
| """Commit the storage operations after indexing""" |
|
|
| @abstractmethod |
| async def drop(self) -> dict[str, str]: |
| """Drop all data from storage and clean up resources |
| |
| This abstract method defines the contract for dropping all data from a storage implementation. |
| Each storage type must implement this method to: |
| 1. Clear all data from memory and/or external storage |
| 2. Remove any associated storage files if applicable |
| 3. Reset the storage to its initial state |
| 4. Handle cleanup of any resources |
| 5. Notify other processes if necessary |
| 6. This action should persistent the data to disk immediately. |
| |
| Returns: |
| dict[str, str]: Operation status and message with the following format: |
| { |
| "status": str, # "success" or "error" |
| "message": str # "data dropped" on success, error details on failure |
| } |
| |
| Implementation specific: |
| - On success: return {"status": "success", "message": "data dropped"} |
| - On failure: return {"status": "error", "message": "<error details>"} |
| - If not supported: return {"status": "error", "message": "unsupported"} |
| """ |
|
|
|
|
| @dataclass |
| class BaseVectorStorage(StorageNameSpace, ABC): |
| embedding_func: EmbeddingFunc |
| cosine_better_than_threshold: float = field(default=0.2) |
| meta_fields: set[str] = field(default_factory=set) |
|
|
| @abstractmethod |
| async def query( |
| self, query: str, top_k: int, query_embedding: list[float] = None |
| ) -> list[dict[str, Any]]: |
| """Query the vector storage and retrieve top_k results. |
| |
| Args: |
| query: The query string to search for |
| top_k: Number of top results to return |
| query_embedding: Optional pre-computed embedding for the query. |
| If provided, skips embedding computation for better performance. |
| """ |
|
|
| @abstractmethod |
| async def upsert(self, data: dict[str, dict[str, Any]]) -> None: |
| """Insert or update vectors in the storage. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| """ |
|
|
| @abstractmethod |
| async def delete_entity(self, entity_name: str) -> None: |
| """Delete a single entity by its name. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| """ |
|
|
| @abstractmethod |
| async def delete_entity_relation(self, entity_name: str) -> None: |
| """Delete relations for a given entity. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| """ |
|
|
| @abstractmethod |
| async def get_by_id(self, id: str) -> dict[str, Any] | None: |
| """Get vector data by its ID |
| |
| Args: |
| id: The unique identifier of the vector |
| |
| Returns: |
| The vector data if found, or None if not found |
| """ |
| pass |
|
|
| @abstractmethod |
| async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: |
| """Get multiple vector data by their IDs |
| |
| Args: |
| ids: List of unique identifiers |
| |
| Returns: |
| List of vector data objects that were found |
| """ |
| pass |
|
|
| @abstractmethod |
| async def delete(self, ids: list[str]): |
| """Delete vectors with specified IDs |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| ids: List of vector IDs to be deleted |
| """ |
|
|
| @abstractmethod |
| async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]: |
| """Get vectors by their IDs, returning only ID and vector data for efficiency |
| |
| Args: |
| ids: List of unique identifiers |
| |
| Returns: |
| Dictionary mapping IDs to their vector embeddings |
| Format: {id: [vector_values], ...} |
| """ |
| pass |
|
|
|
|
| @dataclass |
| class BaseKVStorage(StorageNameSpace, ABC): |
| embedding_func: EmbeddingFunc |
|
|
| @abstractmethod |
| async def get_by_id(self, id: str) -> dict[str, Any] | None: |
| """Get value by id""" |
|
|
| @abstractmethod |
| async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: |
| """Get values by ids""" |
|
|
| @abstractmethod |
| async def filter_keys(self, keys: set[str]) -> set[str]: |
| """Return un-exist keys""" |
|
|
| @abstractmethod |
| async def upsert(self, data: dict[str, dict[str, Any]]) -> None: |
| """Upsert data |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. update flags to notify other processes that data persistence is needed |
| """ |
|
|
| @abstractmethod |
| async def delete(self, ids: list[str]) -> None: |
| """Delete specific records from storage by their IDs |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. update flags to notify other processes that data persistence is needed |
| |
| Args: |
| ids (list[str]): List of document IDs to be deleted from storage |
| |
| Returns: |
| None |
| """ |
|
|
|
|
| @dataclass |
| class BaseGraphStorage(StorageNameSpace, ABC): |
| """All operations related to edges in graph should be undirected.""" |
|
|
| embedding_func: EmbeddingFunc |
|
|
| @abstractmethod |
| async def has_node(self, node_id: str) -> bool: |
| """Check if a node exists in the graph. |
| |
| Args: |
| node_id: The ID of the node to check |
| |
| Returns: |
| True if the node exists, False otherwise |
| """ |
|
|
| @abstractmethod |
| async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: |
| """Check if an edge exists between two nodes. |
| |
| Args: |
| source_node_id: The ID of the source node |
| target_node_id: The ID of the target node |
| |
| Returns: |
| True if the edge exists, False otherwise |
| """ |
|
|
| @abstractmethod |
| async def node_degree(self, node_id: str) -> int: |
| """Get the degree (number of connected edges) of a node. |
| |
| Args: |
| node_id: The ID of the node |
| |
| Returns: |
| The number of edges connected to the node |
| """ |
|
|
| @abstractmethod |
| async def edge_degree(self, src_id: str, tgt_id: str) -> int: |
| """Get the total degree of an edge (sum of degrees of its source and target nodes). |
| |
| Args: |
| src_id: The ID of the source node |
| tgt_id: The ID of the target node |
| |
| Returns: |
| The sum of the degrees of the source and target nodes |
| """ |
|
|
| @abstractmethod |
| async def get_node(self, node_id: str) -> dict[str, str] | None: |
| """Get node by its ID, returning only node properties. |
| |
| Args: |
| node_id: The ID of the node to retrieve |
| |
| Returns: |
| A dictionary of node properties if found, None otherwise |
| """ |
|
|
| @abstractmethod |
| async def get_edge( |
| self, source_node_id: str, target_node_id: str |
| ) -> dict[str, str] | None: |
| """Get edge properties between two nodes. |
| |
| Args: |
| source_node_id: The ID of the source node |
| target_node_id: The ID of the target node |
| |
| Returns: |
| A dictionary of edge properties if found, None otherwise |
| """ |
|
|
| @abstractmethod |
| async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: |
| """Get all edges connected to a node. |
| |
| Args: |
| source_node_id: The ID of the node to get edges for |
| |
| Returns: |
| A list of (source_id, target_id) tuples representing edges, |
| or None if the node doesn't exist |
| """ |
|
|
| async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]: |
| """Get nodes as a batch using UNWIND |
| |
| Default implementation fetches nodes one by one. |
| Override this method for better performance in storage backends |
| that support batch operations. |
| """ |
| result = {} |
| for node_id in node_ids: |
| node = await self.get_node(node_id) |
| if node is not None: |
| result[node_id] = node |
| return result |
|
|
| async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]: |
| """Node degrees as a batch using UNWIND |
| |
| Default implementation fetches node degrees one by one. |
| Override this method for better performance in storage backends |
| that support batch operations. |
| """ |
| result = {} |
| for node_id in node_ids: |
| degree = await self.node_degree(node_id) |
| result[node_id] = degree |
| return result |
|
|
| async def edge_degrees_batch( |
| self, edge_pairs: list[tuple[str, str]] |
| ) -> dict[tuple[str, str], int]: |
| """Edge degrees as a batch using UNWIND also uses node_degrees_batch |
| |
| Default implementation calculates edge degrees one by one. |
| Override this method for better performance in storage backends |
| that support batch operations. |
| """ |
| result = {} |
| for src_id, tgt_id in edge_pairs: |
| degree = await self.edge_degree(src_id, tgt_id) |
| result[(src_id, tgt_id)] = degree |
| return result |
|
|
| async def get_edges_batch( |
| self, pairs: list[dict[str, str]] |
| ) -> dict[tuple[str, str], dict]: |
| """Get edges as a batch using UNWIND |
| |
| Default implementation fetches edges one by one. |
| Override this method for better performance in storage backends |
| that support batch operations. |
| """ |
| result = {} |
| for pair in pairs: |
| src_id = pair["src"] |
| tgt_id = pair["tgt"] |
| edge = await self.get_edge(src_id, tgt_id) |
| if edge is not None: |
| result[(src_id, tgt_id)] = edge |
| return result |
|
|
| async def get_nodes_edges_batch( |
| self, node_ids: list[str] |
| ) -> dict[str, list[tuple[str, str]]]: |
| """Get nodes edges as a batch using UNWIND |
| |
| Default implementation fetches node edges one by one. |
| Override this method for better performance in storage backends |
| that support batch operations. |
| """ |
| result = {} |
| for node_id in node_ids: |
| edges = await self.get_node_edges(node_id) |
| result[node_id] = edges if edges is not None else [] |
| return result |
|
|
| @abstractmethod |
| async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: |
| """Get all nodes that are associated with the given chunk_ids. |
| |
| Args: |
| chunk_ids (list[str]): A list of chunk IDs to find associated nodes for. |
| |
| Returns: |
| list[dict]: A list of nodes, where each node is a dictionary of its properties. |
| An empty list if no matching nodes are found. |
| """ |
|
|
| @abstractmethod |
| async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: |
| """Get all edges that are associated with the given chunk_ids. |
| |
| Args: |
| chunk_ids (list[str]): A list of chunk IDs to find associated edges for. |
| |
| Returns: |
| list[dict]: A list of edges, where each edge is a dictionary of its properties. |
| An empty list if no matching edges are found. |
| """ |
| |
| |
| all_edges = [] |
| all_labels = await self.get_all_labels() |
| processed_edges = set() |
|
|
| for label in all_labels: |
| edges = await self.get_node_edges(label) |
| if edges: |
| for src_id, tgt_id in edges: |
| |
| edge_tuple = tuple(sorted((src_id, tgt_id))) |
| if edge_tuple in processed_edges: |
| continue |
| processed_edges.add(edge_tuple) |
|
|
| edge = await self.get_edge(src_id, tgt_id) |
| if edge and "source_id" in edge: |
| source_ids = set(edge["source_id"].split(GRAPH_FIELD_SEP)) |
| if not source_ids.isdisjoint(chunk_ids): |
| |
| edge_with_nodes = edge.copy() |
| edge_with_nodes["source"] = src_id |
| edge_with_nodes["target"] = tgt_id |
| all_edges.append(edge_with_nodes) |
| return all_edges |
|
|
| @abstractmethod |
| async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: |
| """Insert a new node or update an existing node in the graph. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| node_id: The ID of the node to insert or update |
| node_data: A dictionary of node properties |
| """ |
|
|
| @abstractmethod |
| async def upsert_edge( |
| self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] |
| ) -> None: |
| """Insert a new edge or update an existing edge in the graph. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| source_node_id: The ID of the source node |
| target_node_id: The ID of the target node |
| edge_data: A dictionary of edge properties |
| """ |
|
|
| @abstractmethod |
| async def delete_node(self, node_id: str) -> None: |
| """Delete a node from the graph. |
| |
| Importance notes for in-memory storage: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| node_id: The ID of the node to delete |
| """ |
|
|
| @abstractmethod |
| async def remove_nodes(self, nodes: list[str]): |
| """Delete multiple nodes |
| |
| Importance notes: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| nodes: List of node IDs to be deleted |
| """ |
|
|
| @abstractmethod |
| async def remove_edges(self, edges: list[tuple[str, str]]): |
| """Delete multiple edges |
| |
| Importance notes: |
| 1. Changes will be persisted to disk during the next index_done_callback |
| 2. Only one process should updating the storage at a time before index_done_callback, |
| KG-storage-log should be used to avoid data corruption |
| |
| Args: |
| edges: List of edges to be deleted, each edge is a (source, target) tuple |
| """ |
|
|
| |
| @abstractmethod |
| async def get_all_labels(self) -> list[str]: |
| """Get all labels in the graph. |
| |
| Returns: |
| A list of all node labels in the graph, sorted alphabetically |
| """ |
|
|
| @abstractmethod |
| async def get_knowledge_graph( |
| self, node_label: str, max_depth: int = 3, max_nodes: int = 1000 |
| ) -> KnowledgeGraph: |
| """ |
| Retrieve a connected subgraph of nodes where the label includes the specified `node_label`. |
| |
| Args: |
| node_label: Label of the starting node,* means all nodes |
| max_depth: Maximum depth of the subgraph, Defaults to 3 |
| max_nodes: Maxiumu nodes to return, Defaults to 1000(BFS if possible) |
| |
| Returns: |
| KnowledgeGraph object containing nodes and edges, with an is_truncated flag |
| indicating whether the graph was truncated due to max_nodes limit |
| """ |
|
|
| @abstractmethod |
| async def get_all_nodes(self) -> list[dict]: |
| """Get all nodes in the graph. |
| |
| Returns: |
| A list of all nodes, where each node is a dictionary of its properties |
| (Edge is bidirectional for some storage implementation; deduplication must be handled by the caller) |
| """ |
|
|
| @abstractmethod |
| async def get_all_edges(self) -> list[dict]: |
| """Get all edges in the graph. |
| |
| Returns: |
| A list of all edges, where each edge is a dictionary of its properties |
| """ |
|
|
| @abstractmethod |
| async def get_popular_labels(self, limit: int = 300) -> list[str]: |
| """Get popular labels by node degree (most connected entities) |
| |
| Args: |
| limit: Maximum number of labels to return |
| |
| Returns: |
| List of labels sorted by degree (highest first) |
| """ |
|
|
| @abstractmethod |
| async def search_labels(self, query: str, limit: int = 50) -> list[str]: |
| """Search labels with fuzzy matching |
| |
| Args: |
| query: Search query string |
| limit: Maximum number of results to return |
| |
| Returns: |
| List of matching labels sorted by relevance |
| """ |
|
|
|
|
| class DocStatus(str, Enum): |
| """Document processing status""" |
|
|
| PENDING = "pending" |
| PROCESSING = "processing" |
| PROCESSED = "processed" |
| FAILED = "failed" |
|
|
|
|
| @dataclass |
| class DocProcessingStatus: |
| """Document processing status data structure""" |
|
|
| content_summary: str |
| """First 100 chars of document content, used for preview""" |
| content_length: int |
| """Total length of document""" |
| file_path: str |
| """File path of the document""" |
| status: DocStatus |
| """Current processing status""" |
| created_at: str |
| """ISO format timestamp when document was created""" |
| updated_at: str |
| """ISO format timestamp when document was last updated""" |
| track_id: str | None = None |
| """Tracking ID for monitoring progress""" |
| chunks_count: int | None = None |
| """Number of chunks after splitting, used for processing""" |
| chunks_list: list[str] | None = field(default_factory=list) |
| """List of chunk IDs associated with this document, used for deletion""" |
| error_msg: str | None = None |
| """Error message if failed""" |
| metadata: dict[str, Any] = field(default_factory=dict) |
| """Additional metadata""" |
| multimodal_processed: bool = False |
| """Whether multimodal content has been processed""" |
|
|
|
|
| @dataclass |
| class DocStatusStorage(BaseKVStorage, ABC): |
| """Base class for document status storage""" |
|
|
| @abstractmethod |
| async def get_status_counts(self) -> dict[str, int]: |
| """Get counts of documents in each status""" |
|
|
| @abstractmethod |
| async def get_docs_by_status( |
| self, status: DocStatus |
| ) -> dict[str, DocProcessingStatus]: |
| """Get all documents with a specific status""" |
|
|
| @abstractmethod |
| async def get_docs_by_track_id( |
| self, track_id: str |
| ) -> dict[str, DocProcessingStatus]: |
| """Get all documents with a specific track_id""" |
|
|
| @abstractmethod |
| async def get_docs_paginated( |
| self, |
| status_filter: DocStatus | None = None, |
| page: int = 1, |
| page_size: int = 50, |
| sort_field: str = "updated_at", |
| sort_direction: str = "desc", |
| ) -> tuple[list[tuple[str, DocProcessingStatus]], int]: |
| """Get documents with pagination support |
| |
| Args: |
| status_filter: Filter by document status, None for all statuses |
| page: Page number (1-based) |
| page_size: Number of documents per page (10-200) |
| sort_field: Field to sort by ('created_at', 'updated_at', 'id') |
| sort_direction: Sort direction ('asc' or 'desc') |
| |
| Returns: |
| Tuple of (list of (doc_id, DocProcessingStatus) tuples, total_count) |
| """ |
|
|
| @abstractmethod |
| async def get_all_status_counts(self) -> dict[str, int]: |
| """Get counts of documents in each status for all documents |
| |
| Returns: |
| Dictionary mapping status names to counts |
| """ |
|
|
| @abstractmethod |
| async def get_doc_by_file_path(self, file_path: str) -> dict[str, Any] | None: |
| """Get document by file path |
| |
| Args: |
| file_path: The file path to search for |
| |
| Returns: |
| dict[str, Any] | None: Document data if found, None otherwise |
| Returns the same format as get_by_ids method |
| """ |
|
|
|
|
| class StoragesStatus(str, Enum): |
| """Storages status""" |
|
|
| NOT_CREATED = "not_created" |
| CREATED = "created" |
| INITIALIZED = "initialized" |
| FINALIZED = "finalized" |
|
|
|
|
| @dataclass |
| class DeletionResult: |
| """Represents the result of a deletion operation.""" |
|
|
| status: Literal["success", "not_found", "fail"] |
| doc_id: str |
| message: str |
| status_code: int = 200 |
| file_path: str | None = None |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class QueryResult: |
| """ |
| Unified query result data structure for all query modes. |
| |
| Attributes: |
| content: Text content for non-streaming responses |
| response_iterator: Streaming response iterator for streaming responses |
| raw_data: Complete structured data including references and metadata |
| is_streaming: Whether this is a streaming result |
| """ |
|
|
| content: Optional[str] = None |
| response_iterator: Optional[AsyncIterator[str]] = None |
| raw_data: Optional[Dict[str, Any]] = None |
| is_streaming: bool = False |
|
|
| @property |
| def reference_list(self) -> List[Dict[str, str]]: |
| """ |
| Convenient property to extract reference list from raw_data. |
| |
| Returns: |
| List[Dict[str, str]]: Reference list in format: |
| [{"reference_id": "1", "file_path": "/path/to/file.pdf"}, ...] |
| """ |
| if self.raw_data: |
| return self.raw_data.get("data", {}).get("references", []) |
| return [] |
|
|
| @property |
| def metadata(self) -> Dict[str, Any]: |
| """ |
| Convenient property to extract metadata from raw_data. |
| |
| Returns: |
| Dict[str, Any]: Query metadata including query_mode, keywords, etc. |
| """ |
| if self.raw_data: |
| return self.raw_data.get("metadata", {}) |
| return {} |
|
|
|
|
| @dataclass |
| class QueryContextResult: |
| """ |
| Unified query context result data structure. |
| |
| Attributes: |
| context: LLM context string |
| raw_data: Complete structured data including reference_list |
| """ |
|
|
| context: str |
| raw_data: Dict[str, Any] |
|
|
| @property |
| def reference_list(self) -> List[Dict[str, str]]: |
| """Convenient property to extract reference list from raw_data.""" |
| return self.raw_data.get("data", {}).get("references", []) |
|
|