Spaces:
Running
Running
| """Send Message Tool -- cross-channel messaging via platform APIs. | |
| Sends a message to a user or channel on any connected messaging platform | |
| (Telegram, Discord, Slack). Supports listing available targets and resolving | |
| human-friendly channel names to IDs. Works in both CLI and gateway contexts. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from typing import Dict, Optional | |
| import ssl | |
| import time | |
| from agent.redact import redact_sensitive_text | |
| logger = logging.getLogger(__name__) | |
| _TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$") | |
| _FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$") | |
| # Slack conversation IDs: C (public channel), G (private/group channel), D (DM). | |
| # Must be uppercase alphanumeric, 9+ chars. User IDs (U...) and workspace IDs | |
| # (W...) are NOT valid chat.postMessage channel values — posting to them fails | |
| # because the API requires a conversation ID. To DM a user you must first call | |
| # conversations.open to obtain a D... ID. Without this gate, Slack IDs fall | |
| # through to channel-name resolution, which only matches by name and fails. | |
| _SLACK_TARGET_RE = re.compile(r"^\s*([CGD][A-Z0-9]{8,})\s*$") | |
| _WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$") | |
| _YUANBAO_TARGET_RE = re.compile(r"^\s*((?:group|direct):[^:]+)\s*$") | |
| # Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets. | |
| _NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE | |
| # Platforms that address recipients by phone number and accept E.164 format | |
| # (with a leading '+'). Without this, "+15551234567" fails the isdigit() check | |
| # below and falls through to channel-name resolution, which has no way to | |
| # resolve a raw phone number. Keeping the '+' preserves the E.164 form that | |
| # downstream adapters (signal, etc.) expect. | |
| _PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"}) | |
| _E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$") | |
| _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} | |
| _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"} | |
| _AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a"} | |
| _VOICE_EXTS = {".ogg", ".opus"} | |
| _URL_SECRET_QUERY_RE = re.compile( | |
| r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)", | |
| re.IGNORECASE, | |
| ) | |
| _GENERIC_SECRET_ASSIGN_RE = re.compile( | |
| r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)", | |
| re.IGNORECASE, | |
| ) | |
| def _sanitize_error_text(text) -> str: | |
| """Redact secrets from error text before surfacing it to users/models.""" | |
| redacted = redact_sensitive_text(text) | |
| redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted) | |
| redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted) | |
| return redacted | |
| def _error(message: str) -> dict: | |
| """Build a standardized error payload with redacted content.""" | |
| return {"error": _sanitize_error_text(message)} | |
| def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None: | |
| retry_after = getattr(exc, "retry_after", None) | |
| if retry_after is not None: | |
| try: | |
| return max(float(retry_after), 0.0) | |
| except (TypeError, ValueError): | |
| return 1.0 | |
| text = str(exc).lower() | |
| if "timed out" in text or "timeout" in text: | |
| return None | |
| if ( | |
| "bad gateway" in text | |
| or "502" in text | |
| or "too many requests" in text | |
| or "429" in text | |
| or "service unavailable" in text | |
| or "503" in text | |
| or "gateway timeout" in text | |
| or "504" in text | |
| ): | |
| return float(2 ** attempt) | |
| return None | |
| async def _send_telegram_message_with_retry(bot, *, attempts: int = 3, **kwargs): | |
| for attempt in range(attempts): | |
| try: | |
| return await bot.send_message(**kwargs) | |
| except Exception as exc: | |
| delay = _telegram_retry_delay(exc, attempt) | |
| if delay is None or attempt >= attempts - 1: | |
| raise | |
| logger.warning( | |
| "Transient Telegram send failure (attempt %d/%d), retrying in %.1fs: %s", | |
| attempt + 1, | |
| attempts, | |
| delay, | |
| _sanitize_error_text(exc), | |
| ) | |
| await asyncio.sleep(delay) | |
| SEND_MESSAGE_SCHEMA = { | |
| "name": "send_message", | |
| "description": ( | |
| "Send a message to a connected messaging platform, or list available targets.\n\n" | |
| "IMPORTANT: When the user asks to send to a specific channel or person " | |
| "(not just a bare platform name), call send_message(action='list') FIRST to see " | |
| "available targets, then send to the correct one.\n" | |
| "If the user just says a platform name like 'send to telegram', send directly " | |
| "to the home channel without listing first." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "action": { | |
| "type": "string", | |
| "enum": ["send", "list"], | |
| "description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms." | |
| }, | |
| "target": { | |
| "type": "string", | |
| "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org', 'yuanbao:direct:<account_id>' (DM), 'yuanbao:group:<group_code>' (group chat)" | |
| }, | |
| "message": { | |
| "type": "string", | |
| "description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment." | |
| } | |
| }, | |
| "required": [] | |
| } | |
| } | |
| def send_message_tool(args, **kw): | |
| """Handle cross-channel send_message tool calls.""" | |
| action = args.get("action", "send") | |
| if action == "list": | |
| return _handle_list() | |
| return _handle_send(args) | |
| def _handle_list(): | |
| """Return formatted list of available messaging targets.""" | |
| try: | |
| from gateway.channel_directory import format_directory_for_display | |
| return json.dumps({"targets": format_directory_for_display()}) | |
| except Exception as e: | |
| return json.dumps(_error(f"Failed to load channel directory: {e}")) | |
| def _handle_send(args): | |
| """Send a message to a platform target.""" | |
| target = args.get("target", "") | |
| message = args.get("message", "") | |
| if not target or not message: | |
| return tool_error("Both 'target' and 'message' are required when action='send'") | |
| parts = target.split(":", 1) | |
| platform_name = parts[0].strip().lower() | |
| target_ref = parts[1].strip() if len(parts) > 1 else None | |
| chat_id = None | |
| thread_id = None | |
| if target_ref: | |
| chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref) | |
| else: | |
| is_explicit = False | |
| # Resolve human-friendly channel names to numeric IDs | |
| if target_ref and not is_explicit: | |
| try: | |
| from gateway.channel_directory import resolve_channel_name | |
| resolved = resolve_channel_name(platform_name, target_ref) | |
| if resolved: | |
| chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved) | |
| else: | |
| return json.dumps({ | |
| "error": f"Could not resolve '{target_ref}' on {platform_name}. " | |
| f"Use send_message(action='list') to see available targets." | |
| }) | |
| except Exception: | |
| return json.dumps({ | |
| "error": f"Could not resolve '{target_ref}' on {platform_name}. " | |
| f"Try using a numeric channel ID instead." | |
| }) | |
| from tools.interrupt import is_interrupted | |
| if is_interrupted(): | |
| return tool_error("Interrupted") | |
| try: | |
| from gateway.config import load_gateway_config, Platform | |
| config = load_gateway_config() | |
| except Exception as e: | |
| return json.dumps(_error(f"Failed to load gateway config: {e}")) | |
| platform_map = { | |
| "telegram": Platform.TELEGRAM, | |
| "discord": Platform.DISCORD, | |
| "slack": Platform.SLACK, | |
| "whatsapp": Platform.WHATSAPP, | |
| "signal": Platform.SIGNAL, | |
| "bluebubbles": Platform.BLUEBUBBLES, | |
| "qqbot": Platform.QQBOT, | |
| "matrix": Platform.MATRIX, | |
| "mattermost": Platform.MATTERMOST, | |
| "homeassistant": Platform.HOMEASSISTANT, | |
| "dingtalk": Platform.DINGTALK, | |
| "feishu": Platform.FEISHU, | |
| "wecom": Platform.WECOM, | |
| "wecom_callback": Platform.WECOM_CALLBACK, | |
| "weixin": Platform.WEIXIN, | |
| "email": Platform.EMAIL, | |
| "sms": Platform.SMS, | |
| "yuanbao": Platform.YUANBAO, | |
| } | |
| platform = platform_map.get(platform_name) | |
| if not platform: | |
| avail = ", ".join(platform_map.keys()) | |
| return tool_error(f"Unknown platform: {platform_name}. Available: {avail}") | |
| pconfig = config.platforms.get(platform) | |
| if not pconfig or not pconfig.enabled: | |
| # Weixin can be configured purely via .env; synthesize a pconfig so | |
| # send_message and cron delivery work without a gateway.yaml entry. | |
| if platform_name == "weixin": | |
| wx_token = os.getenv("WEIXIN_TOKEN", "").strip() | |
| wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip() | |
| if wx_token and wx_account: | |
| from gateway.config import PlatformConfig | |
| pconfig = PlatformConfig( | |
| enabled=True, | |
| token=wx_token, | |
| extra={ | |
| "account_id": wx_account, | |
| "base_url": os.getenv("WEIXIN_BASE_URL", "").strip(), | |
| "cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(), | |
| }, | |
| ) | |
| else: | |
| return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") | |
| else: | |
| return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") | |
| from gateway.platforms.base import BasePlatformAdapter | |
| media_files, cleaned_message = BasePlatformAdapter.extract_media(message) | |
| mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files) | |
| used_home_channel = False | |
| if not chat_id: | |
| home = config.get_home_channel(platform) | |
| if not home and platform_name == "weixin": | |
| wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip() | |
| if wx_home: | |
| from gateway.config import HomeChannel | |
| home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home") | |
| if home: | |
| chat_id = home.chat_id | |
| used_home_channel = True | |
| else: | |
| return json.dumps({ | |
| "error": f"No home channel set for {platform_name} to determine where to send the message. " | |
| f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', " | |
| f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>" | |
| }) | |
| duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id) | |
| if duplicate_skip: | |
| return json.dumps(duplicate_skip) | |
| try: | |
| from model_tools import _run_async | |
| result = _run_async( | |
| _send_to_platform( | |
| platform, | |
| pconfig, | |
| chat_id, | |
| cleaned_message, | |
| thread_id=thread_id, | |
| media_files=media_files, | |
| ) | |
| ) | |
| if used_home_channel and isinstance(result, dict) and result.get("success"): | |
| result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})" | |
| # Mirror the sent message into the target's gateway session | |
| if isinstance(result, dict) and result.get("success") and mirror_text: | |
| try: | |
| from gateway.mirror import mirror_to_session | |
| from gateway.session_context import get_session_env | |
| source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli") | |
| user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None | |
| if mirror_to_session( | |
| platform_name, | |
| chat_id, | |
| mirror_text, | |
| source_label=source_label, | |
| thread_id=thread_id, | |
| user_id=user_id, | |
| ): | |
| result["mirrored"] = True | |
| except Exception: | |
| pass | |
| if isinstance(result, dict) and "error" in result: | |
| result["error"] = _sanitize_error_text(result["error"]) | |
| return json.dumps(result) | |
| except Exception as e: | |
| return json.dumps(_error(f"Send failed: {e}")) | |
| def _parse_target_ref(platform_name: str, target_ref: str): | |
| """Parse a tool target into chat_id/thread_id and whether it is explicit.""" | |
| if platform_name == "telegram": | |
| match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), match.group(2), True | |
| if platform_name == "feishu": | |
| match = _FEISHU_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), match.group(2), True | |
| if platform_name == "discord": | |
| match = _NUMERIC_TOPIC_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), match.group(2), True | |
| if platform_name == "slack": | |
| match = _SLACK_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), None, True | |
| if platform_name == "weixin": | |
| match = _WEIXIN_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), None, True | |
| if platform_name == "yuanbao": | |
| match = _YUANBAO_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| return match.group(1), None, True | |
| if target_ref.strip().isdigit(): | |
| return f"group:{target_ref.strip()}", None, True | |
| return None, None, False | |
| if platform_name in _PHONE_PLATFORMS: | |
| match = _E164_TARGET_RE.fullmatch(target_ref) | |
| if match: | |
| # Preserve the leading '+' — signal-cli and sms/whatsapp adapters | |
| # expect E.164 format for direct recipients. | |
| return target_ref.strip(), None, True | |
| if target_ref.lstrip("-").isdigit(): | |
| return target_ref, None, True | |
| # Matrix room IDs (start with !) and user IDs (start with @) are explicit | |
| if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")): | |
| return target_ref, None, True | |
| return None, None, False | |
| def _describe_media_for_mirror(media_files): | |
| """Return a human-readable mirror summary when a message only contains media.""" | |
| if not media_files: | |
| return "" | |
| if len(media_files) == 1: | |
| media_path, is_voice = media_files[0] | |
| ext = os.path.splitext(media_path)[1].lower() | |
| if is_voice and ext in _VOICE_EXTS: | |
| return "[Sent voice message]" | |
| if ext in _IMAGE_EXTS: | |
| return "[Sent image attachment]" | |
| if ext in _VIDEO_EXTS: | |
| return "[Sent video attachment]" | |
| if ext in _AUDIO_EXTS: | |
| return "[Sent audio attachment]" | |
| return "[Sent document attachment]" | |
| return f"[Sent {len(media_files)} media attachments]" | |
| def _get_cron_auto_delivery_target(): | |
| """Return the cron scheduler's auto-delivery target for the current run, if any.""" | |
| from gateway.session_context import get_session_env | |
| platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() | |
| chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() | |
| if not platform or not chat_id: | |
| return None | |
| thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None | |
| return { | |
| "platform": platform, | |
| "chat_id": chat_id, | |
| "thread_id": thread_id, | |
| } | |
| def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None): | |
| """Skip redundant cron send_message calls when the scheduler will auto-deliver there.""" | |
| auto_target = _get_cron_auto_delivery_target() | |
| if not auto_target: | |
| return None | |
| same_target = ( | |
| auto_target["platform"] == platform_name | |
| and str(auto_target["chat_id"]) == str(chat_id) | |
| and auto_target.get("thread_id") == thread_id | |
| ) | |
| if not same_target: | |
| return None | |
| target_label = f"{platform_name}:{chat_id}" | |
| if thread_id is not None: | |
| target_label += f":{thread_id}" | |
| return { | |
| "success": True, | |
| "skipped": True, | |
| "reason": "cron_auto_delivery_duplicate_target", | |
| "target": target_label, | |
| "note": ( | |
| f"Skipped send_message to {target_label}. This cron job will already auto-deliver " | |
| "its final response to that same target. Put the intended user-facing content in " | |
| "your final response instead, or use a different target if you want an additional message." | |
| ), | |
| } | |
| async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None): | |
| """Route a message to the appropriate platform sender. | |
| Long messages are automatically chunked to fit within platform limits | |
| using the same smart-splitting algorithm as the gateway adapters | |
| (preserves code-block boundaries, adds part indicators). | |
| """ | |
| from gateway.config import Platform | |
| from gateway.platforms.base import BasePlatformAdapter, utf16_len | |
| from gateway.platforms.discord import DiscordAdapter | |
| from gateway.platforms.slack import SlackAdapter | |
| # Telegram adapter import is optional (requires python-telegram-bot) | |
| try: | |
| from gateway.platforms.telegram import TelegramAdapter | |
| _telegram_available = True | |
| except ImportError: | |
| _telegram_available = False | |
| # Feishu adapter import is optional (requires lark-oapi) | |
| try: | |
| from gateway.platforms.feishu import FeishuAdapter | |
| _feishu_available = True | |
| except ImportError: | |
| _feishu_available = False | |
| media_files = media_files or [] | |
| if platform == Platform.SLACK and message: | |
| try: | |
| slack_adapter = SlackAdapter.__new__(SlackAdapter) | |
| message = slack_adapter.format_message(message) | |
| except Exception: | |
| logger.debug("Failed to apply Slack mrkdwn formatting in _send_to_platform", exc_info=True) | |
| # Platform message length limits (from adapter class attributes) | |
| _MAX_LENGTHS = { | |
| Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH if _telegram_available else 4096, | |
| Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH, | |
| Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH, | |
| } | |
| if _feishu_available: | |
| _MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH | |
| # Smart-chunk the message to fit within platform limits. | |
| # For short messages or platforms without a known limit this is a no-op. | |
| # Telegram measures length in UTF-16 code units, not Unicode codepoints. | |
| max_len = _MAX_LENGTHS.get(platform) | |
| if max_len: | |
| _len_fn = utf16_len if platform == Platform.TELEGRAM else None | |
| chunks = BasePlatformAdapter.truncate_message(message, max_len, len_fn=_len_fn) | |
| else: | |
| chunks = [message] | |
| # --- Telegram: special handling for media attachments --- | |
| if platform == Platform.TELEGRAM: | |
| last_result = None | |
| disable_link_previews = bool(getattr(pconfig, "extra", {}) and pconfig.extra.get("disable_link_previews")) | |
| for i, chunk in enumerate(chunks): | |
| is_last = (i == len(chunks) - 1) | |
| result = await _send_telegram( | |
| pconfig.token, | |
| chat_id, | |
| chunk, | |
| media_files=media_files if is_last else [], | |
| thread_id=thread_id, | |
| disable_link_previews=disable_link_previews, | |
| ) | |
| if isinstance(result, dict) and result.get("error"): | |
| return result | |
| last_result = result | |
| return last_result | |
| # --- Weixin: use the native one-shot adapter helper for text + media --- | |
| if platform == Platform.WEIXIN: | |
| return await _send_weixin(pconfig, chat_id, message, media_files=media_files) | |
| # --- Feishu: use the native one-shot adapter helper for text + media --- | |
| if platform == Platform.FEISHU: | |
| return await _send_feishu(pconfig, chat_id, message, media_files=media_files, thread_id=thread_id) | |
| # --- Discord: special handling for media attachments --- | |
| if platform == Platform.DISCORD: | |
| last_result = None | |
| for i, chunk in enumerate(chunks): | |
| is_last = (i == len(chunks) - 1) | |
| result = await _send_discord( | |
| pconfig.token, | |
| chat_id, | |
| chunk, | |
| media_files=media_files if is_last else [], | |
| thread_id=thread_id, | |
| ) | |
| if isinstance(result, dict) and result.get("error"): | |
| return result | |
| last_result = result | |
| return last_result | |
| # --- Matrix: use the native adapter helper when media is present --- | |
| if platform == Platform.MATRIX and media_files: | |
| last_result = None | |
| for i, chunk in enumerate(chunks): | |
| is_last = (i == len(chunks) - 1) | |
| result = await _send_matrix_via_adapter( | |
| pconfig, | |
| chat_id, | |
| chunk, | |
| media_files=media_files if is_last else [], | |
| thread_id=thread_id, | |
| ) | |
| if isinstance(result, dict) and result.get("error"): | |
| return result | |
| last_result = result | |
| return last_result | |
| # --- Signal: native attachment support via JSON-RPC attachments param --- | |
| if platform == Platform.SIGNAL and media_files: | |
| last_result = None | |
| for i, chunk in enumerate(chunks): | |
| is_last = (i == len(chunks) - 1) | |
| result = await _send_signal( | |
| pconfig.extra, | |
| chat_id, | |
| chunk, | |
| media_files=media_files if is_last else [], | |
| ) | |
| if isinstance(result, dict) and result.get("error"): | |
| return result | |
| last_result = result | |
| return last_result | |
| # --- Non-media platforms --- | |
| if media_files and not message.strip(): | |
| return { | |
| "error": ( | |
| f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, feishu, signal and yuanbao; " | |
| f"target {platform.value} had only media attachments" | |
| ) | |
| } | |
| warning = None | |
| if media_files: | |
| warning = ( | |
| f"MEDIA attachments were omitted for {platform.value}; " | |
| "native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, feishu, signal and yuanbao" | |
| ) | |
| last_result = None | |
| for chunk in chunks: | |
| if platform == Platform.SLACK: | |
| result = await _send_slack(pconfig.token, chat_id, chunk) | |
| elif platform == Platform.WHATSAPP: | |
| result = await _send_whatsapp(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.SIGNAL: | |
| result = await _send_signal(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.EMAIL: | |
| result = await _send_email(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.SMS: | |
| result = await _send_sms(pconfig.api_key, chat_id, chunk) | |
| elif platform == Platform.MATTERMOST: | |
| result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.MATRIX: | |
| result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.HOMEASSISTANT: | |
| result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.DINGTALK: | |
| result = await _send_dingtalk(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.FEISHU: | |
| result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id) | |
| elif platform == Platform.WECOM: | |
| result = await _send_wecom(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.BLUEBUBBLES: | |
| result = await _send_bluebubbles(pconfig.extra, chat_id, chunk) | |
| elif platform == Platform.QQBOT: | |
| result = await _send_qqbot(pconfig, chat_id, chunk) | |
| else: | |
| result = {"error": f"Direct sending not yet implemented for {platform.value}"} | |
| if isinstance(result, dict) and result.get("error"): | |
| return result | |
| last_result = result | |
| if warning and isinstance(last_result, dict) and last_result.get("success"): | |
| warnings = list(last_result.get("warnings", [])) | |
| warnings.append(warning) | |
| last_result["warnings"] = warnings | |
| return last_result | |
| async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None, disable_link_previews=False): | |
| """Send via Telegram Bot API (one-shot, no polling needed). | |
| Applies markdown→MarkdownV2 formatting (same as the gateway adapter) | |
| so that bold, links, and headers render correctly. If the message | |
| already contains HTML tags, it is sent with ``parse_mode='HTML'`` | |
| instead, bypassing MarkdownV2 conversion. | |
| """ | |
| try: | |
| from telegram import Bot | |
| from telegram.constants import ParseMode | |
| # Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML. | |
| # Inspired by github.com/ashaney — PR #1568. | |
| _has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message)) | |
| if _has_html: | |
| formatted = message | |
| send_parse_mode = ParseMode.HTML | |
| else: | |
| # Reuse the gateway adapter's format_message for markdown→MarkdownV2 | |
| try: | |
| from gateway.platforms.telegram import TelegramAdapter | |
| _adapter = TelegramAdapter.__new__(TelegramAdapter) | |
| formatted = _adapter.format_message(message) | |
| except Exception: | |
| # Fallback: send as-is if formatting unavailable | |
| formatted = message | |
| send_parse_mode = ParseMode.MARKDOWN_V2 | |
| bot = Bot(token=token) | |
| int_chat_id = int(chat_id) | |
| media_files = media_files or [] | |
| thread_kwargs = {} | |
| if thread_id is not None: | |
| thread_kwargs["message_thread_id"] = int(thread_id) | |
| if disable_link_previews: | |
| thread_kwargs["disable_web_page_preview"] = True | |
| last_msg = None | |
| warnings = [] | |
| if formatted.strip(): | |
| try: | |
| last_msg = await _send_telegram_message_with_retry( | |
| bot, | |
| chat_id=int_chat_id, text=formatted, | |
| parse_mode=send_parse_mode, **thread_kwargs | |
| ) | |
| except Exception as md_error: | |
| # Parse failed, fall back to plain text | |
| if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower(): | |
| logger.warning( | |
| "Parse mode %s failed in _send_telegram, falling back to plain text: %s", | |
| send_parse_mode, | |
| _sanitize_error_text(md_error), | |
| ) | |
| if not _has_html: | |
| try: | |
| from gateway.platforms.telegram import _strip_mdv2 | |
| plain = _strip_mdv2(formatted) | |
| except Exception: | |
| plain = message | |
| else: | |
| plain = message | |
| last_msg = await _send_telegram_message_with_retry( | |
| bot, | |
| chat_id=int_chat_id, text=plain, | |
| parse_mode=None, **thread_kwargs | |
| ) | |
| else: | |
| raise | |
| for media_path, is_voice in media_files: | |
| if not os.path.exists(media_path): | |
| warning = f"Media file not found, skipping: {media_path}" | |
| logger.warning(warning) | |
| warnings.append(warning) | |
| continue | |
| ext = os.path.splitext(media_path)[1].lower() | |
| try: | |
| with open(media_path, "rb") as f: | |
| if ext in _IMAGE_EXTS: | |
| last_msg = await bot.send_photo( | |
| chat_id=int_chat_id, photo=f, **thread_kwargs | |
| ) | |
| elif ext in _VIDEO_EXTS: | |
| last_msg = await bot.send_video( | |
| chat_id=int_chat_id, video=f, **thread_kwargs | |
| ) | |
| elif ext in _VOICE_EXTS and is_voice: | |
| last_msg = await bot.send_voice( | |
| chat_id=int_chat_id, voice=f, **thread_kwargs | |
| ) | |
| elif ext in _AUDIO_EXTS: | |
| last_msg = await bot.send_audio( | |
| chat_id=int_chat_id, audio=f, **thread_kwargs | |
| ) | |
| else: | |
| last_msg = await bot.send_document( | |
| chat_id=int_chat_id, document=f, **thread_kwargs | |
| ) | |
| except Exception as e: | |
| warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}") | |
| logger.error(warning) | |
| warnings.append(warning) | |
| if last_msg is None: | |
| error = "No deliverable text or media remained after processing MEDIA tags" | |
| if warnings: | |
| return {"error": error, "warnings": warnings} | |
| return {"error": error} | |
| result = { | |
| "success": True, | |
| "platform": "telegram", | |
| "chat_id": chat_id, | |
| "message_id": str(last_msg.message_id), | |
| } | |
| if warnings: | |
| result["warnings"] = warnings | |
| return result | |
| except ImportError: | |
| return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"} | |
| except Exception as e: | |
| return _error(f"Telegram send failed: {e}") | |
| def _derive_forum_thread_name(message: str) -> str: | |
| """Derive a thread name from the first line of the message, capped at 100 chars.""" | |
| first_line = message.strip().split("\n", 1)[0].strip() | |
| # Strip common markdown heading prefixes | |
| first_line = first_line.lstrip("#").strip() | |
| if not first_line: | |
| first_line = "New Post" | |
| return first_line[:100] | |
| # Process-local cache for Discord channel-type probes. Avoids re-probing the | |
| # same channel on every send when the directory cache has no entry (e.g. fresh | |
| # install, or channel created after the last directory build). | |
| _DISCORD_CHANNEL_TYPE_PROBE_CACHE: Dict[str, bool] = {} | |
| def _remember_channel_is_forum(chat_id: str, is_forum: bool) -> None: | |
| _DISCORD_CHANNEL_TYPE_PROBE_CACHE[str(chat_id)] = bool(is_forum) | |
| def _probe_is_forum_cached(chat_id: str) -> Optional[bool]: | |
| return _DISCORD_CHANNEL_TYPE_PROBE_CACHE.get(str(chat_id)) | |
| async def _send_discord(token, chat_id, message, thread_id=None, media_files=None): | |
| """Send a single message via Discord REST API (no websocket client needed). | |
| Chunking is handled by _send_to_platform() before this is called. | |
| When thread_id is provided, the message is sent directly to that thread | |
| via the /channels/{thread_id}/messages endpoint. | |
| Media files are uploaded one-by-one via multipart/form-data after the | |
| text message is sent (same pattern as Telegram). | |
| Forum channels (type 15) reject POST /messages — a thread post is created | |
| automatically via POST /channels/{id}/threads. Media files are uploaded | |
| as multipart attachments on the starter message of the new thread. | |
| Channel type is resolved from the channel directory first, then a | |
| process-local probe cache, and only as a last resort with a live | |
| GET /channels/{id} probe (whose result is memoized). | |
| """ | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp | |
| _proxy = resolve_proxy_url(platform_env_var="DISCORD_PROXY") | |
| _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) | |
| auth_headers = {"Authorization": f"Bot {token}"} | |
| json_headers = {**auth_headers, "Content-Type": "application/json"} | |
| media_files = media_files or [] | |
| last_data = None | |
| warnings = [] | |
| # Thread endpoint: Discord threads are channels; send directly to the thread ID. | |
| if thread_id: | |
| url = f"https://discord.com/api/v10/channels/{thread_id}/messages" | |
| else: | |
| # Check if the target channel is a forum channel (type 15). | |
| # Forum channels reject POST /messages — create a thread post instead. | |
| # Three-layer detection: directory cache → process-local probe | |
| # cache → GET /channels/{id} probe (with result memoized). | |
| _channel_type = None | |
| try: | |
| from gateway.channel_directory import lookup_channel_type | |
| _channel_type = lookup_channel_type("discord", chat_id) | |
| except Exception: | |
| pass | |
| if _channel_type == "forum": | |
| is_forum = True | |
| elif _channel_type is not None: | |
| is_forum = False | |
| else: | |
| cached = _probe_is_forum_cached(chat_id) | |
| if cached is not None: | |
| is_forum = cached | |
| else: | |
| is_forum = False | |
| try: | |
| info_url = f"https://discord.com/api/v10/channels/{chat_id}" | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), **_sess_kw) as info_sess: | |
| async with info_sess.get(info_url, headers=json_headers, **_req_kw) as info_resp: | |
| if info_resp.status == 200: | |
| info = await info_resp.json() | |
| is_forum = info.get("type") == 15 | |
| _remember_channel_is_forum(chat_id, is_forum) | |
| except Exception: | |
| logger.debug("Failed to probe channel type for %s", chat_id, exc_info=True) | |
| if is_forum: | |
| thread_name = _derive_forum_thread_name(message) | |
| thread_url = f"https://discord.com/api/v10/channels/{chat_id}/threads" | |
| # Filter to readable media files up front so we can pick the | |
| # right code path (JSON vs multipart) before opening a session. | |
| valid_media = [] | |
| for media_path, _is_voice in media_files: | |
| if not os.path.exists(media_path): | |
| warning = f"Media file not found, skipping: {media_path}" | |
| logger.warning(warning) | |
| warnings.append(warning) | |
| continue | |
| valid_media.append(media_path) | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60), **_sess_kw) as session: | |
| if valid_media: | |
| # Multipart: payload_json + files[N] creates a forum | |
| # thread with the starter message plus attachments in | |
| # a single API call. | |
| attachments_meta = [ | |
| {"id": str(idx), "filename": os.path.basename(path)} | |
| for idx, path in enumerate(valid_media) | |
| ] | |
| starter_message = {"content": message, "attachments": attachments_meta} | |
| payload_json = json.dumps({"name": thread_name, "message": starter_message}) | |
| form = aiohttp.FormData() | |
| form.add_field("payload_json", payload_json, content_type="application/json") | |
| # Buffer file bytes up front — aiohttp's FormData can | |
| # read lazily and we don't want handles closing under | |
| # it on retry. | |
| try: | |
| for idx, media_path in enumerate(valid_media): | |
| with open(media_path, "rb") as fh: | |
| form.add_field( | |
| f"files[{idx}]", | |
| fh.read(), | |
| filename=os.path.basename(media_path), | |
| ) | |
| async with session.post(thread_url, headers=auth_headers, data=form, **_req_kw) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Discord forum thread creation error ({resp.status}): {body}") | |
| data = await resp.json() | |
| except Exception as e: | |
| return _error(_sanitize_error_text(f"Discord forum thread upload failed: {e}")) | |
| else: | |
| # No media — simple JSON POST creates the thread with | |
| # just the text starter. | |
| async with session.post( | |
| thread_url, | |
| headers=json_headers, | |
| json={ | |
| "name": thread_name, | |
| "message": {"content": message}, | |
| }, | |
| **_req_kw, | |
| ) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Discord forum thread creation error ({resp.status}): {body}") | |
| data = await resp.json() | |
| thread_id_created = data.get("id") | |
| starter_msg_id = (data.get("message") or {}).get("id", thread_id_created) | |
| result = { | |
| "success": True, | |
| "platform": "discord", | |
| "chat_id": chat_id, | |
| "thread_id": thread_id_created, | |
| "message_id": starter_msg_id, | |
| } | |
| if warnings: | |
| result["warnings"] = warnings | |
| return result | |
| url = f"https://discord.com/api/v10/channels/{chat_id}/messages" | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: | |
| # Send text message (skip if empty and media is present) | |
| if message.strip() or not media_files: | |
| async with session.post(url, headers=json_headers, json={"content": message}, **_req_kw) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Discord API error ({resp.status}): {body}") | |
| last_data = await resp.json() | |
| # Send each media file as a separate multipart upload | |
| for media_path, _is_voice in media_files: | |
| if not os.path.exists(media_path): | |
| warning = f"Media file not found, skipping: {media_path}" | |
| logger.warning(warning) | |
| warnings.append(warning) | |
| continue | |
| try: | |
| form = aiohttp.FormData() | |
| filename = os.path.basename(media_path) | |
| with open(media_path, "rb") as f: | |
| form.add_field("files[0]", f, filename=filename) | |
| async with session.post(url, headers=auth_headers, data=form, **_req_kw) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| warning = _sanitize_error_text(f"Failed to send media {media_path}: Discord API error ({resp.status}): {body}") | |
| logger.error(warning) | |
| warnings.append(warning) | |
| continue | |
| last_data = await resp.json() | |
| except Exception as e: | |
| warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}") | |
| logger.error(warning) | |
| warnings.append(warning) | |
| if last_data is None: | |
| error = "No deliverable text or media remained after processing" | |
| if warnings: | |
| return {"error": error, "warnings": warnings} | |
| return {"error": error} | |
| result = {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": last_data.get("id")} | |
| if warnings: | |
| result["warnings"] = warnings | |
| return result | |
| except Exception as e: | |
| return _error(f"Discord send failed: {e}") | |
| async def _send_slack(token, chat_id, message): | |
| """Send via Slack Web API.""" | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp | |
| _proxy = resolve_proxy_url() | |
| _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) | |
| url = "https://slack.com/api/chat.postMessage" | |
| headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: | |
| payload = {"channel": chat_id, "text": message, "mrkdwn": True} | |
| async with session.post(url, headers=headers, json=payload, **_req_kw) as resp: | |
| data = await resp.json() | |
| if data.get("ok"): | |
| return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")} | |
| return _error(f"Slack API error: {data.get('error', 'unknown')}") | |
| except Exception as e: | |
| return _error(f"Slack send failed: {e}") | |
| async def _send_whatsapp(extra, chat_id, message): | |
| """Send via the local WhatsApp bridge HTTP API.""" | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| bridge_port = extra.get("bridge_port", 3000) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| f"http://localhost:{bridge_port}/send", | |
| json={"chatId": chat_id, "message": message}, | |
| timeout=aiohttp.ClientTimeout(total=30), | |
| ) as resp: | |
| if resp.status == 200: | |
| data = await resp.json() | |
| return { | |
| "success": True, | |
| "platform": "whatsapp", | |
| "chat_id": chat_id, | |
| "message_id": data.get("messageId"), | |
| } | |
| body = await resp.text() | |
| return _error(f"WhatsApp bridge error ({resp.status}): {body}") | |
| except Exception as e: | |
| return _error(f"WhatsApp send failed: {e}") | |
| async def _send_signal(extra, chat_id, message, media_files=None): | |
| """Send via signal-cli JSON-RPC API. | |
| Supports both text-only and text-with-attachments (images/audio/documents). | |
| Attachments are sent as an 'attachments' array in the JSON-RPC params. | |
| """ | |
| try: | |
| import httpx | |
| except ImportError: | |
| return {"error": "httpx not installed"} | |
| try: | |
| http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") | |
| account = extra.get("account", "") | |
| if not account: | |
| return {"error": "Signal account not configured"} | |
| params = {"account": account, "message": message} | |
| if chat_id.startswith("group:"): | |
| params["groupId"] = chat_id[6:] | |
| else: | |
| params["recipient"] = [chat_id] | |
| # Add attachments if media_files are present | |
| valid_media = media_files or [] | |
| attachment_paths = [] | |
| for media_path, _is_voice in valid_media: | |
| if os.path.exists(media_path): | |
| attachment_paths.append(media_path) | |
| else: | |
| logger.warning("Signal media file not found, skipping: %s", media_path) | |
| if attachment_paths: | |
| params["attachments"] = attachment_paths | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "method": "send", | |
| "params": params, | |
| "id": f"send_{int(time.time() * 1000)}", | |
| } | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if "error" in data: | |
| return _error(f"Signal RPC error: {data['error']}") | |
| # Return warning for any skipped media files | |
| result = {"success": True, "platform": "signal", "chat_id": chat_id} | |
| if len(attachment_paths) < len(valid_media): | |
| result["warnings"] = [f"Some media files were skipped (not found on disk)"] | |
| return result | |
| except Exception as e: | |
| return _error(f"Signal send failed: {e}") | |
| async def _send_email(extra, chat_id, message): | |
| """Send via SMTP (one-shot, no persistent connection needed).""" | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.utils import formatdate | |
| address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "") | |
| password = os.getenv("EMAIL_PASSWORD", "") | |
| smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "") | |
| try: | |
| smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587")) | |
| except (ValueError, TypeError): | |
| smtp_port = 587 | |
| if not all([address, password, smtp_host]): | |
| return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"} | |
| try: | |
| msg = MIMEText(message, "plain", "utf-8") | |
| msg["From"] = address | |
| msg["To"] = chat_id | |
| msg["Subject"] = "Hermes Agent" | |
| msg["Date"] = formatdate(localtime=True) | |
| server = smtplib.SMTP(smtp_host, smtp_port) | |
| server.starttls(context=ssl.create_default_context()) | |
| server.login(address, password) | |
| server.send_message(msg) | |
| server.quit() | |
| return {"success": True, "platform": "email", "chat_id": chat_id} | |
| except Exception as e: | |
| return _error(f"Email send failed: {e}") | |
| async def _send_sms(auth_token, chat_id, message): | |
| """Send a single SMS via Twilio REST API. | |
| Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST. | |
| Chunking is handled by _send_to_platform() before this is called. | |
| """ | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| import base64 | |
| account_sid = os.getenv("TWILIO_ACCOUNT_SID", "") | |
| from_number = os.getenv("TWILIO_PHONE_NUMBER", "") | |
| if not account_sid or not auth_token or not from_number: | |
| return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"} | |
| # Strip markdown — SMS renders it as literal characters | |
| message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL) | |
| message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL) | |
| message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL) | |
| message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL) | |
| message = re.sub(r"```[a-z]*\n?", "", message) | |
| message = re.sub(r"`(.+?)`", r"\1", message) | |
| message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE) | |
| message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message) | |
| message = re.sub(r"\n{3,}", "\n\n", message) | |
| message = message.strip() | |
| try: | |
| from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp | |
| _proxy = resolve_proxy_url() | |
| _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) | |
| creds = f"{account_sid}:{auth_token}" | |
| encoded = base64.b64encode(creds.encode("ascii")).decode("ascii") | |
| url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json" | |
| headers = {"Authorization": f"Basic {encoded}"} | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: | |
| form_data = aiohttp.FormData() | |
| form_data.add_field("From", from_number) | |
| form_data.add_field("To", chat_id) | |
| form_data.add_field("Body", message) | |
| async with session.post(url, data=form_data, headers=headers, **_req_kw) as resp: | |
| body = await resp.json() | |
| if resp.status >= 400: | |
| error_msg = body.get("message", str(body)) | |
| return _error(f"Twilio API error ({resp.status}): {error_msg}") | |
| msg_sid = body.get("sid", "") | |
| return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid} | |
| except Exception as e: | |
| return _error(f"SMS send failed: {e}") | |
| async def _send_mattermost(token, extra, chat_id, message): | |
| """Send via Mattermost REST API.""" | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/") | |
| token = token or os.getenv("MATTERMOST_TOKEN", "") | |
| if not base_url or not token: | |
| return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"} | |
| url = f"{base_url}/api/v4/posts" | |
| headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: | |
| async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Mattermost API error ({resp.status}): {body}") | |
| data = await resp.json() | |
| return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")} | |
| except Exception as e: | |
| return _error(f"Mattermost send failed: {e}") | |
| async def _send_matrix(token, extra, chat_id, message): | |
| """Send via Matrix Client-Server API. | |
| Converts markdown to HTML for rich rendering in Matrix clients. | |
| Falls back to plain text if the ``markdown`` library is not installed. | |
| """ | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/") | |
| token = token or os.getenv("MATRIX_ACCESS_TOKEN", "") | |
| if not homeserver or not token: | |
| return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"} | |
| txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}" | |
| from urllib.parse import quote | |
| encoded_room = quote(chat_id, safe="") | |
| url = f"{homeserver}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}" | |
| headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
| # Build message payload with optional HTML formatted_body. | |
| payload = {"msgtype": "m.text", "body": message} | |
| try: | |
| import markdown as _md | |
| html = _md.markdown(message, extensions=["fenced_code", "tables"]) | |
| # Convert h1-h6 to bold for Element X compatibility. | |
| html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html) | |
| payload["format"] = "org.matrix.custom.html" | |
| payload["formatted_body"] = html | |
| except ImportError: | |
| pass | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: | |
| async with session.put(url, headers=headers, json=payload) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Matrix API error ({resp.status}): {body}") | |
| data = await resp.json() | |
| return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")} | |
| except Exception as e: | |
| return _error(f"Matrix send failed: {e}") | |
| async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None): | |
| """Send via the Matrix adapter so native Matrix media uploads are preserved.""" | |
| try: | |
| from gateway.platforms.matrix import MatrixAdapter | |
| except ImportError: | |
| return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"} | |
| media_files = media_files or [] | |
| try: | |
| adapter = MatrixAdapter(pconfig) | |
| connected = await adapter.connect() | |
| if not connected: | |
| return _error("Matrix connect failed") | |
| metadata = {"thread_id": thread_id} if thread_id else None | |
| last_result = None | |
| if message.strip(): | |
| last_result = await adapter.send(chat_id, message, metadata=metadata) | |
| if not last_result.success: | |
| return _error(f"Matrix send failed: {last_result.error}") | |
| for media_path, is_voice in media_files: | |
| if not os.path.exists(media_path): | |
| return _error(f"Media file not found: {media_path}") | |
| ext = os.path.splitext(media_path)[1].lower() | |
| if ext in _IMAGE_EXTS: | |
| last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) | |
| elif ext in _VIDEO_EXTS: | |
| last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) | |
| elif ext in _VOICE_EXTS and is_voice: | |
| last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) | |
| elif ext in _AUDIO_EXTS: | |
| last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) | |
| else: | |
| last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) | |
| if not last_result.success: | |
| return _error(f"Matrix media send failed: {last_result.error}") | |
| if last_result is None: | |
| return {"error": "No deliverable text or media remained after processing MEDIA tags"} | |
| return { | |
| "success": True, | |
| "platform": "matrix", | |
| "chat_id": chat_id, | |
| "message_id": last_result.message_id, | |
| } | |
| except Exception as e: | |
| return _error(f"Matrix send failed: {e}") | |
| finally: | |
| try: | |
| await adapter.disconnect() | |
| except Exception: | |
| pass | |
| async def _send_homeassistant(token, extra, chat_id, message): | |
| """Send via Home Assistant notify service.""" | |
| try: | |
| import aiohttp | |
| except ImportError: | |
| return {"error": "aiohttp not installed. Run: pip install aiohttp"} | |
| try: | |
| hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/") | |
| token = token or os.getenv("HASS_TOKEN", "") | |
| if not hass_url or not token: | |
| return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"} | |
| url = f"{hass_url}/api/services/notify/notify" | |
| headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: | |
| async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp: | |
| if resp.status not in (200, 201): | |
| body = await resp.text() | |
| return _error(f"Home Assistant API error ({resp.status}): {body}") | |
| return {"success": True, "platform": "homeassistant", "chat_id": chat_id} | |
| except Exception as e: | |
| return _error(f"Home Assistant send failed: {e}") | |
| async def _send_dingtalk(extra, chat_id, message): | |
| """Send via DingTalk robot webhook. | |
| Note: The gateway's DingTalk adapter uses per-session webhook URLs from | |
| incoming messages (dingtalk-stream SDK). For cross-platform send_message | |
| delivery we use a static robot webhook URL instead, which must be | |
| configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the | |
| platform's extra config. | |
| """ | |
| try: | |
| import httpx | |
| except ImportError: | |
| return {"error": "httpx not installed"} | |
| try: | |
| webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "") | |
| if not webhook_url: | |
| return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."} | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.post( | |
| webhook_url, | |
| json={"msgtype": "text", "text": {"content": message}}, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("errcode", 0) != 0: | |
| return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}") | |
| return {"success": True, "platform": "dingtalk", "chat_id": chat_id} | |
| except Exception as e: | |
| return _error(f"DingTalk send failed: {e}") | |
| async def _send_wecom(extra, chat_id, message): | |
| """Send via WeCom using the adapter's WebSocket send pipeline.""" | |
| try: | |
| from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements | |
| if not check_wecom_requirements(): | |
| return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."} | |
| except ImportError: | |
| return {"error": "WeCom adapter not available."} | |
| try: | |
| from gateway.config import PlatformConfig | |
| pconfig = PlatformConfig(extra=extra) | |
| adapter = WeComAdapter(pconfig) | |
| connected = await adapter.connect() | |
| if not connected: | |
| return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}") | |
| try: | |
| result = await adapter.send(chat_id, message) | |
| if not result.success: | |
| return _error(f"WeCom send failed: {result.error}") | |
| return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id} | |
| finally: | |
| await adapter.disconnect() | |
| except Exception as e: | |
| return _error(f"WeCom send failed: {e}") | |
| async def _send_weixin(pconfig, chat_id, message, media_files=None): | |
| """Send via Weixin iLink using the native adapter helper.""" | |
| try: | |
| from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct | |
| if not check_weixin_requirements(): | |
| return {"error": "Weixin requirements not met. Need aiohttp + cryptography."} | |
| except ImportError: | |
| return {"error": "Weixin adapter not available."} | |
| try: | |
| return await send_weixin_direct( | |
| extra=pconfig.extra, | |
| token=pconfig.token, | |
| chat_id=chat_id, | |
| message=message, | |
| media_files=media_files, | |
| ) | |
| except Exception as e: | |
| return _error(f"Weixin send failed: {e}") | |
| async def _send_bluebubbles(extra, chat_id, message): | |
| """Send via BlueBubbles iMessage server using the adapter's REST API.""" | |
| try: | |
| from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements | |
| if not check_bluebubbles_requirements(): | |
| return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."} | |
| except ImportError: | |
| return {"error": "BlueBubbles adapter not available."} | |
| try: | |
| from gateway.config import PlatformConfig | |
| pconfig = PlatformConfig(extra=extra) | |
| adapter = BlueBubblesAdapter(pconfig) | |
| connected = await adapter.connect() | |
| if not connected: | |
| return _error("BlueBubbles: failed to connect to server") | |
| try: | |
| result = await adapter.send(chat_id, message) | |
| if not result.success: | |
| return _error(f"BlueBubbles send failed: {result.error}") | |
| return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id} | |
| finally: | |
| await adapter.disconnect() | |
| except Exception as e: | |
| return _error(f"BlueBubbles send failed: {e}") | |
| async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None): | |
| """Send via Feishu/Lark using the adapter's send pipeline.""" | |
| try: | |
| from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE | |
| if not FEISHU_AVAILABLE: | |
| return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"} | |
| from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN | |
| except ImportError: | |
| return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"} | |
| media_files = media_files or [] | |
| try: | |
| adapter = FeishuAdapter(pconfig) | |
| domain_name = getattr(adapter, "_domain_name", "feishu") | |
| domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN | |
| adapter._client = adapter._build_lark_client(domain) | |
| metadata = {"thread_id": thread_id} if thread_id else None | |
| last_result = None | |
| if message.strip(): | |
| last_result = await adapter.send(chat_id, message, metadata=metadata) | |
| if not last_result.success: | |
| return _error(f"Feishu send failed: {last_result.error}") | |
| for media_path, is_voice in media_files: | |
| if not os.path.exists(media_path): | |
| return _error(f"Media file not found: {media_path}") | |
| ext = os.path.splitext(media_path)[1].lower() | |
| if ext in _IMAGE_EXTS: | |
| last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) | |
| elif ext in _VIDEO_EXTS: | |
| last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) | |
| elif ext in _VOICE_EXTS and is_voice: | |
| last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) | |
| elif ext in _AUDIO_EXTS: | |
| last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) | |
| else: | |
| last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) | |
| if not last_result.success: | |
| return _error(f"Feishu media send failed: {last_result.error}") | |
| if last_result is None: | |
| return {"error": "No deliverable text or media remained after processing MEDIA tags"} | |
| return { | |
| "success": True, | |
| "platform": "feishu", | |
| "chat_id": chat_id, | |
| "message_id": last_result.message_id, | |
| } | |
| except Exception as e: | |
| return _error(f"Feishu send failed: {e}") | |
| def _check_send_message(): | |
| """Gate send_message on gateway running (always available on messaging platforms).""" | |
| from gateway.session_context import get_session_env | |
| platform = get_session_env("HERMES_SESSION_PLATFORM", "") | |
| if platform and platform != "local": | |
| return True | |
| try: | |
| from gateway.status import is_gateway_running | |
| return is_gateway_running() | |
| except Exception: | |
| return False | |
| async def _send_qqbot(pconfig, chat_id, message): | |
| """Send via QQBot using the REST API directly (no WebSocket needed). | |
| Uses the QQ Bot Open Platform REST endpoints to get an access token | |
| and post a message. Works for guild channels without requiring | |
| a running gateway adapter. | |
| """ | |
| try: | |
| import httpx | |
| except ImportError: | |
| return _error("QQBot direct send requires httpx. Run: pip install httpx") | |
| extra = pconfig.extra or {} | |
| appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "") | |
| secret = (pconfig.token or extra.get("client_secret") | |
| or os.getenv("QQ_CLIENT_SECRET", "")) | |
| if not appid or not secret: | |
| return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.") | |
| try: | |
| async with httpx.AsyncClient(timeout=15) as client: | |
| # Step 1: Get access token | |
| token_resp = await client.post( | |
| "https://bots.qq.com/app/getAppAccessToken", | |
| json={"appId": str(appid), "clientSecret": str(secret)}, | |
| ) | |
| if token_resp.status_code != 200: | |
| return _error(f"QQBot token request failed: {token_resp.status_code}") | |
| token_data = token_resp.json() | |
| access_token = token_data.get("access_token") | |
| if not access_token: | |
| return _error(f"QQBot: no access_token in response") | |
| # Step 2: Send message via REST | |
| headers = { | |
| "Authorization": f"QQBot {access_token}", | |
| "Content-Type": "application/json", | |
| } | |
| url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages" | |
| payload = {"content": message[:4000], "msg_type": 0} | |
| resp = await client.post(url, json=payload, headers=headers) | |
| if resp.status_code in (200, 201): | |
| data = resp.json() | |
| return {"success": True, "platform": "qqbot", "chat_id": chat_id, | |
| "message_id": data.get("id")} | |
| else: | |
| return _error(f"QQBot send failed: {resp.status_code} {resp.text}") | |
| except Exception as e: | |
| return _error(f"QQBot send failed: {e}") | |
| async def _send_yuanbao(chat_id, message, media_files=None): | |
| """Send via Yuanbao using the running gateway adapter's WebSocket connection. | |
| Yuanbao uses a persistent WebSocket — unlike HTTP-based platforms, we | |
| cannot create a throwaway client. We obtain the running singleton from | |
| the adapter module itself (``get_active_adapter``). | |
| chat_id format: | |
| - Group: "group:<group_code>" | |
| - DM: "direct:<account_id>" or just "<account_id>" | |
| """ | |
| try: | |
| from gateway.platforms.yuanbao import get_active_adapter, send_yuanbao_direct | |
| except ImportError: | |
| return _error("Yuanbao adapter module not available.") | |
| adapter = get_active_adapter() | |
| if adapter is None: | |
| return _error( | |
| "Yuanbao adapter is not running. " | |
| "Start the gateway with yuanbao platform enabled first." | |
| ) | |
| try: | |
| return await send_yuanbao_direct(adapter, chat_id, message, media_files=media_files) | |
| except Exception as e: | |
| return _error(f"Yuanbao send failed: {e}") | |
| # --- Registry --- | |
| from tools.registry import registry, tool_error | |
| registry.register( | |
| name="send_message", | |
| toolset="messaging", | |
| schema=SEND_MESSAGE_SCHEMA, | |
| handler=send_message_tool, | |
| check_fn=_check_send_message, | |
| emoji="📨", | |
| ) | |