| package model |
|
|
| import ( |
| "errors" |
| "fmt" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
|
|
| "github.com/QuantumNous/new-api/common" |
| "github.com/QuantumNous/new-api/pkg/cachex" |
| "github.com/samber/hot" |
| "gorm.io/gorm" |
| ) |
|
|
| |
| const ( |
| SubscriptionDurationYear = "year" |
| SubscriptionDurationMonth = "month" |
| SubscriptionDurationDay = "day" |
| SubscriptionDurationHour = "hour" |
| SubscriptionDurationCustom = "custom" |
| ) |
|
|
| |
| const ( |
| SubscriptionResetNever = "never" |
| SubscriptionResetDaily = "daily" |
| SubscriptionResetWeekly = "weekly" |
| SubscriptionResetMonthly = "monthly" |
| SubscriptionResetCustom = "custom" |
| ) |
|
|
| var ( |
| ErrSubscriptionOrderNotFound = errors.New("subscription order not found") |
| ErrSubscriptionOrderStatusInvalid = errors.New("subscription order status invalid") |
| ) |
|
|
| const ( |
| subscriptionPlanCacheNamespace = "new-api:subscription_plan:v1" |
| subscriptionPlanInfoCacheNamespace = "new-api:subscription_plan_info:v1" |
| ) |
|
|
| var ( |
| subscriptionPlanCacheOnce sync.Once |
| subscriptionPlanInfoCacheOnce sync.Once |
|
|
| subscriptionPlanCache *cachex.HybridCache[SubscriptionPlan] |
| subscriptionPlanInfoCache *cachex.HybridCache[SubscriptionPlanInfo] |
| ) |
|
|
| func subscriptionPlanCacheTTL() time.Duration { |
| ttlSeconds := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_CACHE_TTL", 300) |
| if ttlSeconds <= 0 { |
| ttlSeconds = 300 |
| } |
| return time.Duration(ttlSeconds) * time.Second |
| } |
|
|
| func subscriptionPlanInfoCacheTTL() time.Duration { |
| ttlSeconds := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_INFO_CACHE_TTL", 120) |
| if ttlSeconds <= 0 { |
| ttlSeconds = 120 |
| } |
| return time.Duration(ttlSeconds) * time.Second |
| } |
|
|
| func subscriptionPlanCacheCapacity() int { |
| capacity := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_CACHE_CAP", 5000) |
| if capacity <= 0 { |
| capacity = 5000 |
| } |
| return capacity |
| } |
|
|
| func subscriptionPlanInfoCacheCapacity() int { |
| capacity := common.GetEnvOrDefault("SUBSCRIPTION_PLAN_INFO_CACHE_CAP", 10000) |
| if capacity <= 0 { |
| capacity = 10000 |
| } |
| return capacity |
| } |
|
|
| func getSubscriptionPlanCache() *cachex.HybridCache[SubscriptionPlan] { |
| subscriptionPlanCacheOnce.Do(func() { |
| ttl := subscriptionPlanCacheTTL() |
| subscriptionPlanCache = cachex.NewHybridCache[SubscriptionPlan](cachex.HybridCacheConfig[SubscriptionPlan]{ |
| Namespace: cachex.Namespace(subscriptionPlanCacheNamespace), |
| Redis: common.RDB, |
| RedisEnabled: func() bool { |
| return common.RedisEnabled && common.RDB != nil |
| }, |
| RedisCodec: cachex.JSONCodec[SubscriptionPlan]{}, |
| Memory: func() *hot.HotCache[string, SubscriptionPlan] { |
| return hot.NewHotCache[string, SubscriptionPlan](hot.LRU, subscriptionPlanCacheCapacity()). |
| WithTTL(ttl). |
| WithJanitor(). |
| Build() |
| }, |
| }) |
| }) |
| return subscriptionPlanCache |
| } |
|
|
| func getSubscriptionPlanInfoCache() *cachex.HybridCache[SubscriptionPlanInfo] { |
| subscriptionPlanInfoCacheOnce.Do(func() { |
| ttl := subscriptionPlanInfoCacheTTL() |
| subscriptionPlanInfoCache = cachex.NewHybridCache[SubscriptionPlanInfo](cachex.HybridCacheConfig[SubscriptionPlanInfo]{ |
| Namespace: cachex.Namespace(subscriptionPlanInfoCacheNamespace), |
| Redis: common.RDB, |
| RedisEnabled: func() bool { |
| return common.RedisEnabled && common.RDB != nil |
| }, |
| RedisCodec: cachex.JSONCodec[SubscriptionPlanInfo]{}, |
| Memory: func() *hot.HotCache[string, SubscriptionPlanInfo] { |
| return hot.NewHotCache[string, SubscriptionPlanInfo](hot.LRU, subscriptionPlanInfoCacheCapacity()). |
| WithTTL(ttl). |
| WithJanitor(). |
| Build() |
| }, |
| }) |
| }) |
| return subscriptionPlanInfoCache |
| } |
|
|
| func subscriptionPlanCacheKey(id int) string { |
| if id <= 0 { |
| return "" |
| } |
| return strconv.Itoa(id) |
| } |
|
|
| func InvalidateSubscriptionPlanCache(planId int) { |
| if planId <= 0 { |
| return |
| } |
| cache := getSubscriptionPlanCache() |
| _, _ = cache.DeleteMany([]string{subscriptionPlanCacheKey(planId)}) |
| infoCache := getSubscriptionPlanInfoCache() |
| _ = infoCache.Purge() |
| } |
|
|
| |
| type SubscriptionPlan struct { |
| Id int `json:"id"` |
|
|
| Title string `json:"title" gorm:"type:varchar(128);not null"` |
| Subtitle string `json:"subtitle" gorm:"type:varchar(255);default:''"` |
|
|
| |
| PriceAmount float64 `json:"price_amount" gorm:"type:decimal(10,6);not null;default:0"` |
| Currency string `json:"currency" gorm:"type:varchar(8);not null;default:'USD'"` |
|
|
| DurationUnit string `json:"duration_unit" gorm:"type:varchar(16);not null;default:'month'"` |
| DurationValue int `json:"duration_value" gorm:"type:int;not null;default:1"` |
| CustomSeconds int64 `json:"custom_seconds" gorm:"type:bigint;not null;default:0"` |
|
|
| Enabled bool `json:"enabled" gorm:"default:true"` |
| SortOrder int `json:"sort_order" gorm:"type:int;default:0"` |
|
|
| StripePriceId string `json:"stripe_price_id" gorm:"type:varchar(128);default:''"` |
| CreemProductId string `json:"creem_product_id" gorm:"type:varchar(128);default:''"` |
|
|
| |
| MaxPurchasePerUser int `json:"max_purchase_per_user" gorm:"type:int;default:0"` |
|
|
| |
| UpgradeGroup string `json:"upgrade_group" gorm:"type:varchar(64);default:''"` |
|
|
| |
| TotalAmount int64 `json:"total_amount" gorm:"type:bigint;not null;default:0"` |
|
|
| |
| QuotaResetPeriod string `json:"quota_reset_period" gorm:"type:varchar(16);default:'never'"` |
| QuotaResetCustomSeconds int64 `json:"quota_reset_custom_seconds" gorm:"type:bigint;default:0"` |
|
|
| CreatedAt int64 `json:"created_at" gorm:"bigint"` |
| UpdatedAt int64 `json:"updated_at" gorm:"bigint"` |
| } |
|
|
| func (p *SubscriptionPlan) BeforeCreate(tx *gorm.DB) error { |
| now := common.GetTimestamp() |
| p.CreatedAt = now |
| p.UpdatedAt = now |
| return nil |
| } |
|
|
| func (p *SubscriptionPlan) BeforeUpdate(tx *gorm.DB) error { |
| p.UpdatedAt = common.GetTimestamp() |
| return nil |
| } |
|
|
| |
| type SubscriptionOrder struct { |
| Id int `json:"id"` |
| UserId int `json:"user_id" gorm:"index"` |
| PlanId int `json:"plan_id" gorm:"index"` |
| Money float64 `json:"money"` |
|
|
| TradeNo string `json:"trade_no" gorm:"unique;type:varchar(255);index"` |
| PaymentMethod string `json:"payment_method" gorm:"type:varchar(50)"` |
| Status string `json:"status"` |
| CreateTime int64 `json:"create_time"` |
| CompleteTime int64 `json:"complete_time"` |
|
|
| ProviderPayload string `json:"provider_payload" gorm:"type:text"` |
| } |
|
|
| func (o *SubscriptionOrder) Insert() error { |
| if o.CreateTime == 0 { |
| o.CreateTime = common.GetTimestamp() |
| } |
| return DB.Create(o).Error |
| } |
|
|
| func (o *SubscriptionOrder) Update() error { |
| return DB.Save(o).Error |
| } |
|
|
| func GetSubscriptionOrderByTradeNo(tradeNo string) *SubscriptionOrder { |
| if tradeNo == "" { |
| return nil |
| } |
| var order SubscriptionOrder |
| if err := DB.Where("trade_no = ?", tradeNo).First(&order).Error; err != nil { |
| return nil |
| } |
| return &order |
| } |
|
|
| |
| type UserSubscription struct { |
| Id int `json:"id"` |
| UserId int `json:"user_id" gorm:"index;index:idx_user_sub_active,priority:1"` |
| PlanId int `json:"plan_id" gorm:"index"` |
|
|
| AmountTotal int64 `json:"amount_total" gorm:"type:bigint;not null;default:0"` |
| AmountUsed int64 `json:"amount_used" gorm:"type:bigint;not null;default:0"` |
|
|
| StartTime int64 `json:"start_time" gorm:"bigint"` |
| EndTime int64 `json:"end_time" gorm:"bigint;index;index:idx_user_sub_active,priority:3"` |
| Status string `json:"status" gorm:"type:varchar(32);index;index:idx_user_sub_active,priority:2"` |
|
|
| Source string `json:"source" gorm:"type:varchar(32);default:'order'"` |
|
|
| LastResetTime int64 `json:"last_reset_time" gorm:"type:bigint;default:0"` |
| NextResetTime int64 `json:"next_reset_time" gorm:"type:bigint;default:0;index"` |
|
|
| UpgradeGroup string `json:"upgrade_group" gorm:"type:varchar(64);default:''"` |
| PrevUserGroup string `json:"prev_user_group" gorm:"type:varchar(64);default:''"` |
|
|
| CreatedAt int64 `json:"created_at" gorm:"bigint"` |
| UpdatedAt int64 `json:"updated_at" gorm:"bigint"` |
| } |
|
|
| func (s *UserSubscription) BeforeCreate(tx *gorm.DB) error { |
| now := common.GetTimestamp() |
| s.CreatedAt = now |
| s.UpdatedAt = now |
| return nil |
| } |
|
|
| func (s *UserSubscription) BeforeUpdate(tx *gorm.DB) error { |
| s.UpdatedAt = common.GetTimestamp() |
| return nil |
| } |
|
|
| type SubscriptionSummary struct { |
| Subscription *UserSubscription `json:"subscription"` |
| } |
|
|
| func calcPlanEndTime(start time.Time, plan *SubscriptionPlan) (int64, error) { |
| if plan == nil { |
| return 0, errors.New("plan is nil") |
| } |
| if plan.DurationValue <= 0 && plan.DurationUnit != SubscriptionDurationCustom { |
| return 0, errors.New("duration_value must be > 0") |
| } |
| switch plan.DurationUnit { |
| case SubscriptionDurationYear: |
| return start.AddDate(plan.DurationValue, 0, 0).Unix(), nil |
| case SubscriptionDurationMonth: |
| return start.AddDate(0, plan.DurationValue, 0).Unix(), nil |
| case SubscriptionDurationDay: |
| return start.Add(time.Duration(plan.DurationValue) * 24 * time.Hour).Unix(), nil |
| case SubscriptionDurationHour: |
| return start.Add(time.Duration(plan.DurationValue) * time.Hour).Unix(), nil |
| case SubscriptionDurationCustom: |
| if plan.CustomSeconds <= 0 { |
| return 0, errors.New("custom_seconds must be > 0") |
| } |
| return start.Add(time.Duration(plan.CustomSeconds) * time.Second).Unix(), nil |
| default: |
| return 0, fmt.Errorf("invalid duration_unit: %s", plan.DurationUnit) |
| } |
| } |
|
|
| func NormalizeResetPeriod(period string) string { |
| switch strings.TrimSpace(period) { |
| case SubscriptionResetDaily, SubscriptionResetWeekly, SubscriptionResetMonthly, SubscriptionResetCustom: |
| return strings.TrimSpace(period) |
| default: |
| return SubscriptionResetNever |
| } |
| } |
|
|
| func calcNextResetTime(base time.Time, plan *SubscriptionPlan, endUnix int64) int64 { |
| if plan == nil { |
| return 0 |
| } |
| period := NormalizeResetPeriod(plan.QuotaResetPeriod) |
| if period == SubscriptionResetNever { |
| return 0 |
| } |
| var next time.Time |
| switch period { |
| case SubscriptionResetDaily: |
| next = time.Date(base.Year(), base.Month(), base.Day(), 0, 0, 0, 0, base.Location()). |
| AddDate(0, 0, 1) |
| case SubscriptionResetWeekly: |
| |
| weekday := int(base.Weekday()) |
| |
| if weekday == 0 { |
| weekday = 7 |
| } |
| daysUntil := 8 - weekday |
| next = time.Date(base.Year(), base.Month(), base.Day(), 0, 0, 0, 0, base.Location()). |
| AddDate(0, 0, daysUntil) |
| case SubscriptionResetMonthly: |
| |
| next = time.Date(base.Year(), base.Month(), 1, 0, 0, 0, 0, base.Location()). |
| AddDate(0, 1, 0) |
| case SubscriptionResetCustom: |
| if plan.QuotaResetCustomSeconds <= 0 { |
| return 0 |
| } |
| next = base.Add(time.Duration(plan.QuotaResetCustomSeconds) * time.Second) |
| default: |
| return 0 |
| } |
| if endUnix > 0 && next.Unix() > endUnix { |
| return 0 |
| } |
| return next.Unix() |
| } |
|
|
| func GetSubscriptionPlanById(id int) (*SubscriptionPlan, error) { |
| return getSubscriptionPlanByIdTx(nil, id) |
| } |
|
|
| func getSubscriptionPlanByIdTx(tx *gorm.DB, id int) (*SubscriptionPlan, error) { |
| if id <= 0 { |
| return nil, errors.New("invalid plan id") |
| } |
| key := subscriptionPlanCacheKey(id) |
| if key != "" { |
| if cached, found, err := getSubscriptionPlanCache().Get(key); err == nil && found { |
| return &cached, nil |
| } |
| } |
| var plan SubscriptionPlan |
| query := DB |
| if tx != nil { |
| query = tx |
| } |
| if err := query.Where("id = ?", id).First(&plan).Error; err != nil { |
| return nil, err |
| } |
| _ = getSubscriptionPlanCache().SetWithTTL(key, plan, subscriptionPlanCacheTTL()) |
| return &plan, nil |
| } |
|
|
| func CountUserSubscriptionsByPlan(userId int, planId int) (int64, error) { |
| if userId <= 0 || planId <= 0 { |
| return 0, errors.New("invalid userId or planId") |
| } |
| var count int64 |
| if err := DB.Model(&UserSubscription{}). |
| Where("user_id = ? AND plan_id = ?", userId, planId). |
| Count(&count).Error; err != nil { |
| return 0, err |
| } |
| return count, nil |
| } |
|
|
| func getUserGroupByIdTx(tx *gorm.DB, userId int) (string, error) { |
| if userId <= 0 { |
| return "", errors.New("invalid userId") |
| } |
| if tx == nil { |
| tx = DB |
| } |
| var group string |
| if err := tx.Model(&User{}).Where("id = ?", userId).Select(commonGroupCol).Find(&group).Error; err != nil { |
| return "", err |
| } |
| return group, nil |
| } |
|
|
| func downgradeUserGroupForSubscriptionTx(tx *gorm.DB, sub *UserSubscription, now int64) (string, error) { |
| if tx == nil || sub == nil { |
| return "", errors.New("invalid downgrade args") |
| } |
| upgradeGroup := strings.TrimSpace(sub.UpgradeGroup) |
| if upgradeGroup == "" { |
| return "", nil |
| } |
| currentGroup, err := getUserGroupByIdTx(tx, sub.UserId) |
| if err != nil { |
| return "", err |
| } |
| if currentGroup != upgradeGroup { |
| return "", nil |
| } |
| var activeSub UserSubscription |
| activeQuery := tx.Where("user_id = ? AND status = ? AND end_time > ? AND id <> ? AND upgrade_group <> ''", |
| sub.UserId, "active", now, sub.Id). |
| Order("end_time desc, id desc"). |
| Limit(1). |
| Find(&activeSub) |
| if activeQuery.Error == nil && activeQuery.RowsAffected > 0 { |
| return "", nil |
| } |
| prevGroup := strings.TrimSpace(sub.PrevUserGroup) |
| if prevGroup == "" || prevGroup == currentGroup { |
| return "", nil |
| } |
| if err := tx.Model(&User{}).Where("id = ?", sub.UserId). |
| Update("group", prevGroup).Error; err != nil { |
| return "", err |
| } |
| return prevGroup, nil |
| } |
|
|
| func CreateUserSubscriptionFromPlanTx(tx *gorm.DB, userId int, plan *SubscriptionPlan, source string) (*UserSubscription, error) { |
| if tx == nil { |
| return nil, errors.New("tx is nil") |
| } |
| if plan == nil || plan.Id == 0 { |
| return nil, errors.New("invalid plan") |
| } |
| if userId <= 0 { |
| return nil, errors.New("invalid user id") |
| } |
| if plan.MaxPurchasePerUser > 0 { |
| var count int64 |
| if err := tx.Model(&UserSubscription{}). |
| Where("user_id = ? AND plan_id = ?", userId, plan.Id). |
| Count(&count).Error; err != nil { |
| return nil, err |
| } |
| if count >= int64(plan.MaxPurchasePerUser) { |
| return nil, errors.New("已达到该套餐购买上限") |
| } |
| } |
| nowUnix := GetDBTimestamp() |
| now := time.Unix(nowUnix, 0) |
| endUnix, err := calcPlanEndTime(now, plan) |
| if err != nil { |
| return nil, err |
| } |
| resetBase := now |
| nextReset := calcNextResetTime(resetBase, plan, endUnix) |
| lastReset := int64(0) |
| if nextReset > 0 { |
| lastReset = now.Unix() |
| } |
| upgradeGroup := strings.TrimSpace(plan.UpgradeGroup) |
| prevGroup := "" |
| if upgradeGroup != "" { |
| currentGroup, err := getUserGroupByIdTx(tx, userId) |
| if err != nil { |
| return nil, err |
| } |
| if currentGroup != upgradeGroup { |
| prevGroup = currentGroup |
| if err := tx.Model(&User{}).Where("id = ?", userId). |
| Update("group", upgradeGroup).Error; err != nil { |
| return nil, err |
| } |
| } |
| } |
| sub := &UserSubscription{ |
| UserId: userId, |
| PlanId: plan.Id, |
| AmountTotal: plan.TotalAmount, |
| AmountUsed: 0, |
| StartTime: now.Unix(), |
| EndTime: endUnix, |
| Status: "active", |
| Source: source, |
| LastResetTime: lastReset, |
| NextResetTime: nextReset, |
| UpgradeGroup: upgradeGroup, |
| PrevUserGroup: prevGroup, |
| CreatedAt: common.GetTimestamp(), |
| UpdatedAt: common.GetTimestamp(), |
| } |
| if err := tx.Create(sub).Error; err != nil { |
| return nil, err |
| } |
| return sub, nil |
| } |
|
|
| |
| func CompleteSubscriptionOrder(tradeNo string, providerPayload string) error { |
| if tradeNo == "" { |
| return errors.New("tradeNo is empty") |
| } |
| refCol := "`trade_no`" |
| if common.UsingPostgreSQL { |
| refCol = `"trade_no"` |
| } |
| var logUserId int |
| var logPlanTitle string |
| var logMoney float64 |
| var logPaymentMethod string |
| var upgradeGroup string |
| err := DB.Transaction(func(tx *gorm.DB) error { |
| var order SubscriptionOrder |
| if err := tx.Set("gorm:query_option", "FOR UPDATE").Where(refCol+" = ?", tradeNo).First(&order).Error; err != nil { |
| return ErrSubscriptionOrderNotFound |
| } |
| if order.Status == common.TopUpStatusSuccess { |
| return nil |
| } |
| if order.Status != common.TopUpStatusPending { |
| return ErrSubscriptionOrderStatusInvalid |
| } |
| plan, err := GetSubscriptionPlanById(order.PlanId) |
| if err != nil { |
| return err |
| } |
| if !plan.Enabled { |
| |
| } |
| upgradeGroup = strings.TrimSpace(plan.UpgradeGroup) |
| _, err = CreateUserSubscriptionFromPlanTx(tx, order.UserId, plan, "order") |
| if err != nil { |
| return err |
| } |
| if err := upsertSubscriptionTopUpTx(tx, &order); err != nil { |
| return err |
| } |
| order.Status = common.TopUpStatusSuccess |
| order.CompleteTime = common.GetTimestamp() |
| if providerPayload != "" { |
| order.ProviderPayload = providerPayload |
| } |
| if err := tx.Save(&order).Error; err != nil { |
| return err |
| } |
| logUserId = order.UserId |
| logPlanTitle = plan.Title |
| logMoney = order.Money |
| logPaymentMethod = order.PaymentMethod |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| if upgradeGroup != "" && logUserId > 0 { |
| _ = UpdateUserGroupCache(logUserId, upgradeGroup) |
| } |
| if logUserId > 0 { |
| msg := fmt.Sprintf("订阅购买成功,套餐: %s,支付金额: %.2f,支付方式: %s", logPlanTitle, logMoney, logPaymentMethod) |
| RecordLog(logUserId, LogTypeTopup, msg) |
| } |
| return nil |
| } |
|
|
| func upsertSubscriptionTopUpTx(tx *gorm.DB, order *SubscriptionOrder) error { |
| if tx == nil || order == nil { |
| return errors.New("invalid subscription order") |
| } |
| now := common.GetTimestamp() |
| var topup TopUp |
| if err := tx.Where("trade_no = ?", order.TradeNo).First(&topup).Error; err != nil { |
| if errors.Is(err, gorm.ErrRecordNotFound) { |
| topup = TopUp{ |
| UserId: order.UserId, |
| Amount: 0, |
| Money: order.Money, |
| TradeNo: order.TradeNo, |
| PaymentMethod: order.PaymentMethod, |
| CreateTime: order.CreateTime, |
| CompleteTime: now, |
| Status: common.TopUpStatusSuccess, |
| } |
| return tx.Create(&topup).Error |
| } |
| return err |
| } |
| topup.Money = order.Money |
| if topup.PaymentMethod == "" { |
| topup.PaymentMethod = order.PaymentMethod |
| } |
| if topup.CreateTime == 0 { |
| topup.CreateTime = order.CreateTime |
| } |
| topup.CompleteTime = now |
| topup.Status = common.TopUpStatusSuccess |
| return tx.Save(&topup).Error |
| } |
|
|
| func ExpireSubscriptionOrder(tradeNo string) error { |
| if tradeNo == "" { |
| return errors.New("tradeNo is empty") |
| } |
| refCol := "`trade_no`" |
| if common.UsingPostgreSQL { |
| refCol = `"trade_no"` |
| } |
| return DB.Transaction(func(tx *gorm.DB) error { |
| var order SubscriptionOrder |
| if err := tx.Set("gorm:query_option", "FOR UPDATE").Where(refCol+" = ?", tradeNo).First(&order).Error; err != nil { |
| return ErrSubscriptionOrderNotFound |
| } |
| if order.Status != common.TopUpStatusPending { |
| return nil |
| } |
| order.Status = common.TopUpStatusExpired |
| order.CompleteTime = common.GetTimestamp() |
| return tx.Save(&order).Error |
| }) |
| } |
|
|
| |
| func AdminBindSubscription(userId int, planId int, sourceNote string) (string, error) { |
| if userId <= 0 || planId <= 0 { |
| return "", errors.New("invalid userId or planId") |
| } |
| plan, err := GetSubscriptionPlanById(planId) |
| if err != nil { |
| return "", err |
| } |
| err = DB.Transaction(func(tx *gorm.DB) error { |
| _, err := CreateUserSubscriptionFromPlanTx(tx, userId, plan, "admin") |
| return err |
| }) |
| if err != nil { |
| return "", err |
| } |
| if strings.TrimSpace(plan.UpgradeGroup) != "" { |
| _ = UpdateUserGroupCache(userId, plan.UpgradeGroup) |
| return fmt.Sprintf("用户分组将升级到 %s", plan.UpgradeGroup), nil |
| } |
| return "", nil |
| } |
|
|
| |
| func GetAllActiveUserSubscriptions(userId int) ([]SubscriptionSummary, error) { |
| if userId <= 0 { |
| return nil, errors.New("invalid userId") |
| } |
| now := common.GetTimestamp() |
| var subs []UserSubscription |
| err := DB.Where("user_id = ? AND status = ? AND end_time > ?", userId, "active", now). |
| Order("end_time desc, id desc"). |
| Find(&subs).Error |
| if err != nil { |
| return nil, err |
| } |
| return buildSubscriptionSummaries(subs), nil |
| } |
|
|
| |
| |
| func HasActiveUserSubscription(userId int) (bool, error) { |
| if userId <= 0 { |
| return false, errors.New("invalid userId") |
| } |
| now := common.GetTimestamp() |
| var count int64 |
| if err := DB.Model(&UserSubscription{}). |
| Where("user_id = ? AND status = ? AND end_time > ?", userId, "active", now). |
| Count(&count).Error; err != nil { |
| return false, err |
| } |
| return count > 0, nil |
| } |
|
|
| |
| func GetAllUserSubscriptions(userId int) ([]SubscriptionSummary, error) { |
| if userId <= 0 { |
| return nil, errors.New("invalid userId") |
| } |
| var subs []UserSubscription |
| err := DB.Where("user_id = ?", userId). |
| Order("end_time desc, id desc"). |
| Find(&subs).Error |
| if err != nil { |
| return nil, err |
| } |
| return buildSubscriptionSummaries(subs), nil |
| } |
|
|
| func buildSubscriptionSummaries(subs []UserSubscription) []SubscriptionSummary { |
| if len(subs) == 0 { |
| return []SubscriptionSummary{} |
| } |
| result := make([]SubscriptionSummary, 0, len(subs)) |
| for _, sub := range subs { |
| subCopy := sub |
| result = append(result, SubscriptionSummary{ |
| Subscription: &subCopy, |
| }) |
| } |
| return result |
| } |
|
|
| |
| func AdminInvalidateUserSubscription(userSubscriptionId int) (string, error) { |
| if userSubscriptionId <= 0 { |
| return "", errors.New("invalid userSubscriptionId") |
| } |
| now := common.GetTimestamp() |
| cacheGroup := "" |
| downgradeGroup := "" |
| var userId int |
| err := DB.Transaction(func(tx *gorm.DB) error { |
| var sub UserSubscription |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil { |
| return err |
| } |
| userId = sub.UserId |
| if err := tx.Model(&sub).Updates(map[string]interface{}{ |
| "status": "cancelled", |
| "end_time": now, |
| "updated_at": now, |
| }).Error; err != nil { |
| return err |
| } |
| target, err := downgradeUserGroupForSubscriptionTx(tx, &sub, now) |
| if err != nil { |
| return err |
| } |
| if target != "" { |
| cacheGroup = target |
| downgradeGroup = target |
| } |
| return nil |
| }) |
| if err != nil { |
| return "", err |
| } |
| if cacheGroup != "" && userId > 0 { |
| _ = UpdateUserGroupCache(userId, cacheGroup) |
| } |
| if downgradeGroup != "" { |
| return fmt.Sprintf("用户分组将回退到 %s", downgradeGroup), nil |
| } |
| return "", nil |
| } |
|
|
| |
| func AdminDeleteUserSubscription(userSubscriptionId int) (string, error) { |
| if userSubscriptionId <= 0 { |
| return "", errors.New("invalid userSubscriptionId") |
| } |
| now := common.GetTimestamp() |
| cacheGroup := "" |
| downgradeGroup := "" |
| var userId int |
| err := DB.Transaction(func(tx *gorm.DB) error { |
| var sub UserSubscription |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil { |
| return err |
| } |
| userId = sub.UserId |
| target, err := downgradeUserGroupForSubscriptionTx(tx, &sub, now) |
| if err != nil { |
| return err |
| } |
| if target != "" { |
| cacheGroup = target |
| downgradeGroup = target |
| } |
| if err := tx.Where("id = ?", userSubscriptionId).Delete(&UserSubscription{}).Error; err != nil { |
| return err |
| } |
| return nil |
| }) |
| if err != nil { |
| return "", err |
| } |
| if cacheGroup != "" && userId > 0 { |
| _ = UpdateUserGroupCache(userId, cacheGroup) |
| } |
| if downgradeGroup != "" { |
| return fmt.Sprintf("用户分组将回退到 %s", downgradeGroup), nil |
| } |
| return "", nil |
| } |
|
|
| type SubscriptionPreConsumeResult struct { |
| UserSubscriptionId int |
| PreConsumed int64 |
| AmountTotal int64 |
| AmountUsedBefore int64 |
| AmountUsedAfter int64 |
| } |
|
|
| |
| func ExpireDueSubscriptions(limit int) (int, error) { |
| if limit <= 0 { |
| limit = 200 |
| } |
| now := GetDBTimestamp() |
| var subs []UserSubscription |
| if err := DB.Where("status = ? AND end_time > 0 AND end_time <= ?", "active", now). |
| Order("end_time asc, id asc"). |
| Limit(limit). |
| Find(&subs).Error; err != nil { |
| return 0, err |
| } |
| if len(subs) == 0 { |
| return 0, nil |
| } |
| expiredCount := 0 |
| userIds := make(map[int]struct{}, len(subs)) |
| for _, sub := range subs { |
| if sub.UserId > 0 { |
| userIds[sub.UserId] = struct{}{} |
| } |
| } |
| for userId := range userIds { |
| cacheGroup := "" |
| err := DB.Transaction(func(tx *gorm.DB) error { |
| res := tx.Model(&UserSubscription{}). |
| Where("user_id = ? AND status = ? AND end_time > 0 AND end_time <= ?", userId, "active", now). |
| Updates(map[string]interface{}{ |
| "status": "expired", |
| "updated_at": common.GetTimestamp(), |
| }) |
| if res.Error != nil { |
| return res.Error |
| } |
| expiredCount += int(res.RowsAffected) |
|
|
| |
| var activeSub UserSubscription |
| activeQuery := tx.Where("user_id = ? AND status = ? AND end_time > ? AND upgrade_group <> ''", |
| userId, "active", now). |
| Order("end_time desc, id desc"). |
| Limit(1). |
| Find(&activeSub) |
| if activeQuery.Error == nil && activeQuery.RowsAffected > 0 { |
| return nil |
| } |
|
|
| |
| var lastExpired UserSubscription |
| expiredQuery := tx.Where("user_id = ? AND status = ? AND upgrade_group <> ''", |
| userId, "expired"). |
| Order("end_time desc, id desc"). |
| Limit(1). |
| Find(&lastExpired) |
| if expiredQuery.Error != nil || expiredQuery.RowsAffected == 0 { |
| return nil |
| } |
| upgradeGroup := strings.TrimSpace(lastExpired.UpgradeGroup) |
| prevGroup := strings.TrimSpace(lastExpired.PrevUserGroup) |
| if upgradeGroup == "" || prevGroup == "" { |
| return nil |
| } |
| currentGroup, err := getUserGroupByIdTx(tx, userId) |
| if err != nil { |
| return err |
| } |
| if currentGroup != upgradeGroup || currentGroup == prevGroup { |
| return nil |
| } |
| if err := tx.Model(&User{}).Where("id = ?", userId). |
| Update("group", prevGroup).Error; err != nil { |
| return err |
| } |
| cacheGroup = prevGroup |
| return nil |
| }) |
| if err != nil { |
| return expiredCount, err |
| } |
| if cacheGroup != "" { |
| _ = UpdateUserGroupCache(userId, cacheGroup) |
| } |
| } |
| return expiredCount, nil |
| } |
|
|
| |
| type SubscriptionPreConsumeRecord struct { |
| Id int `json:"id"` |
| RequestId string `json:"request_id" gorm:"type:varchar(64);uniqueIndex"` |
| UserId int `json:"user_id" gorm:"index"` |
| UserSubscriptionId int `json:"user_subscription_id" gorm:"index"` |
| PreConsumed int64 `json:"pre_consumed" gorm:"type:bigint;not null;default:0"` |
| Status string `json:"status" gorm:"type:varchar(32);index"` |
| CreatedAt int64 `json:"created_at" gorm:"bigint"` |
| UpdatedAt int64 `json:"updated_at" gorm:"bigint;index"` |
| } |
|
|
| func (r *SubscriptionPreConsumeRecord) BeforeCreate(tx *gorm.DB) error { |
| now := common.GetTimestamp() |
| r.CreatedAt = now |
| r.UpdatedAt = now |
| return nil |
| } |
|
|
| func (r *SubscriptionPreConsumeRecord) BeforeUpdate(tx *gorm.DB) error { |
| r.UpdatedAt = common.GetTimestamp() |
| return nil |
| } |
|
|
| func maybeResetUserSubscriptionWithPlanTx(tx *gorm.DB, sub *UserSubscription, plan *SubscriptionPlan, now int64) error { |
| if tx == nil || sub == nil || plan == nil { |
| return errors.New("invalid reset args") |
| } |
| if sub.NextResetTime > 0 && sub.NextResetTime > now { |
| return nil |
| } |
| if NormalizeResetPeriod(plan.QuotaResetPeriod) == SubscriptionResetNever { |
| return nil |
| } |
| baseUnix := sub.LastResetTime |
| if baseUnix <= 0 { |
| baseUnix = sub.StartTime |
| } |
| base := time.Unix(baseUnix, 0) |
| next := calcNextResetTime(base, plan, sub.EndTime) |
| advanced := false |
| for next > 0 && next <= now { |
| advanced = true |
| base = time.Unix(next, 0) |
| next = calcNextResetTime(base, plan, sub.EndTime) |
| } |
| if !advanced { |
| if sub.NextResetTime == 0 && next > 0 { |
| sub.NextResetTime = next |
| sub.LastResetTime = base.Unix() |
| return tx.Save(sub).Error |
| } |
| return nil |
| } |
| sub.AmountUsed = 0 |
| sub.LastResetTime = base.Unix() |
| sub.NextResetTime = next |
| return tx.Save(sub).Error |
| } |
|
|
| |
| func PreConsumeUserSubscription(requestId string, userId int, modelName string, quotaType int, amount int64) (*SubscriptionPreConsumeResult, error) { |
| if userId <= 0 { |
| return nil, errors.New("invalid userId") |
| } |
| if strings.TrimSpace(requestId) == "" { |
| return nil, errors.New("requestId is empty") |
| } |
| if amount <= 0 { |
| return nil, errors.New("amount must be > 0") |
| } |
| now := GetDBTimestamp() |
|
|
| returnValue := &SubscriptionPreConsumeResult{} |
|
|
| err := DB.Transaction(func(tx *gorm.DB) error { |
| var existing SubscriptionPreConsumeRecord |
| query := tx.Where("request_id = ?", requestId).Limit(1).Find(&existing) |
| if query.Error != nil { |
| return query.Error |
| } |
| if query.RowsAffected > 0 { |
| if existing.Status == "refunded" { |
| return errors.New("subscription pre-consume already refunded") |
| } |
| var sub UserSubscription |
| if err := tx.Where("id = ?", existing.UserSubscriptionId).First(&sub).Error; err != nil { |
| return err |
| } |
| returnValue.UserSubscriptionId = sub.Id |
| returnValue.PreConsumed = existing.PreConsumed |
| returnValue.AmountTotal = sub.AmountTotal |
| returnValue.AmountUsedBefore = sub.AmountUsed |
| returnValue.AmountUsedAfter = sub.AmountUsed |
| return nil |
| } |
|
|
| var subs []UserSubscription |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("user_id = ? AND status = ? AND end_time > ?", userId, "active", now). |
| Order("end_time asc, id asc"). |
| Find(&subs).Error; err != nil { |
| return errors.New("no active subscription") |
| } |
| if len(subs) == 0 { |
| return errors.New("no active subscription") |
| } |
| for _, candidate := range subs { |
| sub := candidate |
| plan, err := getSubscriptionPlanByIdTx(tx, sub.PlanId) |
| if err != nil { |
| return err |
| } |
| if err := maybeResetUserSubscriptionWithPlanTx(tx, &sub, plan, now); err != nil { |
| return err |
| } |
| usedBefore := sub.AmountUsed |
| if sub.AmountTotal > 0 { |
| remain := sub.AmountTotal - usedBefore |
| if remain < amount { |
| continue |
| } |
| } |
| record := &SubscriptionPreConsumeRecord{ |
| RequestId: requestId, |
| UserId: userId, |
| UserSubscriptionId: sub.Id, |
| PreConsumed: amount, |
| Status: "consumed", |
| } |
| if err := tx.Create(record).Error; err != nil { |
| var dup SubscriptionPreConsumeRecord |
| if err2 := tx.Where("request_id = ?", requestId).First(&dup).Error; err2 == nil { |
| if dup.Status == "refunded" { |
| return errors.New("subscription pre-consume already refunded") |
| } |
| returnValue.UserSubscriptionId = sub.Id |
| returnValue.PreConsumed = dup.PreConsumed |
| returnValue.AmountTotal = sub.AmountTotal |
| returnValue.AmountUsedBefore = sub.AmountUsed |
| returnValue.AmountUsedAfter = sub.AmountUsed |
| return nil |
| } |
| return err |
| } |
| sub.AmountUsed += amount |
| if err := tx.Save(&sub).Error; err != nil { |
| return err |
| } |
| returnValue.UserSubscriptionId = sub.Id |
| returnValue.PreConsumed = amount |
| returnValue.AmountTotal = sub.AmountTotal |
| returnValue.AmountUsedBefore = usedBefore |
| returnValue.AmountUsedAfter = sub.AmountUsed |
| return nil |
| } |
| return fmt.Errorf("subscription quota insufficient, need=%d", amount) |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return returnValue, nil |
| } |
|
|
| |
| func RefundSubscriptionPreConsume(requestId string) error { |
| if strings.TrimSpace(requestId) == "" { |
| return errors.New("requestId is empty") |
| } |
| return DB.Transaction(func(tx *gorm.DB) error { |
| var record SubscriptionPreConsumeRecord |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("request_id = ?", requestId).First(&record).Error; err != nil { |
| return err |
| } |
| if record.Status == "refunded" { |
| return nil |
| } |
| if record.PreConsumed <= 0 { |
| record.Status = "refunded" |
| return tx.Save(&record).Error |
| } |
| if err := PostConsumeUserSubscriptionDelta(record.UserSubscriptionId, -record.PreConsumed); err != nil { |
| return err |
| } |
| record.Status = "refunded" |
| return tx.Save(&record).Error |
| }) |
| } |
|
|
| |
| func ResetDueSubscriptions(limit int) (int, error) { |
| if limit <= 0 { |
| limit = 200 |
| } |
| now := GetDBTimestamp() |
| var subs []UserSubscription |
| if err := DB.Where("next_reset_time > 0 AND next_reset_time <= ? AND status = ?", now, "active"). |
| Order("next_reset_time asc"). |
| Limit(limit). |
| Find(&subs).Error; err != nil { |
| return 0, err |
| } |
| if len(subs) == 0 { |
| return 0, nil |
| } |
| resetCount := 0 |
| for _, sub := range subs { |
| subCopy := sub |
| plan, err := getSubscriptionPlanByIdTx(nil, sub.PlanId) |
| if err != nil || plan == nil { |
| continue |
| } |
| err = DB.Transaction(func(tx *gorm.DB) error { |
| var locked UserSubscription |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("id = ? AND next_reset_time > 0 AND next_reset_time <= ?", subCopy.Id, now). |
| First(&locked).Error; err != nil { |
| return nil |
| } |
| if err := maybeResetUserSubscriptionWithPlanTx(tx, &locked, plan, now); err != nil { |
| return err |
| } |
| resetCount++ |
| return nil |
| }) |
| if err != nil { |
| return resetCount, err |
| } |
| } |
| return resetCount, nil |
| } |
|
|
| |
| func CleanupSubscriptionPreConsumeRecords(olderThanSeconds int64) (int64, error) { |
| if olderThanSeconds <= 0 { |
| olderThanSeconds = 7 * 24 * 3600 |
| } |
| cutoff := GetDBTimestamp() - olderThanSeconds |
| res := DB.Where("updated_at < ?", cutoff).Delete(&SubscriptionPreConsumeRecord{}) |
| return res.RowsAffected, res.Error |
| } |
|
|
| type SubscriptionPlanInfo struct { |
| PlanId int |
| PlanTitle string |
| } |
|
|
| func GetSubscriptionPlanInfoByUserSubscriptionId(userSubscriptionId int) (*SubscriptionPlanInfo, error) { |
| if userSubscriptionId <= 0 { |
| return nil, errors.New("invalid userSubscriptionId") |
| } |
| cacheKey := fmt.Sprintf("sub:%d", userSubscriptionId) |
| if cached, found, err := getSubscriptionPlanInfoCache().Get(cacheKey); err == nil && found { |
| return &cached, nil |
| } |
| var sub UserSubscription |
| if err := DB.Where("id = ?", userSubscriptionId).First(&sub).Error; err != nil { |
| return nil, err |
| } |
| plan, err := getSubscriptionPlanByIdTx(nil, sub.PlanId) |
| if err != nil { |
| return nil, err |
| } |
| info := &SubscriptionPlanInfo{ |
| PlanId: sub.PlanId, |
| PlanTitle: plan.Title, |
| } |
| _ = getSubscriptionPlanInfoCache().SetWithTTL(cacheKey, *info, subscriptionPlanInfoCacheTTL()) |
| return info, nil |
| } |
|
|
| |
| func PostConsumeUserSubscriptionDelta(userSubscriptionId int, delta int64) error { |
| if userSubscriptionId <= 0 { |
| return errors.New("invalid userSubscriptionId") |
| } |
| if delta == 0 { |
| return nil |
| } |
| return DB.Transaction(func(tx *gorm.DB) error { |
| var sub UserSubscription |
| if err := tx.Set("gorm:query_option", "FOR UPDATE"). |
| Where("id = ?", userSubscriptionId). |
| First(&sub).Error; err != nil { |
| return err |
| } |
| newUsed := sub.AmountUsed + delta |
| if newUsed < 0 { |
| newUsed = 0 |
| } |
| if sub.AmountTotal > 0 && newUsed > sub.AmountTotal { |
| return fmt.Errorf("subscription used exceeds total, used=%d total=%d", newUsed, sub.AmountTotal) |
| } |
| sub.AmountUsed = newUsed |
| return tx.Save(&sub).Error |
| }) |
| } |
|
|