Spaces:
Paused
Paused
| import type { TelegramMessage } from "./bot/types.js"; | |
| import { resolveDefaultAgentId } from "../agents/agent-scope.js"; | |
| // @ts-nocheck | |
| import { hasControlCommand } from "../auto-reply/command-detection.js"; | |
| import { | |
| createInboundDebouncer, | |
| resolveInboundDebounceMs, | |
| } from "../auto-reply/inbound-debounce.js"; | |
| import { buildCommandsPaginationKeyboard } from "../auto-reply/reply/commands-info.js"; | |
| import { listSkillCommandsForAgents } from "../auto-reply/skill-commands.js"; | |
| import { buildCommandsMessagePaginated } from "../auto-reply/status.js"; | |
| import { resolveChannelConfigWrites } from "../channels/plugins/config-writes.js"; | |
| import { loadConfig } from "../config/config.js"; | |
| import { writeConfigFile } from "../config/io.js"; | |
| import { danger, logVerbose, warn } from "../globals.js"; | |
| import { readChannelAllowFromStore } from "../pairing/pairing-store.js"; | |
| import { withTelegramApiErrorLogging } from "./api-logging.js"; | |
| import { firstDefined, isSenderAllowed, normalizeAllowFromWithStore } from "./bot-access.js"; | |
| import { RegisterTelegramHandlerParams } from "./bot-native-commands.js"; | |
| import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js"; | |
| import { resolveMedia } from "./bot/delivery.js"; | |
| import { resolveTelegramForumThreadId } from "./bot/helpers.js"; | |
| import { migrateTelegramGroupConfig } from "./group-migration.js"; | |
| import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js"; | |
| import { buildInlineKeyboard } from "./send.js"; | |
| export const registerTelegramHandlers = ({ | |
| cfg, | |
| accountId, | |
| bot, | |
| opts, | |
| runtime, | |
| mediaMaxBytes, | |
| telegramCfg, | |
| groupAllowFrom, | |
| resolveGroupPolicy, | |
| resolveTelegramGroupConfig, | |
| shouldSkipUpdate, | |
| processMessage, | |
| logger, | |
| }: RegisterTelegramHandlerParams) => { | |
| const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; | |
| const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = 1500; | |
| const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; | |
| const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; | |
| const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; | |
| const mediaGroupBuffer = new Map<string, MediaGroupEntry>(); | |
| let mediaGroupProcessing: Promise<void> = Promise.resolve(); | |
| type TextFragmentEntry = { | |
| key: string; | |
| messages: Array<{ msg: TelegramMessage; ctx: unknown; receivedAtMs: number }>; | |
| timer: ReturnType<typeof setTimeout>; | |
| }; | |
| const textFragmentBuffer = new Map<string, TextFragmentEntry>(); | |
| let textFragmentProcessing: Promise<void> = Promise.resolve(); | |
| const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); | |
| type TelegramDebounceEntry = { | |
| ctx: unknown; | |
| msg: TelegramMessage; | |
| allMedia: Array<{ path: string; contentType?: string }>; | |
| storeAllowFrom: string[]; | |
| debounceKey: string | null; | |
| botUsername?: string; | |
| }; | |
| const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({ | |
| debounceMs, | |
| buildKey: (entry) => entry.debounceKey, | |
| shouldDebounce: (entry) => { | |
| if (entry.allMedia.length > 0) { | |
| return false; | |
| } | |
| const text = entry.msg.text ?? entry.msg.caption ?? ""; | |
| if (!text.trim()) { | |
| return false; | |
| } | |
| return !hasControlCommand(text, cfg, { botUsername: entry.botUsername }); | |
| }, | |
| onFlush: async (entries) => { | |
| const last = entries.at(-1); | |
| if (!last) { | |
| return; | |
| } | |
| if (entries.length === 1) { | |
| await processMessage(last.ctx, last.allMedia, last.storeAllowFrom); | |
| return; | |
| } | |
| const combinedText = entries | |
| .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") | |
| .filter(Boolean) | |
| .join("\n"); | |
| if (!combinedText.trim()) { | |
| return; | |
| } | |
| const first = entries[0]; | |
| const baseCtx = first.ctx as { me?: unknown; getFile?: unknown } & Record<string, unknown>; | |
| const getFile = | |
| typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); | |
| const syntheticMessage: TelegramMessage = { | |
| ...first.msg, | |
| text: combinedText, | |
| caption: undefined, | |
| caption_entities: undefined, | |
| entities: undefined, | |
| date: last.msg.date ?? first.msg.date, | |
| }; | |
| const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; | |
| await processMessage( | |
| { message: syntheticMessage, me: baseCtx.me, getFile }, | |
| [], | |
| first.storeAllowFrom, | |
| messageIdOverride ? { messageIdOverride } : undefined, | |
| ); | |
| }, | |
| onError: (err) => { | |
| runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`)); | |
| }, | |
| }); | |
| const processMediaGroup = async (entry: MediaGroupEntry) => { | |
| try { | |
| entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); | |
| const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); | |
| const primaryEntry = captionMsg ?? entry.messages[0]; | |
| const allMedia: Array<{ | |
| path: string; | |
| contentType?: string; | |
| stickerMetadata?: { emoji?: string; setName?: string; fileId?: string }; | |
| }> = []; | |
| for (const { ctx } of entry.messages) { | |
| const media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); | |
| if (media) { | |
| allMedia.push({ | |
| path: media.path, | |
| contentType: media.contentType, | |
| stickerMetadata: media.stickerMetadata, | |
| }); | |
| } | |
| } | |
| const storeAllowFrom = await readChannelAllowFromStore("telegram").catch(() => []); | |
| await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); | |
| } catch (err) { | |
| runtime.error?.(danger(`media group handler failed: ${String(err)}`)); | |
| } | |
| }; | |
| const flushTextFragments = async (entry: TextFragmentEntry) => { | |
| try { | |
| entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); | |
| const first = entry.messages[0]; | |
| const last = entry.messages.at(-1); | |
| if (!first || !last) { | |
| return; | |
| } | |
| const combinedText = entry.messages.map((m) => m.msg.text ?? "").join(""); | |
| if (!combinedText.trim()) { | |
| return; | |
| } | |
| const syntheticMessage: TelegramMessage = { | |
| ...first.msg, | |
| text: combinedText, | |
| caption: undefined, | |
| caption_entities: undefined, | |
| entities: undefined, | |
| date: last.msg.date ?? first.msg.date, | |
| }; | |
| const storeAllowFrom = await readChannelAllowFromStore("telegram").catch(() => []); | |
| const baseCtx = first.ctx as { me?: unknown; getFile?: unknown } & Record<string, unknown>; | |
| const getFile = | |
| typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); | |
| await processMessage( | |
| { message: syntheticMessage, me: baseCtx.me, getFile }, | |
| [], | |
| storeAllowFrom, | |
| { messageIdOverride: String(last.msg.message_id) }, | |
| ); | |
| } catch (err) { | |
| runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); | |
| } | |
| }; | |
| const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { | |
| clearTimeout(entry.timer); | |
| entry.timer = setTimeout(async () => { | |
| textFragmentBuffer.delete(entry.key); | |
| textFragmentProcessing = textFragmentProcessing | |
| .then(async () => { | |
| await flushTextFragments(entry); | |
| }) | |
| .catch(() => undefined); | |
| await textFragmentProcessing; | |
| }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); | |
| }; | |
| bot.on("callback_query", async (ctx) => { | |
| const callback = ctx.callbackQuery; | |
| if (!callback) { | |
| return; | |
| } | |
| if (shouldSkipUpdate(ctx)) { | |
| return; | |
| } | |
| // Answer immediately to prevent Telegram from retrying while we process | |
| await withTelegramApiErrorLogging({ | |
| operation: "answerCallbackQuery", | |
| runtime, | |
| fn: () => bot.api.answerCallbackQuery(callback.id), | |
| }).catch(() => {}); | |
| try { | |
| const data = (callback.data ?? "").trim(); | |
| const callbackMessage = callback.message; | |
| if (!data || !callbackMessage) { | |
| return; | |
| } | |
| const inlineButtonsScope = resolveTelegramInlineButtonsScope({ | |
| cfg, | |
| accountId, | |
| }); | |
| if (inlineButtonsScope === "off") { | |
| return; | |
| } | |
| const chatId = callbackMessage.chat.id; | |
| const isGroup = | |
| callbackMessage.chat.type === "group" || callbackMessage.chat.type === "supergroup"; | |
| if (inlineButtonsScope === "dm" && isGroup) { | |
| return; | |
| } | |
| if (inlineButtonsScope === "group" && !isGroup) { | |
| return; | |
| } | |
| const messageThreadId = (callbackMessage as { message_thread_id?: number }).message_thread_id; | |
| const isForum = (callbackMessage.chat as { is_forum?: boolean }).is_forum === true; | |
| const resolvedThreadId = resolveTelegramForumThreadId({ | |
| isForum, | |
| messageThreadId, | |
| }); | |
| const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId); | |
| const storeAllowFrom = await readChannelAllowFromStore("telegram").catch(() => []); | |
| const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); | |
| const effectiveGroupAllow = normalizeAllowFromWithStore({ | |
| allowFrom: groupAllowOverride ?? groupAllowFrom, | |
| storeAllowFrom, | |
| }); | |
| const effectiveDmAllow = normalizeAllowFromWithStore({ | |
| allowFrom: telegramCfg.allowFrom, | |
| storeAllowFrom, | |
| }); | |
| const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; | |
| const senderId = callback.from?.id ? String(callback.from.id) : ""; | |
| const senderUsername = callback.from?.username ?? ""; | |
| if (isGroup) { | |
| if (groupConfig?.enabled === false) { | |
| logVerbose(`Blocked telegram group ${chatId} (group disabled)`); | |
| return; | |
| } | |
| if (topicConfig?.enabled === false) { | |
| logVerbose( | |
| `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, | |
| ); | |
| return; | |
| } | |
| if (typeof groupAllowOverride !== "undefined") { | |
| const allowed = | |
| senderId && | |
| isSenderAllowed({ | |
| allow: effectiveGroupAllow, | |
| senderId, | |
| senderUsername, | |
| }); | |
| if (!allowed) { | |
| logVerbose( | |
| `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, | |
| ); | |
| return; | |
| } | |
| } | |
| const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; | |
| const groupPolicy = telegramCfg.groupPolicy ?? defaultGroupPolicy ?? "open"; | |
| if (groupPolicy === "disabled") { | |
| logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); | |
| return; | |
| } | |
| if (groupPolicy === "allowlist") { | |
| if (!senderId) { | |
| logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); | |
| return; | |
| } | |
| if (!effectiveGroupAllow.hasEntries) { | |
| logVerbose( | |
| "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", | |
| ); | |
| return; | |
| } | |
| if ( | |
| !isSenderAllowed({ | |
| allow: effectiveGroupAllow, | |
| senderId, | |
| senderUsername, | |
| }) | |
| ) { | |
| logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); | |
| return; | |
| } | |
| } | |
| const groupAllowlist = resolveGroupPolicy(chatId); | |
| if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { | |
| logger.info( | |
| { chatId, title: callbackMessage.chat.title, reason: "not-allowed" }, | |
| "skipping group message", | |
| ); | |
| return; | |
| } | |
| } | |
| if (inlineButtonsScope === "allowlist") { | |
| if (!isGroup) { | |
| if (dmPolicy === "disabled") { | |
| return; | |
| } | |
| if (dmPolicy !== "open") { | |
| const allowed = | |
| effectiveDmAllow.hasWildcard || | |
| (effectiveDmAllow.hasEntries && | |
| isSenderAllowed({ | |
| allow: effectiveDmAllow, | |
| senderId, | |
| senderUsername, | |
| })); | |
| if (!allowed) { | |
| return; | |
| } | |
| } | |
| } else { | |
| const allowed = | |
| effectiveGroupAllow.hasWildcard || | |
| (effectiveGroupAllow.hasEntries && | |
| isSenderAllowed({ | |
| allow: effectiveGroupAllow, | |
| senderId, | |
| senderUsername, | |
| })); | |
| if (!allowed) { | |
| return; | |
| } | |
| } | |
| } | |
| const paginationMatch = data.match(/^commands_page_(\d+|noop)(?::(.+))?$/); | |
| if (paginationMatch) { | |
| const pageValue = paginationMatch[1]; | |
| if (pageValue === "noop") { | |
| return; | |
| } | |
| const page = Number.parseInt(pageValue, 10); | |
| if (Number.isNaN(page) || page < 1) { | |
| return; | |
| } | |
| const agentId = paginationMatch[2]?.trim() || resolveDefaultAgentId(cfg) || undefined; | |
| const skillCommands = listSkillCommandsForAgents({ | |
| cfg, | |
| agentIds: agentId ? [agentId] : undefined, | |
| }); | |
| const result = buildCommandsMessagePaginated(cfg, skillCommands, { | |
| page, | |
| surface: "telegram", | |
| }); | |
| const keyboard = | |
| result.totalPages > 1 | |
| ? buildInlineKeyboard( | |
| buildCommandsPaginationKeyboard(result.currentPage, result.totalPages, agentId), | |
| ) | |
| : undefined; | |
| try { | |
| await bot.api.editMessageText( | |
| callbackMessage.chat.id, | |
| callbackMessage.message_id, | |
| result.text, | |
| keyboard ? { reply_markup: keyboard } : undefined, | |
| ); | |
| } catch (editErr) { | |
| const errStr = String(editErr); | |
| if (!errStr.includes("message is not modified")) { | |
| throw editErr; | |
| } | |
| } | |
| return; | |
| } | |
| const syntheticMessage: TelegramMessage = { | |
| ...callbackMessage, | |
| from: callback.from, | |
| text: data, | |
| caption: undefined, | |
| caption_entities: undefined, | |
| entities: undefined, | |
| }; | |
| const getFile = typeof ctx.getFile === "function" ? ctx.getFile.bind(ctx) : async () => ({}); | |
| await processMessage({ message: syntheticMessage, me: ctx.me, getFile }, [], storeAllowFrom, { | |
| forceWasMentioned: true, | |
| messageIdOverride: callback.id, | |
| }); | |
| } catch (err) { | |
| runtime.error?.(danger(`callback handler failed: ${String(err)}`)); | |
| } | |
| }); | |
| // Handle group migration to supergroup (chat ID changes) | |
| bot.on("message:migrate_to_chat_id", async (ctx) => { | |
| try { | |
| const msg = ctx.message; | |
| if (!msg?.migrate_to_chat_id) { | |
| return; | |
| } | |
| if (shouldSkipUpdate(ctx)) { | |
| return; | |
| } | |
| const oldChatId = String(msg.chat.id); | |
| const newChatId = String(msg.migrate_to_chat_id); | |
| const chatTitle = (msg.chat as { title?: string }).title ?? "Unknown"; | |
| runtime.log?.(warn(`[telegram] Group migrated: "${chatTitle}" ${oldChatId} → ${newChatId}`)); | |
| if (!resolveChannelConfigWrites({ cfg, channelId: "telegram", accountId })) { | |
| runtime.log?.(warn("[telegram] Config writes disabled; skipping group config migration.")); | |
| return; | |
| } | |
| // Check if old chat ID has config and migrate it | |
| const currentConfig = loadConfig(); | |
| const migration = migrateTelegramGroupConfig({ | |
| cfg: currentConfig, | |
| accountId, | |
| oldChatId, | |
| newChatId, | |
| }); | |
| if (migration.migrated) { | |
| runtime.log?.(warn(`[telegram] Migrating group config from ${oldChatId} to ${newChatId}`)); | |
| migrateTelegramGroupConfig({ cfg, accountId, oldChatId, newChatId }); | |
| await writeConfigFile(currentConfig); | |
| runtime.log?.(warn(`[telegram] Group config migrated and saved successfully`)); | |
| } else if (migration.skippedExisting) { | |
| runtime.log?.( | |
| warn( | |
| `[telegram] Group config already exists for ${newChatId}; leaving ${oldChatId} unchanged`, | |
| ), | |
| ); | |
| } else { | |
| runtime.log?.( | |
| warn(`[telegram] No config found for old group ID ${oldChatId}, migration logged only`), | |
| ); | |
| } | |
| } catch (err) { | |
| runtime.error?.(danger(`[telegram] Group migration handler failed: ${String(err)}`)); | |
| } | |
| }); | |
| bot.on("message", async (ctx) => { | |
| try { | |
| const msg = ctx.message; | |
| if (!msg) { | |
| return; | |
| } | |
| if (shouldSkipUpdate(ctx)) { | |
| return; | |
| } | |
| const chatId = msg.chat.id; | |
| const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; | |
| const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; | |
| const isForum = (msg.chat as { is_forum?: boolean }).is_forum === true; | |
| const resolvedThreadId = resolveTelegramForumThreadId({ | |
| isForum, | |
| messageThreadId, | |
| }); | |
| const storeAllowFrom = await readChannelAllowFromStore("telegram").catch(() => []); | |
| const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId); | |
| const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); | |
| const effectiveGroupAllow = normalizeAllowFromWithStore({ | |
| allowFrom: groupAllowOverride ?? groupAllowFrom, | |
| storeAllowFrom, | |
| }); | |
| const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; | |
| if (isGroup) { | |
| if (groupConfig?.enabled === false) { | |
| logVerbose(`Blocked telegram group ${chatId} (group disabled)`); | |
| return; | |
| } | |
| if (topicConfig?.enabled === false) { | |
| logVerbose( | |
| `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, | |
| ); | |
| return; | |
| } | |
| if (hasGroupAllowOverride) { | |
| const senderId = msg.from?.id; | |
| const senderUsername = msg.from?.username ?? ""; | |
| const allowed = | |
| senderId != null && | |
| isSenderAllowed({ | |
| allow: effectiveGroupAllow, | |
| senderId: String(senderId), | |
| senderUsername, | |
| }); | |
| if (!allowed) { | |
| logVerbose( | |
| `Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`, | |
| ); | |
| return; | |
| } | |
| } | |
| // Group policy filtering: controls how group messages are handled | |
| // - "open": groups bypass allowFrom, only mention-gating applies | |
| // - "disabled": block all group messages entirely | |
| // - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom | |
| const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; | |
| const groupPolicy = telegramCfg.groupPolicy ?? defaultGroupPolicy ?? "open"; | |
| if (groupPolicy === "disabled") { | |
| logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); | |
| return; | |
| } | |
| if (groupPolicy === "allowlist") { | |
| // For allowlist mode, the sender (msg.from.id) must be in allowFrom | |
| const senderId = msg.from?.id; | |
| if (senderId == null) { | |
| logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); | |
| return; | |
| } | |
| if (!effectiveGroupAllow.hasEntries) { | |
| logVerbose( | |
| "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", | |
| ); | |
| return; | |
| } | |
| const senderUsername = msg.from?.username ?? ""; | |
| if ( | |
| !isSenderAllowed({ | |
| allow: effectiveGroupAllow, | |
| senderId: String(senderId), | |
| senderUsername, | |
| }) | |
| ) { | |
| logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); | |
| return; | |
| } | |
| } | |
| // Group allowlist based on configured group IDs. | |
| const groupAllowlist = resolveGroupPolicy(chatId); | |
| if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { | |
| logger.info( | |
| { chatId, title: msg.chat.title, reason: "not-allowed" }, | |
| "skipping group message", | |
| ); | |
| return; | |
| } | |
| } | |
| // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). | |
| // We buffer “near-limit” messages and append immediately-following parts. | |
| const text = typeof msg.text === "string" ? msg.text : undefined; | |
| const isCommandLike = (text ?? "").trim().startsWith("/"); | |
| if (text && !isCommandLike) { | |
| const nowMs = Date.now(); | |
| const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; | |
| const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; | |
| const existing = textFragmentBuffer.get(key); | |
| if (existing) { | |
| const last = existing.messages.at(-1); | |
| const lastMsgId = last?.msg.message_id; | |
| const lastReceivedAtMs = last?.receivedAtMs ?? nowMs; | |
| const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity; | |
| const timeGapMs = nowMs - lastReceivedAtMs; | |
| const canAppend = | |
| idGap > 0 && | |
| idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP && | |
| timeGapMs >= 0 && | |
| timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS; | |
| if (canAppend) { | |
| const currentTotalChars = existing.messages.reduce( | |
| (sum, m) => sum + (m.msg.text?.length ?? 0), | |
| 0, | |
| ); | |
| const nextTotalChars = currentTotalChars + text.length; | |
| if ( | |
| existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS && | |
| nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS | |
| ) { | |
| existing.messages.push({ msg, ctx, receivedAtMs: nowMs }); | |
| scheduleTextFragmentFlush(existing); | |
| return; | |
| } | |
| } | |
| // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. | |
| clearTimeout(existing.timer); | |
| textFragmentBuffer.delete(key); | |
| textFragmentProcessing = textFragmentProcessing | |
| .then(async () => { | |
| await flushTextFragments(existing); | |
| }) | |
| .catch(() => undefined); | |
| await textFragmentProcessing; | |
| } | |
| const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; | |
| if (shouldStart) { | |
| const entry: TextFragmentEntry = { | |
| key, | |
| messages: [{ msg, ctx, receivedAtMs: nowMs }], | |
| timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS), | |
| }; | |
| textFragmentBuffer.set(key, entry); | |
| scheduleTextFragmentFlush(entry); | |
| return; | |
| } | |
| } | |
| // Media group handling - buffer multi-image messages | |
| const mediaGroupId = (msg as { media_group_id?: string }).media_group_id; | |
| if (mediaGroupId) { | |
| const existing = mediaGroupBuffer.get(mediaGroupId); | |
| if (existing) { | |
| clearTimeout(existing.timer); | |
| existing.messages.push({ msg, ctx }); | |
| existing.timer = setTimeout(async () => { | |
| mediaGroupBuffer.delete(mediaGroupId); | |
| mediaGroupProcessing = mediaGroupProcessing | |
| .then(async () => { | |
| await processMediaGroup(existing); | |
| }) | |
| .catch(() => undefined); | |
| await mediaGroupProcessing; | |
| }, MEDIA_GROUP_TIMEOUT_MS); | |
| } else { | |
| const entry: MediaGroupEntry = { | |
| messages: [{ msg, ctx }], | |
| timer: setTimeout(async () => { | |
| mediaGroupBuffer.delete(mediaGroupId); | |
| mediaGroupProcessing = mediaGroupProcessing | |
| .then(async () => { | |
| await processMediaGroup(entry); | |
| }) | |
| .catch(() => undefined); | |
| await mediaGroupProcessing; | |
| }, MEDIA_GROUP_TIMEOUT_MS), | |
| }; | |
| mediaGroupBuffer.set(mediaGroupId, entry); | |
| } | |
| return; | |
| } | |
| let media: Awaited<ReturnType<typeof resolveMedia>> = null; | |
| try { | |
| media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); | |
| } catch (mediaErr) { | |
| const errMsg = String(mediaErr); | |
| if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) { | |
| const limitMb = Math.round(mediaMaxBytes / (1024 * 1024)); | |
| await withTelegramApiErrorLogging({ | |
| operation: "sendMessage", | |
| runtime, | |
| fn: () => | |
| bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, { | |
| reply_to_message_id: msg.message_id, | |
| }), | |
| }).catch(() => {}); | |
| logger.warn({ chatId, error: errMsg }, "media exceeds size limit"); | |
| return; | |
| } | |
| throw mediaErr; | |
| } | |
| // Skip sticker-only messages where the sticker was skipped (animated/video) | |
| // These have no media and no text content to process. | |
| const hasText = Boolean((msg.text ?? msg.caption ?? "").trim()); | |
| if (msg.sticker && !media && !hasText) { | |
| logVerbose("telegram: skipping sticker-only message (unsupported sticker type)"); | |
| return; | |
| } | |
| const allMedia = media | |
| ? [ | |
| { | |
| path: media.path, | |
| contentType: media.contentType, | |
| stickerMetadata: media.stickerMetadata, | |
| }, | |
| ] | |
| : []; | |
| const senderId = msg.from?.id ? String(msg.from.id) : ""; | |
| const conversationKey = | |
| resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); | |
| const debounceKey = senderId | |
| ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}` | |
| : null; | |
| await inboundDebouncer.enqueue({ | |
| ctx, | |
| msg, | |
| allMedia, | |
| storeAllowFrom, | |
| debounceKey, | |
| botUsername: ctx.me?.username, | |
| }); | |
| } catch (err) { | |
| runtime.error?.(danger(`handler failed: ${String(err)}`)); | |
| } | |
| }); | |
| }; | |