""" 系统设置服务 管理系统配置的读取、更新和缓存 """ from typing import Optional, Dict from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models import Setting import logging logger = logging.getLogger(__name__) class SettingsService: """系统设置服务类""" def __init__(self): self._cache: Dict[str, str] = {} async def get_setting(self, session: AsyncSession, key: str, default: Optional[str] = None) -> Optional[str]: """ 获取单个配置项 Args: session: 数据库会话 key: 配置项键名 default: 默认值 Returns: 配置项值,如果不存在则返回默认值 """ # 先从缓存获取 if key in self._cache: return self._cache[key] # 从数据库获取 result = await session.execute( select(Setting).where(Setting.key == key) ) setting = result.scalar_one_or_none() if setting: self._cache[key] = setting.value return setting.value return default async def get_all_settings(self, session: AsyncSession) -> Dict[str, str]: """ 获取所有配置项 Args: session: 数据库会话 Returns: 配置项字典 """ result = await session.execute(select(Setting)) settings = result.scalars().all() settings_dict = {s.key: s.value for s in settings} self._cache.update(settings_dict) return settings_dict async def update_setting(self, session: AsyncSession, key: str, value: str) -> bool: """ 更新单个配置项 Args: session: 数据库会话 key: 配置项键名 value: 配置项值 Returns: 是否更新成功 """ try: result = await session.execute( select(Setting).where(Setting.key == key) ) setting = result.scalar_one_or_none() if setting: setting.value = value else: setting = Setting(key=key, value=value) session.add(setting) await session.commit() # 更新缓存 self._cache[key] = value logger.info(f"配置项 {key} 已更新") return True except Exception as e: logger.error(f"更新配置项 {key} 失败: {e}") await session.rollback() return False async def update_settings(self, session: AsyncSession, settings: Dict[str, str]) -> bool: """ 批量更新配置项 Args: session: 数据库会话 settings: 配置项字典 Returns: 是否更新成功 """ try: for key, value in settings.items(): result = await session.execute( select(Setting).where(Setting.key == key) ) setting = result.scalar_one_or_none() if setting: setting.value = value else: setting = Setting(key=key, value=value) session.add(setting) await session.commit() # 更新缓存 self._cache.update(settings) logger.info(f"批量更新了 {len(settings)} 个配置项") return True except Exception as e: logger.error(f"批量更新配置项失败: {e}") await session.rollback() return False def clear_cache(self): """清空缓存""" self._cache.clear() logger.info("配置缓存已清空") async def get_proxy_config(self, session: AsyncSession) -> Dict[str, str]: """ 获取代理配置 Returns: 代理配置字典 """ proxy_enabled = await self.get_setting(session, "proxy_enabled", "false") proxy = await self.get_setting(session, "proxy", "") return { "enabled": str(proxy_enabled).lower() == "true", "proxy": proxy } async def update_proxy_config( self, session: AsyncSession, enabled: bool, proxy: str = "" ) -> bool: """ 更新代理配置 Args: session: 数据库会话 enabled: 是否启用代理 proxy: 代理地址 (格式: http://host:port 或 socks5://host:port) Returns: 是否更新成功 """ settings = { "proxy_enabled": str(enabled).lower(), "proxy": proxy } return await self.update_settings(session, settings) async def get_log_level(self, session: AsyncSession) -> str: """ 获取日志级别 Returns: 日志级别 """ return await self.get_setting(session, "log_level", "INFO") async def update_log_level(self, session: AsyncSession, level: str) -> bool: """ 更新日志级别 Args: session: 数据库会话 level: 日志级别 (DEBUG/INFO/WARNING/ERROR/CRITICAL) Returns: 是否更新成功 """ valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] if level.upper() not in valid_levels: logger.error(f"无效的日志级别: {level}") return False success = await self.update_setting(session, "log_level", level.upper()) if success: # 动态更新日志级别 logging.getLogger().setLevel(level.upper()) logger.info(f"日志级别已更新为: {level.upper()}") return success # 创建全局实例 settings_service = SettingsService()