| import { keyvRedisClient } from '~/cache/redisClients'; |
| import { cacheConfig as cache } from '~/cache/cacheConfig'; |
| import { clusterConfig as cluster } from './config'; |
| import { logger } from '@librechat/data-schemas'; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export class LeaderElection { |
| |
| static readonly LEADER_KEY = `${cache.REDIS_KEY_PREFIX}${cache.GLOBAL_PREFIX_SEPARATOR}LeadingServerUUID`; |
| private static _instance = new LeaderElection(); |
|
|
| readonly UUID: string = crypto.randomUUID(); |
| private refreshTimer: NodeJS.Timeout | undefined = undefined; |
|
|
| |
| |
| constructor() { |
| if (LeaderElection._instance) return LeaderElection._instance; |
|
|
| process.on('SIGTERM', () => this.resign()); |
| process.on('SIGINT', () => this.resign()); |
| LeaderElection._instance = this; |
| } |
|
|
| |
| |
| |
| |
| |
| public async isLeader(): Promise<boolean> { |
| if (!cache.USE_REDIS) return true; |
|
|
| try { |
| const currentLeader = await LeaderElection.getLeaderUUID(); |
| |
| |
| |
| if (currentLeader === this.UUID) return this.refreshTimer != null; |
| if (currentLeader != null) return false; |
|
|
| const delay = Math.random() * 2000; |
| await new Promise((resolve) => setTimeout(resolve, delay)); |
| return await this.electSelf(); |
| } catch (error) { |
| logger.error('Failed to check leadership status:', error); |
| return false; |
| } |
| } |
|
|
| |
| |
| |
| |
| public async resign(): Promise<void> { |
| if (!cache.USE_REDIS) return; |
|
|
| try { |
| this.clearRefreshTimer(); |
|
|
| |
| const script = ` |
| if redis.call("get", KEYS[1]) == ARGV[1] then |
| redis.call("del", KEYS[1]) |
| end |
| `; |
|
|
| await keyvRedisClient!.eval(script, { |
| keys: [LeaderElection.LEADER_KEY], |
| arguments: [this.UUID], |
| }); |
| } catch (error) { |
| logger.error('Failed to release leadership lock:', error); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| public static async getLeaderUUID(): Promise<string | null> { |
| if (!cache.USE_REDIS) return null; |
| return await keyvRedisClient!.get(LeaderElection.LEADER_KEY); |
| } |
|
|
| |
| |
| |
| |
| |
| public clearRefreshTimer(): void { |
| clearInterval(this.refreshTimer); |
| this.refreshTimer = undefined; |
| } |
|
|
| |
| |
| |
| |
| |
| private async electSelf(): Promise<boolean> { |
| try { |
| const result = await keyvRedisClient!.set(LeaderElection.LEADER_KEY, this.UUID, { |
| NX: true, |
| EX: cluster.LEADER_LEASE_DURATION, |
| }); |
|
|
| if (result !== 'OK') return false; |
|
|
| this.clearRefreshTimer(); |
| this.refreshTimer = setInterval(async () => { |
| await this.renewLeadership(); |
| }, cluster.LEADER_RENEW_INTERVAL * 1000); |
| this.refreshTimer.unref(); |
|
|
| return true; |
| } catch (error) { |
| logger.error('Leader election failed:', error); |
| return false; |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| private async renewLeadership(attempts: number = 1): Promise<void> { |
| try { |
| |
| const script = ` |
| if redis.call("get", KEYS[1]) == ARGV[1] then |
| return redis.call("expire", KEYS[1], ARGV[2]) |
| else |
| return 0 |
| end |
| `; |
|
|
| const result = await keyvRedisClient!.eval(script, { |
| keys: [LeaderElection.LEADER_KEY], |
| arguments: [this.UUID, cluster.LEADER_LEASE_DURATION.toString()], |
| }); |
|
|
| if (result === 0) { |
| logger.warn('Lost leadership, clearing refresh timer'); |
| this.clearRefreshTimer(); |
| } |
| } catch (error) { |
| logger.error(`Failed to renew leadership (attempts No.${attempts}):`, error); |
| if (attempts <= cluster.LEADER_RENEW_ATTEMPTS) { |
| await new Promise((resolve) => |
| setTimeout(resolve, cluster.LEADER_RENEW_RETRY_DELAY * 1000), |
| ); |
| await this.renewLeadership(attempts + 1); |
| } else { |
| logger.error('Exceeded maximum attempts to renew leadership.'); |
| this.clearRefreshTimer(); |
| } |
| } |
| } |
| } |
|
|
| const defaultElection = new LeaderElection(); |
| export const isLeader = (): Promise<boolean> => defaultElection.isLeader(); |
|
|