| const axios = require('axios') |
| const ProxyHelper = require('../utils/proxyHelper') |
| const logger = require('../utils/logger') |
| const config = require('../../config/config') |
|
|
| |
| function normalizeModelName(model) { |
| if (model && model.startsWith('azure/')) { |
| return model.replace('azure/', '') |
| } |
| return model |
| } |
|
|
| |
| async function handleAzureOpenAIRequest({ |
| account, |
| requestBody, |
| headers: _headers = {}, // 前缀下划线表示未使用 |
| isStream = false, |
| endpoint = 'chat/completions' |
| }) { |
| |
| let requestUrl = '' |
| let proxyAgent = null |
| let deploymentName = '' |
|
|
| try { |
| |
| const baseUrl = account.azureEndpoint |
| deploymentName = account.deploymentName || 'default' |
| |
| const apiVersion = |
| account.apiVersion || (endpoint === 'responses' ? '2025-04-01-preview' : '2024-02-01') |
| if (endpoint === 'chat/completions') { |
| requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/chat/completions?api-version=${apiVersion}` |
| } else if (endpoint === 'responses') { |
| requestUrl = `${baseUrl}/openai/responses?api-version=${apiVersion}` |
| } else { |
| requestUrl = `${baseUrl}/openai/deployments/${deploymentName}/${endpoint}?api-version=${apiVersion}` |
| } |
|
|
| |
| const requestHeaders = { |
| 'Content-Type': 'application/json', |
| 'api-key': account.apiKey |
| } |
|
|
| |
| delete requestHeaders['anthropic-version'] |
| delete requestHeaders['x-api-key'] |
| delete requestHeaders['host'] |
|
|
| |
| const processedBody = { ...requestBody } |
|
|
| |
| if (endpoint === 'responses') { |
| processedBody.model = deploymentName |
| } else if (processedBody.model) { |
| processedBody.model = normalizeModelName(processedBody.model) |
| } else { |
| processedBody.model = 'gpt-4' |
| } |
|
|
| |
| proxyAgent = ProxyHelper.createProxyAgent(account.proxy) |
|
|
| |
| const axiosConfig = { |
| method: 'POST', |
| url: requestUrl, |
| headers: requestHeaders, |
| data: processedBody, |
| timeout: config.requestTimeout || 600000, |
| validateStatus: () => true, |
| |
| keepAlive: true, |
| maxRedirects: 5, |
| |
| socketKeepAlive: true |
| } |
|
|
| |
| if (proxyAgent) { |
| axiosConfig.httpAgent = proxyAgent |
| axiosConfig.httpsAgent = proxyAgent |
| axiosConfig.proxy = false |
| |
| if (proxyAgent.options) { |
| proxyAgent.options.keepAlive = true |
| proxyAgent.options.keepAliveMsecs = 1000 |
| } |
| logger.debug( |
| `Using proxy for Azure OpenAI request: ${ProxyHelper.getProxyDescription(account.proxy)}` |
| ) |
| } |
|
|
| |
| if (isStream) { |
| axiosConfig.responseType = 'stream' |
| requestHeaders.accept = 'text/event-stream' |
| } else { |
| requestHeaders.accept = 'application/json' |
| } |
|
|
| logger.debug(`Making Azure OpenAI request`, { |
| requestUrl, |
| method: 'POST', |
| endpoint, |
| deploymentName, |
| apiVersion, |
| hasProxy: !!proxyAgent, |
| proxyInfo: ProxyHelper.maskProxyInfo(account.proxy), |
| isStream, |
| requestBodySize: JSON.stringify(processedBody).length |
| }) |
|
|
| logger.debug('Azure OpenAI request headers', { |
| 'content-type': requestHeaders['Content-Type'], |
| 'user-agent': requestHeaders['user-agent'] || 'not-set', |
| customHeaders: Object.keys(requestHeaders).filter( |
| (key) => !['Content-Type', 'user-agent'].includes(key) |
| ) |
| }) |
|
|
| logger.debug('Azure OpenAI request body', { |
| model: processedBody.model, |
| messages: processedBody.messages?.length || 0, |
| otherParams: Object.keys(processedBody).filter((key) => !['model', 'messages'].includes(key)) |
| }) |
|
|
| const requestStartTime = Date.now() |
| logger.debug(`🔄 Starting Azure OpenAI HTTP request at ${new Date().toISOString()}`) |
|
|
| |
| const response = await axios(axiosConfig) |
|
|
| const requestDuration = Date.now() - requestStartTime |
| logger.debug(`✅ Azure OpenAI HTTP request completed at ${new Date().toISOString()}`) |
|
|
| logger.debug(`Azure OpenAI response received`, { |
| status: response.status, |
| statusText: response.statusText, |
| duration: `${requestDuration}ms`, |
| responseHeaders: Object.keys(response.headers || {}), |
| hasData: !!response.data, |
| contentType: response.headers?.['content-type'] || 'unknown' |
| }) |
|
|
| return response |
| } catch (error) { |
| const errorDetails = { |
| message: error.message, |
| code: error.code, |
| status: error.response?.status, |
| statusText: error.response?.statusText, |
| responseData: error.response?.data, |
| requestUrl: requestUrl || 'unknown', |
| endpoint, |
| deploymentName: deploymentName || account?.deploymentName || 'unknown', |
| hasProxy: !!proxyAgent, |
| proxyType: account?.proxy?.type || 'none', |
| isTimeout: error.code === 'ECONNABORTED', |
| isNetworkError: !error.response, |
| stack: error.stack |
| } |
|
|
| |
| if (error.code === 'ENOTFOUND') { |
| logger.error('DNS Resolution Failed for Azure OpenAI', { |
| ...errorDetails, |
| hostname: requestUrl && requestUrl !== 'unknown' ? new URL(requestUrl).hostname : 'unknown', |
| suggestion: 'Check if Azure endpoint URL is correct and accessible' |
| }) |
| } else if (error.code === 'ECONNREFUSED') { |
| logger.error('Connection Refused by Azure OpenAI', { |
| ...errorDetails, |
| suggestion: 'Check if proxy settings are correct or Azure service is accessible' |
| }) |
| } else if (error.code === 'ECONNRESET' || error.message.includes('socket hang up')) { |
| logger.error('🚨 Azure OpenAI Connection Reset / Socket Hang Up', { |
| ...errorDetails, |
| suggestion: |
| 'Connection was dropped by Azure OpenAI or proxy. This might be due to long request processing time, proxy timeout, or network instability. Try reducing request complexity or check proxy settings.' |
| }) |
| } else if (error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT') { |
| logger.error('🚨 Azure OpenAI Request Timeout', { |
| ...errorDetails, |
| timeoutMs: 600000, |
| suggestion: |
| 'Request exceeded 10-minute timeout. Consider reducing model complexity or check if Azure service is responding slowly.' |
| }) |
| } else if ( |
| error.code === 'CERT_AUTHORITY_INVALID' || |
| error.code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE' |
| ) { |
| logger.error('SSL Certificate Error for Azure OpenAI', { |
| ...errorDetails, |
| suggestion: 'SSL certificate validation failed - check proxy SSL settings' |
| }) |
| } else if (error.response?.status === 401) { |
| logger.error('Azure OpenAI Authentication Failed', { |
| ...errorDetails, |
| suggestion: 'Check if Azure OpenAI API key is valid and not expired' |
| }) |
| } else if (error.response?.status === 404) { |
| logger.error('Azure OpenAI Deployment Not Found', { |
| ...errorDetails, |
| suggestion: 'Check if deployment name and Azure endpoint are correct' |
| }) |
| } else { |
| logger.error('Azure OpenAI Request Failed', errorDetails) |
| } |
|
|
| throw error |
| } |
| } |
|
|
| |
| class StreamManager { |
| constructor() { |
| this.activeStreams = new Set() |
| this.cleanupCallbacks = new Map() |
| } |
|
|
| registerStream(streamId, cleanup) { |
| this.activeStreams.add(streamId) |
| this.cleanupCallbacks.set(streamId, cleanup) |
| } |
|
|
| cleanup(streamId) { |
| if (this.activeStreams.has(streamId)) { |
| try { |
| const cleanup = this.cleanupCallbacks.get(streamId) |
| if (cleanup) { |
| cleanup() |
| } |
| } catch (error) { |
| logger.warn(`Stream cleanup error for ${streamId}:`, error.message) |
| } finally { |
| this.activeStreams.delete(streamId) |
| this.cleanupCallbacks.delete(streamId) |
| } |
| } |
| } |
|
|
| getActiveStreamCount() { |
| return this.activeStreams.size |
| } |
| } |
|
|
| const streamManager = new StreamManager() |
|
|
| |
| const MAX_BUFFER_SIZE = 64 * 1024 |
| const MAX_EVENT_SIZE = 16 * 1024 |
|
|
| |
| function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { |
| const { onData, onEnd, onError } = options |
| const streamId = `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` |
|
|
| logger.info(`Starting Azure OpenAI stream handling`, { |
| streamId, |
| upstreamStatus: upstreamResponse.status, |
| upstreamHeaders: Object.keys(upstreamResponse.headers || {}), |
| clientRemoteAddress: clientResponse.req?.connection?.remoteAddress, |
| hasOnData: !!onData, |
| hasOnEnd: !!onEnd, |
| hasOnError: !!onError |
| }) |
|
|
| return new Promise((resolve, reject) => { |
| let buffer = '' |
| let usageData = null |
| let actualModel = null |
| let hasEnded = false |
| let eventCount = 0 |
| const maxEvents = 10000 |
|
|
| |
| let finalChunksBuffer = '' |
| const FINAL_CHUNKS_SIZE = 32 * 1024 |
| const allParsedEvents = [] |
|
|
| |
| clientResponse.setHeader('Content-Type', 'text/event-stream') |
| clientResponse.setHeader('Cache-Control', 'no-cache') |
| clientResponse.setHeader('Connection', 'keep-alive') |
| clientResponse.setHeader('X-Accel-Buffering', 'no') |
|
|
| |
| const passThroughHeaders = [ |
| 'x-request-id', |
| 'x-ratelimit-remaining-requests', |
| 'x-ratelimit-remaining-tokens' |
| ] |
| passThroughHeaders.forEach((header) => { |
| const value = upstreamResponse.headers[header] |
| if (value) { |
| clientResponse.setHeader(header, value) |
| } |
| }) |
|
|
| |
| if (typeof clientResponse.flushHeaders === 'function') { |
| clientResponse.flushHeaders() |
| } |
|
|
| |
| const parseSSEForUsage = (data, isFromFinalBuffer = false) => { |
| const lines = data.split('\n') |
|
|
| for (const line of lines) { |
| if (line.startsWith('data: ')) { |
| try { |
| const jsonStr = line.slice(6) |
| if (jsonStr.trim() === '[DONE]') { |
| continue |
| } |
| const eventData = JSON.parse(jsonStr) |
|
|
| |
| allParsedEvents.push(eventData) |
|
|
| |
| if (eventData.model) { |
| actualModel = eventData.model |
| } |
|
|
| |
| const { usageData: extractedUsage, actualModel: extractedModel } = |
| extractUsageDataRobust( |
| eventData, |
| `stream-event-${isFromFinalBuffer ? 'final' : 'normal'}` |
| ) |
|
|
| if (extractedUsage && !usageData) { |
| usageData = extractedUsage |
| if (extractedModel) { |
| actualModel = extractedModel |
| } |
| logger.debug(`🎯 Stream usage captured via robust extraction`, { |
| isFromFinalBuffer, |
| usageData, |
| actualModel |
| }) |
| } |
|
|
| |
| if (!usageData) { |
| |
| if (eventData.type === 'response.completed' && eventData.response) { |
| if (eventData.response.model) { |
| actualModel = eventData.response.model |
| } |
| if (eventData.response.usage) { |
| usageData = eventData.response.usage |
| logger.debug('🎯 Stream usage (backup method - response.usage):', usageData) |
| } |
| } |
|
|
| |
| if (!usageData && eventData.usage) { |
| usageData = eventData.usage |
| logger.debug('🎯 Stream usage (backup method - top-level):', usageData) |
| } |
| } |
| } catch (e) { |
| logger.debug('SSE parsing error (expected for incomplete chunks):', e.message) |
| } |
| } |
| } |
| } |
|
|
| |
| const cleanup = () => { |
| if (!hasEnded) { |
| hasEnded = true |
| try { |
| upstreamResponse.data?.removeAllListeners?.() |
| upstreamResponse.data?.destroy?.() |
|
|
| if (!clientResponse.headersSent) { |
| clientResponse.status(502).end() |
| } else if (!clientResponse.destroyed) { |
| clientResponse.end() |
| } |
| } catch (error) { |
| logger.warn('Stream cleanup error:', error.message) |
| } |
| } |
| } |
|
|
| streamManager.registerStream(streamId, cleanup) |
|
|
| upstreamResponse.data.on('data', (chunk) => { |
| try { |
| if (hasEnded || clientResponse.destroyed) { |
| return |
| } |
|
|
| eventCount++ |
| if (eventCount > maxEvents) { |
| logger.warn(`Stream ${streamId} exceeded max events limit`) |
| cleanup() |
| return |
| } |
|
|
| const chunkStr = chunk.toString() |
|
|
| |
| if (!clientResponse.destroyed) { |
| clientResponse.write(chunk) |
| } |
|
|
| |
| buffer += chunkStr |
|
|
| |
| finalChunksBuffer += chunkStr |
| if (finalChunksBuffer.length > FINAL_CHUNKS_SIZE) { |
| finalChunksBuffer = finalChunksBuffer.slice(-FINAL_CHUNKS_SIZE) |
| } |
|
|
| |
| if (buffer.length > MAX_BUFFER_SIZE) { |
| logger.warn( |
| `Stream ${streamId} buffer exceeded limit, truncating main buffer but preserving final chunks` |
| ) |
| |
| buffer = buffer.slice(-MAX_BUFFER_SIZE / 4) |
| } |
|
|
| |
| if (buffer.includes('\n\n')) { |
| const events = buffer.split('\n\n') |
| buffer = events.pop() || '' |
|
|
| for (const event of events) { |
| if (event.trim() && event.length <= MAX_EVENT_SIZE) { |
| parseSSEForUsage(event) |
| } |
| } |
| } |
|
|
| if (onData) { |
| onData(chunk, { usageData, actualModel }) |
| } |
| } catch (error) { |
| logger.error('Error processing Azure OpenAI stream chunk:', error) |
| if (!hasEnded) { |
| cleanup() |
| reject(error) |
| } |
| } |
| }) |
|
|
| upstreamResponse.data.on('end', () => { |
| if (hasEnded) { |
| return |
| } |
|
|
| streamManager.cleanup(streamId) |
| hasEnded = true |
|
|
| try { |
| logger.debug(`🔚 Stream ended, performing comprehensive usage extraction for ${streamId}`, { |
| mainBufferSize: buffer.length, |
| finalChunksBufferSize: finalChunksBuffer.length, |
| parsedEventsCount: allParsedEvents.length, |
| hasUsageData: !!usageData |
| }) |
|
|
| |
| if (!usageData) { |
| logger.debug('🔍 No usage found during stream, trying final extraction methods...') |
|
|
| |
| if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) { |
| parseSSEForUsage(buffer, false) |
| } |
|
|
| |
| if (!usageData && finalChunksBuffer.trim()) { |
| logger.debug('🔍 Trying final chunks buffer for usage extraction...') |
| parseSSEForUsage(finalChunksBuffer, true) |
| } |
|
|
| |
| if (!usageData && allParsedEvents.length > 0) { |
| logger.debug('🔍 Searching through all parsed events for usage...') |
|
|
| |
| for (let i = allParsedEvents.length - 1; i >= 0; i--) { |
| const { usageData: foundUsage, actualModel: foundModel } = extractUsageDataRobust( |
| allParsedEvents[i], |
| `final-event-scan-${i}` |
| ) |
| if (foundUsage) { |
| usageData = foundUsage |
| if (foundModel) { |
| actualModel = foundModel |
| } |
| logger.debug(`🎯 Usage found in event ${i} during final scan!`) |
| break |
| } |
| } |
| } |
|
|
| |
| if (!usageData && allParsedEvents.length > 0) { |
| logger.debug('🔍 Trying combined events analysis...') |
| const combinedData = { |
| events: allParsedEvents, |
| lastEvent: allParsedEvents[allParsedEvents.length - 1], |
| eventCount: allParsedEvents.length |
| } |
|
|
| const { usageData: combinedUsage } = extractUsageDataRobust( |
| combinedData, |
| 'combined-events' |
| ) |
| if (combinedUsage) { |
| usageData = combinedUsage |
| logger.debug('🎯 Usage found via combined events analysis!') |
| } |
| } |
| } |
|
|
| |
| if (usageData) { |
| logger.debug('✅ Final stream usage extraction SUCCESS', { |
| streamId, |
| usageData, |
| actualModel, |
| totalEvents: allParsedEvents.length, |
| finalBufferSize: finalChunksBuffer.length |
| }) |
| } else { |
| logger.warn('❌ Final stream usage extraction FAILED', { |
| streamId, |
| totalEvents: allParsedEvents.length, |
| finalBufferSize: finalChunksBuffer.length, |
| mainBufferSize: buffer.length, |
| lastFewEvents: allParsedEvents.slice(-3).map((e) => ({ |
| type: e.type, |
| hasUsage: !!e.usage, |
| hasResponse: !!e.response, |
| keys: Object.keys(e) |
| })) |
| }) |
| } |
|
|
| if (onEnd) { |
| onEnd({ usageData, actualModel }) |
| } |
|
|
| if (!clientResponse.destroyed) { |
| clientResponse.end() |
| } |
|
|
| resolve({ usageData, actualModel }) |
| } catch (error) { |
| logger.error('Stream end handling error:', error) |
| reject(error) |
| } |
| }) |
|
|
| upstreamResponse.data.on('error', (error) => { |
| if (hasEnded) { |
| return |
| } |
|
|
| streamManager.cleanup(streamId) |
| hasEnded = true |
|
|
| logger.error('Upstream stream error:', error) |
|
|
| try { |
| if (onError) { |
| onError(error) |
| } |
|
|
| if (!clientResponse.headersSent) { |
| clientResponse.status(502).json({ error: { message: 'Upstream stream error' } }) |
| } else if (!clientResponse.destroyed) { |
| clientResponse.end() |
| } |
| } catch (cleanupError) { |
| logger.warn('Error during stream error cleanup:', cleanupError.message) |
| } |
|
|
| reject(error) |
| }) |
|
|
| |
| const clientCleanup = () => { |
| streamManager.cleanup(streamId) |
| } |
|
|
| clientResponse.on('close', clientCleanup) |
| clientResponse.on('aborted', clientCleanup) |
| clientResponse.on('error', clientCleanup) |
| }) |
| } |
|
|
| |
| function extractUsageDataRobust(responseData, context = 'unknown') { |
| logger.debug(`🔍 Attempting usage extraction for ${context}`, { |
| responseDataKeys: Object.keys(responseData || {}), |
| responseDataType: typeof responseData, |
| hasUsage: !!responseData?.usage, |
| hasResponse: !!responseData?.response |
| }) |
|
|
| let usageData = null |
| let actualModel = null |
|
|
| try { |
| |
| if (responseData?.usage) { |
| usageData = responseData.usage |
| actualModel = responseData.model |
| logger.debug('✅ Usage extracted via Strategy 1 (top-level)', { usageData, actualModel }) |
| } |
|
|
| |
| else if (responseData?.response?.usage) { |
| usageData = responseData.response.usage |
| actualModel = responseData.response.model || responseData.model |
| logger.debug('✅ Usage extracted via Strategy 2 (response.usage)', { usageData, actualModel }) |
| } |
|
|
| |
| else { |
| const findUsageRecursive = (obj, path = '') => { |
| if (!obj || typeof obj !== 'object') { |
| return null |
| } |
|
|
| for (const [key, value] of Object.entries(obj)) { |
| const currentPath = path ? `${path}.${key}` : key |
|
|
| if (key === 'usage' && value && typeof value === 'object') { |
| logger.debug(`✅ Usage found at path: ${currentPath}`, value) |
| return { usage: value, path: currentPath } |
| } |
|
|
| if (typeof value === 'object' && value !== null) { |
| const nested = findUsageRecursive(value, currentPath) |
| if (nested) { |
| return nested |
| } |
| } |
| } |
| return null |
| } |
|
|
| const found = findUsageRecursive(responseData) |
| if (found) { |
| usageData = found.usage |
| |
| const pathParts = found.path.split('.') |
| pathParts.pop() |
| let modelParent = responseData |
| for (const part of pathParts) { |
| modelParent = modelParent?.[part] |
| } |
| actualModel = modelParent?.model || responseData?.model |
| logger.debug('✅ Usage extracted via Strategy 3 (recursive)', { |
| usageData, |
| actualModel, |
| foundPath: found.path |
| }) |
| } |
| } |
|
|
| |
| if (!usageData) { |
| |
| if (responseData?.choices?.length > 0) { |
| const lastChoice = responseData.choices[responseData.choices.length - 1] |
| if (lastChoice?.usage) { |
| usageData = lastChoice.usage |
| actualModel = responseData.model || lastChoice.model |
| logger.debug('✅ Usage extracted via Strategy 4 (choices)', { usageData, actualModel }) |
| } |
| } |
| } |
|
|
| |
| if (usageData) { |
| logger.debug('🎯 Final usage extraction result', { |
| context, |
| usageData, |
| actualModel, |
| inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, |
| outputTokens: usageData.completion_tokens || usageData.output_tokens || 0, |
| totalTokens: usageData.total_tokens || 0 |
| }) |
| } else { |
| logger.warn('❌ Failed to extract usage data', { |
| context, |
| responseDataStructure: `${JSON.stringify(responseData, null, 2).substring(0, 1000)}...`, |
| availableKeys: Object.keys(responseData || {}), |
| responseSize: JSON.stringify(responseData || {}).length |
| }) |
| } |
| } catch (extractionError) { |
| logger.error('🚨 Error during usage extraction', { |
| context, |
| error: extractionError.message, |
| stack: extractionError.stack, |
| responseDataType: typeof responseData |
| }) |
| } |
|
|
| return { usageData, actualModel } |
| } |
|
|
| |
| function handleNonStreamResponse(upstreamResponse, clientResponse) { |
| try { |
| |
| clientResponse.status(upstreamResponse.status) |
|
|
| |
| clientResponse.setHeader('Content-Type', 'application/json') |
|
|
| |
| const passThroughHeaders = [ |
| 'x-request-id', |
| 'x-ratelimit-remaining-requests', |
| 'x-ratelimit-remaining-tokens' |
| ] |
| passThroughHeaders.forEach((header) => { |
| const value = upstreamResponse.headers[header] |
| if (value) { |
| clientResponse.setHeader(header, value) |
| } |
| }) |
|
|
| |
| const responseData = upstreamResponse.data |
| clientResponse.json(responseData) |
|
|
| |
| const { usageData, actualModel } = extractUsageDataRobust(responseData, 'non-stream') |
|
|
| return { usageData, actualModel, responseData } |
| } catch (error) { |
| logger.error('Error handling Azure OpenAI non-stream response:', error) |
| throw error |
| } |
| } |
|
|
| module.exports = { |
| handleAzureOpenAIRequest, |
| handleStreamResponse, |
| handleNonStreamResponse, |
| normalizeModelName |
| } |
|
|