Spaces:
Paused
Paused
| import logging | |
| import httpx | |
| import asyncio | |
| from typing import Optional, Any, Dict | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.services.settings import settings_service | |
| from app.services.redemption import RedemptionService | |
| from app.services.team import team_service | |
| from app.database import AsyncSessionLocal | |
| logger = logging.getLogger(__name__) | |
| class NotificationService: | |
| """通知服务类""" | |
| def __init__(self): | |
| self.redemption_service = RedemptionService() | |
| async def check_and_notify_low_stock(self) -> bool: | |
| """ | |
| 检查库存(车位)并发送通知 | |
| 使用独立的数据库会话以支持异步后台任务 | |
| """ | |
| async with AsyncSessionLocal() as db_session: | |
| try: | |
| # 1. 获取配置 | |
| webhook_url = await settings_service.get_setting(db_session, "webhook_url") | |
| if not webhook_url: | |
| return False | |
| threshold_str = await settings_service.get_setting(db_session, "low_stock_threshold", "10") | |
| api_key = await settings_service.get_setting(db_session, "api_key") | |
| try: | |
| threshold = int(threshold_str) | |
| except (ValueError, TypeError): | |
| threshold = 10 | |
| # 2. 检查可用车位 (作为预警指标) | |
| available_seats = await team_service.get_total_available_seats(db_session) | |
| logger.info(f"库存检查 - 当前总可用车位: {available_seats}, 触发阈值: {threshold}") | |
| # 仅根据可用车位触发补货 | |
| if available_seats <= threshold: | |
| logger.info(f"检测到车位不足,触发补货预警! Webhook URL: {webhook_url}") | |
| return await self.send_webhook_notification(webhook_url, available_seats, threshold, api_key) | |
| return False | |
| except Exception as e: | |
| logger.error(f"检查库存并通知过程发生错误: {e}") | |
| return False | |
| async def send_webhook_notification(self, url: str, available_seats: int, threshold: int, api_key: Optional[str] = None) -> bool: | |
| """ | |
| 发送 Webhook 通知 | |
| """ | |
| try: | |
| payload = { | |
| "event": "low_stock", | |
| "current_seats": available_seats, | |
| "threshold": threshold, | |
| "message": f"库存不足预警:系统总可用车位仅剩 {available_seats},已低于预警阈值 {threshold},请及时补货导入新账号。" | |
| } | |
| headers = {} | |
| if api_key: | |
| headers["X-API-Key"] = api_key | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| logger.info(f"Webhook 通知发送成功: {url}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"发送 Webhook 通知失败: {e}") | |
| return False | |
| # 创建全局实例 | |
| notification_service = NotificationService() | |