| #!/usr/bin/env node |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| const path = require('path') |
| const Redis = require('ioredis') |
|
|
| |
| require('dotenv').config({ path: path.join(__dirname, '../.env') }) |
|
|
| const config = { |
| host: process.env.REDIS_HOST || 'localhost', |
| port: parseInt(process.env.REDIS_PORT) || 6379, |
| password: process.env.REDIS_PASSWORD || '', |
| db: parseInt(process.env.REDIS_DB) || 0 |
| } |
|
|
| const redis = new Redis(config) |
| const STREAM_KEY = 'billing:events' |
|
|
| |
| |
| |
|
|
| |
| |
| |
| async function publishTestEvent() { |
| console.log('📤 Publishing test billing event...') |
|
|
| const testEvent = { |
| eventId: `test-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, |
| eventType: 'usage.recorded', |
| timestamp: new Date().toISOString(), |
| version: '1.0', |
| apiKey: { |
| id: 'test-key-123', |
| name: 'Test API Key', |
| userId: 'test-user-456' |
| }, |
| usage: { |
| model: 'claude-sonnet-4-20250514', |
| inputTokens: 1500, |
| outputTokens: 800, |
| cacheCreateTokens: 200, |
| cacheReadTokens: 100, |
| ephemeral5mTokens: 150, |
| ephemeral1hTokens: 50, |
| totalTokens: 2600 |
| }, |
| cost: { |
| total: 0.0156, |
| currency: 'USD', |
| breakdown: { |
| input: 0.0045, |
| output: 0.012, |
| cacheCreate: 0.00075, |
| cacheRead: 0.00003, |
| ephemeral5m: 0.0005625, |
| ephemeral1h: 0.0001875 |
| } |
| }, |
| account: { |
| id: 'test-account-789', |
| type: 'claude-official' |
| }, |
| context: { |
| isLongContext: false, |
| requestTimestamp: new Date().toISOString() |
| } |
| } |
|
|
| try { |
| const messageId = await redis.xadd( |
| STREAM_KEY, |
| 'MAXLEN', |
| '~', |
| 100000, |
| '*', |
| 'data', |
| JSON.stringify(testEvent) |
| ) |
|
|
| console.log('✅ Event published successfully!') |
| console.log(` Message ID: ${messageId}`) |
| console.log(` Event ID: ${testEvent.eventId}`) |
| console.log(` Cost: $${testEvent.cost.total}`) |
| } catch (error) { |
| console.error('❌ Failed to publish event:', error.message) |
| process.exit(1) |
| } |
| } |
|
|
| |
| |
| |
| async function consumeTestEvents() { |
| console.log('📬 Consuming test events...') |
| console.log(' Press Ctrl+C to stop\n') |
|
|
| let isRunning = true |
|
|
| process.on('SIGINT', () => { |
| console.log('\n⏹️ Stopping consumer...') |
| isRunning = false |
| }) |
|
|
| let lastId = '0' |
|
|
| while (isRunning) { |
| try { |
| |
| const messages = await redis.xread('BLOCK', 5000, 'COUNT', 10, 'STREAMS', STREAM_KEY, lastId) |
|
|
| if (!messages || messages.length === 0) { |
| continue |
| } |
|
|
| const [streamKey, entries] = messages[0] |
| console.log(`📬 Received ${entries.length} messages from ${streamKey}\n`) |
|
|
| for (const [messageId, fields] of entries) { |
| try { |
| const data = {} |
| for (let i = 0; i < fields.length; i += 2) { |
| data[fields[i]] = fields[i + 1] |
| } |
|
|
| const event = JSON.parse(data.data) |
|
|
| console.log(`📊 Event: ${event.eventId}`) |
| console.log(` API Key: ${event.apiKey.name} (${event.apiKey.id})`) |
| console.log(` Model: ${event.usage.model}`) |
| console.log(` Tokens: ${event.usage.totalTokens}`) |
| console.log(` Cost: $${event.cost.total.toFixed(6)}`) |
| console.log(` Timestamp: ${event.timestamp}`) |
| console.log('') |
|
|
| lastId = messageId |
| } catch (parseError) { |
| console.error(`❌ Failed to parse message ${messageId}:`, parseError.message) |
| } |
| } |
| } catch (error) { |
| if (isRunning) { |
| console.error('❌ Error consuming messages:', error.message) |
| await new Promise((resolve) => setTimeout(resolve, 5000)) |
| } |
| } |
| } |
|
|
| console.log('👋 Consumer stopped') |
| } |
|
|
| |
| |
| |
| async function showQueueInfo() { |
| console.log('📊 Queue Information\n') |
|
|
| try { |
| |
| const length = await redis.xlen(STREAM_KEY) |
| console.log(`Stream: ${STREAM_KEY}`) |
| console.log(`Length: ${length} messages\n`) |
|
|
| if (length === 0) { |
| console.log('ℹ️ Queue is empty') |
| return |
| } |
|
|
| |
| const info = await redis.xinfo('STREAM', STREAM_KEY) |
| const infoObj = {} |
| for (let i = 0; i < info.length; i += 2) { |
| infoObj[info[i]] = info[i + 1] |
| } |
|
|
| console.log('Stream Details:') |
| console.log(` First Entry ID: ${infoObj['first-entry'] ? infoObj['first-entry'][0] : 'N/A'}`) |
| console.log(` Last Entry ID: ${infoObj['last-entry'] ? infoObj['last-entry'][0] : 'N/A'}`) |
| console.log(` Consumer Groups: ${infoObj.groups || 0}\n`) |
|
|
| |
| if (infoObj.groups > 0) { |
| console.log('Consumer Groups:') |
| const groups = await redis.xinfo('GROUPS', STREAM_KEY) |
|
|
| for (let i = 0; i < groups.length; i++) { |
| const group = groups[i] |
| const groupObj = {} |
| for (let j = 0; j < group.length; j += 2) { |
| groupObj[group[j]] = group[j + 1] |
| } |
|
|
| console.log(`\n Group: ${groupObj.name}`) |
| console.log(` Consumers: ${groupObj.consumers}`) |
| console.log(` Pending: ${groupObj.pending}`) |
| console.log(` Last Delivered ID: ${groupObj['last-delivered-id']}`) |
|
|
| |
| if (groupObj.consumers > 0) { |
| const consumers = await redis.xinfo('CONSUMERS', STREAM_KEY, groupObj.name) |
| console.log(' Consumer Details:') |
|
|
| for (let k = 0; k < consumers.length; k++) { |
| const consumer = consumers[k] |
| const consumerObj = {} |
| for (let l = 0; l < consumer.length; l += 2) { |
| consumerObj[consumer[l]] = consumer[l + 1] |
| } |
|
|
| console.log(` - ${consumerObj.name}`) |
| console.log(` Pending: ${consumerObj.pending}`) |
| console.log(` Idle: ${Math.round(consumerObj.idle / 1000)}s`) |
| } |
| } |
| } |
| } |
|
|
| |
| console.log('\n📬 Latest 5 Messages:') |
| const latest = await redis.xrevrange(STREAM_KEY, '+', '-', 'COUNT', 5) |
|
|
| if (latest.length === 0) { |
| console.log(' No messages') |
| } else { |
| for (const [messageId, fields] of latest) { |
| const data = {} |
| for (let i = 0; i < fields.length; i += 2) { |
| data[fields[i]] = fields[i + 1] |
| } |
|
|
| try { |
| const event = JSON.parse(data.data) |
| console.log(`\n ${messageId}`) |
| console.log(` Event ID: ${event.eventId}`) |
| console.log(` Model: ${event.usage.model}`) |
| console.log(` Cost: $${event.cost.total.toFixed(6)}`) |
| console.log(` Time: ${event.timestamp}`) |
| } catch (e) { |
| console.log(`\n ${messageId} (Parse Error)`) |
| } |
| } |
| } |
| } catch (error) { |
| console.error('❌ Failed to get queue info:', error.message) |
| process.exit(1) |
| } |
| } |
|
|
| |
| |
| |
| async function clearQueue() { |
| console.log('⚠️ WARNING: This will delete all messages in the queue!') |
| console.log(` Stream: ${STREAM_KEY}`) |
|
|
| |
| const readline = require('readline') |
| const rl = readline.createInterface({ |
| input: process.stdin, |
| output: process.stdout |
| }) |
|
|
| rl.question('Type "yes" to confirm: ', async (answer) => { |
| if (answer.toLowerCase() === 'yes') { |
| try { |
| await redis.del(STREAM_KEY) |
| console.log('✅ Queue cleared successfully') |
| } catch (error) { |
| console.error('❌ Failed to clear queue:', error.message) |
| } |
| } else { |
| console.log('❌ Operation cancelled') |
| } |
| rl.close() |
| redis.quit() |
| }) |
| } |
|
|
| |
| |
| |
|
|
| async function main() { |
| const command = process.argv[2] || 'info' |
|
|
| console.log('🔧 Billing Events Test Tool\n') |
|
|
| try { |
| switch (command) { |
| case 'publish': |
| await publishTestEvent() |
| break |
|
|
| case 'consume': |
| await consumeTestEvents() |
| break |
|
|
| case 'info': |
| await showQueueInfo() |
| break |
|
|
| case 'clear': |
| await clearQueue() |
| return |
|
|
| default: |
| console.error(`❌ Unknown command: ${command}`) |
| console.log('\nAvailable commands:') |
| console.log(' publish - Publish a test event') |
| console.log(' consume - Consume events (test mode)') |
| console.log(' info - Show queue status') |
| console.log(' clear - Clear the queue (dangerous)') |
| process.exit(1) |
| } |
|
|
| await redis.quit() |
| } catch (error) { |
| console.error('💥 Fatal error:', error) |
| await redis.quit() |
| process.exit(1) |
| } |
| } |
|
|
| main() |
|
|