| const { z } = require('zod'); |
| const { tool } = require('@langchain/core/tools'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { |
| Providers, |
| StepTypes, |
| GraphEvents, |
| Constants: AgentConstants, |
| } = require('@librechat/agents'); |
| const { |
| sendEvent, |
| MCPOAuthHandler, |
| normalizeServerName, |
| convertWithResolvedRefs, |
| } = require('@librechat/api'); |
| const { |
| Time, |
| CacheKeys, |
| Constants, |
| ContentTypes, |
| isAssistantsEndpoint, |
| } = require('librechat-data-provider'); |
| const { getMCPManager, getFlowStateManager, getOAuthReconnectionManager } = require('~/config'); |
| const { findToken, createToken, updateToken } = require('~/models'); |
| const { reinitMCPServer } = require('./Tools/mcp'); |
| const { getAppConfig } = require('./Config'); |
| const { getLogStores } = require('~/cache'); |
| const { mcpServersRegistry } = require('@librechat/api'); |
|
|
| |
| |
| |
| |
| |
| |
| function createRunStepDeltaEmitter({ res, stepId, toolCall }) { |
| |
| |
| |
| |
| return function (authURL) { |
| |
| const data = { |
| id: stepId, |
| delta: { |
| type: StepTypes.TOOL_CALLS, |
| tool_calls: [{ ...toolCall, args: '' }], |
| auth: authURL, |
| expires_at: Date.now() + Time.TWO_MINUTES, |
| }, |
| }; |
| sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| function createRunStepEmitter({ res, runId, stepId, toolCall, index }) { |
| return function () { |
| |
| const data = { |
| runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID, |
| id: stepId, |
| type: StepTypes.TOOL_CALLS, |
| index: index ?? 0, |
| stepDetails: { |
| type: StepTypes.TOOL_CALLS, |
| tool_calls: [toolCall], |
| }, |
| }; |
| sendEvent(res, { event: GraphEvents.ON_RUN_STEP, data }); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| function createOAuthStart({ flowId, flowManager, callback }) { |
| |
| |
| |
| |
| |
| return async function (authURL) { |
| await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => { |
| callback?.(authURL); |
| logger.debug('Sent OAuth login request to client'); |
| return true; |
| }); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| function createOAuthEnd({ res, stepId, toolCall }) { |
| return async function () { |
| |
| const data = { |
| id: stepId, |
| delta: { |
| type: StepTypes.TOOL_CALLS, |
| tool_calls: [{ ...toolCall }], |
| }, |
| }; |
| sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); |
| logger.debug('Sent OAuth login success to client'); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| function createAbortHandler({ userId, serverName, toolName, flowManager }) { |
| return function () { |
| logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`); |
| const flowId = MCPOAuthHandler.generateFlowId(userId, serverName); |
| flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted')); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) { |
| return function (authURL) { |
| runStepEmitter(); |
| runStepDeltaEmitter(authURL); |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap }) { |
| const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID; |
| const flowId = `${user.id}:${serverName}:${Date.now()}`; |
| const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS)); |
| const stepId = 'step_oauth_login_' + serverName; |
| const toolCall = { |
| id: flowId, |
| name: serverName, |
| type: 'tool_call_chunk', |
| }; |
|
|
| const runStepEmitter = createRunStepEmitter({ |
| res, |
| index, |
| runId, |
| stepId, |
| toolCall, |
| }); |
| const runStepDeltaEmitter = createRunStepDeltaEmitter({ |
| res, |
| stepId, |
| toolCall, |
| }); |
| const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }); |
| const oauthStart = createOAuthStart({ |
| res, |
| flowId, |
| callback, |
| flowManager, |
| }); |
| return await reinitMCPServer({ |
| user, |
| signal, |
| serverName, |
| oauthStart, |
| flowManager, |
| userMCPAuthMap, |
| forceNew: true, |
| returnOnOAuth: false, |
| connectionTimeout: Time.TWO_MINUTES, |
| }); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function createMCPTools({ res, user, index, signal, serverName, provider, userMCPAuthMap }) { |
| const result = await reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap }); |
| if (!result || !result.tools) { |
| logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`); |
| return; |
| } |
|
|
| const serverTools = []; |
| for (const tool of result.tools) { |
| const toolInstance = await createMCPTool({ |
| res, |
| user, |
| provider, |
| userMCPAuthMap, |
| availableTools: result.availableTools, |
| toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`, |
| }); |
| if (toolInstance) { |
| serverTools.push(toolInstance); |
| } |
| } |
|
|
| return serverTools; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function createMCPTool({ |
| res, |
| user, |
| index, |
| signal, |
| toolKey, |
| provider, |
| userMCPAuthMap, |
| availableTools, |
| }) { |
| const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter); |
|
|
| |
| let toolDefinition = availableTools?.[toolKey]?.function; |
| if (!toolDefinition) { |
| logger.warn( |
| `[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`, |
| ); |
| const result = await reconnectServer({ |
| res, |
| user, |
| index, |
| signal, |
| serverName, |
| userMCPAuthMap, |
| }); |
| toolDefinition = result?.availableTools?.[toolKey]?.function; |
| } |
|
|
| if (!toolDefinition) { |
| logger.warn(`[MCP][${serverName}][${toolName}] Tool definition not found, cannot create tool.`); |
| return; |
| } |
|
|
| return createToolInstance({ |
| res, |
| provider, |
| toolName, |
| serverName, |
| toolDefinition, |
| }); |
| } |
|
|
| function createToolInstance({ res, toolName, serverName, toolDefinition, provider: _provider }) { |
| |
| const { description, parameters } = toolDefinition; |
| const isGoogle = _provider === Providers.VERTEXAI || _provider === Providers.GOOGLE; |
| let schema = convertWithResolvedRefs(parameters, { |
| allowEmptyObject: !isGoogle, |
| transformOneOfAnyOf: true, |
| }); |
|
|
| if (!schema) { |
| schema = z.object({ input: z.string().optional() }); |
| } |
|
|
| const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`; |
|
|
| |
| const _call = async (toolArguments, config) => { |
| const userId = config?.configurable?.user?.id || config?.configurable?.user_id; |
| |
| let abortHandler = null; |
| |
| let derivedSignal = null; |
|
|
| try { |
| const flowsCache = getLogStores(CacheKeys.FLOWS); |
| const flowManager = getFlowStateManager(flowsCache); |
| derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined; |
| const mcpManager = getMCPManager(userId); |
| const provider = (config?.metadata?.provider || _provider)?.toLowerCase(); |
|
|
| const { args: _args, stepId, ...toolCall } = config.toolCall ?? {}; |
| const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`; |
| const runStepDeltaEmitter = createRunStepDeltaEmitter({ |
| res, |
| stepId, |
| toolCall, |
| }); |
| const oauthStart = createOAuthStart({ |
| flowId, |
| flowManager, |
| callback: runStepDeltaEmitter, |
| }); |
| const oauthEnd = createOAuthEnd({ |
| res, |
| stepId, |
| toolCall, |
| }); |
|
|
| if (derivedSignal) { |
| abortHandler = createAbortHandler({ userId, serverName, toolName, flowManager }); |
| derivedSignal.addEventListener('abort', abortHandler, { once: true }); |
| } |
|
|
| const customUserVars = |
| config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`]; |
|
|
| const result = await mcpManager.callTool({ |
| serverName, |
| toolName, |
| provider, |
| toolArguments, |
| options: { |
| signal: derivedSignal, |
| }, |
| user: config?.configurable?.user, |
| requestBody: config?.configurable?.requestBody, |
| customUserVars, |
| flowManager, |
| tokenMethods: { |
| findToken, |
| createToken, |
| updateToken, |
| }, |
| oauthStart, |
| oauthEnd, |
| }); |
|
|
| if (isAssistantsEndpoint(provider) && Array.isArray(result)) { |
| return result[0]; |
| } |
| if (isGoogle && Array.isArray(result[0]) && result[0][0]?.type === ContentTypes.TEXT) { |
| return [result[0][0].text, result[1]]; |
| } |
| return result; |
| } catch (error) { |
| logger.error( |
| `[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`, |
| error, |
| ); |
|
|
| |
| const isOAuthError = |
| error.message?.includes('401') || |
| error.message?.includes('OAuth') || |
| error.message?.includes('authentication') || |
| error.message?.includes('Non-200 status code (401)'); |
|
|
| if (isOAuthError) { |
| throw new Error( |
| `[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`, |
| ); |
| } |
|
|
| throw new Error( |
| `[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`, |
| ); |
| } finally { |
| |
| if (abortHandler && derivedSignal) { |
| derivedSignal.removeEventListener('abort', abortHandler); |
| } |
| } |
| }; |
|
|
| const toolInstance = tool(_call, { |
| schema, |
| name: normalizedToolKey, |
| description: description || '', |
| responseFormat: AgentConstants.CONTENT_AND_ARTIFACT, |
| }); |
| toolInstance.mcp = true; |
| toolInstance.mcpRawServerName = serverName; |
| return toolInstance; |
| } |
|
|
| |
| |
| |
| |
| |
| async function getMCPSetupData(userId) { |
| const config = await getAppConfig(); |
| const mcpConfig = config?.mcpConfig; |
|
|
| if (!mcpConfig) { |
| throw new Error('MCP config not found'); |
| } |
|
|
| const mcpManager = getMCPManager(userId); |
| |
| let appConnections = new Map(); |
| try { |
| appConnections = (await mcpManager.appConnections?.getAll()) || new Map(); |
| } catch (error) { |
| logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error); |
| } |
| const userConnections = mcpManager.getUserConnections(userId) || new Map(); |
| const oauthServers = await mcpServersRegistry.getOAuthServers(); |
|
|
| return { |
| mcpConfig, |
| oauthServers, |
| appConnections, |
| userConnections, |
| }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| async function checkOAuthFlowStatus(userId, serverName) { |
| const flowsCache = getLogStores(CacheKeys.FLOWS); |
| const flowManager = getFlowStateManager(flowsCache); |
| const flowId = MCPOAuthHandler.generateFlowId(userId, serverName); |
|
|
| try { |
| const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth'); |
| if (!flowState) { |
| return { hasActiveFlow: false, hasFailedFlow: false }; |
| } |
|
|
| const flowAge = Date.now() - flowState.createdAt; |
| const flowTTL = flowState.ttl || 180000; |
|
|
| if (flowState.status === 'FAILED' || flowAge > flowTTL) { |
| const wasCancelled = flowState.error && flowState.error.includes('cancelled'); |
|
|
| if (wasCancelled) { |
| logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, { |
| flowId, |
| status: flowState.status, |
| error: flowState.error, |
| }); |
| return { hasActiveFlow: false, hasFailedFlow: false }; |
| } else { |
| logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, { |
| flowId, |
| status: flowState.status, |
| flowAge, |
| flowTTL, |
| timedOut: flowAge > flowTTL, |
| error: flowState.error, |
| }); |
| return { hasActiveFlow: false, hasFailedFlow: true }; |
| } |
| } |
|
|
| if (flowState.status === 'PENDING') { |
| logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, { |
| flowId, |
| flowAge, |
| flowTTL, |
| }); |
| return { hasActiveFlow: true, hasFailedFlow: false }; |
| } |
|
|
| return { hasActiveFlow: false, hasFailedFlow: false }; |
| } catch (error) { |
| logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error); |
| return { hasActiveFlow: false, hasFailedFlow: false }; |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function getServerConnectionStatus( |
| userId, |
| serverName, |
| appConnections, |
| userConnections, |
| oauthServers, |
| ) { |
| const getConnectionState = () => |
| appConnections.get(serverName)?.connectionState ?? |
| userConnections.get(serverName)?.connectionState ?? |
| 'disconnected'; |
|
|
| const baseConnectionState = getConnectionState(); |
| let finalConnectionState = baseConnectionState; |
|
|
| |
| if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) { |
| |
| const oauthReconnectionManager = getOAuthReconnectionManager(); |
| if (oauthReconnectionManager.isReconnecting(userId, serverName)) { |
| finalConnectionState = 'connecting'; |
| } else { |
| const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName); |
|
|
| if (hasFailedFlow) { |
| finalConnectionState = 'error'; |
| } else if (hasActiveFlow) { |
| finalConnectionState = 'connecting'; |
| } |
| } |
| } |
|
|
| return { |
| requiresOAuth: oauthServers.has(serverName), |
| connectionState: finalConnectionState, |
| }; |
| } |
|
|
| module.exports = { |
| createMCPTool, |
| createMCPTools, |
| getMCPSetupData, |
| checkOAuthFlowStatus, |
| getServerConnectionStatus, |
| }; |
|
|