Spaces:
Paused
Paused
| """ | |
| 兑换码管理服务 | |
| 用于管理兑换码的生成、验证、使用和查询 | |
| """ | |
| import logging | |
| import secrets | |
| import string | |
| from typing import Optional, Dict, Any, List | |
| from datetime import datetime, timedelta | |
| from sqlalchemy import select, update, delete, and_, or_, func | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.orm import selectinload | |
| from app.models import RedemptionCode, RedemptionRecord, Team | |
| from app.utils.time_utils import get_now | |
| logger = logging.getLogger(__name__) | |
| class RedemptionService: | |
| """兑换码管理服务类""" | |
| def __init__(self): | |
| """初始化兑换码管理服务""" | |
| pass | |
| def _generate_random_code(self, length: int = 16) -> str: | |
| """ | |
| 生成随机兑换码 | |
| Args: | |
| length: 兑换码长度 | |
| Returns: | |
| 随机兑换码字符串 | |
| """ | |
| # 使用大写字母和数字,排除容易混淆的字符 (0, O, I, 1) | |
| alphabet = string.ascii_uppercase + string.digits | |
| alphabet = alphabet.replace('0', '').replace('O', '').replace('I', '').replace('1', '') | |
| # 生成随机码 | |
| code = ''.join(secrets.choice(alphabet) for _ in range(length)) | |
| # 格式化为 XXXX-XXXX-XXXX-XXXX | |
| if length == 16: | |
| code = f"{code[0:4]}-{code[4:8]}-{code[8:12]}-{code[12:16]}" | |
| return code | |
| async def generate_code_single( | |
| self, | |
| db_session: AsyncSession, | |
| code: Optional[str] = None, | |
| expires_days: Optional[int] = None, | |
| has_warranty: bool = False, | |
| warranty_days: int = 30, | |
| pool_type: str = "normal", | |
| reusable_by_seat: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| 生成单个兑换码 | |
| Args: | |
| db_session: 数据库会话 | |
| code: 自定义兑换码 (可选,如果不提供则自动生成) | |
| expires_days: 有效期天数 (可选,如果不提供则永久有效) | |
| has_warranty: 是否为质保兑换码 (默认 False) | |
| Returns: | |
| 结果字典,包含 success, code, message, error | |
| """ | |
| try: | |
| # 1. 生成或使用自定义兑换码 | |
| if not code: | |
| # 生成随机码,确保唯一性 | |
| max_attempts = 10 | |
| for _ in range(max_attempts): | |
| code = self._generate_random_code() | |
| # 检查是否已存在 | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| existing = result.scalar_one_or_none() | |
| if not existing: | |
| break | |
| else: | |
| return { | |
| "success": False, | |
| "code": None, | |
| "message": None, | |
| "error": "生成唯一兑换码失败,请重试" | |
| } | |
| else: | |
| # 检查自定义兑换码是否已存在 | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| existing = result.scalar_one_or_none() | |
| if existing: | |
| return { | |
| "success": False, | |
| "code": None, | |
| "message": None, | |
| "error": f"兑换码 {code} 已存在" | |
| } | |
| # 2. 计算过期时间 | |
| expires_at = None | |
| if expires_days: | |
| expires_at = get_now() + timedelta(days=expires_days) | |
| # 3. 创建兑换码记录 | |
| redemption_code = RedemptionCode( | |
| code=code, | |
| status="unused", | |
| expires_at=expires_at, | |
| has_warranty=has_warranty, | |
| warranty_days=warranty_days, | |
| pool_type=pool_type, | |
| reusable_by_seat=reusable_by_seat | |
| ) | |
| db_session.add(redemption_code) | |
| await db_session.commit() | |
| logger.info(f"生成兑换码成功: {code}") | |
| return { | |
| "success": True, | |
| "code": code, | |
| "message": f"兑换码生成成功: {code}", | |
| "error": None | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"生成兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "code": None, | |
| "message": None, | |
| "error": f"生成兑换码失败: {str(e)}" | |
| } | |
| async def generate_code_batch( | |
| self, | |
| db_session: AsyncSession, | |
| count: int, | |
| expires_days: Optional[int] = None, | |
| has_warranty: bool = False, | |
| warranty_days: int = 30, | |
| pool_type: str = "normal", | |
| reusable_by_seat: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| 批量生成兑换码 | |
| Args: | |
| db_session: 数据库会话 | |
| count: 生成数量 | |
| expires_days: 有效期天数 (可选) | |
| has_warranty: 是否为质保兑换码 (默认 False) | |
| Returns: | |
| 结果字典,包含 success, codes, total, message, error | |
| """ | |
| try: | |
| if count <= 0 or count > 1000: | |
| return { | |
| "success": False, | |
| "codes": [], | |
| "total": 0, | |
| "message": None, | |
| "error": "生成数量必须在 1-1000 之间" | |
| } | |
| # 计算过期时间 | |
| expires_at = None | |
| if expires_days: | |
| expires_at = get_now() + timedelta(days=expires_days) | |
| # 批量生成兑换码 | |
| codes = [] | |
| for i in range(count): | |
| # 生成唯一兑换码 | |
| max_attempts = 10 | |
| for _ in range(max_attempts): | |
| code = self._generate_random_code() | |
| # 检查是否已存在 (包括本次批量生成的) | |
| if code not in codes: | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| existing = result.scalar_one_or_none() | |
| if not existing: | |
| codes.append(code) | |
| break | |
| else: | |
| logger.warning(f"生成第 {i+1} 个兑换码失败") | |
| continue | |
| # 批量插入数据库 | |
| for code in codes: | |
| redemption_code = RedemptionCode( | |
| code=code, | |
| status="unused", | |
| expires_at=expires_at, | |
| has_warranty=has_warranty, | |
| warranty_days=warranty_days, | |
| pool_type=pool_type, | |
| reusable_by_seat=reusable_by_seat | |
| ) | |
| db_session.add(redemption_code) | |
| await db_session.commit() | |
| logger.info(f"批量生成兑换码成功: {len(codes)} 个") | |
| return { | |
| "success": True, | |
| "codes": codes, | |
| "total": len(codes), | |
| "message": f"成功生成 {len(codes)} 个兑换码", | |
| "error": None | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"批量生成兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "codes": [], | |
| "total": 0, | |
| "message": None, | |
| "error": f"批量生成兑换码失败: {str(e)}" | |
| } | |
| async def validate_code( | |
| self, | |
| code: str, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 验证兑换码 | |
| Args: | |
| code: 兑换码 | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典,包含 success, valid, reason, redemption_code, error | |
| """ | |
| try: | |
| # 1. 查询兑换码 | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| redemption_code = result.scalar_one_or_none() | |
| if not redemption_code: | |
| # 兼容福利通用兑换码:只存 settings,不写 redemption_codes 表 | |
| from app.services.settings import settings_service | |
| welfare_code = (await settings_service.get_setting(db_session, "welfare_common_code", "") or "").strip() | |
| if welfare_code and code == welfare_code: | |
| limit_raw = await settings_service.get_setting(db_session, "welfare_common_code_limit", "0") | |
| used_raw = await settings_service.get_setting(db_session, "welfare_common_code_used_count", "0") | |
| try: | |
| limit_count = int(limit_raw or 0) | |
| except Exception: | |
| limit_count = 0 | |
| try: | |
| used_count = int(used_raw or 0) | |
| except Exception: | |
| used_count = 0 | |
| # 真实可兑换次数按可邀请席位计算:sum(max_members - 1) | |
| capacity_stmt = select(func.sum(Team.max_members - 1)).where( | |
| Team.pool_type == "welfare", | |
| Team.max_members > 1 | |
| ) | |
| capacity_result = await db_session.execute(capacity_stmt) | |
| usable_capacity = int(capacity_result.scalar() or 0) | |
| # 兼容历史错误配置:向下收敛到真实可邀请席位 | |
| effective_limit = min(limit_count, usable_capacity) if limit_count > 0 else usable_capacity | |
| if effective_limit <= 0 or used_count >= effective_limit: | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": "兑换码次数已用完,无法进行兑换", | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| return { | |
| "success": True, | |
| "valid": True, | |
| "reason": "兑换码有效", | |
| "redemption_code": { | |
| "id": None, | |
| "code": code, | |
| "status": "virtual_welfare", | |
| "expires_at": None, | |
| "created_at": None, | |
| "has_warranty": False, | |
| "warranty_days": 0, | |
| "pool_type": "welfare", | |
| "reusable_by_seat": True, | |
| "virtual_welfare_code": True, | |
| "limit": effective_limit, | |
| "used_count": used_count, | |
| }, | |
| "error": None | |
| } | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": "兑换码不存在", | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| # 兼容清理:历史版本中福利通用码可能被写入 redemption_codes。 | |
| # 现版本福利通用码以 settings 为准,旧的福利可复用码一律视为失效,避免绕过“新码替换旧码”的规则。 | |
| if redemption_code.pool_type == "welfare" and redemption_code.reusable_by_seat: | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": "兑换码已失效,请使用最新福利通用兑换码", | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| # 2. 检查状态 | |
| if redemption_code.reusable_by_seat: | |
| allowed_statuses = ["unused", "used", "warranty_active"] | |
| else: | |
| allowed_statuses = ["unused", "warranty_active"] | |
| if redemption_code.has_warranty: | |
| allowed_statuses.append("used") | |
| if redemption_code.status not in allowed_statuses: | |
| status_text = "已过期" if redemption_code.status == "expired" else redemption_code.status | |
| reason = "兑换码已被使用" if redemption_code.status == "used" else f"兑换码{status_text}" | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": reason, | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| # 3. 席位可复用兑换码次数限制校验(按池内总席位) | |
| if redemption_code.reusable_by_seat: | |
| total_seats_stmt = select(func.sum(Team.max_members)).where( | |
| Team.pool_type == (redemption_code.pool_type or "normal") | |
| ) | |
| total_seats_result = await db_session.execute(total_seats_stmt) | |
| total_seats = int(total_seats_result.scalar() or 0) | |
| used_count_stmt = select(func.count(RedemptionRecord.id)).where( | |
| RedemptionRecord.code == code | |
| ) | |
| used_count_result = await db_session.execute(used_count_stmt) | |
| used_count = int(used_count_result.scalar() or 0) | |
| if total_seats <= 0 or used_count >= total_seats: | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": "兑换码次数已用完,无法进行兑换", | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| # 4. 检查是否过期 (仅针对未使用的兑换码执行首次激活截止时间检查) | |
| if redemption_code.status == "unused" and redemption_code.expires_at: | |
| if redemption_code.expires_at < get_now(): | |
| # 更新状态为 expired | |
| redemption_code.status = "expired" | |
| # 不在服务层内部 commit,让调用方决定事务边界 | |
| # await db_session.commit() | |
| return { | |
| "success": True, | |
| "valid": False, | |
| "reason": "兑换码已过期 (超过首次兑换截止时间)", | |
| "redemption_code": None, | |
| "error": None | |
| } | |
| # 5. 验证通过 | |
| return { | |
| "success": True, | |
| "valid": True, | |
| "reason": "兑换码有效", | |
| "redemption_code": { | |
| "id": redemption_code.id, | |
| "code": redemption_code.code, | |
| "status": redemption_code.status, | |
| "expires_at": redemption_code.expires_at.isoformat() if redemption_code.expires_at else None, | |
| "created_at": redemption_code.created_at.isoformat() if redemption_code.created_at else None, | |
| "has_warranty": redemption_code.has_warranty, | |
| "warranty_days": redemption_code.warranty_days, | |
| "pool_type": redemption_code.pool_type or "normal", | |
| "reusable_by_seat": bool(redemption_code.reusable_by_seat) | |
| }, | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.error(f"验证兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "valid": False, | |
| "reason": None, | |
| "redemption_code": None, | |
| "error": f"验证兑换码失败: {str(e)}" | |
| } | |
| async def use_code( | |
| self, | |
| code: str, | |
| email: str, | |
| team_id: int, | |
| account_id: str, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 使用兑换码 | |
| Args: | |
| code: 兑换码 | |
| email: 使用者邮箱 | |
| team_id: Team ID | |
| account_id: Account ID | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典,包含 success, message, error | |
| """ | |
| try: | |
| # 1. 验证兑换码 | |
| validate_result = await self.validate_code(code, db_session) | |
| if not validate_result["success"]: | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": validate_result["error"] | |
| } | |
| if not validate_result["valid"]: | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": validate_result["reason"] | |
| } | |
| # 2. 更新兑换码状态 | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| redemption_code = result.scalar_one_or_none() | |
| redemption_code.status = "used" | |
| redemption_code.used_by_email = email | |
| redemption_code.used_team_id = team_id | |
| redemption_code.used_at = get_now() | |
| # 3. 创建使用记录 | |
| redemption_record = RedemptionRecord( | |
| email=email, | |
| code=code, | |
| team_id=team_id, | |
| account_id=account_id | |
| ) | |
| db_session.add(redemption_record) | |
| await db_session.commit() | |
| logger.info(f"使用兑换码成功: {code} -> {email}") | |
| return { | |
| "success": True, | |
| "message": "兑换码使用成功", | |
| "error": None | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"使用兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": f"使用兑换码失败: {str(e)}" | |
| } | |
| async def get_all_codes( | |
| self, | |
| db_session: AsyncSession, | |
| page: int = 1, | |
| per_page: int = 50, | |
| search: Optional[str] = None, | |
| status: Optional[str] = None, | |
| pool_type: Optional[str] = "normal" | |
| ) -> Dict[str, Any]: | |
| """ | |
| 获取所有兑换码 | |
| Args: | |
| db_session: 数据库会话 | |
| page: 页码 | |
| per_page: 每页数量 | |
| search: 搜索关键词 (兑换码或邮箱) | |
| status: 状态筛选 | |
| Returns: | |
| 结果字典,包含 success, codes, total, total_pages, current_page, error | |
| """ | |
| try: | |
| # 1. 构建基础查询 | |
| count_stmt = select(func.count(RedemptionCode.id)) | |
| stmt = select(RedemptionCode).order_by(RedemptionCode.created_at.desc()) | |
| # 2. 如果提供了筛选条件,添加过滤条件 | |
| filters = [] | |
| if search: | |
| filters.append(or_( | |
| RedemptionCode.code.ilike(f"%{search}%"), | |
| RedemptionCode.used_by_email.ilike(f"%{search}%") | |
| )) | |
| if status: | |
| if status == 'used': | |
| # "已使用" 在查询中通常指窄义的 used, 但如果要包含质保中, 逻辑如下 | |
| filters.append(RedemptionCode.status.in_(['used', 'warranty_active'])) | |
| else: | |
| filters.append(RedemptionCode.status == status) | |
| if filters: | |
| count_stmt = count_stmt.where(and_(*filters)) | |
| stmt = stmt.where(and_(*filters)) | |
| # 3. 获取总数 | |
| count_result = await db_session.execute(count_stmt) | |
| total = count_result.scalar() or 0 | |
| # 4. 计算分页 | |
| import math | |
| total_pages = math.ceil(total / per_page) if total > 0 else 1 | |
| if page < 1: | |
| page = 1 | |
| if page > total_pages and total_pages > 0: | |
| page = total_pages | |
| offset = (page - 1) * per_page | |
| # 5. 查询分页数据 | |
| stmt = stmt.limit(per_page).offset(offset) | |
| result = await db_session.execute(stmt) | |
| codes = result.scalars().all() | |
| # 构建返回数据 | |
| code_list = [] | |
| for code in codes: | |
| code_list.append({ | |
| "id": code.id, | |
| "code": code.code, | |
| "status": code.status, | |
| "created_at": code.created_at.isoformat() if code.created_at else None, | |
| "expires_at": code.expires_at.isoformat() if code.expires_at else None, | |
| "used_by_email": code.used_by_email, | |
| "used_team_id": code.used_team_id, | |
| "used_at": code.used_at.isoformat() if code.used_at else None, | |
| "has_warranty": code.has_warranty, | |
| "warranty_days": code.warranty_days, | |
| "warranty_expires_at": code.warranty_expires_at.isoformat() if code.warranty_expires_at else None | |
| }) | |
| logger.info(f"获取所有兑换码成功: 第 {page} 页, 共 {len(code_list)} 个 / 总数 {total}") | |
| return { | |
| "success": True, | |
| "codes": code_list, | |
| "total": total, | |
| "total_pages": total_pages, | |
| "current_page": page, | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.error(f"获取所有兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "codes": [], | |
| "total": 0, | |
| "error": f"获取所有兑换码失败: {str(e)}" | |
| } | |
| async def get_unused_count( | |
| self, | |
| db_session: AsyncSession | |
| ) -> int: | |
| """ | |
| 获取未使用的兑换码数量 | |
| """ | |
| try: | |
| stmt = select(func.count(RedemptionCode.id)).where(RedemptionCode.status == "unused") | |
| result = await db_session.execute(stmt) | |
| return result.scalar() or 0 | |
| except Exception as e: | |
| logger.error(f"获取未使用兑换码数量失败: {e}") | |
| return 0 | |
| async def get_code_by_code( | |
| self, | |
| code: str, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 根据兑换码查询 | |
| Args: | |
| code: 兑换码 | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典,包含 success, code_info, error | |
| """ | |
| try: | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| redemption_code = result.scalar_one_or_none() | |
| if not redemption_code: | |
| return { | |
| "success": False, | |
| "code_info": None, | |
| "error": f"兑换码 {code} 不存在" | |
| } | |
| code_info = { | |
| "id": redemption_code.id, | |
| "code": redemption_code.code, | |
| "status": redemption_code.status, | |
| "created_at": redemption_code.created_at.isoformat() if redemption_code.created_at else None, | |
| "expires_at": redemption_code.expires_at.isoformat() if redemption_code.expires_at else None, | |
| "used_by_email": redemption_code.used_by_email, | |
| "used_team_id": redemption_code.used_team_id, | |
| "used_at": redemption_code.used_at.isoformat() if redemption_code.used_at else None | |
| } | |
| return { | |
| "success": True, | |
| "code_info": code_info, | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.error(f"查询兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "code_info": None, | |
| "error": f"查询兑换码失败: {str(e)}" | |
| } | |
| async def get_unused_codes( | |
| self, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 获取未使用的兑换码 | |
| Args: | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典,包含 success, codes, total, error | |
| """ | |
| try: | |
| stmt = select(RedemptionCode).where( | |
| RedemptionCode.status == "unused" | |
| ).order_by(RedemptionCode.created_at.desc()) | |
| result = await db_session.execute(stmt) | |
| codes = result.scalars().all() | |
| # 构建返回数据 | |
| code_list = [] | |
| for code in codes: | |
| code_list.append({ | |
| "id": code.id, | |
| "code": code.code, | |
| "status": code.status, | |
| "created_at": code.created_at.isoformat() if code.created_at else None, | |
| "expires_at": code.expires_at.isoformat() if code.expires_at else None | |
| }) | |
| return { | |
| "success": True, | |
| "codes": code_list, | |
| "total": len(code_list), | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.error(f"获取未使用兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "codes": [], | |
| "total": 0, | |
| "error": f"获取未使用兑换码失败: {str(e)}" | |
| } | |
| async def get_all_records( | |
| self, | |
| db_session: AsyncSession, | |
| email: Optional[str] = None, | |
| code: Optional[str] = None, | |
| team_id: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| 获取所有兑换记录 (支持筛选) | |
| Args: | |
| db_session: 数据库会话 | |
| email: 邮箱模糊搜索 | |
| code: 兑换码模糊搜索 | |
| team_id: Team ID 筛选 | |
| Returns: | |
| 结果字典,包含 success, records, total, error | |
| """ | |
| try: | |
| stmt = select(RedemptionRecord) | |
| # 添加筛选条件 | |
| filters = [] | |
| if email: | |
| filters.append(RedemptionRecord.email.ilike(f"%{email}%")) | |
| if code: | |
| filters.append(RedemptionRecord.code.ilike(f"%{code}%")) | |
| if team_id: | |
| filters.append(RedemptionRecord.team_id == team_id) | |
| if filters: | |
| stmt = stmt.where(and_(*filters)) | |
| stmt = stmt.order_by(RedemptionRecord.redeemed_at.desc()) | |
| result = await db_session.execute(stmt) | |
| records = result.scalars().all() | |
| # 构建返回数据 | |
| record_list = [] | |
| for record in records: | |
| record_list.append({ | |
| "id": record.id, | |
| "email": record.email, | |
| "code": record.code, | |
| "team_id": record.team_id, | |
| "account_id": record.account_id, | |
| "redeemed_at": record.redeemed_at.isoformat() if record.redeemed_at else None | |
| }) | |
| logger.info(f"获取所有兑换记录成功: 共 {len(record_list)} 条") | |
| return { | |
| "success": True, | |
| "records": record_list, | |
| "total": len(record_list), | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.error(f"获取所有兑换记录失败: {e}") | |
| return { | |
| "success": False, | |
| "records": [], | |
| "total": 0, | |
| "error": f"获取所有兑换记录失败: {str(e)}" | |
| } | |
| async def delete_code( | |
| self, | |
| code: str, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 删除兑换码 | |
| Args: | |
| code: 兑换码 | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典,包含 success, message, error | |
| """ | |
| try: | |
| # 查询兑换码 | |
| stmt = select(RedemptionCode).where(RedemptionCode.code == code) | |
| result = await db_session.execute(stmt) | |
| redemption_code = result.scalar_one_or_none() | |
| if not redemption_code: | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": f"兑换码 {code} 不存在" | |
| } | |
| # 删除兑换码 | |
| await db_session.delete(redemption_code) | |
| await db_session.commit() | |
| logger.info(f"删除兑换码成功: {code}") | |
| return { | |
| "success": True, | |
| "message": f"兑换码 {code} 已删除", | |
| "error": None | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"删除兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": f"删除兑换码失败: {str(e)}" | |
| } | |
| async def update_code( | |
| self, | |
| code: str, | |
| db_session: AsyncSession, | |
| has_warranty: Optional[bool] = None, | |
| warranty_days: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """更新兑换码信息""" | |
| return await self.bulk_update_codes([code], db_session, has_warranty, warranty_days) | |
| async def withdraw_record( | |
| self, | |
| record_id: int, | |
| db_session: AsyncSession | |
| ) -> Dict[str, Any]: | |
| """ | |
| 撤回使用记录 (删除记录,恢复兑换码,并在 Team 中移除成员/邀请) | |
| Args: | |
| record_id: 记录 ID | |
| db_session: 数据库会话 | |
| Returns: | |
| 结果字典 | |
| """ | |
| try: | |
| from app.services.team import team_service | |
| # 1. 查询记录 | |
| stmt = select(RedemptionRecord).where(RedemptionRecord.id == record_id).options( | |
| selectinload(RedemptionRecord.redemption_code) | |
| ) | |
| result = await db_session.execute(stmt) | |
| record = result.scalar_one_or_none() | |
| if not record: | |
| return {"success": False, "error": f"记录 ID {record_id} 不存在"} | |
| # 2. 调用 TeamService 移除成员/邀请 | |
| logger.info(f"正在从 Team {record.team_id} 中移除成员 {record.email}") | |
| team_result = await team_service.remove_invite_or_member( | |
| record.team_id, | |
| record.email, | |
| db_session | |
| ) | |
| if not team_result["success"]: | |
| # 即使 Team 移除失败,如果是因为成员已经不在了,我们也继续处理数据库 | |
| if "成员已不存在" not in str(team_result.get("message", "")) and "用户不存在" not in str(team_result.get("error", "")): | |
| return { | |
| "success": False, | |
| "error": f"从 Team 移除成员失败: {team_result.get('error') or team_result.get('message')}" | |
| } | |
| # 3. 恢复兑换码状态 | |
| code = record.redemption_code | |
| if code: | |
| # 如果是质保兑换,且还有其他记录,状态可能不应该直接回 unused | |
| # 但根据逻辑,目前一个码一个记录(除了质保补发可能产生新记录,但那是两个不同的码吧?) | |
| # 查了一下模型,RedemptionCode 有 used_by_email 等字段,说明它是单次使用的设计 | |
| code.status = "unused" | |
| code.used_by_email = None | |
| code.used_team_id = None | |
| code.used_at = None | |
| # 特殊处理质保字段 | |
| if code.has_warranty: | |
| code.warranty_expires_at = None | |
| # 4. 删除使用记录 | |
| await db_session.delete(record) | |
| await db_session.commit() | |
| logger.info(f"撤回记录成功: {record_id}, 邮箱: {record.email}, 兑换码: {record.code}") | |
| return { | |
| "success": True, | |
| "message": f"成功撤回记录并恢复兑换码 {record.code}" | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"撤回记录失败: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return {"success": False, "error": f"撤回失败: {str(e)}"} | |
| async def bulk_update_codes( | |
| self, | |
| codes: List[str], | |
| db_session: AsyncSession, | |
| has_warranty: Optional[bool] = None, | |
| warranty_days: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| 批量更新兑换码信息 | |
| Args: | |
| codes: 兑换码列表 | |
| db_session: 数据库会话 | |
| has_warranty: 是否为质保兑换码 (可选) | |
| warranty_days: 质保天数 (可选) | |
| Returns: | |
| 结果字典 | |
| """ | |
| try: | |
| if not codes: | |
| return {"success": True, "message": "没有需要更新的兑换码"} | |
| # 构建更新语句 | |
| values = {} | |
| if has_warranty is not None: | |
| values[RedemptionCode.has_warranty] = has_warranty | |
| if warranty_days is not None: | |
| values[RedemptionCode.warranty_days] = warranty_days | |
| if not values: | |
| return {"success": True, "message": "没有提供更新内容"} | |
| stmt = update(RedemptionCode).where(RedemptionCode.code.in_(codes)).values(values) | |
| await db_session.execute(stmt) | |
| await db_session.commit() | |
| logger.info(f"成功批量更新 {len(codes)} 个兑换码") | |
| return { | |
| "success": True, | |
| "message": f"成功批量更新 {len(codes)} 个兑换码", | |
| "error": None | |
| } | |
| except Exception as e: | |
| await db_session.rollback() | |
| logger.error(f"批量更新兑换码失败: {e}") | |
| return { | |
| "success": False, | |
| "message": None, | |
| "error": f"批量更新失败: {str(e)}" | |
| } | |
| async def get_stats( | |
| self, | |
| db_session: AsyncSession, | |
| pool_type: Optional[str] = "normal" | |
| ) -> Dict[str, int]: | |
| """ | |
| 获取兑换码统计信息 | |
| Returns: | |
| 统计字典, 包含 total, unused, used, expired | |
| """ | |
| try: | |
| # 使用 SQL 聚合统计各状态数量 | |
| stmt = select( | |
| RedemptionCode.status, | |
| func.count(RedemptionCode.id) | |
| ).group_by(RedemptionCode.status) | |
| if pool_type: | |
| stmt = stmt.where(RedemptionCode.pool_type == pool_type) | |
| result = await db_session.execute(stmt) | |
| status_counts = dict(result.all()) | |
| # 由于 "used" 和 "warranty_active" 都属于广义上的 "已使用" | |
| # 这里的 used 统计需要合并这两个状态 | |
| used_count = status_counts.get("used", 0) + status_counts.get("warranty_active", 0) | |
| # 计算总数 | |
| total_stmt = select(func.count(RedemptionCode.id)) | |
| if pool_type: | |
| total_stmt = total_stmt.where(RedemptionCode.pool_type == pool_type) | |
| total_result = await db_session.execute(total_stmt) | |
| total = total_result.scalar() or 0 | |
| return { | |
| "total": total, | |
| "unused": status_counts.get("unused", 0), | |
| "used": used_count, | |
| "warranty_active": status_counts.get("warranty_active", 0), | |
| "expired": status_counts.get("expired", 0) | |
| } | |
| except Exception as e: | |
| logger.error(f"获取兑换码统计信息失败: {e}") | |
| return { | |
| "total": 0, | |
| "unused": 0, | |
| "used": 0, | |
| "expired": 0 | |
| } | |
| # 创建全局兑换码服务实例 | |
| redemption_service = RedemptionService() | |