| """ |
| 数据库连接池模块 |
| """ |
| from pathlib import Path |
| from urllib.parse import quote_plus |
| from databases import Database |
| from sqlalchemy import create_engine, MetaData |
| from sqlalchemy.ext.declarative import declarative_base |
|
|
| from app.config.config import settings |
| from app.log.logger import get_database_logger |
|
|
| logger = get_database_logger() |
|
|
| |
| if settings.DATABASE_TYPE == "sqlite": |
| |
| data_dir = Path("data") |
| data_dir.mkdir(exist_ok=True) |
| db_path = data_dir / settings.SQLITE_DATABASE |
| DATABASE_URL = f"sqlite:///{db_path}" |
| elif settings.DATABASE_TYPE == "mysql": |
| if settings.MYSQL_SOCKET: |
| DATABASE_URL = f"mysql+pymysql://{settings.MYSQL_USER}:{quote_plus(settings.MYSQL_PASSWORD)}@/{settings.MYSQL_DATABASE}?unix_socket={settings.MYSQL_SOCKET}" |
| else: |
| DATABASE_URL = f"mysql+pymysql://{settings.MYSQL_USER}:{quote_plus(settings.MYSQL_PASSWORD)}@{settings.MYSQL_HOST}:{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}" |
| else: |
| raise ValueError("Unsupported database type. Please set DATABASE_TYPE to 'sqlite' or 'mysql'.") |
|
|
| |
| |
| engine = create_engine(DATABASE_URL, pool_pre_ping=True) |
|
|
| |
| metadata = MetaData() |
|
|
| |
| Base = declarative_base(metadata=metadata) |
|
|
| |
| |
| |
| |
| |
| |
| if settings.DATABASE_TYPE == "sqlite": |
| database = Database(DATABASE_URL) |
| else: |
| database = Database(DATABASE_URL, min_size=5, max_size=20, pool_recycle=1800) |
|
|
| async def connect_to_db(): |
| """ |
| 连接到数据库 |
| """ |
| try: |
| await database.connect() |
| logger.info(f"Connected to {settings.DATABASE_TYPE}") |
| except Exception as e: |
| logger.error(f"Failed to connect to database: {str(e)}") |
| raise |
|
|
|
|
| async def disconnect_from_db(): |
| """ |
| 断开数据库连接 |
| """ |
| try: |
| await database.disconnect() |
| logger.info(f"Disconnected from {settings.DATABASE_TYPE}") |
| except Exception as e: |
| logger.error(f"Failed to disconnect from database: {str(e)}") |
|
|