""" 数据库自动迁移模块 在应用启动时自动检测并执行必要的数据库迁移 """ import logging import sqlite3 from pathlib import Path from datetime import datetime logger = logging.getLogger(__name__) def get_db_path(): """获取数据库文件路径""" from app.config import settings db_file = settings.database_url.split("///")[-1] return Path(db_file) def column_exists(cursor, table_name, column_name): """检查表中是否存在指定列""" cursor.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns def run_auto_migration(): """ 自动运行数据库迁移 检测缺失的列并自动添加 """ db_path = get_db_path() if not db_path.exists(): logger.info("数据库文件不存在,跳过迁移") return logger.info("开始检查数据库迁移...") try: conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() migrations_applied = [] # 检查并添加质保相关字段 if not column_exists(cursor, "redemption_codes", "has_warranty"): logger.info("添加 redemption_codes.has_warranty 字段") cursor.execute(""" ALTER TABLE redemption_codes ADD COLUMN has_warranty BOOLEAN DEFAULT 0 """) migrations_applied.append("redemption_codes.has_warranty") if not column_exists(cursor, "redemption_codes", "warranty_expires_at"): logger.info("添加 redemption_codes.warranty_expires_at 字段") cursor.execute(""" ALTER TABLE redemption_codes ADD COLUMN warranty_expires_at DATETIME """) migrations_applied.append("redemption_codes.warranty_expires_at") if not column_exists(cursor, "redemption_codes", "warranty_days"): logger.info("添加 redemption_codes.warranty_days 字段") cursor.execute(""" ALTER TABLE redemption_codes ADD COLUMN warranty_days INTEGER DEFAULT 30 """) migrations_applied.append("redemption_codes.warranty_days") if not column_exists(cursor, "redemption_records", "is_warranty_redemption"): logger.info("添加 redemption_records.is_warranty_redemption 字段") cursor.execute(""" ALTER TABLE redemption_records ADD COLUMN is_warranty_redemption BOOLEAN DEFAULT 0 """) migrations_applied.append("redemption_records.is_warranty_redemption") # 检查并添加 Token 刷新相关字段 if not column_exists(cursor, "teams", "refresh_token_encrypted"): logger.info("添加 teams.refresh_token_encrypted 字段") cursor.execute("ALTER TABLE teams ADD COLUMN refresh_token_encrypted TEXT") migrations_applied.append("teams.refresh_token_encrypted") if not column_exists(cursor, "teams", "session_token_encrypted"): logger.info("添加 teams.session_token_encrypted 字段") cursor.execute("ALTER TABLE teams ADD COLUMN session_token_encrypted TEXT") migrations_applied.append("teams.session_token_encrypted") if not column_exists(cursor, "teams", "client_id"): logger.info("添加 teams.client_id 字段") cursor.execute("ALTER TABLE teams ADD COLUMN client_id VARCHAR(100)") migrations_applied.append("teams.client_id") if not column_exists(cursor, "teams", "error_count"): logger.info("添加 teams.error_count 字段") cursor.execute("ALTER TABLE teams ADD COLUMN error_count INTEGER DEFAULT 0") migrations_applied.append("teams.error_count") if not column_exists(cursor, "teams", "account_role"): logger.info("添加 teams.account_role 字段") cursor.execute("ALTER TABLE teams ADD COLUMN account_role VARCHAR(50)") migrations_applied.append("teams.account_role") if not column_exists(cursor, "teams", "device_code_auth_enabled"): logger.info("添加 teams.device_code_auth_enabled 字段") cursor.execute("ALTER TABLE teams ADD COLUMN device_code_auth_enabled BOOLEAN DEFAULT 0") migrations_applied.append("teams.device_code_auth_enabled") if not column_exists(cursor, "teams", "pool_type"): logger.info("添加 teams.pool_type 字段") cursor.execute("ALTER TABLE teams ADD COLUMN pool_type VARCHAR(20) DEFAULT 'normal'") migrations_applied.append("teams.pool_type") if not column_exists(cursor, "redemption_codes", "pool_type"): logger.info("添加 redemption_codes.pool_type 字段") cursor.execute("ALTER TABLE redemption_codes ADD COLUMN pool_type VARCHAR(20) DEFAULT 'normal'") migrations_applied.append("redemption_codes.pool_type") if not column_exists(cursor, "redemption_codes", "reusable_by_seat"): logger.info("添加 redemption_codes.reusable_by_seat 字段") cursor.execute("ALTER TABLE redemption_codes ADD COLUMN reusable_by_seat BOOLEAN DEFAULT 0") migrations_applied.append("redemption_codes.reusable_by_seat") # 提交更改 conn.commit() if migrations_applied: logger.info(f"数据库迁移完成,应用了 {len(migrations_applied)} 个迁移: {', '.join(migrations_applied)}") else: logger.info("数据库已是最新版本,无需迁移") conn.close() except Exception as e: logger.error(f"数据库迁移失败: {e}") raise if __name__ == "__main__": # 允许直接运行此脚本进行迁移 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) run_auto_migration() print("迁移完成")