test-team-manager / app /services /settings.py
Kyou0203's picture
Deploy updated app
4e5a541 verified
"""
系统设置服务
管理系统配置的读取、更新和缓存
"""
from typing import Optional, Dict
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Setting
import logging
logger = logging.getLogger(__name__)
class SettingsService:
"""系统设置服务类"""
def __init__(self):
self._cache: Dict[str, str] = {}
async def get_setting(self, session: AsyncSession, key: str, default: Optional[str] = None) -> Optional[str]:
"""
获取单个配置项
Args:
session: 数据库会话
key: 配置项键名
default: 默认值
Returns:
配置项值,如果不存在则返回默认值
"""
# 先从缓存获取
if key in self._cache:
return self._cache[key]
# 从数据库获取
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
self._cache[key] = setting.value
return setting.value
return default
async def get_all_settings(self, session: AsyncSession) -> Dict[str, str]:
"""
获取所有配置项
Args:
session: 数据库会话
Returns:
配置项字典
"""
result = await session.execute(select(Setting))
settings = result.scalars().all()
settings_dict = {s.key: s.value for s in settings}
self._cache.update(settings_dict)
return settings_dict
async def update_setting(self, session: AsyncSession, key: str, value: str) -> bool:
"""
更新单个配置项
Args:
session: 数据库会话
key: 配置项键名
value: 配置项值
Returns:
是否更新成功
"""
try:
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
setting.value = value
else:
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
# 更新缓存
self._cache[key] = value
logger.info(f"配置项 {key} 已更新")
return True
except Exception as e:
logger.error(f"更新配置项 {key} 失败: {e}")
await session.rollback()
return False
async def update_settings(self, session: AsyncSession, settings: Dict[str, str]) -> bool:
"""
批量更新配置项
Args:
session: 数据库会话
settings: 配置项字典
Returns:
是否更新成功
"""
try:
for key, value in settings.items():
result = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = result.scalar_one_or_none()
if setting:
setting.value = value
else:
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
# 更新缓存
self._cache.update(settings)
logger.info(f"批量更新了 {len(settings)} 个配置项")
return True
except Exception as e:
logger.error(f"批量更新配置项失败: {e}")
await session.rollback()
return False
def clear_cache(self):
"""清空缓存"""
self._cache.clear()
logger.info("配置缓存已清空")
async def get_proxy_config(self, session: AsyncSession) -> Dict[str, str]:
"""
获取代理配置
Returns:
代理配置字典
"""
proxy_enabled = await self.get_setting(session, "proxy_enabled", "false")
proxy = await self.get_setting(session, "proxy", "")
return {
"enabled": str(proxy_enabled).lower() == "true",
"proxy": proxy
}
async def update_proxy_config(
self,
session: AsyncSession,
enabled: bool,
proxy: str = ""
) -> bool:
"""
更新代理配置
Args:
session: 数据库会话
enabled: 是否启用代理
proxy: 代理地址 (格式: http://host:port 或 socks5://host:port)
Returns:
是否更新成功
"""
settings = {
"proxy_enabled": str(enabled).lower(),
"proxy": proxy
}
return await self.update_settings(session, settings)
async def get_log_level(self, session: AsyncSession) -> str:
"""
获取日志级别
Returns:
日志级别
"""
return await self.get_setting(session, "log_level", "INFO")
async def update_log_level(self, session: AsyncSession, level: str) -> bool:
"""
更新日志级别
Args:
session: 数据库会话
level: 日志级别 (DEBUG/INFO/WARNING/ERROR/CRITICAL)
Returns:
是否更新成功
"""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if level.upper() not in valid_levels:
logger.error(f"无效的日志级别: {level}")
return False
success = await self.update_setting(session, "log_level", level.upper())
if success:
# 动态更新日志级别
logging.getLogger().setLevel(level.upper())
logger.info(f"日志级别已更新为: {level.upper()}")
return success
# 创建全局实例
settings_service = SettingsService()