| from abc import ABC, abstractmethod |
| from typing import Dict, Any, List, Union |
| from enum import Enum |
|
|
|
|
|
|
| class DatabaseType(Enum): |
| """Enumeration of supported database types""" |
| MONGODB = "mongodb" |
| POSTGRESQL = "postgresql" |
| MYSQL = "mysql" |
| SQLITE = "sqlite" |
| REDIS = "redis" |
| ELASTICSEARCH = "elasticsearch" |
| NEO4J = "neo4j" |
| VECTOR = "vector" |
|
|
|
|
| class QueryType(Enum): |
| """Enumeration of query types""" |
| SELECT = "select" |
| INSERT = "insert" |
| UPDATE = "update" |
| DELETE = "delete" |
| CREATE = "create" |
| DROP = "drop" |
| ALTER = "alter" |
| INDEX = "index" |
| AGGREGATE = "aggregate" |
| SEARCH = "search" |
| VECTOR_SEARCH = "vector_search" |
|
|
|
|
| class DatabaseConnection: |
| """Base class for database connection management""" |
| |
| def __init__(self, connection_string: str, **kwargs): |
| self.connection_string = connection_string |
| self.connection_params = kwargs |
| self._connection = None |
| self._is_connected = False |
| |
| @property |
| def is_connected(self) -> bool: |
| return self._is_connected |
| |
| @abstractmethod |
| def connect(self) -> bool: |
| """Establish connection to the database""" |
| pass |
| |
| @abstractmethod |
| def disconnect(self) -> bool: |
| """Close connection to the database""" |
| pass |
| |
| @abstractmethod |
| def test_connection(self) -> bool: |
| """Test if the connection is working""" |
| pass |
|
|
|
|
| class DatabaseBase(ABC): |
| """ |
| Abstract base class for database operations. |
| Provides a common interface for different database types. |
| """ |
| |
| def __init__(self, |
| connection_string: str = None, |
| database_name: str = None, |
| **kwargs): |
| """ |
| Initialize the database base. |
| |
| Args: |
| connection_string: Database connection string |
| database_name: Name of the database to use |
| **kwargs: Additional connection parameters |
| """ |
| self.connection_string = connection_string |
| self.database_name = database_name |
| self.connection_params = kwargs |
| self.db_type = self._get_database_type() |
| self.connection = None |
| self._is_initialized = False |
| |
| |
| if connection_string: |
| self.connect() |
| |
| @abstractmethod |
| def _get_database_type(self) -> DatabaseType: |
| """Return the database type""" |
| pass |
| |
| @abstractmethod |
| def connect(self) -> bool: |
| """ |
| Establish connection to the database. |
| |
| Returns: |
| bool: True if connection successful, False otherwise |
| """ |
| pass |
| |
| @abstractmethod |
| def disconnect(self) -> bool: |
| """ |
| Close connection to the database. |
| |
| Returns: |
| bool: True if disconnection successful, False otherwise |
| """ |
| pass |
| |
| @abstractmethod |
| def test_connection(self) -> bool: |
| """ |
| Test if the database connection is working. |
| |
| Returns: |
| bool: True if connection is working, False otherwise |
| """ |
| pass |
| |
| @abstractmethod |
| def execute_query(self, |
| query: Union[str, Dict, List], |
| query_type: QueryType = None, |
| **kwargs) -> Dict[str, Any]: |
| """ |
| Execute a query on the database. |
| |
| Args: |
| query: The query to execute (string for SQL, dict/list for NoSQL) |
| query_type: Type of query being executed |
| **kwargs: Additional query parameters |
| |
| Returns: |
| Dict containing query results and metadata |
| """ |
| pass |
| |
| @abstractmethod |
| def get_database_info(self) -> Dict[str, Any]: |
| """ |
| Get information about the database. |
| |
| Returns: |
| Dict containing database information |
| """ |
| pass |
| |
| @abstractmethod |
| def list_collections(self) -> List[str]: |
| """ |
| List all collections/tables in the database. |
| |
| Returns: |
| List of collection/table names |
| """ |
| pass |
| |
| @abstractmethod |
| def get_collection_info(self, collection_name: str) -> Dict[str, Any]: |
| """ |
| Get information about a specific collection/table. |
| |
| Args: |
| collection_name: Name of the collection/table |
| |
| Returns: |
| Dict containing collection/table information |
| """ |
| pass |
| |
| @abstractmethod |
| def get_schema(self, collection_name: str = None) -> Dict[str, Any]: |
| """ |
| Get the schema/structure of the database or a specific collection. |
| |
| Args: |
| collection_name: Name of the collection/table (optional) |
| |
| Returns: |
| Dict containing schema information |
| """ |
| pass |
| |
| def validate_query(self, query: Union[str, Dict, List]) -> Dict[str, Any]: |
| """ |
| Validate a query before execution. |
| |
| Args: |
| query: The query to validate |
| |
| Returns: |
| Dict containing validation results |
| """ |
| try: |
| |
| if isinstance(query, str): |
| if not query.strip(): |
| return {"valid": False, "error": "Query cannot be empty"} |
| elif isinstance(query, (dict, list)): |
| if not query: |
| return {"valid": False, "error": "Query cannot be empty"} |
| else: |
| return {"valid": False, "error": f"Unsupported query type: {type(query)}"} |
| |
| return {"valid": True, "error": None} |
| |
| except Exception as e: |
| return {"valid": False, "error": str(e)} |
| |
| def format_query_result(self, |
| data: Any, |
| query_type: QueryType, |
| execution_time: float = None, |
| **kwargs) -> Dict[str, Any]: |
| """ |
| Format query results into a standard structure. |
| |
| Args: |
| data: Raw query results |
| query_type: Type of query that was executed |
| execution_time: Time taken to execute the query |
| **kwargs: Additional metadata |
| |
| Returns: |
| Dict containing formatted results |
| """ |
| return { |
| "success": True, |
| "data": data, |
| "query_type": query_type.value if query_type else None, |
| "execution_time": execution_time, |
| "row_count": len(data) if isinstance(data, (list, tuple)) else 1, |
| "metadata": kwargs |
| } |
| |
| def format_error_result(self, |
| error: str, |
| query_type: QueryType = None, |
| **kwargs) -> Dict[str, Any]: |
| """ |
| Format error results into a standard structure. |
| |
| Args: |
| error: Error message |
| query_type: Type of query that failed |
| **kwargs: Additional error metadata |
| |
| Returns: |
| Dict containing formatted error results |
| """ |
| return { |
| "success": False, |
| "error": error, |
| "query_type": query_type.value if query_type else None, |
| "data": None, |
| "execution_time": None, |
| "row_count": 0, |
| "metadata": kwargs |
| } |
| |
| def get_supported_query_types(self) -> List[QueryType]: |
| """ |
| Get list of supported query types for this database. |
| |
| Returns: |
| List of supported QueryType enums |
| """ |
| return [ |
| QueryType.SELECT, |
| QueryType.INSERT, |
| QueryType.UPDATE, |
| QueryType.DELETE, |
| QueryType.CREATE, |
| QueryType.DROP |
| ] |
| |
| def get_capabilities(self) -> Dict[str, Any]: |
| """ |
| Get database capabilities and features. |
| |
| Returns: |
| Dict containing database capabilities |
| """ |
| return { |
| "database_type": self.db_type.value, |
| "supports_sql": False, |
| "supports_aggregation": False, |
| "supports_full_text_search": False, |
| "supports_vector_search": False, |
| "supports_transactions": False, |
| "supports_indexing": True, |
| "supported_query_types": [qt.value for qt in self.get_supported_query_types()], |
| "connection_info": { |
| "is_connected": self.connection is not None, |
| "database_name": self.database_name |
| } |
| } |
| |
| def __enter__(self): |
| """Context manager entry""" |
| if not self.connection: |
| self.connect() |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """Context manager exit""" |
| self.disconnect() |
| |
| def __del__(self): |
| """Cleanup on deletion""" |
| try: |
| self.disconnect() |
| except Exception: |
| pass |