| import mongoose from 'mongoose'; |
| import { EventEmitter } from 'events'; |
| import { GridFSBucket } from 'mongodb'; |
| import { logger } from '@librechat/data-schemas'; |
| import type { Db, ReadPreference, Collection } from 'mongodb'; |
|
|
| interface KeyvMongoOptions { |
| url?: string; |
| collection?: string; |
| useGridFS?: boolean; |
| readPreference?: ReadPreference; |
| } |
|
|
| interface GridFSClient { |
| bucket: GridFSBucket; |
| store: Collection; |
| db: Db; |
| } |
|
|
| interface CollectionClient { |
| store: Collection; |
| db: Db; |
| } |
|
|
| type Client = GridFSClient | CollectionClient; |
|
|
| const storeMap = new Map<string, Client>(); |
|
|
| class KeyvMongoCustom extends EventEmitter { |
| private opts: KeyvMongoOptions; |
| public ttlSupport: boolean; |
| public namespace?: string; |
|
|
| constructor(options: KeyvMongoOptions = {}) { |
| super(); |
|
|
| this.opts = { |
| url: 'mongodb://127.0.0.1:27017', |
| collection: 'keyv', |
| ...options, |
| }; |
|
|
| this.ttlSupport = false; |
| } |
|
|
| |
| private async _getClient(): Promise<Client> { |
| const storeKey = `${this.opts.collection}:${this.opts.useGridFS ? 'gridfs' : 'collection'}`; |
|
|
| |
| if (storeMap.has(storeKey)) { |
| return storeMap.get(storeKey)!; |
| } |
|
|
| |
| if (mongoose.connection.readyState !== 1) { |
| throw new Error('Mongoose connection not ready. Ensure connectDb() is called first.'); |
| } |
|
|
| try { |
| const db = mongoose.connection.db as unknown as Db | undefined; |
| if (!db) { |
| throw new Error('MongoDB database not available'); |
| } |
|
|
| let client: Client; |
|
|
| if (this.opts.useGridFS) { |
| const bucket = new GridFSBucket(db, { |
| readPreference: this.opts.readPreference, |
| bucketName: this.opts.collection, |
| }); |
| const store = db.collection(`${this.opts.collection}.files`); |
| client = { bucket, store, db }; |
| } else { |
| const collection = this.opts.collection || 'keyv'; |
| const store = db.collection(collection); |
| client = { store, db }; |
| } |
|
|
| storeMap.set(storeKey, client); |
| return client; |
| } catch (error) { |
| this.emit('error', error); |
| throw error; |
| } |
| } |
|
|
| async get(key: string): Promise<unknown> { |
| const client = await this._getClient(); |
|
|
| if (this.opts.useGridFS && this.isGridFSClient(client)) { |
| await client.store.updateOne( |
| { |
| filename: key, |
| }, |
| { |
| $set: { |
| 'metadata.lastAccessed': new Date(), |
| }, |
| }, |
| ); |
|
|
| const stream = client.bucket.openDownloadStreamByName(key); |
|
|
| return new Promise((resolve) => { |
| const resp: Uint8Array[] = []; |
| stream.on('error', () => { |
| resolve(undefined); |
| }); |
|
|
| stream.on('end', () => { |
| const data = Buffer.concat(resp).toString('utf8'); |
| resolve(data); |
| }); |
|
|
| stream.on('data', (chunk: Uint8Array) => { |
| resp.push(chunk); |
| }); |
| }); |
| } |
|
|
| const document = await client.store.findOne({ key: { $eq: key } }); |
|
|
| if (!document) { |
| return undefined; |
| } |
|
|
| return document.value; |
| } |
|
|
| async getMany(keys: string[]): Promise<unknown[]> { |
| const client = await this._getClient(); |
|
|
| if (this.opts.useGridFS) { |
| const promises = []; |
| for (const key of keys) { |
| promises.push(this.get(key)); |
| } |
|
|
| const values = await Promise.allSettled(promises); |
| const data: unknown[] = []; |
| for (const value of values) { |
| data.push(value.status === 'fulfilled' ? value.value : undefined); |
| } |
|
|
| return data; |
| } |
|
|
| const values = await client.store |
| .find({ key: { $in: keys } }) |
| .project({ _id: 0, value: 1, key: 1 }) |
| .toArray(); |
|
|
| const results: unknown[] = [...keys]; |
| let i = 0; |
| for (const key of keys) { |
| const rowIndex = values.findIndex((row) => row.key === key); |
| results[i] = rowIndex > -1 ? values[rowIndex].value : undefined; |
| i++; |
| } |
|
|
| return results; |
| } |
|
|
| async set(key: string, value: string, ttl?: number): Promise<unknown> { |
| const client = await this._getClient(); |
| const expiresAt = typeof ttl === 'number' ? new Date(Date.now() + ttl) : null; |
|
|
| if (this.opts.useGridFS && this.isGridFSClient(client)) { |
| const stream = client.bucket.openUploadStream(key, { |
| metadata: { |
| expiresAt, |
| lastAccessed: new Date(), |
| }, |
| }); |
|
|
| return new Promise((resolve) => { |
| stream.on('finish', () => { |
| resolve(stream); |
| }); |
| stream.end(value); |
| }); |
| } |
|
|
| await client.store.updateOne( |
| { key: { $eq: key } }, |
| { $set: { key, value, expiresAt } }, |
| { upsert: true }, |
| ); |
| } |
|
|
| async delete(key: string): Promise<boolean> { |
| const client = await this._getClient(); |
|
|
| if (this.opts.useGridFS && this.isGridFSClient(client)) { |
| try { |
| const bucket = new GridFSBucket(client.db, { |
| bucketName: this.opts.collection, |
| }); |
| const files = await bucket.find({ filename: key }).toArray(); |
| if (files.length > 0) { |
| await client.bucket.delete(files[0]._id); |
| } |
| return true; |
| } catch { |
| return false; |
| } |
| } |
|
|
| const object = await client.store.deleteOne({ key: { $eq: key } }); |
| return object.deletedCount > 0; |
| } |
|
|
| async deleteMany(keys: string[]): Promise<boolean> { |
| const client = await this._getClient(); |
|
|
| if (this.opts.useGridFS && this.isGridFSClient(client)) { |
| const bucket = new GridFSBucket(client.db, { |
| bucketName: this.opts.collection, |
| }); |
| const files = await bucket.find({ filename: { $in: keys } }).toArray(); |
| if (files.length === 0) { |
| return false; |
| } |
|
|
| await Promise.all(files.map(async (file) => client.bucket.delete(file._id))); |
| return true; |
| } |
|
|
| const object = await client.store.deleteMany({ key: { $in: keys } }); |
| return object.deletedCount > 0; |
| } |
|
|
| async clear(): Promise<void> { |
| const client = await this._getClient(); |
|
|
| if (this.opts.useGridFS && this.isGridFSClient(client)) { |
| try { |
| await client.bucket.drop(); |
| } catch (error: unknown) { |
| |
| const errorCode = |
| error instanceof Error && 'code' in error ? (error as { code?: number }).code : undefined; |
| if (errorCode !== 26) { |
| throw error; |
| } |
| } |
| } |
|
|
| await client.store.deleteMany({ |
| key: { $regex: this.namespace ? `^${this.namespace}:*` : '' }, |
| }); |
| } |
|
|
| async has(key: string): Promise<boolean> { |
| const client = await this._getClient(); |
| const filter = { [this.opts.useGridFS ? 'filename' : 'key']: { $eq: key } }; |
| const document = await client.store.countDocuments(filter, { limit: 1 }); |
| return document !== 0; |
| } |
|
|
| |
| async disconnect(): Promise<boolean> { |
| |
| return true; |
| } |
|
|
| private isGridFSClient(client: Client): client is GridFSClient { |
| return (client as GridFSClient).bucket != null; |
| } |
| } |
|
|
| const keyvMongo = new KeyvMongoCustom({ |
| collection: 'logs', |
| }); |
|
|
| keyvMongo.on('error', (err) => logger.error('KeyvMongo connection error:', err)); |
|
|
| export default keyvMongo; |
|
|