Spaces:
Paused
Paused
File size: 6,160 Bytes
4e5a541 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | """
系统设置服务
管理系统配置的读取、更新和缓存
"""
from typing import Optional, Dict
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Setting
import logging
logger = logging.getLogger(__name__)
class SettingsService:
"""系统设置服务类"""
def __init__(self):
self._cache: Dict[str, str] = {}
async def get_setting(self, session: AsyncSession, key: str, default: Optional[str] = None) -> Optional[str]:
"""
获取单个配置项
Args:
session: 数据库会话
key: 配置项键名
default: 默认值
Returns:
配置项值,如果不存在则返回默认值
"""
# 先从缓存获取
if key in self._cache:
return self._cache[key]
# 从数据库获取
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
self._cache[key] = setting.value
return setting.value
return default
async def get_all_settings(self, session: AsyncSession) -> Dict[str, str]:
"""
获取所有配置项
Args:
session: 数据库会话
Returns:
配置项字典
"""
result = await session.execute(select(Setting))
settings = result.scalars().all()
settings_dict = {s.key: s.value for s in settings}
self._cache.update(settings_dict)
return settings_dict
async def update_setting(self, session: AsyncSession, key: str, value: str) -> bool:
"""
更新单个配置项
Args:
session: 数据库会话
key: 配置项键名
value: 配置项值
Returns:
是否更新成功
"""
try:
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
setting.value = value
else:
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
# 更新缓存
self._cache[key] = value
logger.info(f"配置项 {key} 已更新")
return True
except Exception as e:
logger.error(f"更新配置项 {key} 失败: {e}")
await session.rollback()
return False
async def update_settings(self, session: AsyncSession, settings: Dict[str, str]) -> bool:
"""
批量更新配置项
Args:
session: 数据库会话
settings: 配置项字典
Returns:
是否更新成功
"""
try:
for key, value in settings.items():
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
setting.value = value
else:
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
# 更新缓存
self._cache.update(settings)
logger.info(f"批量更新了 {len(settings)} 个配置项")
return True
except Exception as e:
logger.error(f"批量更新配置项失败: {e}")
await session.rollback()
return False
def clear_cache(self):
"""清空缓存"""
self._cache.clear()
logger.info("配置缓存已清空")
async def get_proxy_config(self, session: AsyncSession) -> Dict[str, str]:
"""
获取代理配置
Returns:
代理配置字典
"""
proxy_enabled = await self.get_setting(session, "proxy_enabled", "false")
proxy = await self.get_setting(session, "proxy", "")
return {
"enabled": str(proxy_enabled).lower() == "true",
"proxy": proxy
}
async def update_proxy_config(
self,
session: AsyncSession,
enabled: bool,
proxy: str = ""
) -> bool:
"""
更新代理配置
Args:
session: 数据库会话
enabled: 是否启用代理
proxy: 代理地址 (格式: http://host:port 或 socks5://host:port)
Returns:
是否更新成功
"""
settings = {
"proxy_enabled": str(enabled).lower(),
"proxy": proxy
}
return await self.update_settings(session, settings)
async def get_log_level(self, session: AsyncSession) -> str:
"""
获取日志级别
Returns:
日志级别
"""
return await self.get_setting(session, "log_level", "INFO")
async def update_log_level(self, session: AsyncSession, level: str) -> bool:
"""
更新日志级别
Args:
session: 数据库会话
level: 日志级别 (DEBUG/INFO/WARNING/ERROR/CRITICAL)
Returns:
是否更新成功
"""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if level.upper() not in valid_levels:
logger.error(f"无效的日志级别: {level}")
return False
success = await self.update_setting(session, "log_level", level.upper())
if success:
# 动态更新日志级别
logging.getLogger().setLevel(level.upper())
logger.info(f"日志级别已更新为: {level.upper()}")
return success
# 创建全局实例
settings_service = SettingsService()
|