| const path = require('path'); |
| const { v4 } = require('uuid'); |
| const { countTokens, escapeRegExp } = require('@librechat/api'); |
| const { |
| Constants, |
| ContentTypes, |
| AnnotationTypes, |
| defaultOrderQuery, |
| } = require('librechat-data-provider'); |
| const { retrieveAndProcessFile } = require('~/server/services/Files/process'); |
| const { recordMessage, getMessages } = require('~/models/Message'); |
| const { spendTokens } = require('~/models/spendTokens'); |
| const { saveConvo } = require('~/models/Conversation'); |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function initThread({ openai, body, thread_id: _thread_id }) { |
| let thread = {}; |
| const messages = []; |
| if (_thread_id) { |
| const message = await openai.beta.threads.messages.create(_thread_id, body.messages[0]); |
| messages.push(message); |
| } else { |
| thread = await openai.beta.threads.create(body); |
| } |
|
|
| const thread_id = _thread_id || thread.id; |
| return { messages, thread_id, ...thread }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function saveUserMessage(req, params) { |
| const tokenCount = await countTokens(params.text); |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| const userMessage = { |
| user: params.user, |
| endpoint: params.endpoint, |
| messageId: params.messageId, |
| conversationId: params.conversationId, |
| parentMessageId: params.parentMessageId ?? Constants.NO_PARENT, |
| |
| model: params.assistant_id, |
| thread_id: params.thread_id, |
| sender: 'User', |
| text: params.text, |
| isCreatedByUser: true, |
| tokenCount, |
| }; |
|
|
| const convo = { |
| endpoint: params.endpoint, |
| conversationId: params.conversationId, |
| promptPrefix: params.promptPrefix, |
| instructions: params.instructions, |
| assistant_id: params.assistant_id, |
| model: params.model, |
| }; |
|
|
| if (params.files?.length) { |
| userMessage.files = params.files.map(({ file_id }) => ({ file_id })); |
| convo.file_ids = params.file_ids; |
| } |
|
|
| const message = await recordMessage(userMessage); |
| await saveConvo(req, convo, { |
| context: 'api/server/services/Threads/manage.js #saveUserMessage', |
| }); |
| return message; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function saveAssistantMessage(req, params) { |
| |
|
|
| const message = await recordMessage({ |
| user: params.user, |
| endpoint: params.endpoint, |
| messageId: params.messageId, |
| conversationId: params.conversationId, |
| parentMessageId: params.parentMessageId, |
| thread_id: params.thread_id, |
| |
| model: params.assistant_id, |
| content: params.content, |
| sender: 'Assistant', |
| isCreatedByUser: false, |
| text: params.text, |
| unfinished: false, |
| |
| iconURL: params.iconURL, |
| spec: params.spec, |
| }); |
|
|
| await saveConvo( |
| req, |
| { |
| endpoint: params.endpoint, |
| conversationId: params.conversationId, |
| promptPrefix: params.promptPrefix, |
| instructions: params.instructions, |
| assistant_id: params.assistant_id, |
| model: params.model, |
| iconURL: params.iconURL, |
| spec: params.spec, |
| }, |
| { context: 'api/server/services/Threads/manage.js #saveAssistantMessage' }, |
| ); |
|
|
| return message; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function addThreadMetadata({ openai, thread_id, messageId, messages }) { |
| const promises = []; |
| for (const message of messages) { |
| promises.push( |
| openai.beta.threads.messages.update(message.id, { |
| thread_id, |
| metadata: { |
| messageId, |
| }, |
| }), |
| ); |
| } |
|
|
| return await Promise.all(promises); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function syncMessages({ |
| openai, |
| endpoint, |
| thread_id, |
| dbMessages, |
| apiMessages, |
| assistant_id, |
| conversationId, |
| }) { |
| let result = []; |
| let dbMessageMap = new Map(dbMessages.map((msg) => [msg.messageId, msg])); |
|
|
| const modifyPromises = []; |
| const recordPromises = []; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| const processNewMessage = async ({ dbMessage, apiMessage }) => { |
| recordPromises.push(recordMessage({ ...dbMessage, user: openai.req.user.id })); |
|
|
| if (!apiMessage.id.includes('msg_')) { |
| return; |
| } |
|
|
| if (dbMessage.aggregateMessages?.length > 1) { |
| modifyPromises.push( |
| addThreadMetadata({ |
| openai, |
| thread_id, |
| messageId: dbMessage.messageId, |
| messages: dbMessage.aggregateMessages, |
| }), |
| ); |
| return; |
| } |
|
|
| modifyPromises.push( |
| openai.beta.threads.messages.update(apiMessage.id, { |
| thread_id, |
| metadata: { |
| messageId: dbMessage.messageId, |
| }, |
| }), |
| ); |
| }; |
|
|
| let lastMessage = null; |
|
|
| for (let i = 0; i < apiMessages.length; i++) { |
| const apiMessage = apiMessages[i]; |
|
|
| |
| const dbMessageId = apiMessage.metadata && apiMessage.metadata.messageId; |
| let dbMessage = dbMessageMap.get(dbMessageId); |
|
|
| if (dbMessage) { |
| |
| dbMessage.parentMessageId = lastMessage ? lastMessage.messageId : Constants.NO_PARENT; |
| lastMessage = dbMessage; |
| result.push(dbMessage); |
| continue; |
| } |
|
|
| if (apiMessage.role === 'assistant' && lastMessage && lastMessage.role === 'assistant') { |
| |
| lastMessage.content = [...lastMessage.content, ...apiMessage.content]; |
| lastMessage.files = [...(lastMessage.files ?? []), ...(apiMessage.files ?? [])]; |
| lastMessage.aggregateMessages.push({ id: apiMessage.id }); |
| } else { |
| |
| const newMessage = { |
| thread_id, |
| conversationId, |
| messageId: v4(), |
| endpoint, |
| parentMessageId: lastMessage ? lastMessage.messageId : Constants.NO_PARENT, |
| role: apiMessage.role, |
| isCreatedByUser: apiMessage.role === 'user', |
| |
| content: apiMessage.content, |
| aggregateMessages: [{ id: apiMessage.id }], |
| model: apiMessage.role === 'user' ? null : apiMessage.assistant_id, |
| user: openai.req.user.id, |
| unfinished: false, |
| }; |
|
|
| if (apiMessage.file_ids?.length) { |
| |
| newMessage.files = apiMessage.file_ids.map((file_id) => ({ file_id })); |
| } |
|
|
| |
| if (assistant_id && apiMessage.role === 'assistant' && !newMessage.model) { |
| apiMessage.model = assistant_id; |
| newMessage.model = assistant_id; |
| } |
|
|
| result.push(newMessage); |
| lastMessage = newMessage; |
|
|
| if (apiMessage.role === 'user') { |
| processNewMessage({ dbMessage: newMessage, apiMessage }); |
| continue; |
| } |
| } |
|
|
| const nextMessage = apiMessages[i + 1]; |
| const processAssistant = !nextMessage || nextMessage.role === 'user'; |
|
|
| if (apiMessage.role === 'assistant' && processAssistant) { |
| processNewMessage({ dbMessage: lastMessage, apiMessage }); |
| } |
| } |
|
|
| const attached_file_ids = apiMessages.reduce((acc, msg) => { |
| if (msg.role === 'user' && msg.file_ids?.length) { |
| return [...acc, ...msg.file_ids]; |
| } |
|
|
| return acc; |
| }, []); |
|
|
| await Promise.all(modifyPromises); |
| await Promise.all(recordPromises); |
|
|
| await saveConvo( |
| openai.req, |
| { |
| conversationId, |
| file_ids: attached_file_ids, |
| }, |
| { context: 'api/server/services/Threads/manage.js #syncMessages' }, |
| ); |
|
|
| return result; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| function mapMessagesToSteps(steps, messages) { |
| |
| const messageMap = messages.reduce((acc, msg) => { |
| acc[msg.id] = msg; |
| return acc; |
| }, {}); |
|
|
| |
| return steps |
| .sort((a, b) => a.created_at - b.created_at) |
| .map((step) => { |
| const messageId = step.step_details?.message_creation?.message_id; |
|
|
| if (messageId && messageMap[messageId]) { |
| return { step, message: messageMap[messageId] }; |
| } |
| return step; |
| }); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function checkMessageGaps({ |
| openai, |
| endpoint, |
| latestMessageId, |
| thread_id, |
| run_id, |
| conversationId, |
| }) { |
| const promises = []; |
| promises.push(openai.beta.threads.messages.list(thread_id, defaultOrderQuery)); |
| promises.push(openai.beta.threads.runs.steps.list(run_id, { thread_id })); |
| |
| const [response, stepsResponse] = await Promise.all(promises); |
|
|
| const steps = mapMessagesToSteps(stepsResponse.data, response.data); |
| |
| const currentMessage = { |
| id: v4(), |
| content: [], |
| assistant_id: null, |
| created_at: Math.floor(new Date().getTime() / 1000), |
| object: 'thread.message', |
| role: 'assistant', |
| run_id, |
| thread_id, |
| endpoint, |
| metadata: { |
| messageId: latestMessageId, |
| }, |
| }; |
|
|
| for (const step of steps) { |
| if (!currentMessage.assistant_id && step.assistant_id) { |
| currentMessage.assistant_id = step.assistant_id; |
| } |
| if (step.message) { |
| currentMessage.id = step.message.id; |
| currentMessage.created_at = step.message.created_at; |
| currentMessage.content = currentMessage.content.concat(step.message.content); |
| } else if (step.step_details?.type === 'tool_calls' && step.step_details?.tool_calls?.length) { |
| currentMessage.content = currentMessage.content.concat( |
| step.step_details?.tool_calls.map((toolCall) => ({ |
| [ContentTypes.TOOL_CALL]: { |
| ...toolCall, |
| progress: 2, |
| }, |
| type: ContentTypes.TOOL_CALL, |
| })), |
| ); |
| } |
| } |
|
|
| let addedCurrentMessage = false; |
| const apiMessages = response.data |
| .map((msg) => { |
| if (msg.id === currentMessage.id) { |
| addedCurrentMessage = true; |
| return currentMessage; |
| } |
| return msg; |
| }) |
| .sort((a, b) => new Date(a.created_at) - new Date(b.created_at)); |
|
|
| if (!addedCurrentMessage) { |
| apiMessages.push(currentMessage); |
| } |
|
|
| const dbMessages = await getMessages({ conversationId }); |
| const assistant_id = dbMessages?.[0]?.model; |
|
|
| const syncedMessages = await syncMessages({ |
| openai, |
| endpoint, |
| thread_id, |
| dbMessages, |
| apiMessages, |
| assistant_id, |
| conversationId, |
| }); |
|
|
| return Object.values( |
| [...dbMessages, ...syncedMessages].reduce( |
| (acc, message) => ({ ...acc, [message.messageId]: message }), |
| {}, |
| ), |
| ); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| const recordUsage = async ({ |
| prompt_tokens, |
| completion_tokens, |
| model, |
| user, |
| conversationId, |
| context = 'message', |
| }) => { |
| await spendTokens( |
| { |
| user, |
| model, |
| context, |
| conversationId, |
| }, |
| { promptTokens: prompt_tokens, completionTokens: completion_tokens }, |
| ); |
| }; |
|
|
| const uniqueCitationStart = '^====||==='; |
| const uniqueCitationEnd = '==|||||^'; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function processMessages({ openai, client, messages = [] }) { |
| const sorted = messages.sort((a, b) => a.created_at - b.created_at); |
|
|
| let text = ''; |
| let edited = false; |
| const sources = new Map(); |
| const fileRetrievalPromises = []; |
|
|
| for (const message of sorted) { |
| message.files = []; |
| for (const content of message.content) { |
| const type = content.type; |
| const contentType = content[type]; |
| const currentFileId = contentType?.file_id; |
|
|
| if (type === ContentTypes.IMAGE_FILE && !client.processedFileIds.has(currentFileId)) { |
| fileRetrievalPromises.push( |
| retrieveAndProcessFile({ |
| openai, |
| client, |
| file_id: currentFileId, |
| basename: `${currentFileId}.png`, |
| }) |
| .then((file) => { |
| client.processedFileIds.add(currentFileId); |
| message.files.push(file); |
| }) |
| .catch((error) => { |
| console.error(`Failed to retrieve file: ${error.message}`); |
| }), |
| ); |
| continue; |
| } |
|
|
| let currentText = contentType?.value ?? ''; |
|
|
| |
| const { annotations } = contentType ?? {}; |
|
|
| if (!annotations?.length) { |
| text += currentText; |
| continue; |
| } |
|
|
| const replacements = []; |
| const annotationPromises = annotations.map(async (annotation) => { |
| const type = annotation.type; |
| const annotationType = annotation[type]; |
| const file_id = annotationType?.file_id; |
| const alreadyProcessed = client.processedFileIds.has(file_id); |
|
|
| let file; |
| let replacementText = ''; |
|
|
| try { |
| if (alreadyProcessed) { |
| file = await retrieveAndProcessFile({ openai, client, file_id, unknownType: true }); |
| } else if (type === AnnotationTypes.FILE_PATH) { |
| const basename = path.basename(annotation.text); |
| file = await retrieveAndProcessFile({ |
| openai, |
| client, |
| file_id, |
| basename, |
| }); |
| replacementText = file.filepath; |
| } else if (type === AnnotationTypes.FILE_CITATION && file_id) { |
| file = await retrieveAndProcessFile({ |
| openai, |
| client, |
| file_id, |
| unknownType: true, |
| }); |
| if (file && file.filename) { |
| if (!sources.has(file.filename)) { |
| sources.set(file.filename, sources.size + 1); |
| } |
| replacementText = `${uniqueCitationStart}${sources.get( |
| file.filename, |
| )}${uniqueCitationEnd}`; |
| } |
| } |
|
|
| if (file && replacementText) { |
| replacements.push({ |
| start: annotation.start_index, |
| end: annotation.end_index, |
| text: replacementText, |
| }); |
| edited = true; |
| if (!alreadyProcessed) { |
| client.processedFileIds.add(file_id); |
| message.files.push(file); |
| } |
| } |
| } catch (error) { |
| console.error(`Failed to process annotation: ${error.message}`); |
| } |
| }); |
|
|
| await Promise.all(annotationPromises); |
|
|
| |
| replacements.sort((a, b) => b.start - a.start); |
| for (const { start, end, text: replacementText } of replacements) { |
| currentText = currentText.slice(0, start) + replacementText + currentText.slice(end); |
| } |
|
|
| text += currentText; |
| } |
| } |
|
|
| await Promise.all(fileRetrievalPromises); |
|
|
| |
| const adjacentCitationRegex = new RegExp( |
| `${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp( |
| uniqueCitationEnd, |
| )}(\\s*)${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)}`, |
| 'g', |
| ); |
| text = text.replace(adjacentCitationRegex, (match, num1, space, num2) => { |
| return num1 === num2 |
| ? `${uniqueCitationStart}${num1}${uniqueCitationEnd}` |
| : `${uniqueCitationStart}${num1}${uniqueCitationEnd}${space}${uniqueCitationStart}${num2}${uniqueCitationEnd}`; |
| }); |
|
|
| |
| const remainingAdjacentRegex = new RegExp( |
| `(${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)})\\s*\\1+`, |
| 'g', |
| ); |
| text = text.replace(remainingAdjacentRegex, '$1'); |
|
|
| |
| text = text.replace(new RegExp(escapeRegExp(uniqueCitationStart), 'g'), '^'); |
| text = text.replace(new RegExp(escapeRegExp(uniqueCitationEnd), 'g'), '^'); |
|
|
| if (sources.size) { |
| text += '\n\n'; |
| Array.from(sources.entries()).forEach(([source, index], arrayIndex) => { |
| text += `^${index}.^ ${source}${arrayIndex === sources.size - 1 ? '' : '\n'}`; |
| }); |
| } |
|
|
| return { messages: sorted, text, edited }; |
| } |
|
|
| module.exports = { |
| initThread, |
| recordUsage, |
| processMessages, |
| saveUserMessage, |
| checkMessageGaps, |
| addThreadMetadata, |
| mapMessagesToSteps, |
| saveAssistantMessage, |
| }; |
|
|