| |
|
|
| |
| |
| |
|
|
| package main |
|
|
| import ( |
| "context" |
| "github.com/Wei-Shaw/sub2api/ent" |
| "github.com/Wei-Shaw/sub2api/internal/config" |
| "github.com/Wei-Shaw/sub2api/internal/handler" |
| "github.com/Wei-Shaw/sub2api/internal/handler/admin" |
| "github.com/Wei-Shaw/sub2api/internal/repository" |
| "github.com/Wei-Shaw/sub2api/internal/server" |
| "github.com/Wei-Shaw/sub2api/internal/server/middleware" |
| "github.com/Wei-Shaw/sub2api/internal/service" |
| "github.com/redis/go-redis/v9" |
| "log" |
| "net/http" |
| "sync" |
| "time" |
| ) |
|
|
| import ( |
| _ "embed" |
| _ "github.com/Wei-Shaw/sub2api/ent/runtime" |
| ) |
|
|
| |
|
|
| func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { |
| configConfig, err := config.ProvideConfig() |
| if err != nil { |
| return nil, err |
| } |
| client, err := repository.ProvideEnt(configConfig) |
| if err != nil { |
| return nil, err |
| } |
| db, err := repository.ProvideSQLDB(client) |
| if err != nil { |
| return nil, err |
| } |
| userRepository := repository.NewUserRepository(client, db) |
| redeemCodeRepository := repository.NewRedeemCodeRepository(client) |
| redisClient := repository.ProvideRedis(configConfig) |
| refreshTokenCache := repository.NewRefreshTokenCache(redisClient) |
| settingRepository := repository.NewSettingRepository(client) |
| groupRepository := repository.NewGroupRepository(client, db) |
| settingService := service.ProvideSettingService(settingRepository, groupRepository, configConfig) |
| emailCache := repository.NewEmailCache(redisClient) |
| emailService := service.NewEmailService(settingRepository, emailCache) |
| turnstileVerifier := repository.NewTurnstileVerifier() |
| turnstileService := service.NewTurnstileService(settingService, turnstileVerifier) |
| emailQueueService := service.ProvideEmailQueueService(emailService) |
| promoCodeRepository := repository.NewPromoCodeRepository(client) |
| billingCache := repository.NewBillingCache(redisClient) |
| userSubscriptionRepository := repository.NewUserSubscriptionRepository(client) |
| apiKeyRepository := repository.NewAPIKeyRepository(client, db) |
| billingCacheService := service.NewBillingCacheService(billingCache, userRepository, userSubscriptionRepository, apiKeyRepository, configConfig) |
| userGroupRateRepository := repository.NewUserGroupRateRepository(db) |
| apiKeyCache := repository.NewAPIKeyCache(redisClient) |
| apiKeyService := service.NewAPIKeyService(apiKeyRepository, userRepository, groupRepository, userSubscriptionRepository, userGroupRateRepository, apiKeyCache, configConfig) |
| apiKeyService.SetRateLimitCacheInvalidator(billingCache) |
| apiKeyAuthCacheInvalidator := service.ProvideAPIKeyAuthCacheInvalidator(apiKeyService) |
| promoService := service.NewPromoService(promoCodeRepository, userRepository, billingCacheService, client, apiKeyAuthCacheInvalidator) |
| subscriptionService := service.NewSubscriptionService(groupRepository, userSubscriptionRepository, billingCacheService, client, configConfig) |
| authService := service.NewAuthService(client, userRepository, redeemCodeRepository, refreshTokenCache, configConfig, settingService, emailService, turnstileService, emailQueueService, promoService, subscriptionService) |
| userService := service.NewUserService(userRepository, apiKeyAuthCacheInvalidator, billingCache) |
| redeemCache := repository.NewRedeemCache(redisClient) |
| redeemService := service.NewRedeemService(redeemCodeRepository, userRepository, subscriptionService, redeemCache, billingCacheService, client, apiKeyAuthCacheInvalidator) |
| secretEncryptor, err := repository.NewAESEncryptor(configConfig) |
| if err != nil { |
| return nil, err |
| } |
| totpCache := repository.NewTotpCache(redisClient) |
| totpService := service.NewTotpService(userRepository, secretEncryptor, totpCache, settingService, emailService, emailQueueService) |
| authHandler := handler.NewAuthHandler(configConfig, authService, userService, settingService, promoService, redeemService, totpService) |
| userHandler := handler.NewUserHandler(userService) |
| apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService) |
| usageLogRepository := repository.NewUsageLogRepository(client, db) |
| usageBillingRepository := repository.NewUsageBillingRepository(client, db) |
| usageService := service.NewUsageService(usageLogRepository, userRepository, client, apiKeyAuthCacheInvalidator) |
| usageHandler := handler.NewUsageHandler(usageService, apiKeyService) |
| redeemHandler := handler.NewRedeemHandler(redeemService) |
| subscriptionHandler := handler.NewSubscriptionHandler(subscriptionService) |
| announcementRepository := repository.NewAnnouncementRepository(client) |
| announcementReadRepository := repository.NewAnnouncementReadRepository(client) |
| announcementService := service.NewAnnouncementService(announcementRepository, announcementReadRepository, userRepository, userSubscriptionRepository) |
| announcementHandler := handler.NewAnnouncementHandler(announcementService) |
| dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db) |
| dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig) |
| dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig) |
| timingWheelService, err := service.ProvideTimingWheelService() |
| if err != nil { |
| return nil, err |
| } |
| dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig) |
| dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService) |
| schedulerCache := repository.NewSchedulerCache(redisClient) |
| accountRepository := repository.NewAccountRepository(client, db, schedulerCache) |
| soraAccountRepository := repository.NewSoraAccountRepository(db) |
| proxyRepository := repository.NewProxyRepository(client, db) |
| proxyExitInfoProber := repository.NewProxyExitInfoProber(configConfig) |
| proxyLatencyCache := repository.NewProxyLatencyCache(redisClient) |
| privacyClientFactory := providePrivacyClientFactory() |
| adminService := service.NewAdminService(userRepository, groupRepository, accountRepository, soraAccountRepository, proxyRepository, apiKeyRepository, redeemCodeRepository, userGroupRateRepository, billingCacheService, proxyExitInfoProber, proxyLatencyCache, apiKeyAuthCacheInvalidator, client, settingService, subscriptionService, userSubscriptionRepository, privacyClientFactory) |
| concurrencyCache := repository.ProvideConcurrencyCache(redisClient, configConfig) |
| concurrencyService := service.ProvideConcurrencyService(concurrencyCache, accountRepository, configConfig) |
| adminUserHandler := admin.NewUserHandler(adminService, concurrencyService) |
| claudeOAuthClient := repository.NewClaudeOAuthClient() |
| oAuthService := service.NewOAuthService(proxyRepository, claudeOAuthClient) |
| openAIOAuthClient := repository.NewOpenAIOAuthClient() |
| openAIOAuthService := service.NewOpenAIOAuthService(proxyRepository, openAIOAuthClient) |
| geminiOAuthClient := repository.NewGeminiOAuthClient(configConfig) |
| geminiCliCodeAssistClient := repository.NewGeminiCliCodeAssistClient() |
| driveClient := repository.NewGeminiDriveClient() |
| geminiOAuthService := service.NewGeminiOAuthService(proxyRepository, geminiOAuthClient, geminiCliCodeAssistClient, driveClient, configConfig) |
| antigravityOAuthService := service.NewAntigravityOAuthService(proxyRepository) |
| geminiQuotaService := service.NewGeminiQuotaService(configConfig, settingRepository) |
| tempUnschedCache := repository.NewTempUnschedCache(redisClient) |
| timeoutCounterCache := repository.NewTimeoutCounterCache(redisClient) |
| geminiTokenCache := repository.NewGeminiTokenCache(redisClient) |
| oauthRefreshAPI := service.NewOAuthRefreshAPI(accountRepository, geminiTokenCache) |
| compositeTokenCacheInvalidator := service.NewCompositeTokenCacheInvalidator(geminiTokenCache) |
| rateLimitService := service.ProvideRateLimitService(accountRepository, usageLogRepository, configConfig, geminiQuotaService, tempUnschedCache, timeoutCounterCache, settingService, compositeTokenCacheInvalidator) |
| httpUpstream := repository.NewHTTPUpstream(configConfig) |
| claudeUsageFetcher := repository.NewClaudeUsageFetcher(httpUpstream) |
| antigravityQuotaFetcher := service.NewAntigravityQuotaFetcher(proxyRepository) |
| usageCache := service.NewUsageCache() |
| identityCache := repository.NewIdentityCache(redisClient) |
| accountUsageService := service.NewAccountUsageService(accountRepository, usageLogRepository, claudeUsageFetcher, geminiQuotaService, antigravityQuotaFetcher, usageCache, identityCache) |
| geminiTokenProvider := service.ProvideGeminiTokenProvider(accountRepository, geminiTokenCache, geminiOAuthService, oauthRefreshAPI) |
| gatewayCache := repository.NewGatewayCache(redisClient) |
| schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db) |
| schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig) |
| antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache) |
| antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService) |
| accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig) |
| crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig) |
| sessionLimitCache := repository.ProvideSessionLimitCache(redisClient, configConfig) |
| rpmCache := repository.NewRPMCache(redisClient) |
| groupCapacityService := service.NewGroupCapacityService(accountRepository, groupRepository, concurrencyService, sessionLimitCache, rpmCache) |
| groupHandler := admin.NewGroupHandler(adminService, dashboardService, groupCapacityService) |
| accountHandler := admin.NewAccountHandler(adminService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, rateLimitService, accountUsageService, accountTestService, concurrencyService, crsSyncService, sessionLimitCache, rpmCache, compositeTokenCacheInvalidator) |
| adminAnnouncementHandler := admin.NewAnnouncementHandler(announcementService) |
| dataManagementService := service.NewDataManagementService() |
| dataManagementHandler := admin.NewDataManagementHandler(dataManagementService) |
| backupObjectStoreFactory := repository.NewS3BackupStoreFactory() |
| dbDumper := repository.NewPgDumper(configConfig) |
| backupService := service.ProvideBackupService(settingRepository, configConfig, secretEncryptor, backupObjectStoreFactory, dbDumper) |
| backupHandler := admin.NewBackupHandler(backupService, userService) |
| oAuthHandler := admin.NewOAuthHandler(oAuthService) |
| openAIOAuthHandler := admin.NewOpenAIOAuthHandler(openAIOAuthService, adminService) |
| geminiOAuthHandler := admin.NewGeminiOAuthHandler(geminiOAuthService) |
| antigravityOAuthHandler := admin.NewAntigravityOAuthHandler(antigravityOAuthService) |
| proxyHandler := admin.NewProxyHandler(adminService) |
| adminRedeemHandler := admin.NewRedeemHandler(adminService, redeemService) |
| promoHandler := admin.NewPromoHandler(promoService) |
| opsRepository := repository.NewOpsRepository(db) |
| pricingRemoteClient := repository.ProvidePricingRemoteClient(configConfig) |
| pricingService, err := service.ProvidePricingService(configConfig, pricingRemoteClient) |
| if err != nil { |
| return nil, err |
| } |
| billingService := service.NewBillingService(configConfig, pricingService) |
| identityService := service.NewIdentityService(identityCache) |
| deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) |
| claudeTokenProvider := service.ProvideClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService, oauthRefreshAPI) |
| digestSessionStore := service.NewDigestSessionStore() |
| gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService) |
| openAITokenProvider := service.ProvideOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService, oauthRefreshAPI) |
| openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider) |
| geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) |
| opsSystemLogSink := service.ProvideOpsSystemLogSink(opsRepository) |
| opsService := service.NewOpsService(opsRepository, settingRepository, configConfig, accountRepository, userRepository, concurrencyService, gatewayService, openAIGatewayService, geminiMessagesCompatService, antigravityGatewayService, opsSystemLogSink) |
| soraS3Storage := service.NewSoraS3Storage(settingService) |
| settingService.SetOnS3UpdateCallback(soraS3Storage.RefreshClient) |
| soraGenerationRepository := repository.NewSoraGenerationRepository(db) |
| soraQuotaService := service.NewSoraQuotaService(userRepository, groupRepository, settingService) |
| soraGenerationService := service.NewSoraGenerationService(soraGenerationRepository, soraS3Storage, soraQuotaService) |
| settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService, soraS3Storage) |
| opsHandler := admin.NewOpsHandler(opsService) |
| updateCache := repository.NewUpdateCache(redisClient) |
| gitHubReleaseClient := repository.ProvideGitHubReleaseClient(configConfig) |
| serviceBuildInfo := provideServiceBuildInfo(buildInfo) |
| updateService := service.ProvideUpdateService(updateCache, gitHubReleaseClient, serviceBuildInfo) |
| idempotencyRepository := repository.NewIdempotencyRepository(client, db) |
| systemOperationLockService := service.ProvideSystemOperationLockService(idempotencyRepository, configConfig) |
| systemHandler := handler.ProvideSystemHandler(updateService, systemOperationLockService) |
| adminSubscriptionHandler := admin.NewSubscriptionHandler(subscriptionService) |
| usageCleanupRepository := repository.NewUsageCleanupRepository(client, db) |
| usageCleanupService := service.ProvideUsageCleanupService(usageCleanupRepository, timingWheelService, dashboardAggregationService, configConfig) |
| adminUsageHandler := admin.NewUsageHandler(usageService, apiKeyService, adminService, usageCleanupService) |
| userAttributeDefinitionRepository := repository.NewUserAttributeDefinitionRepository(client) |
| userAttributeValueRepository := repository.NewUserAttributeValueRepository(client) |
| userAttributeService := service.NewUserAttributeService(userAttributeDefinitionRepository, userAttributeValueRepository) |
| userAttributeHandler := admin.NewUserAttributeHandler(userAttributeService) |
| errorPassthroughRepository := repository.NewErrorPassthroughRepository(client) |
| errorPassthroughCache := repository.NewErrorPassthroughCache(redisClient) |
| errorPassthroughService := service.NewErrorPassthroughService(errorPassthroughRepository, errorPassthroughCache) |
| errorPassthroughHandler := admin.NewErrorPassthroughHandler(errorPassthroughService) |
| adminAPIKeyHandler := admin.NewAdminAPIKeyHandler(adminService) |
| scheduledTestPlanRepository := repository.NewScheduledTestPlanRepository(db) |
| scheduledTestResultRepository := repository.NewScheduledTestResultRepository(db) |
| scheduledTestService := service.ProvideScheduledTestService(scheduledTestPlanRepository, scheduledTestResultRepository) |
| scheduledTestHandler := admin.NewScheduledTestHandler(scheduledTestService) |
| adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, backupHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, adminAPIKeyHandler, scheduledTestHandler) |
| usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig) |
| userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient) |
| userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig) |
| gatewayHandler := handler.NewGatewayHandler(gatewayService, geminiMessagesCompatService, antigravityGatewayService, userService, concurrencyService, billingCacheService, usageService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, userMessageQueueService, configConfig, settingService) |
| openAIGatewayHandler := handler.NewOpenAIGatewayHandler(openAIGatewayService, concurrencyService, billingCacheService, apiKeyService, usageRecordWorkerPool, errorPassthroughService, configConfig) |
| soraSDKClient := service.ProvideSoraSDKClient(configConfig, httpUpstream, openAITokenProvider, accountRepository, soraAccountRepository) |
| soraMediaStorage := service.ProvideSoraMediaStorage(configConfig) |
| soraGatewayService := service.NewSoraGatewayService(soraSDKClient, rateLimitService, httpUpstream, configConfig) |
| soraClientHandler := handler.NewSoraClientHandler(soraGenerationService, soraQuotaService, soraS3Storage, soraGatewayService, gatewayService, soraMediaStorage, apiKeyService) |
| soraGatewayHandler := handler.NewSoraGatewayHandler(gatewayService, soraGatewayService, concurrencyService, billingCacheService, usageRecordWorkerPool, configConfig) |
| handlerSettingHandler := handler.ProvideSettingHandler(settingService, buildInfo) |
| totpHandler := handler.NewTotpHandler(totpService) |
| idempotencyCoordinator := service.ProvideIdempotencyCoordinator(idempotencyRepository, configConfig) |
| idempotencyCleanupService := service.ProvideIdempotencyCleanupService(idempotencyRepository, configConfig) |
| handlers := handler.ProvideHandlers(authHandler, userHandler, apiKeyHandler, usageHandler, redeemHandler, subscriptionHandler, announcementHandler, adminHandlers, gatewayHandler, openAIGatewayHandler, soraGatewayHandler, soraClientHandler, handlerSettingHandler, totpHandler, idempotencyCoordinator, idempotencyCleanupService) |
| jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService) |
| adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService) |
| apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig) |
| engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient) |
| httpServer := server.ProvideHTTPServer(configConfig, engine) |
| opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig) |
| opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) |
| opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig) |
| opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig) |
| opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) |
| soraMediaCleanupService := service.ProvideSoraMediaCleanupService(soraMediaStorage, configConfig) |
| tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oauthRefreshAPI) |
| accountExpiryService := service.ProvideAccountExpiryService(accountRepository) |
| subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) |
| scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, rateLimitService, configConfig) |
| v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService, backupService) |
| application := &Application{ |
| Server: httpServer, |
| Cleanup: v, |
| } |
| return application, nil |
| } |
|
|
| |
|
|
| type Application struct { |
| Server *http.Server |
| Cleanup func() |
| } |
|
|
| func providePrivacyClientFactory() service.PrivacyClientFactory { |
| return repository.CreatePrivacyReqClient |
| } |
|
|
| func provideServiceBuildInfo(buildInfo handler.BuildInfo) service.BuildInfo { |
| return service.BuildInfo{ |
| Version: buildInfo.Version, |
| BuildType: buildInfo.BuildType, |
| } |
| } |
|
|
| func provideCleanup( |
| entClient *ent.Client, |
| rdb *redis.Client, |
| opsMetricsCollector *service.OpsMetricsCollector, |
| opsAggregation *service.OpsAggregationService, |
| opsAlertEvaluator *service.OpsAlertEvaluatorService, |
| opsCleanup *service.OpsCleanupService, |
| opsScheduledReport *service.OpsScheduledReportService, |
| opsSystemLogSink *service.OpsSystemLogSink, |
| soraMediaCleanup *service.SoraMediaCleanupService, |
| schedulerSnapshot *service.SchedulerSnapshotService, |
| tokenRefresh *service.TokenRefreshService, |
| accountExpiry *service.AccountExpiryService, |
| subscriptionExpiry *service.SubscriptionExpiryService, |
| usageCleanup *service.UsageCleanupService, |
| idempotencyCleanup *service.IdempotencyCleanupService, |
| pricing *service.PricingService, |
| emailQueue *service.EmailQueueService, |
| billingCache *service.BillingCacheService, |
| usageRecordWorkerPool *service.UsageRecordWorkerPool, |
| subscriptionService *service.SubscriptionService, |
| oauth *service.OAuthService, |
| openaiOAuth *service.OpenAIOAuthService, |
| geminiOAuth *service.GeminiOAuthService, |
| antigravityOAuth *service.AntigravityOAuthService, |
| openAIGateway *service.OpenAIGatewayService, |
| scheduledTestRunner *service.ScheduledTestRunnerService, |
| backupSvc *service.BackupService, |
| ) func() { |
| return func() { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| defer cancel() |
|
|
| type cleanupStep struct { |
| name string |
| fn func() error |
| } |
|
|
| parallelSteps := []cleanupStep{ |
| {"OpsScheduledReportService", func() error { |
| if opsScheduledReport != nil { |
| opsScheduledReport.Stop() |
| } |
| return nil |
| }}, |
| {"OpsCleanupService", func() error { |
| if opsCleanup != nil { |
| opsCleanup.Stop() |
| } |
| return nil |
| }}, |
| {"OpsSystemLogSink", func() error { |
| if opsSystemLogSink != nil { |
| opsSystemLogSink.Stop() |
| } |
| return nil |
| }}, |
| {"SoraMediaCleanupService", func() error { |
| if soraMediaCleanup != nil { |
| soraMediaCleanup.Stop() |
| } |
| return nil |
| }}, |
| {"OpsAlertEvaluatorService", func() error { |
| if opsAlertEvaluator != nil { |
| opsAlertEvaluator.Stop() |
| } |
| return nil |
| }}, |
| {"OpsAggregationService", func() error { |
| if opsAggregation != nil { |
| opsAggregation.Stop() |
| } |
| return nil |
| }}, |
| {"OpsMetricsCollector", func() error { |
| if opsMetricsCollector != nil { |
| opsMetricsCollector.Stop() |
| } |
| return nil |
| }}, |
| {"SchedulerSnapshotService", func() error { |
| if schedulerSnapshot != nil { |
| schedulerSnapshot.Stop() |
| } |
| return nil |
| }}, |
| {"UsageCleanupService", func() error { |
| if usageCleanup != nil { |
| usageCleanup.Stop() |
| } |
| return nil |
| }}, |
| {"IdempotencyCleanupService", func() error { |
| if idempotencyCleanup != nil { |
| idempotencyCleanup.Stop() |
| } |
| return nil |
| }}, |
| {"TokenRefreshService", func() error { |
| tokenRefresh.Stop() |
| return nil |
| }}, |
| {"AccountExpiryService", func() error { |
| accountExpiry.Stop() |
| return nil |
| }}, |
| {"SubscriptionExpiryService", func() error { |
| subscriptionExpiry.Stop() |
| return nil |
| }}, |
| {"SubscriptionService", func() error { |
| if subscriptionService != nil { |
| subscriptionService.Stop() |
| } |
| return nil |
| }}, |
| {"PricingService", func() error { |
| pricing.Stop() |
| return nil |
| }}, |
| {"EmailQueueService", func() error { |
| emailQueue.Stop() |
| return nil |
| }}, |
| {"BillingCacheService", func() error { |
| billingCache.Stop() |
| return nil |
| }}, |
| {"UsageRecordWorkerPool", func() error { |
| if usageRecordWorkerPool != nil { |
| usageRecordWorkerPool.Stop() |
| } |
| return nil |
| }}, |
| {"OAuthService", func() error { |
| oauth.Stop() |
| return nil |
| }}, |
| {"OpenAIOAuthService", func() error { |
| openaiOAuth.Stop() |
| return nil |
| }}, |
| {"GeminiOAuthService", func() error { |
| geminiOAuth.Stop() |
| return nil |
| }}, |
| {"AntigravityOAuthService", func() error { |
| antigravityOAuth.Stop() |
| return nil |
| }}, |
| {"OpenAIWSPool", func() error { |
| if openAIGateway != nil { |
| openAIGateway.CloseOpenAIWSPool() |
| } |
| return nil |
| }}, |
| {"ScheduledTestRunnerService", func() error { |
| if scheduledTestRunner != nil { |
| scheduledTestRunner.Stop() |
| } |
| return nil |
| }}, |
| {"BackupService", func() error { |
| if backupSvc != nil { |
| backupSvc.Stop() |
| } |
| return nil |
| }}, |
| } |
|
|
| infraSteps := []cleanupStep{ |
| {"Redis", func() error { |
| if rdb == nil { |
| return nil |
| } |
| return rdb.Close() |
| }}, |
| {"Ent", func() error { |
| if entClient == nil { |
| return nil |
| } |
| return entClient.Close() |
| }}, |
| } |
|
|
| runParallel := func(steps []cleanupStep) { |
| var wg sync.WaitGroup |
| for i := range steps { |
| step := steps[i] |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| if err := step.fn(); err != nil { |
| log.Printf("[Cleanup] %s failed: %v", step.name, err) |
| return |
| } |
| log.Printf("[Cleanup] %s succeeded", step.name) |
| }() |
| } |
| wg.Wait() |
| } |
|
|
| runSequential := func(steps []cleanupStep) { |
| for i := range steps { |
| step := steps[i] |
| if err := step.fn(); err != nil { |
| log.Printf("[Cleanup] %s failed: %v", step.name, err) |
| continue |
| } |
| log.Printf("[Cleanup] %s succeeded", step.name) |
| } |
| } |
|
|
| runParallel(parallelSteps) |
| runSequential(infraSteps) |
|
|
| select { |
| case <-ctx.Done(): |
| log.Printf("[Cleanup] Warning: cleanup timed out after 10 seconds") |
| default: |
| log.Printf("[Cleanup] All cleanup steps completed") |
| } |
| } |
| } |
|
|