| const Redis = require('ioredis') |
| const config = require('../../config/config') |
| const logger = require('../utils/logger') |
|
|
| |
| |
| |
| function getDateInTimezone(date = new Date()) { |
| const offset = config.system.timezoneOffset || 8 |
|
|
| |
| |
| const offsetMs = offset * 3600000 |
| const adjustedTime = new Date(date.getTime() + offsetMs) |
|
|
| return adjustedTime |
| } |
|
|
| |
| function getDateStringInTimezone(date = new Date()) { |
| const tzDate = getDateInTimezone(date) |
| |
| return `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart(2, '0')}-${String( |
| tzDate.getUTCDate() |
| ).padStart(2, '0')}` |
| } |
|
|
| |
| function getHourInTimezone(date = new Date()) { |
| const tzDate = getDateInTimezone(date) |
| return tzDate.getUTCHours() |
| } |
|
|
| |
| function getWeekStringInTimezone(date = new Date()) { |
| const tzDate = getDateInTimezone(date) |
|
|
| |
| const year = tzDate.getUTCFullYear() |
|
|
| |
| const dateObj = new Date(tzDate) |
| const dayOfWeek = dateObj.getUTCDay() || 7 |
| const firstThursday = new Date(dateObj) |
| firstThursday.setUTCDate(dateObj.getUTCDate() + 4 - dayOfWeek) |
|
|
| const yearStart = new Date(firstThursday.getUTCFullYear(), 0, 1) |
| const weekNumber = Math.ceil(((firstThursday - yearStart) / 86400000 + 1) / 7) |
|
|
| return `${year}-W${String(weekNumber).padStart(2, '0')}` |
| } |
|
|
| class RedisClient { |
| constructor() { |
| this.client = null |
| this.isConnected = false |
| } |
|
|
| async connect() { |
| try { |
| this.client = new Redis({ |
| host: config.redis.host, |
| port: config.redis.port, |
| password: config.redis.password, |
| db: config.redis.db, |
| retryDelayOnFailover: config.redis.retryDelayOnFailover, |
| maxRetriesPerRequest: config.redis.maxRetriesPerRequest, |
| lazyConnect: config.redis.lazyConnect, |
| tls: config.redis.enableTLS ? {} : false |
| }) |
|
|
| this.client.on('connect', () => { |
| this.isConnected = true |
| logger.info('🔗 Redis connected successfully') |
| }) |
|
|
| this.client.on('error', (err) => { |
| this.isConnected = false |
| logger.error('❌ Redis connection error:', err) |
| }) |
|
|
| this.client.on('close', () => { |
| this.isConnected = false |
| logger.warn('⚠️ Redis connection closed') |
| }) |
|
|
| await this.client.connect() |
| return this.client |
| } catch (error) { |
| logger.error('💥 Failed to connect to Redis:', error) |
| throw error |
| } |
| } |
|
|
| async disconnect() { |
| if (this.client) { |
| await this.client.quit() |
| this.isConnected = false |
| logger.info('👋 Redis disconnected') |
| } |
| } |
|
|
| getClient() { |
| if (!this.client || !this.isConnected) { |
| logger.warn('⚠️ Redis client is not connected') |
| return null |
| } |
| return this.client |
| } |
|
|
| |
| getClientSafe() { |
| if (!this.client || !this.isConnected) { |
| throw new Error('Redis client is not connected') |
| } |
| return this.client |
| } |
|
|
| |
| async setApiKey(keyId, keyData, hashedKey = null) { |
| const key = `apikey:${keyId}` |
| const client = this.getClientSafe() |
|
|
| |
| |
| if (hashedKey) { |
| await client.hset('apikey:hash_map', hashedKey, keyId) |
| } |
|
|
| await client.hset(key, keyData) |
| await client.expire(key, 86400 * 365) |
| } |
|
|
| async getApiKey(keyId) { |
| const key = `apikey:${keyId}` |
| return await this.client.hgetall(key) |
| } |
|
|
| async deleteApiKey(keyId) { |
| const key = `apikey:${keyId}` |
|
|
| |
| const keyData = await this.client.hgetall(key) |
| if (keyData && keyData.apiKey) { |
| |
| await this.client.hdel('apikey:hash_map', keyData.apiKey) |
| } |
|
|
| return await this.client.del(key) |
| } |
|
|
| async getAllApiKeys() { |
| const keys = await this.client.keys('apikey:*') |
| const apiKeys = [] |
| for (const key of keys) { |
| |
| if (key === 'apikey:hash_map') { |
| continue |
| } |
|
|
| const keyData = await this.client.hgetall(key) |
| if (keyData && Object.keys(keyData).length > 0) { |
| apiKeys.push({ id: key.replace('apikey:', ''), ...keyData }) |
| } |
| } |
| return apiKeys |
| } |
|
|
| |
| async findApiKeyByHash(hashedKey) { |
| |
| const keyId = await this.client.hget('apikey:hash_map', hashedKey) |
| if (!keyId) { |
| return null |
| } |
|
|
| const keyData = await this.client.hgetall(`apikey:${keyId}`) |
| if (keyData && Object.keys(keyData).length > 0) { |
| return { id: keyId, ...keyData } |
| } |
|
|
| |
| await this.client.hdel('apikey:hash_map', hashedKey) |
| return null |
| } |
|
|
| |
| |
| _normalizeModelName(model) { |
| if (!model || model === 'unknown') { |
| return model |
| } |
|
|
| |
| if (model.includes('.anthropic.') || model.includes('.claude')) { |
| |
| |
| let normalized = model.replace(/^[a-z0-9-]+\./, '') |
| normalized = normalized.replace('anthropic.', '') |
| normalized = normalized.replace(/-v\d+:\d+$/, '') |
| return normalized |
| } |
|
|
| |
| return model.replace(/-v\d+:\d+$|:latest$/, '') |
| } |
|
|
| async incrementTokenUsage( |
| keyId, |
| tokens, |
| inputTokens = 0, |
| outputTokens = 0, |
| cacheCreateTokens = 0, |
| cacheReadTokens = 0, |
| model = 'unknown', |
| ephemeral5mTokens = 0, |
| ephemeral1hTokens = 0, |
| isLongContextRequest = false |
| ) { |
| const key = `usage:${keyId}` |
| const now = new Date() |
| const today = getDateStringInTimezone(now) |
| const tzDate = getDateInTimezone(now) |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}` |
|
|
| const daily = `usage:daily:${keyId}:${today}` |
| const monthly = `usage:monthly:${keyId}:${currentMonth}` |
| const hourly = `usage:hourly:${keyId}:${currentHour}` |
|
|
| |
| const normalizedModel = this._normalizeModelName(model) |
|
|
| |
| const modelDaily = `usage:model:daily:${normalizedModel}:${today}` |
| const modelMonthly = `usage:model:monthly:${normalizedModel}:${currentMonth}` |
| const modelHourly = `usage:model:hourly:${normalizedModel}:${currentHour}` |
|
|
| |
| const keyModelDaily = `usage:${keyId}:model:daily:${normalizedModel}:${today}` |
| const keyModelMonthly = `usage:${keyId}:model:monthly:${normalizedModel}:${currentMonth}` |
| const keyModelHourly = `usage:${keyId}:model:hourly:${normalizedModel}:${currentHour}` |
|
|
| |
| const minuteTimestamp = Math.floor(now.getTime() / 60000) |
| const systemMinuteKey = `system:metrics:minute:${minuteTimestamp}` |
|
|
| |
| const finalInputTokens = inputTokens || 0 |
| const finalOutputTokens = outputTokens || (finalInputTokens > 0 ? 0 : tokens) |
| const finalCacheCreateTokens = cacheCreateTokens || 0 |
| const finalCacheReadTokens = cacheReadTokens || 0 |
|
|
| |
| const totalTokens = |
| finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens |
| |
| const coreTokens = finalInputTokens + finalOutputTokens |
|
|
| |
| const pipeline = this.client.pipeline() |
|
|
| |
| |
| pipeline.hincrby(key, 'totalTokens', coreTokens) |
| pipeline.hincrby(key, 'totalInputTokens', finalInputTokens) |
| pipeline.hincrby(key, 'totalOutputTokens', finalOutputTokens) |
| |
| pipeline.hincrby(key, 'totalCacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(key, 'totalCacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(key, 'totalAllTokens', totalTokens) |
| |
| pipeline.hincrby(key, 'totalEphemeral5mTokens', ephemeral5mTokens) |
| pipeline.hincrby(key, 'totalEphemeral1hTokens', ephemeral1hTokens) |
| |
| if (isLongContextRequest) { |
| pipeline.hincrby(key, 'totalLongContextInputTokens', finalInputTokens) |
| pipeline.hincrby(key, 'totalLongContextOutputTokens', finalOutputTokens) |
| pipeline.hincrby(key, 'totalLongContextRequests', 1) |
| } |
| |
| pipeline.hincrby(key, 'totalRequests', 1) |
|
|
| |
| pipeline.hincrby(daily, 'tokens', coreTokens) |
| pipeline.hincrby(daily, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(daily, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(daily, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(daily, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(daily, 'allTokens', totalTokens) |
| pipeline.hincrby(daily, 'requests', 1) |
| |
| pipeline.hincrby(daily, 'ephemeral5mTokens', ephemeral5mTokens) |
| pipeline.hincrby(daily, 'ephemeral1hTokens', ephemeral1hTokens) |
| |
| if (isLongContextRequest) { |
| pipeline.hincrby(daily, 'longContextInputTokens', finalInputTokens) |
| pipeline.hincrby(daily, 'longContextOutputTokens', finalOutputTokens) |
| pipeline.hincrby(daily, 'longContextRequests', 1) |
| } |
|
|
| |
| pipeline.hincrby(monthly, 'tokens', coreTokens) |
| pipeline.hincrby(monthly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(monthly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(monthly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(monthly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(monthly, 'allTokens', totalTokens) |
| pipeline.hincrby(monthly, 'requests', 1) |
| |
| pipeline.hincrby(monthly, 'ephemeral5mTokens', ephemeral5mTokens) |
| pipeline.hincrby(monthly, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
| |
| pipeline.hincrby(modelDaily, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(modelDaily, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(modelDaily, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(modelDaily, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(modelDaily, 'allTokens', totalTokens) |
| pipeline.hincrby(modelDaily, 'requests', 1) |
|
|
| |
| pipeline.hincrby(modelMonthly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(modelMonthly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(modelMonthly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(modelMonthly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(modelMonthly, 'allTokens', totalTokens) |
| pipeline.hincrby(modelMonthly, 'requests', 1) |
|
|
| |
| pipeline.hincrby(keyModelDaily, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(keyModelDaily, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(keyModelDaily, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(keyModelDaily, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(keyModelDaily, 'allTokens', totalTokens) |
| pipeline.hincrby(keyModelDaily, 'requests', 1) |
| |
| pipeline.hincrby(keyModelDaily, 'ephemeral5mTokens', ephemeral5mTokens) |
| pipeline.hincrby(keyModelDaily, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
| |
| pipeline.hincrby(keyModelMonthly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(keyModelMonthly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(keyModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(keyModelMonthly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(keyModelMonthly, 'allTokens', totalTokens) |
| pipeline.hincrby(keyModelMonthly, 'requests', 1) |
| |
| pipeline.hincrby(keyModelMonthly, 'ephemeral5mTokens', ephemeral5mTokens) |
| pipeline.hincrby(keyModelMonthly, 'ephemeral1hTokens', ephemeral1hTokens) |
|
|
| |
| pipeline.hincrby(hourly, 'tokens', coreTokens) |
| pipeline.hincrby(hourly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(hourly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(hourly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(hourly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(hourly, 'allTokens', totalTokens) |
| pipeline.hincrby(hourly, 'requests', 1) |
|
|
| |
| pipeline.hincrby(modelHourly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(modelHourly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(modelHourly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(modelHourly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(modelHourly, 'allTokens', totalTokens) |
| pipeline.hincrby(modelHourly, 'requests', 1) |
|
|
| |
| pipeline.hincrby(keyModelHourly, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(keyModelHourly, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(keyModelHourly, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(keyModelHourly, 'cacheReadTokens', finalCacheReadTokens) |
| pipeline.hincrby(keyModelHourly, 'allTokens', totalTokens) |
| pipeline.hincrby(keyModelHourly, 'requests', 1) |
|
|
| |
| pipeline.hincrby(systemMinuteKey, 'requests', 1) |
| pipeline.hincrby(systemMinuteKey, 'totalTokens', totalTokens) |
| pipeline.hincrby(systemMinuteKey, 'inputTokens', finalInputTokens) |
| pipeline.hincrby(systemMinuteKey, 'outputTokens', finalOutputTokens) |
| pipeline.hincrby(systemMinuteKey, 'cacheCreateTokens', finalCacheCreateTokens) |
| pipeline.hincrby(systemMinuteKey, 'cacheReadTokens', finalCacheReadTokens) |
|
|
| |
| pipeline.expire(daily, 86400 * 32) |
| pipeline.expire(monthly, 86400 * 365) |
| pipeline.expire(hourly, 86400 * 7) |
| pipeline.expire(modelDaily, 86400 * 32) |
| pipeline.expire(modelMonthly, 86400 * 365) |
| pipeline.expire(modelHourly, 86400 * 7) |
| pipeline.expire(keyModelDaily, 86400 * 32) |
| pipeline.expire(keyModelMonthly, 86400 * 365) |
| pipeline.expire(keyModelHourly, 86400 * 7) |
|
|
| |
| const configLocal = require('../../config/config') |
| const { metricsWindow } = configLocal.system |
| pipeline.expire(systemMinuteKey, metricsWindow * 60 * 2) |
|
|
| |
| await pipeline.exec() |
| } |
|
|
| |
| async incrementAccountUsage( |
| accountId, |
| totalTokens, |
| inputTokens = 0, |
| outputTokens = 0, |
| cacheCreateTokens = 0, |
| cacheReadTokens = 0, |
| model = 'unknown', |
| isLongContextRequest = false |
| ) { |
| const now = new Date() |
| const today = getDateStringInTimezone(now) |
| const tzDate = getDateInTimezone(now) |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const currentHour = `${today}:${String(getHourInTimezone(now)).padStart(2, '0')}` |
|
|
| |
| const accountKey = `account_usage:${accountId}` |
| const accountDaily = `account_usage:daily:${accountId}:${today}` |
| const accountMonthly = `account_usage:monthly:${accountId}:${currentMonth}` |
| const accountHourly = `account_usage:hourly:${accountId}:${currentHour}` |
|
|
| |
| const normalizedModel = this._normalizeModelName(model) |
|
|
| |
| const accountModelDaily = `account_usage:model:daily:${accountId}:${normalizedModel}:${today}` |
| const accountModelMonthly = `account_usage:model:monthly:${accountId}:${normalizedModel}:${currentMonth}` |
| const accountModelHourly = `account_usage:model:hourly:${accountId}:${normalizedModel}:${currentHour}` |
|
|
| |
| const finalInputTokens = inputTokens || 0 |
| const finalOutputTokens = outputTokens || 0 |
| const finalCacheCreateTokens = cacheCreateTokens || 0 |
| const finalCacheReadTokens = cacheReadTokens || 0 |
| const actualTotalTokens = |
| finalInputTokens + finalOutputTokens + finalCacheCreateTokens + finalCacheReadTokens |
| const coreTokens = finalInputTokens + finalOutputTokens |
|
|
| |
| const operations = [ |
| |
| this.client.hincrby(accountKey, 'totalTokens', coreTokens), |
| this.client.hincrby(accountKey, 'totalInputTokens', finalInputTokens), |
| this.client.hincrby(accountKey, 'totalOutputTokens', finalOutputTokens), |
| this.client.hincrby(accountKey, 'totalCacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountKey, 'totalCacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountKey, 'totalAllTokens', actualTotalTokens), |
| this.client.hincrby(accountKey, 'totalRequests', 1), |
|
|
| |
| this.client.hincrby(accountDaily, 'tokens', coreTokens), |
| this.client.hincrby(accountDaily, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountDaily, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountDaily, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountDaily, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountDaily, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountDaily, 'requests', 1), |
|
|
| |
| this.client.hincrby(accountMonthly, 'tokens', coreTokens), |
| this.client.hincrby(accountMonthly, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountMonthly, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountMonthly, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountMonthly, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountMonthly, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountMonthly, 'requests', 1), |
|
|
| |
| this.client.hincrby(accountHourly, 'tokens', coreTokens), |
| this.client.hincrby(accountHourly, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountHourly, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountHourly, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountHourly, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountHourly, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountHourly, 'requests', 1), |
|
|
| |
| this.client.hincrby(accountHourly, `model:${normalizedModel}:inputTokens`, finalInputTokens), |
| this.client.hincrby( |
| accountHourly, |
| `model:${normalizedModel}:outputTokens`, |
| finalOutputTokens |
| ), |
| this.client.hincrby( |
| accountHourly, |
| `model:${normalizedModel}:cacheCreateTokens`, |
| finalCacheCreateTokens |
| ), |
| this.client.hincrby( |
| accountHourly, |
| `model:${normalizedModel}:cacheReadTokens`, |
| finalCacheReadTokens |
| ), |
| this.client.hincrby(accountHourly, `model:${normalizedModel}:allTokens`, actualTotalTokens), |
| this.client.hincrby(accountHourly, `model:${normalizedModel}:requests`, 1), |
|
|
| |
| this.client.hincrby(accountModelDaily, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountModelDaily, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountModelDaily, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountModelDaily, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountModelDaily, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountModelDaily, 'requests', 1), |
|
|
| |
| this.client.hincrby(accountModelMonthly, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountModelMonthly, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountModelMonthly, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountModelMonthly, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountModelMonthly, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountModelMonthly, 'requests', 1), |
|
|
| |
| this.client.hincrby(accountModelHourly, 'inputTokens', finalInputTokens), |
| this.client.hincrby(accountModelHourly, 'outputTokens', finalOutputTokens), |
| this.client.hincrby(accountModelHourly, 'cacheCreateTokens', finalCacheCreateTokens), |
| this.client.hincrby(accountModelHourly, 'cacheReadTokens', finalCacheReadTokens), |
| this.client.hincrby(accountModelHourly, 'allTokens', actualTotalTokens), |
| this.client.hincrby(accountModelHourly, 'requests', 1), |
|
|
| |
| this.client.expire(accountDaily, 86400 * 32), |
| this.client.expire(accountMonthly, 86400 * 365), |
| this.client.expire(accountHourly, 86400 * 7), |
| this.client.expire(accountModelDaily, 86400 * 32), |
| this.client.expire(accountModelMonthly, 86400 * 365), |
| this.client.expire(accountModelHourly, 86400 * 7) |
| ] |
|
|
| |
| if (isLongContextRequest) { |
| operations.push( |
| this.client.hincrby(accountKey, 'totalLongContextInputTokens', finalInputTokens), |
| this.client.hincrby(accountKey, 'totalLongContextOutputTokens', finalOutputTokens), |
| this.client.hincrby(accountKey, 'totalLongContextRequests', 1), |
| this.client.hincrby(accountDaily, 'longContextInputTokens', finalInputTokens), |
| this.client.hincrby(accountDaily, 'longContextOutputTokens', finalOutputTokens), |
| this.client.hincrby(accountDaily, 'longContextRequests', 1) |
| ) |
| } |
|
|
| await Promise.all(operations) |
| } |
|
|
| async getUsageStats(keyId) { |
| const totalKey = `usage:${keyId}` |
| const today = getDateStringInTimezone() |
| const dailyKey = `usage:daily:${keyId}:${today}` |
| const tzDate = getDateInTimezone() |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const monthlyKey = `usage:monthly:${keyId}:${currentMonth}` |
|
|
| const [total, daily, monthly] = await Promise.all([ |
| this.client.hgetall(totalKey), |
| this.client.hgetall(dailyKey), |
| this.client.hgetall(monthlyKey) |
| ]) |
|
|
| |
| const keyData = await this.client.hgetall(`apikey:${keyId}`) |
| const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date() |
| const now = new Date() |
| const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24))) |
|
|
| const totalTokens = parseInt(total.totalTokens) || 0 |
| const totalRequests = parseInt(total.totalRequests) || 0 |
|
|
| |
| const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60) |
| const avgRPM = totalRequests / totalMinutes |
| const avgTPM = totalTokens / totalMinutes |
|
|
| |
| const handleLegacyData = (data) => { |
| |
| const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0 |
| const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0 |
| const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0 |
| const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0 |
|
|
| |
| const cacheCreateTokens = |
| parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0 |
| const cacheReadTokens = |
| parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0 |
| const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0 |
|
|
| const totalFromSeparate = inputTokens + outputTokens |
| |
| const actualAllTokens = |
| allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens |
|
|
| if (totalFromSeparate === 0 && tokens > 0) { |
| |
| return { |
| tokens, |
| inputTokens: Math.round(tokens * 0.3), |
| outputTokens: Math.round(tokens * 0.7), |
| cacheCreateTokens: 0, |
| cacheReadTokens: 0, |
| allTokens: tokens, |
| requests |
| } |
| } else { |
| |
| return { |
| tokens: actualAllTokens, |
| inputTokens, |
| outputTokens, |
| cacheCreateTokens, |
| cacheReadTokens, |
| allTokens: actualAllTokens, |
| requests |
| } |
| } |
| } |
|
|
| const totalData = handleLegacyData(total) |
| const dailyData = handleLegacyData(daily) |
| const monthlyData = handleLegacyData(monthly) |
|
|
| return { |
| total: totalData, |
| daily: dailyData, |
| monthly: monthlyData, |
| averages: { |
| rpm: Math.round(avgRPM * 100) / 100, |
| tpm: Math.round(avgTPM * 100) / 100, |
| dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100, |
| dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100 |
| } |
| } |
| } |
|
|
| async addUsageRecord(keyId, record, maxRecords = 200) { |
| const listKey = `usage:records:${keyId}` |
| const client = this.getClientSafe() |
|
|
| try { |
| await client |
| .multi() |
| .lpush(listKey, JSON.stringify(record)) |
| .ltrim(listKey, 0, Math.max(0, maxRecords - 1)) |
| .expire(listKey, 86400 * 90) |
| .exec() |
| } catch (error) { |
| logger.error(`❌ Failed to append usage record for key ${keyId}:`, error) |
| } |
| } |
|
|
| async getUsageRecords(keyId, limit = 50) { |
| const listKey = `usage:records:${keyId}` |
| const client = this.getClient() |
|
|
| if (!client) { |
| return [] |
| } |
|
|
| try { |
| const rawRecords = await client.lrange(listKey, 0, Math.max(0, limit - 1)) |
| return rawRecords |
| .map((entry) => { |
| try { |
| return JSON.parse(entry) |
| } catch (error) { |
| logger.warn('⚠️ Failed to parse usage record entry:', error) |
| return null |
| } |
| }) |
| .filter(Boolean) |
| } catch (error) { |
| logger.error(`❌ Failed to load usage records for key ${keyId}:`, error) |
| return [] |
| } |
| } |
|
|
| |
| async getDailyCost(keyId) { |
| const today = getDateStringInTimezone() |
| const costKey = `usage:cost:daily:${keyId}:${today}` |
| const cost = await this.client.get(costKey) |
| const result = parseFloat(cost || 0) |
| logger.debug( |
| `💰 Getting daily cost for ${keyId}, date: ${today}, key: ${costKey}, value: ${cost}, result: ${result}` |
| ) |
| return result |
| } |
|
|
| |
| async incrementDailyCost(keyId, amount) { |
| const today = getDateStringInTimezone() |
| const tzDate = getDateInTimezone() |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}` |
|
|
| const dailyKey = `usage:cost:daily:${keyId}:${today}` |
| const monthlyKey = `usage:cost:monthly:${keyId}:${currentMonth}` |
| const hourlyKey = `usage:cost:hourly:${keyId}:${currentHour}` |
| const totalKey = `usage:cost:total:${keyId}` |
|
|
| logger.debug( |
| `💰 Incrementing cost for ${keyId}, amount: $${amount}, date: ${today}, dailyKey: ${dailyKey}` |
| ) |
|
|
| const results = await Promise.all([ |
| this.client.incrbyfloat(dailyKey, amount), |
| this.client.incrbyfloat(monthlyKey, amount), |
| this.client.incrbyfloat(hourlyKey, amount), |
| this.client.incrbyfloat(totalKey, amount), |
| |
| this.client.expire(dailyKey, 86400 * 30), |
| this.client.expire(monthlyKey, 86400 * 90), |
| this.client.expire(hourlyKey, 86400 * 7) |
| ]) |
|
|
| logger.debug(`💰 Cost incremented successfully, new daily total: $${results[0]}`) |
| } |
|
|
| |
| async getCostStats(keyId) { |
| const today = getDateStringInTimezone() |
| const tzDate = getDateInTimezone() |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const currentHour = `${today}:${String(getHourInTimezone(new Date())).padStart(2, '0')}` |
|
|
| const [daily, monthly, hourly, total] = await Promise.all([ |
| this.client.get(`usage:cost:daily:${keyId}:${today}`), |
| this.client.get(`usage:cost:monthly:${keyId}:${currentMonth}`), |
| this.client.get(`usage:cost:hourly:${keyId}:${currentHour}`), |
| this.client.get(`usage:cost:total:${keyId}`) |
| ]) |
|
|
| return { |
| daily: parseFloat(daily || 0), |
| monthly: parseFloat(monthly || 0), |
| hourly: parseFloat(hourly || 0), |
| total: parseFloat(total || 0) |
| } |
| } |
|
|
| |
| async getWeeklyOpusCost(keyId) { |
| const currentWeek = getWeekStringInTimezone() |
| const costKey = `usage:opus:weekly:${keyId}:${currentWeek}` |
| const cost = await this.client.get(costKey) |
| const result = parseFloat(cost || 0) |
| logger.debug( |
| `💰 Getting weekly Opus cost for ${keyId}, week: ${currentWeek}, key: ${costKey}, value: ${cost}, result: ${result}` |
| ) |
| return result |
| } |
|
|
| |
| async incrementWeeklyOpusCost(keyId, amount) { |
| const currentWeek = getWeekStringInTimezone() |
| const weeklyKey = `usage:opus:weekly:${keyId}:${currentWeek}` |
| const totalKey = `usage:opus:total:${keyId}` |
|
|
| logger.debug( |
| `💰 Incrementing weekly Opus cost for ${keyId}, week: ${currentWeek}, amount: $${amount}` |
| ) |
|
|
| |
| const pipeline = this.client.pipeline() |
| pipeline.incrbyfloat(weeklyKey, amount) |
| pipeline.incrbyfloat(totalKey, amount) |
| |
| pipeline.expire(weeklyKey, 14 * 24 * 3600) |
|
|
| const results = await pipeline.exec() |
| logger.debug(`💰 Opus cost incremented successfully, new weekly total: $${results[0][1]}`) |
| } |
|
|
| |
| async getAccountDailyCost(accountId) { |
| const CostCalculator = require('../utils/costCalculator') |
| const today = getDateStringInTimezone() |
|
|
| |
| const pattern = `account_usage:model:daily:${accountId}:*:${today}` |
| const modelKeys = await this.client.keys(pattern) |
|
|
| if (!modelKeys || modelKeys.length === 0) { |
| return 0 |
| } |
|
|
| let totalCost = 0 |
|
|
| for (const key of modelKeys) { |
| |
| |
| const parts = key.split(':') |
| const model = parts[4] |
|
|
| |
| const modelUsage = await this.client.hgetall(key) |
|
|
| if (modelUsage && (modelUsage.inputTokens || modelUsage.outputTokens)) { |
| const usage = { |
| input_tokens: parseInt(modelUsage.inputTokens || 0), |
| output_tokens: parseInt(modelUsage.outputTokens || 0), |
| cache_creation_input_tokens: parseInt(modelUsage.cacheCreateTokens || 0), |
| cache_read_input_tokens: parseInt(modelUsage.cacheReadTokens || 0) |
| } |
|
|
| |
| const costResult = CostCalculator.calculateCost(usage, model) |
| totalCost += costResult.costs.total |
|
|
| logger.debug( |
| `💰 Account ${accountId} daily cost for model ${model}: $${costResult.costs.total}` |
| ) |
| } |
| } |
|
|
| logger.debug(`💰 Account ${accountId} total daily cost: $${totalCost}`) |
| return totalCost |
| } |
|
|
| |
| async getAccountUsageStats(accountId, accountType = null) { |
| const accountKey = `account_usage:${accountId}` |
| const today = getDateStringInTimezone() |
| const accountDailyKey = `account_usage:daily:${accountId}:${today}` |
| const tzDate = getDateInTimezone() |
| const currentMonth = `${tzDate.getUTCFullYear()}-${String(tzDate.getUTCMonth() + 1).padStart( |
| 2, |
| '0' |
| )}` |
| const accountMonthlyKey = `account_usage:monthly:${accountId}:${currentMonth}` |
|
|
| const [total, daily, monthly] = await Promise.all([ |
| this.client.hgetall(accountKey), |
| this.client.hgetall(accountDailyKey), |
| this.client.hgetall(accountMonthlyKey) |
| ]) |
|
|
| |
| let accountData = {} |
| if (accountType === 'droid') { |
| accountData = await this.client.hgetall(`droid:account:${accountId}`) |
| } else if (accountType === 'openai') { |
| accountData = await this.client.hgetall(`openai:account:${accountId}`) |
| } else if (accountType === 'openai-responses') { |
| accountData = await this.client.hgetall(`openai_responses_account:${accountId}`) |
| } else { |
| |
| accountData = await this.client.hgetall(`claude_account:${accountId}`) |
| if (!accountData.createdAt) { |
| accountData = await this.client.hgetall(`openai:account:${accountId}`) |
| } |
| if (!accountData.createdAt) { |
| accountData = await this.client.hgetall(`openai_responses_account:${accountId}`) |
| } |
| if (!accountData.createdAt) { |
| accountData = await this.client.hgetall(`openai_account:${accountId}`) |
| } |
| if (!accountData.createdAt) { |
| accountData = await this.client.hgetall(`droid:account:${accountId}`) |
| } |
| } |
| const createdAt = accountData.createdAt ? new Date(accountData.createdAt) : new Date() |
| const now = new Date() |
| const daysSinceCreated = Math.max(1, Math.ceil((now - createdAt) / (1000 * 60 * 60 * 24))) |
|
|
| const totalTokens = parseInt(total.totalTokens) || 0 |
| const totalRequests = parseInt(total.totalRequests) || 0 |
|
|
| |
| const totalMinutes = Math.max(1, daysSinceCreated * 24 * 60) |
| const avgRPM = totalRequests / totalMinutes |
| const avgTPM = totalTokens / totalMinutes |
|
|
| |
| const handleAccountData = (data) => { |
| const tokens = parseInt(data.totalTokens) || parseInt(data.tokens) || 0 |
| const inputTokens = parseInt(data.totalInputTokens) || parseInt(data.inputTokens) || 0 |
| const outputTokens = parseInt(data.totalOutputTokens) || parseInt(data.outputTokens) || 0 |
| const requests = parseInt(data.totalRequests) || parseInt(data.requests) || 0 |
| const cacheCreateTokens = |
| parseInt(data.totalCacheCreateTokens) || parseInt(data.cacheCreateTokens) || 0 |
| const cacheReadTokens = |
| parseInt(data.totalCacheReadTokens) || parseInt(data.cacheReadTokens) || 0 |
| const allTokens = parseInt(data.totalAllTokens) || parseInt(data.allTokens) || 0 |
|
|
| const actualAllTokens = |
| allTokens || inputTokens + outputTokens + cacheCreateTokens + cacheReadTokens |
|
|
| return { |
| tokens, |
| inputTokens, |
| outputTokens, |
| cacheCreateTokens, |
| cacheReadTokens, |
| allTokens: actualAllTokens, |
| requests |
| } |
| } |
|
|
| const totalData = handleAccountData(total) |
| const dailyData = handleAccountData(daily) |
| const monthlyData = handleAccountData(monthly) |
|
|
| |
| const dailyCost = await this.getAccountDailyCost(accountId) |
|
|
| return { |
| accountId, |
| total: totalData, |
| daily: { |
| ...dailyData, |
| cost: dailyCost |
| }, |
| monthly: monthlyData, |
| averages: { |
| rpm: Math.round(avgRPM * 100) / 100, |
| tpm: Math.round(avgTPM * 100) / 100, |
| dailyRequests: Math.round((totalRequests / daysSinceCreated) * 100) / 100, |
| dailyTokens: Math.round((totalTokens / daysSinceCreated) * 100) / 100 |
| } |
| } |
| } |
|
|
| |
| async getAllAccountsUsageStats() { |
| try { |
| |
| const accountKeys = await this.client.keys('claude_account:*') |
| const accountStats = [] |
|
|
| for (const accountKey of accountKeys) { |
| const accountId = accountKey.replace('claude_account:', '') |
| const accountData = await this.client.hgetall(accountKey) |
|
|
| if (accountData.name) { |
| const stats = await this.getAccountUsageStats(accountId) |
| accountStats.push({ |
| id: accountId, |
| name: accountData.name, |
| email: accountData.email || '', |
| status: accountData.status || 'unknown', |
| isActive: accountData.isActive === 'true', |
| ...stats |
| }) |
| } |
| } |
|
|
| |
| accountStats.sort((a, b) => (b.daily.allTokens || 0) - (a.daily.allTokens || 0)) |
|
|
| return accountStats |
| } catch (error) { |
| logger.error('❌ Failed to get all accounts usage stats:', error) |
| return [] |
| } |
| } |
|
|
| |
| async resetAllUsageStats() { |
| const client = this.getClientSafe() |
| const stats = { |
| deletedKeys: 0, |
| deletedDailyKeys: 0, |
| deletedMonthlyKeys: 0, |
| resetApiKeys: 0 |
| } |
|
|
| try { |
| |
| const apiKeyIds = [] |
| const apiKeyKeys = await client.keys('apikey:*') |
|
|
| for (const key of apiKeyKeys) { |
| if (key === 'apikey:hash_map') { |
| continue |
| } |
| const keyId = key.replace('apikey:', '') |
| apiKeyIds.push(keyId) |
| } |
|
|
| |
| for (const keyId of apiKeyIds) { |
| |
| const usageKey = `usage:${keyId}` |
| const deleted = await client.del(usageKey) |
| if (deleted > 0) { |
| stats.deletedKeys++ |
| } |
|
|
| |
| const dailyKeys = await client.keys(`usage:daily:${keyId}:*`) |
| if (dailyKeys.length > 0) { |
| await client.del(...dailyKeys) |
| stats.deletedDailyKeys += dailyKeys.length |
| } |
|
|
| |
| const monthlyKeys = await client.keys(`usage:monthly:${keyId}:*`) |
| if (monthlyKeys.length > 0) { |
| await client.del(...monthlyKeys) |
| stats.deletedMonthlyKeys += monthlyKeys.length |
| } |
|
|
| |
| const keyData = await client.hgetall(`apikey:${keyId}`) |
| if (keyData && Object.keys(keyData).length > 0) { |
| keyData.lastUsedAt = '' |
| await client.hset(`apikey:${keyId}`, keyData) |
| stats.resetApiKeys++ |
| } |
| } |
|
|
| |
| const allUsageKeys = await client.keys('usage:*') |
| if (allUsageKeys.length > 0) { |
| await client.del(...allUsageKeys) |
| stats.deletedKeys += allUsageKeys.length |
| } |
|
|
| return stats |
| } catch (error) { |
| throw new Error(`Failed to reset usage stats: ${error.message}`) |
| } |
| } |
|
|
| |
| async setClaudeAccount(accountId, accountData) { |
| const key = `claude:account:${accountId}` |
| await this.client.hset(key, accountData) |
| } |
|
|
| async getClaudeAccount(accountId) { |
| const key = `claude:account:${accountId}` |
| return await this.client.hgetall(key) |
| } |
|
|
| async getAllClaudeAccounts() { |
| const keys = await this.client.keys('claude:account:*') |
| const accounts = [] |
| for (const key of keys) { |
| const accountData = await this.client.hgetall(key) |
| if (accountData && Object.keys(accountData).length > 0) { |
| accounts.push({ id: key.replace('claude:account:', ''), ...accountData }) |
| } |
| } |
| return accounts |
| } |
|
|
| async deleteClaudeAccount(accountId) { |
| const key = `claude:account:${accountId}` |
| return await this.client.del(key) |
| } |
|
|
| |
| async setDroidAccount(accountId, accountData) { |
| const key = `droid:account:${accountId}` |
| await this.client.hset(key, accountData) |
| } |
|
|
| async getDroidAccount(accountId) { |
| const key = `droid:account:${accountId}` |
| return await this.client.hgetall(key) |
| } |
|
|
| async getAllDroidAccounts() { |
| const keys = await this.client.keys('droid:account:*') |
| const accounts = [] |
| for (const key of keys) { |
| const accountData = await this.client.hgetall(key) |
| if (accountData && Object.keys(accountData).length > 0) { |
| accounts.push({ id: key.replace('droid:account:', ''), ...accountData }) |
| } |
| } |
| return accounts |
| } |
|
|
| async deleteDroidAccount(accountId) { |
| const key = `droid:account:${accountId}` |
| return await this.client.del(key) |
| } |
|
|
| async setOpenAiAccount(accountId, accountData) { |
| const key = `openai:account:${accountId}` |
| await this.client.hset(key, accountData) |
| } |
| async getOpenAiAccount(accountId) { |
| const key = `openai:account:${accountId}` |
| return await this.client.hgetall(key) |
| } |
| async deleteOpenAiAccount(accountId) { |
| const key = `openai:account:${accountId}` |
| return await this.client.del(key) |
| } |
|
|
| async getAllOpenAIAccounts() { |
| const keys = await this.client.keys('openai:account:*') |
| const accounts = [] |
| for (const key of keys) { |
| const accountData = await this.client.hgetall(key) |
| if (accountData && Object.keys(accountData).length > 0) { |
| accounts.push({ id: key.replace('openai:account:', ''), ...accountData }) |
| } |
| } |
| return accounts |
| } |
|
|
| |
| async setSession(sessionId, sessionData, ttl = 86400) { |
| const key = `session:${sessionId}` |
| await this.client.hset(key, sessionData) |
| await this.client.expire(key, ttl) |
| } |
|
|
| async getSession(sessionId) { |
| const key = `session:${sessionId}` |
| return await this.client.hgetall(key) |
| } |
|
|
| async deleteSession(sessionId) { |
| const key = `session:${sessionId}` |
| return await this.client.del(key) |
| } |
|
|
| |
| async setApiKeyHash(hashedKey, keyData, ttl = 0) { |
| const key = `apikey_hash:${hashedKey}` |
| await this.client.hset(key, keyData) |
| if (ttl > 0) { |
| await this.client.expire(key, ttl) |
| } |
| } |
|
|
| async getApiKeyHash(hashedKey) { |
| const key = `apikey_hash:${hashedKey}` |
| return await this.client.hgetall(key) |
| } |
|
|
| async deleteApiKeyHash(hashedKey) { |
| const key = `apikey_hash:${hashedKey}` |
| return await this.client.del(key) |
| } |
|
|
| |
| async setOAuthSession(sessionId, sessionData, ttl = 600) { |
| |
| const key = `oauth:${sessionId}` |
|
|
| |
| const serializedData = {} |
| for (const [dataKey, value] of Object.entries(sessionData)) { |
| if (typeof value === 'object' && value !== null) { |
| serializedData[dataKey] = JSON.stringify(value) |
| } else { |
| serializedData[dataKey] = value |
| } |
| } |
|
|
| await this.client.hset(key, serializedData) |
| await this.client.expire(key, ttl) |
| } |
|
|
| async getOAuthSession(sessionId) { |
| const key = `oauth:${sessionId}` |
| const data = await this.client.hgetall(key) |
|
|
| |
| if (data.proxy) { |
| try { |
| data.proxy = JSON.parse(data.proxy) |
| } catch (error) { |
| |
| data.proxy = null |
| } |
| } |
|
|
| return data |
| } |
|
|
| async deleteOAuthSession(sessionId) { |
| const key = `oauth:${sessionId}` |
| return await this.client.del(key) |
| } |
|
|
| |
| async getSystemStats() { |
| const keys = await Promise.all([ |
| this.client.keys('apikey:*'), |
| this.client.keys('claude:account:*'), |
| this.client.keys('usage:*') |
| ]) |
|
|
| return { |
| totalApiKeys: keys[0].length, |
| totalClaudeAccounts: keys[1].length, |
| totalUsageRecords: keys[2].length |
| } |
| } |
|
|
| |
| async getTodayStats() { |
| try { |
| const today = getDateStringInTimezone() |
| const dailyKeys = await this.client.keys(`usage:daily:*:${today}`) |
|
|
| let totalRequestsToday = 0 |
| let totalTokensToday = 0 |
| let totalInputTokensToday = 0 |
| let totalOutputTokensToday = 0 |
| let totalCacheCreateTokensToday = 0 |
| let totalCacheReadTokensToday = 0 |
|
|
| |
| if (dailyKeys.length > 0) { |
| const pipeline = this.client.pipeline() |
| dailyKeys.forEach((key) => pipeline.hgetall(key)) |
| const results = await pipeline.exec() |
|
|
| for (const [error, dailyData] of results) { |
| if (error || !dailyData) { |
| continue |
| } |
|
|
| totalRequestsToday += parseInt(dailyData.requests) || 0 |
| const currentDayTokens = parseInt(dailyData.tokens) || 0 |
| totalTokensToday += currentDayTokens |
|
|
| |
| const inputTokens = parseInt(dailyData.inputTokens) || 0 |
| const outputTokens = parseInt(dailyData.outputTokens) || 0 |
| const cacheCreateTokens = parseInt(dailyData.cacheCreateTokens) || 0 |
| const cacheReadTokens = parseInt(dailyData.cacheReadTokens) || 0 |
| const totalTokensFromSeparate = inputTokens + outputTokens |
|
|
| if (totalTokensFromSeparate === 0 && currentDayTokens > 0) { |
| |
| totalOutputTokensToday += Math.round(currentDayTokens * 0.7) |
| totalInputTokensToday += Math.round(currentDayTokens * 0.3) |
| } else { |
| |
| totalInputTokensToday += inputTokens |
| totalOutputTokensToday += outputTokens |
| } |
|
|
| |
| totalCacheCreateTokensToday += cacheCreateTokens |
| totalCacheReadTokensToday += cacheReadTokens |
| } |
| } |
|
|
| |
| const allApiKeys = await this.client.keys('apikey:*') |
| let apiKeysCreatedToday = 0 |
|
|
| if (allApiKeys.length > 0) { |
| const pipeline = this.client.pipeline() |
| allApiKeys.forEach((key) => pipeline.hget(key, 'createdAt')) |
| const results = await pipeline.exec() |
|
|
| for (const [error, createdAt] of results) { |
| if (!error && createdAt && createdAt.startsWith(today)) { |
| apiKeysCreatedToday++ |
| } |
| } |
| } |
|
|
| return { |
| requestsToday: totalRequestsToday, |
| tokensToday: totalTokensToday, |
| inputTokensToday: totalInputTokensToday, |
| outputTokensToday: totalOutputTokensToday, |
| cacheCreateTokensToday: totalCacheCreateTokensToday, |
| cacheReadTokensToday: totalCacheReadTokensToday, |
| apiKeysCreatedToday |
| } |
| } catch (error) { |
| console.error('Error getting today stats:', error) |
| return { |
| requestsToday: 0, |
| tokensToday: 0, |
| inputTokensToday: 0, |
| outputTokensToday: 0, |
| cacheCreateTokensToday: 0, |
| cacheReadTokensToday: 0, |
| apiKeysCreatedToday: 0 |
| } |
| } |
| } |
|
|
| |
| async getSystemAverages() { |
| try { |
| const allApiKeys = await this.client.keys('apikey:*') |
| let totalRequests = 0 |
| let totalTokens = 0 |
| let totalInputTokens = 0 |
| let totalOutputTokens = 0 |
| let oldestCreatedAt = new Date() |
|
|
| |
| const usageKeys = allApiKeys.map((key) => `usage:${key.replace('apikey:', '')}`) |
| const pipeline = this.client.pipeline() |
|
|
| |
| usageKeys.forEach((key) => pipeline.hgetall(key)) |
| |
| allApiKeys.forEach((key) => pipeline.hgetall(key)) |
|
|
| const results = await pipeline.exec() |
| const usageResults = results.slice(0, usageKeys.length) |
| const keyResults = results.slice(usageKeys.length) |
|
|
| for (let i = 0; i < allApiKeys.length; i++) { |
| const totalData = usageResults[i][1] || {} |
| const keyData = keyResults[i][1] || {} |
|
|
| totalRequests += parseInt(totalData.totalRequests) || 0 |
| totalTokens += parseInt(totalData.totalTokens) || 0 |
| totalInputTokens += parseInt(totalData.totalInputTokens) || 0 |
| totalOutputTokens += parseInt(totalData.totalOutputTokens) || 0 |
|
|
| const createdAt = keyData.createdAt ? new Date(keyData.createdAt) : new Date() |
| if (createdAt < oldestCreatedAt) { |
| oldestCreatedAt = createdAt |
| } |
| } |
|
|
| const now = new Date() |
| |
| const daysSinceOldest = Math.max( |
| 1, |
| Math.ceil((now - oldestCreatedAt) / (1000 * 60 * 60 * 24)) |
| ) |
| const totalMinutes = daysSinceOldest * 24 * 60 |
|
|
| return { |
| systemRPM: Math.round((totalRequests / totalMinutes) * 100) / 100, |
| systemTPM: Math.round((totalTokens / totalMinutes) * 100) / 100, |
| totalInputTokens, |
| totalOutputTokens, |
| totalTokens |
| } |
| } catch (error) { |
| console.error('Error getting system averages:', error) |
| return { |
| systemRPM: 0, |
| systemTPM: 0, |
| totalInputTokens: 0, |
| totalOutputTokens: 0, |
| totalTokens: 0 |
| } |
| } |
| } |
|
|
| |
| async getRealtimeSystemMetrics() { |
| try { |
| const configLocal = require('../../config/config') |
| const windowMinutes = configLocal.system.metricsWindow || 5 |
|
|
| const now = new Date() |
| const currentMinute = Math.floor(now.getTime() / 60000) |
|
|
| |
| logger.debug( |
| `🔍 Realtime metrics - Current time: ${now.toISOString()}, Minute timestamp: ${currentMinute}` |
| ) |
|
|
| |
| const pipeline = this.client.pipeline() |
| const minuteKeys = [] |
| for (let i = 0; i < windowMinutes; i++) { |
| const minuteKey = `system:metrics:minute:${currentMinute - i}` |
| minuteKeys.push(minuteKey) |
| pipeline.hgetall(minuteKey) |
| } |
|
|
| logger.debug(`🔍 Realtime metrics - Checking keys: ${minuteKeys.join(', ')}`) |
|
|
| const results = await pipeline.exec() |
|
|
| |
| let totalRequests = 0 |
| let totalTokens = 0 |
| let totalInputTokens = 0 |
| let totalOutputTokens = 0 |
| let totalCacheCreateTokens = 0 |
| let totalCacheReadTokens = 0 |
| let validDataCount = 0 |
|
|
| results.forEach(([err, data], index) => { |
| if (!err && data && Object.keys(data).length > 0) { |
| validDataCount++ |
| totalRequests += parseInt(data.requests || 0) |
| totalTokens += parseInt(data.totalTokens || 0) |
| totalInputTokens += parseInt(data.inputTokens || 0) |
| totalOutputTokens += parseInt(data.outputTokens || 0) |
| totalCacheCreateTokens += parseInt(data.cacheCreateTokens || 0) |
| totalCacheReadTokens += parseInt(data.cacheReadTokens || 0) |
|
|
| logger.debug(`🔍 Realtime metrics - Key ${minuteKeys[index]} data:`, { |
| requests: data.requests, |
| totalTokens: data.totalTokens |
| }) |
| } |
| }) |
|
|
| logger.debug( |
| `🔍 Realtime metrics - Valid data count: ${validDataCount}/${windowMinutes}, Total requests: ${totalRequests}, Total tokens: ${totalTokens}` |
| ) |
|
|
| |
| const realtimeRPM = |
| windowMinutes > 0 ? Math.round((totalRequests / windowMinutes) * 100) / 100 : 0 |
| const realtimeTPM = |
| windowMinutes > 0 ? Math.round((totalTokens / windowMinutes) * 100) / 100 : 0 |
|
|
| const result = { |
| realtimeRPM, |
| realtimeTPM, |
| windowMinutes, |
| totalRequests, |
| totalTokens, |
| totalInputTokens, |
| totalOutputTokens, |
| totalCacheCreateTokens, |
| totalCacheReadTokens |
| } |
|
|
| logger.debug('🔍 Realtime metrics - Final result:', result) |
|
|
| return result |
| } catch (error) { |
| console.error('Error getting realtime system metrics:', error) |
| |
| const historicalMetrics = await this.getSystemAverages() |
| return { |
| realtimeRPM: historicalMetrics.systemRPM, |
| realtimeTPM: historicalMetrics.systemTPM, |
| windowMinutes: 0, |
| totalRequests: 0, |
| totalTokens: historicalMetrics.totalTokens, |
| totalInputTokens: historicalMetrics.totalInputTokens, |
| totalOutputTokens: historicalMetrics.totalOutputTokens, |
| totalCacheCreateTokens: 0, |
| totalCacheReadTokens: 0 |
| } |
| } |
| } |
|
|
| |
| async setSessionAccountMapping(sessionHash, accountId, ttl = null) { |
| const appConfig = require('../../config/config') |
| |
| const defaultTTL = ttl !== null ? ttl : (appConfig.session?.stickyTtlHours || 1) * 60 * 60 |
| const key = `sticky_session:${sessionHash}` |
| await this.client.set(key, accountId, 'EX', defaultTTL) |
| } |
|
|
| async getSessionAccountMapping(sessionHash) { |
| const key = `sticky_session:${sessionHash}` |
| return await this.client.get(key) |
| } |
|
|
| |
| async extendSessionAccountMappingTTL(sessionHash) { |
| const appConfig = require('../../config/config') |
| const key = `sticky_session:${sessionHash}` |
|
|
| |
| const ttlHours = appConfig.session?.stickyTtlHours || 1 |
| const thresholdMinutes = appConfig.session?.renewalThresholdMinutes || 0 |
|
|
| |
| if (thresholdMinutes === 0) { |
| return true |
| } |
|
|
| const fullTTL = ttlHours * 60 * 60 |
| const renewalThreshold = thresholdMinutes * 60 |
|
|
| try { |
| |
| const remainingTTL = await this.client.ttl(key) |
|
|
| |
| if (remainingTTL === -2) { |
| return false |
| } |
|
|
| |
| if (remainingTTL === -1) { |
| return true |
| } |
|
|
| |
| if (remainingTTL < renewalThreshold) { |
| await this.client.expire(key, fullTTL) |
| logger.debug( |
| `🔄 Renewed sticky session TTL: ${sessionHash} (was ${Math.round( |
| remainingTTL / 60 |
| )}min, renewed to ${ttlHours}h)` |
| ) |
| return true |
| } |
|
|
| |
| logger.debug( |
| `✅ Sticky session TTL sufficient: ${sessionHash} (remaining ${Math.round( |
| remainingTTL / 60 |
| )}min)` |
| ) |
| return true |
| } catch (error) { |
| logger.error('❌ Failed to extend session TTL:', error) |
| return false |
| } |
| } |
|
|
| async deleteSessionAccountMapping(sessionHash) { |
| const key = `sticky_session:${sessionHash}` |
| return await this.client.del(key) |
| } |
|
|
| |
| async cleanup() { |
| try { |
| const patterns = ['usage:daily:*', 'ratelimit:*', 'session:*', 'sticky_session:*', 'oauth:*'] |
|
|
| for (const pattern of patterns) { |
| const keys = await this.client.keys(pattern) |
| const pipeline = this.client.pipeline() |
|
|
| for (const key of keys) { |
| const ttl = await this.client.ttl(key) |
| if (ttl === -1) { |
| |
| if (key.startsWith('oauth:')) { |
| pipeline.expire(key, 600) |
| } else { |
| pipeline.expire(key, 86400) |
| } |
| } |
| } |
|
|
| await pipeline.exec() |
| } |
|
|
| logger.info('🧹 Redis cleanup completed') |
| } catch (error) { |
| logger.error('❌ Redis cleanup failed:', error) |
| } |
| } |
|
|
| |
| _getConcurrencyConfig() { |
| const defaults = { |
| leaseSeconds: 300, |
| renewIntervalSeconds: 30, |
| cleanupGraceSeconds: 30 |
| } |
|
|
| const configValues = { |
| ...defaults, |
| ...(config.concurrency || {}) |
| } |
|
|
| const normalizeNumber = (value, fallback, options = {}) => { |
| const parsed = Number(value) |
| if (!Number.isFinite(parsed)) { |
| return fallback |
| } |
|
|
| if (options.allowZero && parsed === 0) { |
| return 0 |
| } |
|
|
| if (options.min !== undefined && parsed < options.min) { |
| return options.min |
| } |
|
|
| return parsed |
| } |
|
|
| return { |
| leaseSeconds: normalizeNumber(configValues.leaseSeconds, defaults.leaseSeconds, { |
| min: 30 |
| }), |
| renewIntervalSeconds: normalizeNumber( |
| configValues.renewIntervalSeconds, |
| defaults.renewIntervalSeconds, |
| { |
| allowZero: true, |
| min: 0 |
| } |
| ), |
| cleanupGraceSeconds: normalizeNumber( |
| configValues.cleanupGraceSeconds, |
| defaults.cleanupGraceSeconds, |
| { |
| min: 0 |
| } |
| ) |
| } |
| } |
|
|
| |
| async incrConcurrency(apiKeyId, requestId, leaseSeconds = null) { |
| if (!requestId) { |
| throw new Error('Request ID is required for concurrency tracking') |
| } |
|
|
| try { |
| const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } = |
| this._getConcurrencyConfig() |
| const lease = leaseSeconds || defaultLeaseSeconds |
| const key = `concurrency:${apiKeyId}` |
| const now = Date.now() |
| const expireAt = now + lease * 1000 |
| const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000) |
|
|
| const luaScript = ` |
| local key = KEYS[1] |
| local member = ARGV[1] |
| local expireAt = tonumber(ARGV[2]) |
| local now = tonumber(ARGV[3]) |
| local ttl = tonumber(ARGV[4]) |
| |
| redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
| redis.call('ZADD', key, expireAt, member) |
| |
| if ttl > 0 then |
| redis.call('PEXPIRE', key, ttl) |
| end |
| |
| local count = redis.call('ZCARD', key) |
| return count |
| ` |
|
|
| const count = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl) |
| logger.database( |
| `🔢 Incremented concurrency for key ${apiKeyId}: ${count} (request ${requestId})` |
| ) |
| return count |
| } catch (error) { |
| logger.error('❌ Failed to increment concurrency:', error) |
| throw error |
| } |
| } |
|
|
| |
| async refreshConcurrencyLease(apiKeyId, requestId, leaseSeconds = null) { |
| if (!requestId) { |
| return 0 |
| } |
|
|
| try { |
| const { leaseSeconds: defaultLeaseSeconds, cleanupGraceSeconds } = |
| this._getConcurrencyConfig() |
| const lease = leaseSeconds || defaultLeaseSeconds |
| const key = `concurrency:${apiKeyId}` |
| const now = Date.now() |
| const expireAt = now + lease * 1000 |
| const ttl = Math.max((lease + cleanupGraceSeconds) * 1000, 60000) |
|
|
| const luaScript = ` |
| local key = KEYS[1] |
| local member = ARGV[1] |
| local expireAt = tonumber(ARGV[2]) |
| local now = tonumber(ARGV[3]) |
| local ttl = tonumber(ARGV[4]) |
| |
| redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
| |
| local exists = redis.call('ZSCORE', key, member) |
| |
| if exists then |
| redis.call('ZADD', key, expireAt, member) |
| if ttl > 0 then |
| redis.call('PEXPIRE', key, ttl) |
| end |
| return 1 |
| end |
| |
| return 0 |
| ` |
|
|
| const refreshed = await this.client.eval(luaScript, 1, key, requestId, expireAt, now, ttl) |
| if (refreshed === 1) { |
| logger.debug(`🔄 Refreshed concurrency lease for key ${apiKeyId} (request ${requestId})`) |
| } |
| return refreshed |
| } catch (error) { |
| logger.error('❌ Failed to refresh concurrency lease:', error) |
| return 0 |
| } |
| } |
|
|
| |
| async decrConcurrency(apiKeyId, requestId) { |
| try { |
| const key = `concurrency:${apiKeyId}` |
| const now = Date.now() |
|
|
| const luaScript = ` |
| local key = KEYS[1] |
| local member = ARGV[1] |
| local now = tonumber(ARGV[2]) |
| |
| if member then |
| redis.call('ZREM', key, member) |
| end |
| |
| redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
| |
| local count = redis.call('ZCARD', key) |
| if count <= 0 then |
| redis.call('DEL', key) |
| return 0 |
| end |
| |
| return count |
| ` |
|
|
| const count = await this.client.eval(luaScript, 1, key, requestId || '', now) |
| logger.database( |
| `🔢 Decremented concurrency for key ${apiKeyId}: ${count} (request ${requestId || 'n/a'})` |
| ) |
| return count |
| } catch (error) { |
| logger.error('❌ Failed to decrement concurrency:', error) |
| throw error |
| } |
| } |
|
|
| |
| async getConcurrency(apiKeyId) { |
| try { |
| const key = `concurrency:${apiKeyId}` |
| const now = Date.now() |
|
|
| const luaScript = ` |
| local key = KEYS[1] |
| local now = tonumber(ARGV[1]) |
| |
| redis.call('ZREMRANGEBYSCORE', key, '-inf', now) |
| return redis.call('ZCARD', key) |
| ` |
|
|
| const count = await this.client.eval(luaScript, 1, key, now) |
| return parseInt(count || 0) |
| } catch (error) { |
| logger.error('❌ Failed to get concurrency:', error) |
| return 0 |
| } |
| } |
|
|
| |
| async get(key) { |
| const client = this.getClientSafe() |
| return await client.get(key) |
| } |
|
|
| async set(key, value, ...args) { |
| const client = this.getClientSafe() |
| return await client.set(key, value, ...args) |
| } |
|
|
| async setex(key, ttl, value) { |
| const client = this.getClientSafe() |
| return await client.setex(key, ttl, value) |
| } |
|
|
| async del(...keys) { |
| const client = this.getClientSafe() |
| return await client.del(...keys) |
| } |
|
|
| async keys(pattern) { |
| const client = this.getClientSafe() |
| return await client.keys(pattern) |
| } |
|
|
| |
| async getAccountSessionWindowUsage(accountId, windowStart, windowEnd) { |
| try { |
| if (!windowStart || !windowEnd) { |
| return { |
| totalInputTokens: 0, |
| totalOutputTokens: 0, |
| totalCacheCreateTokens: 0, |
| totalCacheReadTokens: 0, |
| totalAllTokens: 0, |
| totalRequests: 0, |
| modelUsage: {} |
| } |
| } |
|
|
| const startDate = new Date(windowStart) |
| const endDate = new Date(windowEnd) |
|
|
| |
| logger.debug(`📊 Getting session window usage for account ${accountId}`) |
| logger.debug(` Window: ${windowStart} to ${windowEnd}`) |
| logger.debug(` Start UTC: ${startDate.toISOString()}, End UTC: ${endDate.toISOString()}`) |
|
|
| |
| |
| const hourlyKeys = [] |
| const currentHour = new Date(startDate) |
| currentHour.setMinutes(0) |
| currentHour.setSeconds(0) |
| currentHour.setMilliseconds(0) |
|
|
| while (currentHour <= endDate) { |
| |
| const tzDateStr = getDateStringInTimezone(currentHour) |
| const tzHour = String(getHourInTimezone(currentHour)).padStart(2, '0') |
| const key = `account_usage:hourly:${accountId}:${tzDateStr}:${tzHour}` |
|
|
| logger.debug(` Adding hourly key: ${key}`) |
| hourlyKeys.push(key) |
| currentHour.setHours(currentHour.getHours() + 1) |
| } |
|
|
| |
| const pipeline = this.client.pipeline() |
| for (const key of hourlyKeys) { |
| pipeline.hgetall(key) |
| } |
| const results = await pipeline.exec() |
|
|
| |
| let totalInputTokens = 0 |
| let totalOutputTokens = 0 |
| let totalCacheCreateTokens = 0 |
| let totalCacheReadTokens = 0 |
| let totalAllTokens = 0 |
| let totalRequests = 0 |
| const modelUsage = {} |
|
|
| logger.debug(` Processing ${results.length} hourly results`) |
|
|
| for (const [error, data] of results) { |
| if (error || !data || Object.keys(data).length === 0) { |
| continue |
| } |
|
|
| |
| const hourInputTokens = parseInt(data.inputTokens || 0) |
| const hourOutputTokens = parseInt(data.outputTokens || 0) |
| const hourCacheCreateTokens = parseInt(data.cacheCreateTokens || 0) |
| const hourCacheReadTokens = parseInt(data.cacheReadTokens || 0) |
| const hourAllTokens = parseInt(data.allTokens || 0) |
| const hourRequests = parseInt(data.requests || 0) |
|
|
| totalInputTokens += hourInputTokens |
| totalOutputTokens += hourOutputTokens |
| totalCacheCreateTokens += hourCacheCreateTokens |
| totalCacheReadTokens += hourCacheReadTokens |
| totalAllTokens += hourAllTokens |
| totalRequests += hourRequests |
|
|
| if (hourAllTokens > 0) { |
| logger.debug(` Hour data: allTokens=${hourAllTokens}, requests=${hourRequests}`) |
| } |
|
|
| |
| for (const [key, value] of Object.entries(data)) { |
| |
| if (key.startsWith('model:')) { |
| const parts = key.split(':') |
| if (parts.length >= 3) { |
| const modelName = parts[1] |
| const metric = parts.slice(2).join(':') |
|
|
| if (!modelUsage[modelName]) { |
| modelUsage[modelName] = { |
| inputTokens: 0, |
| outputTokens: 0, |
| cacheCreateTokens: 0, |
| cacheReadTokens: 0, |
| allTokens: 0, |
| requests: 0 |
| } |
| } |
|
|
| if (metric === 'inputTokens') { |
| modelUsage[modelName].inputTokens += parseInt(value || 0) |
| } else if (metric === 'outputTokens') { |
| modelUsage[modelName].outputTokens += parseInt(value || 0) |
| } else if (metric === 'cacheCreateTokens') { |
| modelUsage[modelName].cacheCreateTokens += parseInt(value || 0) |
| } else if (metric === 'cacheReadTokens') { |
| modelUsage[modelName].cacheReadTokens += parseInt(value || 0) |
| } else if (metric === 'allTokens') { |
| modelUsage[modelName].allTokens += parseInt(value || 0) |
| } else if (metric === 'requests') { |
| modelUsage[modelName].requests += parseInt(value || 0) |
| } |
| } |
| } |
| } |
| } |
|
|
| logger.debug(`📊 Session window usage summary:`) |
| logger.debug(` Total allTokens: ${totalAllTokens}`) |
| logger.debug(` Total requests: ${totalRequests}`) |
| logger.debug(` Input: ${totalInputTokens}, Output: ${totalOutputTokens}`) |
| logger.debug( |
| ` Cache Create: ${totalCacheCreateTokens}, Cache Read: ${totalCacheReadTokens}` |
| ) |
|
|
| return { |
| totalInputTokens, |
| totalOutputTokens, |
| totalCacheCreateTokens, |
| totalCacheReadTokens, |
| totalAllTokens, |
| totalRequests, |
| modelUsage |
| } |
| } catch (error) { |
| logger.error(`❌ Failed to get session window usage for account ${accountId}:`, error) |
| return { |
| totalInputTokens: 0, |
| totalOutputTokens: 0, |
| totalCacheCreateTokens: 0, |
| totalCacheReadTokens: 0, |
| totalAllTokens: 0, |
| totalRequests: 0, |
| modelUsage: {} |
| } |
| } |
| } |
| } |
|
|
| const redisClient = new RedisClient() |
|
|
| |
| redisClient.setAccountLock = async function (lockKey, lockValue, ttlMs) { |
| try { |
| |
| |
| const result = await this.client.set(lockKey, lockValue, 'PX', ttlMs, 'NX') |
| return result === 'OK' |
| } catch (error) { |
| logger.error(`Failed to acquire lock ${lockKey}:`, error) |
| return false |
| } |
| } |
|
|
| redisClient.releaseAccountLock = async function (lockKey, lockValue) { |
| try { |
| |
| const script = ` |
| if redis.call("get", KEYS[1]) == ARGV[1] then |
| return redis.call("del", KEYS[1]) |
| else |
| return 0 |
| end |
| ` |
| |
| const result = await this.client.eval(script, 1, lockKey, lockValue) |
| return result === 1 |
| } catch (error) { |
| logger.error(`Failed to release lock ${lockKey}:`, error) |
| return false |
| } |
| } |
|
|
| |
| redisClient.getDateInTimezone = getDateInTimezone |
| redisClient.getDateStringInTimezone = getDateStringInTimezone |
| redisClient.getHourInTimezone = getHourInTimezone |
| redisClient.getWeekStringInTimezone = getWeekStringInTimezone |
|
|
| module.exports = redisClient |
|
|