""" Temp-Mail 邮箱服务实现 基于自部署 Cloudflare Worker 临时邮箱服务 接口文档参见 plan/temp-mail.md """ import re import time import json import logging from email import message_from_string from email.header import decode_header, make_header from email.message import Message from email.policy import default as email_policy from html import unescape from typing import Optional, Dict, Any, List from .base import BaseEmailService, EmailServiceError, EmailServiceType from ..core.http_client import HTTPClient, RequestConfig from ..config.constants import OTP_CODE_PATTERN logger = logging.getLogger(__name__) class TempMailService(BaseEmailService): """ Temp-Mail 邮箱服务 基于自部署 Cloudflare Worker 的临时邮箱,admin 模式管理邮箱 不走代理,不使用 requests 库 """ def __init__(self, config: Dict[str, Any] = None, name: str = None): """ 初始化 TempMail 服务 Args: config: 配置字典,支持以下键: - base_url: Worker 域名地址,如 https://mail.example.com (必需) - admin_password: Admin 密码,对应 x-admin-auth header (必需) - domain: 邮箱域名,如 example.com (必需) - enable_prefix: 是否启用前缀,默认 True - timeout: 请求超时时间,默认 30 - max_retries: 最大重试次数,默认 3 name: 服务名称 """ super().__init__(EmailServiceType.TEMP_MAIL, name) required_keys = ["base_url", "admin_password", "domain"] missing_keys = [key for key in required_keys if not (config or {}).get(key)] if missing_keys: raise ValueError(f"缺少必需配置: {missing_keys}") default_config = { "enable_prefix": True, "timeout": 30, "max_retries": 3, } self.config = {**default_config, **(config or {})} # 不走代理,proxy_url=None http_config = RequestConfig( timeout=self.config["timeout"], max_retries=self.config["max_retries"], ) self.http_client = HTTPClient(proxy_url=None, config=http_config) # 邮箱缓存:email -> {jwt, address} self._email_cache: Dict[str, Dict[str, Any]] = {} def _decode_mime_header(self, value: str) -> str: """解码 MIME 头,兼容 RFC 2047 编码主题。""" if not value: return "" try: return str(make_header(decode_header(value))) except Exception: return value def _extract_body_from_message(self, message: Message) -> str: """从 MIME 邮件对象中提取可读正文。""" parts: List[str] = [] if message.is_multipart(): for part in message.walk(): if part.get_content_maintype() == "multipart": continue content_type = (part.get_content_type() or "").lower() if content_type not in ("text/plain", "text/html"): continue try: payload = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" text = payload.decode(charset, errors="replace") if payload else "" except Exception: try: text = part.get_content() except Exception: text = "" if content_type == "text/html": text = re.sub(r"<[^>]+>", " ", text) parts.append(text) else: try: payload = message.get_payload(decode=True) charset = message.get_content_charset() or "utf-8" body = payload.decode(charset, errors="replace") if payload else "" except Exception: try: body = message.get_content() except Exception: body = str(message.get_payload() or "") if "html" in (message.get_content_type() or "").lower(): body = re.sub(r"<[^>]+>", " ", body) parts.append(body) return unescape("\n".join(part for part in parts if part).strip()) def _extract_mail_fields(self, mail: Dict[str, Any]) -> Dict[str, str]: """统一提取邮件字段,兼容 raw MIME 和不同 Worker 返回格式。""" sender = str( mail.get("source") or mail.get("from") or mail.get("from_address") or mail.get("fromAddress") or "" ).strip() subject = str(mail.get("subject") or mail.get("title") or "").strip() body_text = str( mail.get("text") or mail.get("body") or mail.get("content") or mail.get("html") or "" ).strip() raw = str(mail.get("raw") or "").strip() if raw: try: message = message_from_string(raw, policy=email_policy) sender = sender or self._decode_mime_header(message.get("From", "")) subject = subject or self._decode_mime_header(message.get("Subject", "")) parsed_body = self._extract_body_from_message(message) if parsed_body: body_text = f"{body_text}\n{parsed_body}".strip() if body_text else parsed_body except Exception as e: logger.debug(f"解析 TempMail raw 邮件失败: {e}") body_text = f"{body_text}\n{raw}".strip() if body_text else raw body_text = unescape(re.sub(r"<[^>]+>", " ", body_text)) return { "sender": sender, "subject": subject, "body": body_text, "raw": raw, } def _admin_headers(self) -> Dict[str, str]: """构造 admin 请求头""" return { "x-admin-auth": self.config["admin_password"], "Content-Type": "application/json", "Accept": "application/json", } def _make_request(self, method: str, path: str, **kwargs) -> Any: """ 发送请求并返回 JSON 数据 Args: method: HTTP 方法 path: 请求路径(以 / 开头) **kwargs: 传递给 http_client.request 的额外参数 Returns: 响应 JSON 数据 Raises: EmailServiceError: 请求失败 """ base_url = self.config["base_url"].rstrip("/") url = f"{base_url}{path}" # 合并默认 admin headers kwargs.setdefault("headers", {}) for k, v in self._admin_headers().items(): kwargs["headers"].setdefault(k, v) try: response = self.http_client.request(method, url, **kwargs) if response.status_code >= 400: error_msg = f"请求失败: {response.status_code}" try: error_data = response.json() error_msg = f"{error_msg} - {error_data}" except Exception: error_msg = f"{error_msg} - {response.text[:200]}" self.update_status(False, EmailServiceError(error_msg)) raise EmailServiceError(error_msg) try: return response.json() except json.JSONDecodeError: return {"raw_response": response.text} except Exception as e: self.update_status(False, e) if isinstance(e, EmailServiceError): raise raise EmailServiceError(f"请求失败: {method} {path} - {e}") def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: """ 通过 admin API 创建临时邮箱 Returns: 包含邮箱信息的字典: - email: 邮箱地址 - jwt: 用户级 JWT token - service_id: 同 email(用作标识) """ import random import string # 生成随机邮箱名 letters = ''.join(random.choices(string.ascii_lowercase, k=5)) digits = ''.join(random.choices(string.digits, k=random.randint(1, 3))) suffix = ''.join(random.choices(string.ascii_lowercase, k=random.randint(1, 3))) name = letters + digits + suffix domain = self.config["domain"] enable_prefix = self.config.get("enable_prefix", True) body = { "enablePrefix": enable_prefix, "name": name, "domain": domain, } try: response = self._make_request("POST", "/admin/new_address", json=body) address = response.get("address", "").strip() jwt = response.get("jwt", "").strip() if not address: raise EmailServiceError(f"API 返回数据不完整: {response}") email_info = { "email": address, "jwt": jwt, "service_id": address, "id": address, "created_at": time.time(), } # 缓存 jwt,供获取验证码时使用 self._email_cache[address] = email_info logger.info(f"成功创建 TempMail 邮箱: {address}") self.update_status(True) return email_info except Exception as e: self.update_status(False, e) if isinstance(e, EmailServiceError): raise raise EmailServiceError(f"创建邮箱失败: {e}") def get_verification_code( self, email: str, email_id: str = None, timeout: int = 120, pattern: str = OTP_CODE_PATTERN, otp_sent_at: Optional[float] = None, ) -> Optional[str]: """ 从 TempMail 邮箱获取验证码 Args: email: 邮箱地址 email_id: 未使用,保留接口兼容 timeout: 超时时间(秒) pattern: 验证码正则 otp_sent_at: OTP 发送时间戳(暂未使用) Returns: 验证码字符串,超时返回 None """ logger.info(f"正在从 TempMail 邮箱 {email} 获取验证码...") start_time = time.time() seen_mail_ids: set = set() # 优先使用用户级 JWT,回退到 admin API cached = self._email_cache.get(email, {}) jwt = cached.get("jwt") while time.time() - start_time < timeout: try: if jwt: response = self._make_request( "GET", "/user_api/mails", params={"limit": 20, "offset": 0}, headers={"x-user-token": jwt, "Content-Type": "application/json", "Accept": "application/json"}, ) else: response = self._make_request( "GET", "/admin/mails", params={"limit": 20, "offset": 0, "address": email}, ) # /user_api/mails 和 /admin/mails 返回格式相同: {"results": [...], "total": N} mails = response.get("results", []) if not isinstance(mails, list): time.sleep(3) continue for mail in mails: mail_id = mail.get("id") if not mail_id or mail_id in seen_mail_ids: continue seen_mail_ids.add(mail_id) parsed = self._extract_mail_fields(mail) sender = parsed["sender"].lower() subject = parsed["subject"] body_text = parsed["body"] raw_text = parsed["raw"] content = f"{sender}\n{subject}\n{body_text}\n{raw_text}".strip() # 只处理 OpenAI 邮件 if "openai" not in sender and "openai" not in content.lower(): continue match = re.search(pattern, content) if match: code = match.group(1) logger.info(f"从 TempMail 邮箱 {email} 找到验证码: {code}") self.update_status(True) return code except Exception as e: logger.debug(f"检查 TempMail 邮件时出错: {e}") time.sleep(3) logger.warning(f"等待 TempMail 验证码超时: {email}") return None def list_emails(self, limit: int = 100, offset: int = 0, **kwargs) -> List[Dict[str, Any]]: """ 列出邮箱 Args: limit: 返回数量上限 offset: 分页偏移 **kwargs: 额外查询参数,透传给 admin API Returns: 邮箱列表 """ params = { "limit": limit, "offset": offset, } params.update({k: v for k, v in kwargs.items() if v is not None}) try: response = self._make_request("GET", "/admin/mails", params=params) mails = response.get("results", []) if not isinstance(mails, list): raise EmailServiceError(f"API 返回数据格式错误: {response}") emails: List[Dict[str, Any]] = [] for mail in mails: address = (mail.get("address") or "").strip() mail_id = mail.get("id") or address email_info = { "id": mail_id, "service_id": mail_id, "email": address, "subject": mail.get("subject"), "from": mail.get("source"), "created_at": mail.get("createdAt") or mail.get("created_at"), "raw_data": mail, } emails.append(email_info) if address: cached = self._email_cache.get(address, {}) self._email_cache[address] = {**cached, **email_info} self.update_status(True) return emails except Exception as e: logger.warning(f"列出 TempMail 邮箱失败: {e}") self.update_status(False, e) return list(self._email_cache.values()) def delete_email(self, email_id: str) -> bool: """ 删除邮箱 Note: 当前 TempMail admin API 文档未见删除地址接口,这里先从本地缓存移除, 以满足统一接口并避免服务实例化失败。 """ removed = False emails_to_delete = [] for address, info in self._email_cache.items(): candidate_ids = { address, info.get("id"), info.get("service_id"), } if email_id in candidate_ids: emails_to_delete.append(address) for address in emails_to_delete: self._email_cache.pop(address, None) removed = True if removed: logger.info(f"已从 TempMail 缓存移除邮箱: {email_id}") self.update_status(True) else: logger.info(f"TempMail 缓存中未找到邮箱: {email_id}") return removed def check_health(self) -> bool: """检查服务健康状态""" try: self._make_request( "GET", "/admin/mails", params={"limit": 1, "offset": 0}, ) self.update_status(True) return True except Exception as e: logger.warning(f"TempMail 健康检查失败: {e}") self.update_status(False, e) return False