"""Telegram MarkdownV2 utilities. Renders common Markdown into Telegram MarkdownV2 format. Used by the message handler and Telegram platform adapter. """ import re from markdown_it import MarkdownIt MDV2_SPECIAL_CHARS = set("\\_*[]()~`>#+-=|{}.!") MDV2_LINK_ESCAPE = set("\\)") _MD = MarkdownIt("commonmark", {"html": False, "breaks": False}) _MD.enable("strikethrough") _MD.enable("table") _TABLE_SEP_RE = re.compile(r"^\s*\|?\s*:?-{3,}:?\s*(\|\s*:?-{3,}:?\s*)+\|?\s*$") _FENCE_RE = re.compile(r"^\s*```") def _is_gfm_table_header_line(line: str) -> bool: """Check if line is a GFM table header (pipe-delimited, not separator).""" if "|" not in line: return False if _TABLE_SEP_RE.match(line): return False stripped = line.strip() parts = [p.strip() for p in stripped.strip("|").split("|")] parts = [p for p in parts if p != ""] return len(parts) >= 2 def _normalize_gfm_tables(text: str) -> str: """ Many LLMs emit tables immediately after a paragraph line (no blank line). Markdown-it will treat that as a softbreak within the paragraph, so the table extension won't trigger. Insert a blank line before detected tables. We only do this outside fenced code blocks. """ lines = text.splitlines() if len(lines) < 2: return text out_lines: list[str] = [] in_fence = False for idx, line in enumerate(lines): if _FENCE_RE.match(line): in_fence = not in_fence out_lines.append(line) continue if ( not in_fence and idx + 1 < len(lines) and _is_gfm_table_header_line(line) and _TABLE_SEP_RE.match(lines[idx + 1]) and out_lines and out_lines[-1].strip() != "" ): m = re.match(r"^(\s*)", line) indent = m.group(1) if m else "" out_lines.append(indent) out_lines.append(line) return "\n".join(out_lines) def escape_md_v2(text: str) -> str: """Escape text for Telegram MarkdownV2.""" return "".join(f"\\{ch}" if ch in MDV2_SPECIAL_CHARS else ch for ch in text) def escape_md_v2_code(text: str) -> str: """Escape text for Telegram MarkdownV2 code spans/blocks.""" return text.replace("\\", "\\\\").replace("`", "\\`") def escape_md_v2_link_url(text: str) -> str: """Escape URL for Telegram MarkdownV2 link destination.""" return "".join(f"\\{ch}" if ch in MDV2_LINK_ESCAPE else ch for ch in text) def mdv2_bold(text: str) -> str: """Format text as bold in MarkdownV2.""" return f"*{escape_md_v2(text)}*" def mdv2_code_inline(text: str) -> str: """Format text as inline code in MarkdownV2.""" return f"`{escape_md_v2_code(text)}`" def format_status(emoji: str, label: str, suffix: str | None = None) -> str: """Format a status message with emoji and optional suffix.""" base = f"{emoji} {mdv2_bold(label)}" if suffix: return f"{base} {escape_md_v2(suffix)}" return base def render_markdown_to_mdv2(text: str) -> str: """Render common Markdown into Telegram MarkdownV2.""" if not text: return "" text = _normalize_gfm_tables(text) tokens = _MD.parse(text) def render_inline_table_plain(children) -> str: out: list[str] = [] for tok in children: if tok.type == "text" or tok.type == "code_inline": out.append(tok.content) elif tok.type in {"softbreak", "hardbreak"}: out.append(" ") elif tok.type == "image" and tok.content: out.append(tok.content) return "".join(out) def render_inline_plain(children) -> str: out: list[str] = [] for tok in children: if tok.type == "text" or tok.type == "code_inline": out.append(escape_md_v2(tok.content)) elif tok.type in {"softbreak", "hardbreak"}: out.append("\n") return "".join(out) def render_inline(children) -> str: out: list[str] = [] i = 0 while i < len(children): tok = children[i] t = tok.type if t == "text": out.append(escape_md_v2(tok.content)) elif t in {"softbreak", "hardbreak"}: out.append("\n") elif t == "em_open" or t == "em_close": out.append("_") elif t == "strong_open" or t == "strong_close": out.append("*") elif t == "s_open" or t == "s_close": out.append("~") elif t == "code_inline": out.append(f"`{escape_md_v2_code(tok.content)}`") elif t == "link_open": href = "" if tok.attrs: if isinstance(tok.attrs, dict): href = tok.attrs.get("href", "") else: for key, val in tok.attrs: if key == "href": href = val break inner_tokens = [] i += 1 while i < len(children) and children[i].type != "link_close": inner_tokens.append(children[i]) i += 1 link_text = "" for child in inner_tokens: if child.type == "text" or child.type == "code_inline": link_text += child.content out.append( f"[{escape_md_v2(link_text)}]({escape_md_v2_link_url(href)})" ) elif t == "image": href = "" alt = tok.content or "" if tok.attrs: if isinstance(tok.attrs, dict): href = tok.attrs.get("src", "") else: for key, val in tok.attrs: if key == "src": href = val break if alt: out.append(f"{escape_md_v2(alt)} ({escape_md_v2_link_url(href)})") else: out.append(escape_md_v2_link_url(href)) else: out.append(escape_md_v2(tok.content or "")) i += 1 return "".join(out) out: list[str] = [] list_stack: list[dict] = [] pending_prefix: str | None = None blockquote_level = 0 in_heading = False def apply_blockquote(val: str) -> str: if blockquote_level <= 0: return val prefix = "> " * blockquote_level return prefix + val.replace("\n", "\n" + prefix) i = 0 while i < len(tokens): tok = tokens[i] t = tok.type if t == "paragraph_open": pass elif t == "paragraph_close": out.append("\n") elif t == "heading_open": in_heading = True elif t == "heading_close": in_heading = False out.append("\n") elif t == "bullet_list_open": list_stack.append({"type": "bullet", "index": 1}) elif t == "bullet_list_close": if list_stack: list_stack.pop() out.append("\n") elif t == "ordered_list_open": start = 1 if tok.attrs: if isinstance(tok.attrs, dict): val = tok.attrs.get("start") if val is not None: try: start = int(val) except (TypeError, ValueError): start = 1 else: for key, val in tok.attrs: if key == "start": try: start = int(val) except (TypeError, ValueError): start = 1 break list_stack.append({"type": "ordered", "index": start}) elif t == "ordered_list_close": if list_stack: list_stack.pop() out.append("\n") elif t == "list_item_open": if list_stack: top = list_stack[-1] if top["type"] == "bullet": pending_prefix = "\\- " else: pending_prefix = f"{top['index']}\\." top["index"] += 1 pending_prefix += " " elif t == "list_item_close": out.append("\n") elif t == "blockquote_open": blockquote_level += 1 elif t == "blockquote_close": blockquote_level = max(0, blockquote_level - 1) out.append("\n") elif t == "table_open": if pending_prefix: out.append(apply_blockquote(pending_prefix.rstrip())) out.append("\n") pending_prefix = None rows: list[list[str]] = [] row_is_header: list[bool] = [] j = i + 1 in_thead = False in_row = False current_row: list[str] = [] current_row_header = False in_cell = False cell_parts: list[str] = [] while j < len(tokens): tt = tokens[j].type if tt == "thead_open": in_thead = True elif tt == "thead_close": in_thead = False elif tt == "tr_open": in_row = True current_row = [] current_row_header = in_thead elif tt in {"th_open", "td_open"}: in_cell = True cell_parts = [] elif tt == "inline" and in_cell: cell_parts.append( render_inline_table_plain(tokens[j].children or []) ) elif tt in {"th_close", "td_close"} and in_cell: cell = " ".join(cell_parts).strip() current_row.append(cell) in_cell = False cell_parts = [] elif tt == "tr_close" and in_row: rows.append(current_row) row_is_header.append(bool(current_row_header)) in_row = False elif tt == "table_close": break j += 1 if rows: col_count = max((len(r) for r in rows), default=0) norm_rows: list[list[str]] = [] for r in rows: if len(r) < col_count: r = r + [""] * (col_count - len(r)) norm_rows.append(r) widths: list[int] = [] for c in range(col_count): w = max((len(r[c]) for r in norm_rows), default=0) widths.append(max(w, 3)) def fmt_row( r: list[str], _w: list[int] = widths, _c: int = col_count ) -> str: cells = [r[c].ljust(_w[c]) for c in range(_c)] return "| " + " | ".join(cells) + " |" def fmt_sep(_w: list[int] = widths, _c: int = col_count) -> str: cells = ["-" * _w[c] for c in range(_c)] return "| " + " | ".join(cells) + " |" last_header_idx = -1 for idx, is_h in enumerate(row_is_header): if is_h: last_header_idx = idx lines: list[str] = [] for idx, r in enumerate(norm_rows): lines.append(fmt_row(r)) if idx == last_header_idx: lines.append(fmt_sep()) table_text = "\n".join(lines).rstrip() out.append(f"```\n{escape_md_v2_code(table_text)}\n```") out.append("\n") i = j + 1 continue elif t in {"code_block", "fence"}: code = escape_md_v2_code(tok.content.rstrip("\n")) out.append(f"```\n{code}\n```") out.append("\n") elif t == "inline": rendered = render_inline(tok.children or []) if in_heading: rendered = f"*{render_inline_plain(tok.children or [])}*" if pending_prefix: rendered = pending_prefix + rendered pending_prefix = None rendered = apply_blockquote(rendered) out.append(rendered) else: if tok.content: out.append(escape_md_v2(tok.content)) i += 1 return "".join(out).rstrip() __all__ = [ "escape_md_v2", "escape_md_v2_code", "escape_md_v2_link_url", "format_status", "mdv2_bold", "mdv2_code_inline", "render_markdown_to_mdv2", ]