test-team-manager / app /services /redemption.py
Kyou0203's picture
Deploy updated app
4e5a541 verified
"""
兑换码管理服务
用于管理兑换码的生成、验证、使用和查询
"""
import logging
import secrets
import string
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from sqlalchemy import select, update, delete, and_, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import RedemptionCode, RedemptionRecord, Team
from app.utils.time_utils import get_now
logger = logging.getLogger(__name__)
class RedemptionService:
"""兑换码管理服务类"""
def __init__(self):
"""初始化兑换码管理服务"""
pass
def _generate_random_code(self, length: int = 16) -> str:
"""
生成随机兑换码
Args:
length: 兑换码长度
Returns:
随机兑换码字符串
"""
# 使用大写字母和数字,排除容易混淆的字符 (0, O, I, 1)
alphabet = string.ascii_uppercase + string.digits
alphabet = alphabet.replace('0', '').replace('O', '').replace('I', '').replace('1', '')
# 生成随机码
code = ''.join(secrets.choice(alphabet) for _ in range(length))
# 格式化为 XXXX-XXXX-XXXX-XXXX
if length == 16:
code = f"{code[0:4]}-{code[4:8]}-{code[8:12]}-{code[12:16]}"
return code
async def generate_code_single(
self,
db_session: AsyncSession,
code: Optional[str] = None,
expires_days: Optional[int] = None,
has_warranty: bool = False,
warranty_days: int = 30,
pool_type: str = "normal",
reusable_by_seat: bool = False
) -> Dict[str, Any]:
"""
生成单个兑换码
Args:
db_session: 数据库会话
code: 自定义兑换码 (可选,如果不提供则自动生成)
expires_days: 有效期天数 (可选,如果不提供则永久有效)
has_warranty: 是否为质保兑换码 (默认 False)
Returns:
结果字典,包含 success, code, message, error
"""
try:
# 1. 生成或使用自定义兑换码
if not code:
# 生成随机码,确保唯一性
max_attempts = 10
for _ in range(max_attempts):
code = self._generate_random_code()
# 检查是否已存在
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
existing = result.scalar_one_or_none()
if not existing:
break
else:
return {
"success": False,
"code": None,
"message": None,
"error": "生成唯一兑换码失败,请重试"
}
else:
# 检查自定义兑换码是否已存在
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
return {
"success": False,
"code": None,
"message": None,
"error": f"兑换码 {code} 已存在"
}
# 2. 计算过期时间
expires_at = None
if expires_days:
expires_at = get_now() + timedelta(days=expires_days)
# 3. 创建兑换码记录
redemption_code = RedemptionCode(
code=code,
status="unused",
expires_at=expires_at,
has_warranty=has_warranty,
warranty_days=warranty_days,
pool_type=pool_type,
reusable_by_seat=reusable_by_seat
)
db_session.add(redemption_code)
await db_session.commit()
logger.info(f"生成兑换码成功: {code}")
return {
"success": True,
"code": code,
"message": f"兑换码生成成功: {code}",
"error": None
}
except Exception as e:
await db_session.rollback()
logger.error(f"生成兑换码失败: {e}")
return {
"success": False,
"code": None,
"message": None,
"error": f"生成兑换码失败: {str(e)}"
}
async def generate_code_batch(
self,
db_session: AsyncSession,
count: int,
expires_days: Optional[int] = None,
has_warranty: bool = False,
warranty_days: int = 30,
pool_type: str = "normal",
reusable_by_seat: bool = False
) -> Dict[str, Any]:
"""
批量生成兑换码
Args:
db_session: 数据库会话
count: 生成数量
expires_days: 有效期天数 (可选)
has_warranty: 是否为质保兑换码 (默认 False)
Returns:
结果字典,包含 success, codes, total, message, error
"""
try:
if count <= 0 or count > 1000:
return {
"success": False,
"codes": [],
"total": 0,
"message": None,
"error": "生成数量必须在 1-1000 之间"
}
# 计算过期时间
expires_at = None
if expires_days:
expires_at = get_now() + timedelta(days=expires_days)
# 批量生成兑换码
codes = []
for i in range(count):
# 生成唯一兑换码
max_attempts = 10
for _ in range(max_attempts):
code = self._generate_random_code()
# 检查是否已存在 (包括本次批量生成的)
if code not in codes:
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
existing = result.scalar_one_or_none()
if not existing:
codes.append(code)
break
else:
logger.warning(f"生成第 {i+1} 个兑换码失败")
continue
# 批量插入数据库
for code in codes:
redemption_code = RedemptionCode(
code=code,
status="unused",
expires_at=expires_at,
has_warranty=has_warranty,
warranty_days=warranty_days,
pool_type=pool_type,
reusable_by_seat=reusable_by_seat
)
db_session.add(redemption_code)
await db_session.commit()
logger.info(f"批量生成兑换码成功: {len(codes)} 个")
return {
"success": True,
"codes": codes,
"total": len(codes),
"message": f"成功生成 {len(codes)} 个兑换码",
"error": None
}
except Exception as e:
await db_session.rollback()
logger.error(f"批量生成兑换码失败: {e}")
return {
"success": False,
"codes": [],
"total": 0,
"message": None,
"error": f"批量生成兑换码失败: {str(e)}"
}
async def validate_code(
self,
code: str,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
验证兑换码
Args:
code: 兑换码
db_session: 数据库会话
Returns:
结果字典,包含 success, valid, reason, redemption_code, error
"""
try:
# 1. 查询兑换码
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
redemption_code = result.scalar_one_or_none()
if not redemption_code:
# 兼容福利通用兑换码:只存 settings,不写 redemption_codes 表
from app.services.settings import settings_service
welfare_code = (await settings_service.get_setting(db_session, "welfare_common_code", "") or "").strip()
if welfare_code and code == welfare_code:
limit_raw = await settings_service.get_setting(db_session, "welfare_common_code_limit", "0")
used_raw = await settings_service.get_setting(db_session, "welfare_common_code_used_count", "0")
try:
limit_count = int(limit_raw or 0)
except Exception:
limit_count = 0
try:
used_count = int(used_raw or 0)
except Exception:
used_count = 0
# 真实可兑换次数按可邀请席位计算:sum(max_members - 1)
capacity_stmt = select(func.sum(Team.max_members - 1)).where(
Team.pool_type == "welfare",
Team.max_members > 1
)
capacity_result = await db_session.execute(capacity_stmt)
usable_capacity = int(capacity_result.scalar() or 0)
# 兼容历史错误配置:向下收敛到真实可邀请席位
effective_limit = min(limit_count, usable_capacity) if limit_count > 0 else usable_capacity
if effective_limit <= 0 or used_count >= effective_limit:
return {
"success": True,
"valid": False,
"reason": "兑换码次数已用完,无法进行兑换",
"redemption_code": None,
"error": None
}
return {
"success": True,
"valid": True,
"reason": "兑换码有效",
"redemption_code": {
"id": None,
"code": code,
"status": "virtual_welfare",
"expires_at": None,
"created_at": None,
"has_warranty": False,
"warranty_days": 0,
"pool_type": "welfare",
"reusable_by_seat": True,
"virtual_welfare_code": True,
"limit": effective_limit,
"used_count": used_count,
},
"error": None
}
return {
"success": True,
"valid": False,
"reason": "兑换码不存在",
"redemption_code": None,
"error": None
}
# 兼容清理:历史版本中福利通用码可能被写入 redemption_codes。
# 现版本福利通用码以 settings 为准,旧的福利可复用码一律视为失效,避免绕过“新码替换旧码”的规则。
if redemption_code.pool_type == "welfare" and redemption_code.reusable_by_seat:
return {
"success": True,
"valid": False,
"reason": "兑换码已失效,请使用最新福利通用兑换码",
"redemption_code": None,
"error": None
}
# 2. 检查状态
if redemption_code.reusable_by_seat:
allowed_statuses = ["unused", "used", "warranty_active"]
else:
allowed_statuses = ["unused", "warranty_active"]
if redemption_code.has_warranty:
allowed_statuses.append("used")
if redemption_code.status not in allowed_statuses:
status_text = "已过期" if redemption_code.status == "expired" else redemption_code.status
reason = "兑换码已被使用" if redemption_code.status == "used" else f"兑换码{status_text}"
return {
"success": True,
"valid": False,
"reason": reason,
"redemption_code": None,
"error": None
}
# 3. 席位可复用兑换码次数限制校验(按池内总席位)
if redemption_code.reusable_by_seat:
total_seats_stmt = select(func.sum(Team.max_members)).where(
Team.pool_type == (redemption_code.pool_type or "normal")
)
total_seats_result = await db_session.execute(total_seats_stmt)
total_seats = int(total_seats_result.scalar() or 0)
used_count_stmt = select(func.count(RedemptionRecord.id)).where(
RedemptionRecord.code == code
)
used_count_result = await db_session.execute(used_count_stmt)
used_count = int(used_count_result.scalar() or 0)
if total_seats <= 0 or used_count >= total_seats:
return {
"success": True,
"valid": False,
"reason": "兑换码次数已用完,无法进行兑换",
"redemption_code": None,
"error": None
}
# 4. 检查是否过期 (仅针对未使用的兑换码执行首次激活截止时间检查)
if redemption_code.status == "unused" and redemption_code.expires_at:
if redemption_code.expires_at < get_now():
# 更新状态为 expired
redemption_code.status = "expired"
# 不在服务层内部 commit,让调用方决定事务边界
# await db_session.commit()
return {
"success": True,
"valid": False,
"reason": "兑换码已过期 (超过首次兑换截止时间)",
"redemption_code": None,
"error": None
}
# 5. 验证通过
return {
"success": True,
"valid": True,
"reason": "兑换码有效",
"redemption_code": {
"id": redemption_code.id,
"code": redemption_code.code,
"status": redemption_code.status,
"expires_at": redemption_code.expires_at.isoformat() if redemption_code.expires_at else None,
"created_at": redemption_code.created_at.isoformat() if redemption_code.created_at else None,
"has_warranty": redemption_code.has_warranty,
"warranty_days": redemption_code.warranty_days,
"pool_type": redemption_code.pool_type or "normal",
"reusable_by_seat": bool(redemption_code.reusable_by_seat)
},
"error": None
}
except Exception as e:
logger.error(f"验证兑换码失败: {e}")
return {
"success": False,
"valid": False,
"reason": None,
"redemption_code": None,
"error": f"验证兑换码失败: {str(e)}"
}
async def use_code(
self,
code: str,
email: str,
team_id: int,
account_id: str,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
使用兑换码
Args:
code: 兑换码
email: 使用者邮箱
team_id: Team ID
account_id: Account ID
db_session: 数据库会话
Returns:
结果字典,包含 success, message, error
"""
try:
# 1. 验证兑换码
validate_result = await self.validate_code(code, db_session)
if not validate_result["success"]:
return {
"success": False,
"message": None,
"error": validate_result["error"]
}
if not validate_result["valid"]:
return {
"success": False,
"message": None,
"error": validate_result["reason"]
}
# 2. 更新兑换码状态
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
redemption_code = result.scalar_one_or_none()
redemption_code.status = "used"
redemption_code.used_by_email = email
redemption_code.used_team_id = team_id
redemption_code.used_at = get_now()
# 3. 创建使用记录
redemption_record = RedemptionRecord(
email=email,
code=code,
team_id=team_id,
account_id=account_id
)
db_session.add(redemption_record)
await db_session.commit()
logger.info(f"使用兑换码成功: {code} -> {email}")
return {
"success": True,
"message": "兑换码使用成功",
"error": None
}
except Exception as e:
await db_session.rollback()
logger.error(f"使用兑换码失败: {e}")
return {
"success": False,
"message": None,
"error": f"使用兑换码失败: {str(e)}"
}
async def get_all_codes(
self,
db_session: AsyncSession,
page: int = 1,
per_page: int = 50,
search: Optional[str] = None,
status: Optional[str] = None,
pool_type: Optional[str] = "normal"
) -> Dict[str, Any]:
"""
获取所有兑换码
Args:
db_session: 数据库会话
page: 页码
per_page: 每页数量
search: 搜索关键词 (兑换码或邮箱)
status: 状态筛选
Returns:
结果字典,包含 success, codes, total, total_pages, current_page, error
"""
try:
# 1. 构建基础查询
count_stmt = select(func.count(RedemptionCode.id))
stmt = select(RedemptionCode).order_by(RedemptionCode.created_at.desc())
# 2. 如果提供了筛选条件,添加过滤条件
filters = []
if search:
filters.append(or_(
RedemptionCode.code.ilike(f"%{search}%"),
RedemptionCode.used_by_email.ilike(f"%{search}%")
))
if status:
if status == 'used':
# "已使用" 在查询中通常指窄义的 used, 但如果要包含质保中, 逻辑如下
filters.append(RedemptionCode.status.in_(['used', 'warranty_active']))
else:
filters.append(RedemptionCode.status == status)
if filters:
count_stmt = count_stmt.where(and_(*filters))
stmt = stmt.where(and_(*filters))
# 3. 获取总数
count_result = await db_session.execute(count_stmt)
total = count_result.scalar() or 0
# 4. 计算分页
import math
total_pages = math.ceil(total / per_page) if total > 0 else 1
if page < 1:
page = 1
if page > total_pages and total_pages > 0:
page = total_pages
offset = (page - 1) * per_page
# 5. 查询分页数据
stmt = stmt.limit(per_page).offset(offset)
result = await db_session.execute(stmt)
codes = result.scalars().all()
# 构建返回数据
code_list = []
for code in codes:
code_list.append({
"id": code.id,
"code": code.code,
"status": code.status,
"created_at": code.created_at.isoformat() if code.created_at else None,
"expires_at": code.expires_at.isoformat() if code.expires_at else None,
"used_by_email": code.used_by_email,
"used_team_id": code.used_team_id,
"used_at": code.used_at.isoformat() if code.used_at else None,
"has_warranty": code.has_warranty,
"warranty_days": code.warranty_days,
"warranty_expires_at": code.warranty_expires_at.isoformat() if code.warranty_expires_at else None
})
logger.info(f"获取所有兑换码成功: 第 {page} 页, 共 {len(code_list)} 个 / 总数 {total}")
return {
"success": True,
"codes": code_list,
"total": total,
"total_pages": total_pages,
"current_page": page,
"error": None
}
except Exception as e:
logger.error(f"获取所有兑换码失败: {e}")
return {
"success": False,
"codes": [],
"total": 0,
"error": f"获取所有兑换码失败: {str(e)}"
}
async def get_unused_count(
self,
db_session: AsyncSession
) -> int:
"""
获取未使用的兑换码数量
"""
try:
stmt = select(func.count(RedemptionCode.id)).where(RedemptionCode.status == "unused")
result = await db_session.execute(stmt)
return result.scalar() or 0
except Exception as e:
logger.error(f"获取未使用兑换码数量失败: {e}")
return 0
async def get_code_by_code(
self,
code: str,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
根据兑换码查询
Args:
code: 兑换码
db_session: 数据库会话
Returns:
结果字典,包含 success, code_info, error
"""
try:
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
redemption_code = result.scalar_one_or_none()
if not redemption_code:
return {
"success": False,
"code_info": None,
"error": f"兑换码 {code} 不存在"
}
code_info = {
"id": redemption_code.id,
"code": redemption_code.code,
"status": redemption_code.status,
"created_at": redemption_code.created_at.isoformat() if redemption_code.created_at else None,
"expires_at": redemption_code.expires_at.isoformat() if redemption_code.expires_at else None,
"used_by_email": redemption_code.used_by_email,
"used_team_id": redemption_code.used_team_id,
"used_at": redemption_code.used_at.isoformat() if redemption_code.used_at else None
}
return {
"success": True,
"code_info": code_info,
"error": None
}
except Exception as e:
logger.error(f"查询兑换码失败: {e}")
return {
"success": False,
"code_info": None,
"error": f"查询兑换码失败: {str(e)}"
}
async def get_unused_codes(
self,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
获取未使用的兑换码
Args:
db_session: 数据库会话
Returns:
结果字典,包含 success, codes, total, error
"""
try:
stmt = select(RedemptionCode).where(
RedemptionCode.status == "unused"
).order_by(RedemptionCode.created_at.desc())
result = await db_session.execute(stmt)
codes = result.scalars().all()
# 构建返回数据
code_list = []
for code in codes:
code_list.append({
"id": code.id,
"code": code.code,
"status": code.status,
"created_at": code.created_at.isoformat() if code.created_at else None,
"expires_at": code.expires_at.isoformat() if code.expires_at else None
})
return {
"success": True,
"codes": code_list,
"total": len(code_list),
"error": None
}
except Exception as e:
logger.error(f"获取未使用兑换码失败: {e}")
return {
"success": False,
"codes": [],
"total": 0,
"error": f"获取未使用兑换码失败: {str(e)}"
}
async def get_all_records(
self,
db_session: AsyncSession,
email: Optional[str] = None,
code: Optional[str] = None,
team_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取所有兑换记录 (支持筛选)
Args:
db_session: 数据库会话
email: 邮箱模糊搜索
code: 兑换码模糊搜索
team_id: Team ID 筛选
Returns:
结果字典,包含 success, records, total, error
"""
try:
stmt = select(RedemptionRecord)
# 添加筛选条件
filters = []
if email:
filters.append(RedemptionRecord.email.ilike(f"%{email}%"))
if code:
filters.append(RedemptionRecord.code.ilike(f"%{code}%"))
if team_id:
filters.append(RedemptionRecord.team_id == team_id)
if filters:
stmt = stmt.where(and_(*filters))
stmt = stmt.order_by(RedemptionRecord.redeemed_at.desc())
result = await db_session.execute(stmt)
records = result.scalars().all()
# 构建返回数据
record_list = []
for record in records:
record_list.append({
"id": record.id,
"email": record.email,
"code": record.code,
"team_id": record.team_id,
"account_id": record.account_id,
"redeemed_at": record.redeemed_at.isoformat() if record.redeemed_at else None
})
logger.info(f"获取所有兑换记录成功: 共 {len(record_list)} 条")
return {
"success": True,
"records": record_list,
"total": len(record_list),
"error": None
}
except Exception as e:
logger.error(f"获取所有兑换记录失败: {e}")
return {
"success": False,
"records": [],
"total": 0,
"error": f"获取所有兑换记录失败: {str(e)}"
}
async def delete_code(
self,
code: str,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
删除兑换码
Args:
code: 兑换码
db_session: 数据库会话
Returns:
结果字典,包含 success, message, error
"""
try:
# 查询兑换码
stmt = select(RedemptionCode).where(RedemptionCode.code == code)
result = await db_session.execute(stmt)
redemption_code = result.scalar_one_or_none()
if not redemption_code:
return {
"success": False,
"message": None,
"error": f"兑换码 {code} 不存在"
}
# 删除兑换码
await db_session.delete(redemption_code)
await db_session.commit()
logger.info(f"删除兑换码成功: {code}")
return {
"success": True,
"message": f"兑换码 {code} 已删除",
"error": None
}
except Exception as e:
await db_session.rollback()
logger.error(f"删除兑换码失败: {e}")
return {
"success": False,
"message": None,
"error": f"删除兑换码失败: {str(e)}"
}
async def update_code(
self,
code: str,
db_session: AsyncSession,
has_warranty: Optional[bool] = None,
warranty_days: Optional[int] = None
) -> Dict[str, Any]:
"""更新兑换码信息"""
return await self.bulk_update_codes([code], db_session, has_warranty, warranty_days)
async def withdraw_record(
self,
record_id: int,
db_session: AsyncSession
) -> Dict[str, Any]:
"""
撤回使用记录 (删除记录,恢复兑换码,并在 Team 中移除成员/邀请)
Args:
record_id: 记录 ID
db_session: 数据库会话
Returns:
结果字典
"""
try:
from app.services.team import team_service
# 1. 查询记录
stmt = select(RedemptionRecord).where(RedemptionRecord.id == record_id).options(
selectinload(RedemptionRecord.redemption_code)
)
result = await db_session.execute(stmt)
record = result.scalar_one_or_none()
if not record:
return {"success": False, "error": f"记录 ID {record_id} 不存在"}
# 2. 调用 TeamService 移除成员/邀请
logger.info(f"正在从 Team {record.team_id} 中移除成员 {record.email}")
team_result = await team_service.remove_invite_or_member(
record.team_id,
record.email,
db_session
)
if not team_result["success"]:
# 即使 Team 移除失败,如果是因为成员已经不在了,我们也继续处理数据库
if "成员已不存在" not in str(team_result.get("message", "")) and "用户不存在" not in str(team_result.get("error", "")):
return {
"success": False,
"error": f"从 Team 移除成员失败: {team_result.get('error') or team_result.get('message')}"
}
# 3. 恢复兑换码状态
code = record.redemption_code
if code:
# 如果是质保兑换,且还有其他记录,状态可能不应该直接回 unused
# 但根据逻辑,目前一个码一个记录(除了质保补发可能产生新记录,但那是两个不同的码吧?)
# 查了一下模型,RedemptionCode 有 used_by_email 等字段,说明它是单次使用的设计
code.status = "unused"
code.used_by_email = None
code.used_team_id = None
code.used_at = None
# 特殊处理质保字段
if code.has_warranty:
code.warranty_expires_at = None
# 4. 删除使用记录
await db_session.delete(record)
await db_session.commit()
logger.info(f"撤回记录成功: {record_id}, 邮箱: {record.email}, 兑换码: {record.code}")
return {
"success": True,
"message": f"成功撤回记录并恢复兑换码 {record.code}"
}
except Exception as e:
await db_session.rollback()
logger.error(f"撤回记录失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {"success": False, "error": f"撤回失败: {str(e)}"}
async def bulk_update_codes(
self,
codes: List[str],
db_session: AsyncSession,
has_warranty: Optional[bool] = None,
warranty_days: Optional[int] = None
) -> Dict[str, Any]:
"""
批量更新兑换码信息
Args:
codes: 兑换码列表
db_session: 数据库会话
has_warranty: 是否为质保兑换码 (可选)
warranty_days: 质保天数 (可选)
Returns:
结果字典
"""
try:
if not codes:
return {"success": True, "message": "没有需要更新的兑换码"}
# 构建更新语句
values = {}
if has_warranty is not None:
values[RedemptionCode.has_warranty] = has_warranty
if warranty_days is not None:
values[RedemptionCode.warranty_days] = warranty_days
if not values:
return {"success": True, "message": "没有提供更新内容"}
stmt = update(RedemptionCode).where(RedemptionCode.code.in_(codes)).values(values)
await db_session.execute(stmt)
await db_session.commit()
logger.info(f"成功批量更新 {len(codes)} 个兑换码")
return {
"success": True,
"message": f"成功批量更新 {len(codes)} 个兑换码",
"error": None
}
except Exception as e:
await db_session.rollback()
logger.error(f"批量更新兑换码失败: {e}")
return {
"success": False,
"message": None,
"error": f"批量更新失败: {str(e)}"
}
async def get_stats(
self,
db_session: AsyncSession,
pool_type: Optional[str] = "normal"
) -> Dict[str, int]:
"""
获取兑换码统计信息
Returns:
统计字典, 包含 total, unused, used, expired
"""
try:
# 使用 SQL 聚合统计各状态数量
stmt = select(
RedemptionCode.status,
func.count(RedemptionCode.id)
).group_by(RedemptionCode.status)
if pool_type:
stmt = stmt.where(RedemptionCode.pool_type == pool_type)
result = await db_session.execute(stmt)
status_counts = dict(result.all())
# 由于 "used" 和 "warranty_active" 都属于广义上的 "已使用"
# 这里的 used 统计需要合并这两个状态
used_count = status_counts.get("used", 0) + status_counts.get("warranty_active", 0)
# 计算总数
total_stmt = select(func.count(RedemptionCode.id))
if pool_type:
total_stmt = total_stmt.where(RedemptionCode.pool_type == pool_type)
total_result = await db_session.execute(total_stmt)
total = total_result.scalar() or 0
return {
"total": total,
"unused": status_counts.get("unused", 0),
"used": used_count,
"warranty_active": status_counts.get("warranty_active", 0),
"expired": status_counts.get("expired", 0)
}
except Exception as e:
logger.error(f"获取兑换码统计信息失败: {e}")
return {
"total": 0,
"unused": 0,
"used": 0,
"expired": 0
}
# 创建全局兑换码服务实例
redemption_service = RedemptionService()