| const { sleep } = require('@librechat/agents'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { RunStatus, defaultOrderQuery, CacheKeys } = require('librechat-data-provider'); |
| const getLogStores = require('~/cache/getLogStores'); |
| const { retrieveRun } = require('./methods'); |
| const RunManager = require('./RunManager'); |
|
|
| async function withTimeout(promise, timeoutMs, timeoutMessage) { |
| let timeoutHandle; |
|
|
| const timeoutPromise = new Promise((_, reject) => { |
| timeoutHandle = setTimeout(() => { |
| logger.debug(timeoutMessage); |
| reject(new Error('Operation timed out')); |
| }, timeoutMs); |
| }); |
|
|
| try { |
| return await Promise.race([promise, timeoutPromise]); |
| } finally { |
| clearTimeout(timeoutHandle); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function createRun({ openai, thread_id, body }) { |
| return await openai.beta.threads.runs.create(thread_id, body); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function waitForRun({ |
| openai, |
| run_id, |
| thread_id, |
| runManager, |
| pollIntervalMs = 2000, |
| timeout = 60000 * 3, |
| }) { |
| let timeElapsed = 0; |
| let run; |
|
|
| const cache = getLogStores(CacheKeys.ABORT_KEYS); |
| const cacheKey = `${openai.req.user.id}:${openai.responseMessage.conversationId}`; |
|
|
| let i = 0; |
| let lastSeenStatus = null; |
| const runIdLog = `run_id: ${run_id}`; |
| const runInfo = `user: ${openai.req.user.id} | thread_id: ${thread_id} | ${runIdLog}`; |
| const raceTimeoutMs = 3000; |
| let maxRetries = 5; |
| while (timeElapsed < timeout) { |
| i++; |
| logger.debug(`[heartbeat ${i}] ${runIdLog} | Retrieving run status...`); |
| let updatedRun; |
|
|
| let attempt = 0; |
| let startTime = Date.now(); |
| while (!updatedRun && attempt < maxRetries) { |
| try { |
| updatedRun = await withTimeout( |
| retrieveRun({ thread_id, run_id, timeout: raceTimeoutMs, openai }), |
| raceTimeoutMs, |
| `[heartbeat ${i}] ${runIdLog} | Run retrieval timed out after ${raceTimeoutMs} ms. Trying again (attempt ${ |
| attempt + 1 |
| } of ${maxRetries})...`, |
| ); |
| const endTime = Date.now(); |
| logger.debug( |
| `[heartbeat ${i}] ${runIdLog} | Elapsed run retrieval time: ${endTime - startTime}`, |
| ); |
| } catch (error) { |
| attempt++; |
| startTime = Date.now(); |
| logger.warn(`${runIdLog} | Error retrieving run status`, error); |
| } |
| } |
|
|
| if (!updatedRun) { |
| const errorMessage = `[waitForRun] ${runIdLog} | Run retrieval failed after ${maxRetries} attempts`; |
| throw new Error(errorMessage); |
| } |
|
|
| run = updatedRun; |
| attempt = 0; |
| const runStatus = `${runInfo} | status: ${run.status}`; |
|
|
| if (run.status !== lastSeenStatus) { |
| logger.debug(`[${run.status}] ${runInfo}`); |
| lastSeenStatus = run.status; |
| } |
|
|
| logger.debug(`[heartbeat ${i}] ${runStatus}`); |
|
|
| let cancelStatus; |
| try { |
| const timeoutMessage = `[heartbeat ${i}] ${runIdLog} | Cancel Status check operation timed out.`; |
| cancelStatus = await withTimeout(cache.get(cacheKey), raceTimeoutMs, timeoutMessage); |
| } catch (error) { |
| logger.warn(`Error retrieving cancel status: ${error}`); |
| } |
|
|
| if (cancelStatus === 'cancelled') { |
| logger.warn(`[waitForRun] ${runStatus} | RUN CANCELLED`); |
| throw new Error('Run cancelled'); |
| } |
|
|
| if (![RunStatus.IN_PROGRESS, RunStatus.QUEUED].includes(run.status)) { |
| logger.debug(`[FINAL] ${runInfo} | status: ${run.status}`); |
| await runManager.fetchRunSteps({ |
| openai, |
| thread_id: thread_id, |
| run_id: run_id, |
| runStatus: run.status, |
| final: true, |
| }); |
| break; |
| } |
|
|
| |
| await runManager.fetchRunSteps({ |
| openai, |
| thread_id: thread_id, |
| run_id: run_id, |
| runStatus: run.status, |
| }); |
|
|
| await sleep(pollIntervalMs); |
| timeElapsed += pollIntervalMs; |
| } |
|
|
| if (timeElapsed >= timeout) { |
| const timeoutMessage = `[waitForRun] ${runInfo} | status: ${run.status} | timed out after ${timeout} ms`; |
| logger.warn(timeoutMessage); |
| throw new Error(timeoutMessage); |
| } |
|
|
| return run; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function _retrieveRunSteps({ openai, thread_id, run_id }) { |
| const runSteps = await openai.beta.threads.runs.steps.list(run_id, { thread_id }); |
| return runSteps; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| async function _handleRun({ openai, run_id, thread_id }) { |
| let steps = []; |
| let messages = []; |
| const runManager = new RunManager({ |
| |
| |
| |
| |
| |
| |
| final: async ({ step, runStatus, stepsByStatus }) => { |
| console.log(`Final step for ${run_id} with status ${runStatus}`); |
| console.dir(step, { depth: null }); |
|
|
| const promises = []; |
| promises.push(openai.beta.threads.messages.list(thread_id, defaultOrderQuery)); |
|
|
| |
| |
| |
| |
|
|
| |
| for (const [_status, stepsPromises] of Object.entries(stepsByStatus)) { |
| promises.push(...stepsPromises); |
| } |
|
|
| const resolved = await Promise.all(promises); |
| const res = resolved.shift(); |
| messages = res.data.filter((msg) => msg.run_id === run_id); |
| resolved.push(step); |
| steps = resolved; |
| }, |
| }); |
|
|
| const run = await waitForRun({ |
| openai, |
| run_id, |
| thread_id, |
| runManager, |
| pollIntervalMs: 2000, |
| timeout: 60000, |
| }); |
| const actions = []; |
| if (run.required_action) { |
| const { submit_tool_outputs } = run.required_action; |
| submit_tool_outputs.tool_calls.forEach((item) => { |
| const functionCall = item.function; |
| const args = JSON.parse(functionCall.arguments); |
| actions.push({ |
| tool: functionCall.name, |
| toolInput: args, |
| toolCallId: item.id, |
| run_id, |
| thread_id, |
| }); |
| }); |
| } |
|
|
| return { run, steps, messages, actions }; |
| } |
|
|
| module.exports = { |
| sleep, |
| createRun, |
| waitForRun, |
| |
| |
| }; |
|
|