""" Outlook 邮箱服务实现 支持 IMAP 协议,XOAUTH2 和密码认证 """ import imaplib import email import re import time import threading import json import urllib.parse import urllib.request import base64 import hashlib import secrets import logging from typing import Optional, Dict, Any, List from email.header import decode_header from email.utils import parsedate_to_datetime from urllib.error import HTTPError from .base import BaseEmailService, EmailServiceError, EmailServiceType from ..config.constants import ( OTP_CODE_PATTERN, OTP_CODE_SIMPLE_PATTERN, OTP_CODE_SEMANTIC_PATTERN, OPENAI_EMAIL_SENDERS, OPENAI_VERIFICATION_KEYWORDS, ) from ..config.settings import get_settings def get_email_code_settings() -> dict: """ 获取验证码等待配置 Returns: dict: 包含 timeout 和 poll_interval 的字典 """ settings = get_settings() return { "timeout": settings.email_code_timeout, "poll_interval": settings.email_code_poll_interval, } logger = logging.getLogger(__name__) class OutlookAccount: """Outlook 账户信息""" def __init__( self, email: str, password: str, client_id: str = "", refresh_token: str = "" ): self.email = email self.password = password self.client_id = client_id self.refresh_token = refresh_token @classmethod def from_config(cls, config: Dict[str, Any]) -> "OutlookAccount": """从配置创建账户""" return cls( email=config.get("email", ""), password=config.get("password", ""), client_id=config.get("client_id", ""), refresh_token=config.get("refresh_token", "") ) def has_oauth(self) -> bool: """是否支持 OAuth2""" return bool(self.client_id and self.refresh_token) def validate(self) -> bool: """验证账户信息是否有效""" return bool(self.email and self.password) or self.has_oauth() class OutlookIMAPClient: """ Outlook IMAP 客户端 支持 XOAUTH2 和密码认证 """ # Microsoft OAuth2 Token 缓存 _token_cache: Dict[str, tuple] = {} _cache_lock = threading.Lock() def __init__( self, account: OutlookAccount, host: str = "outlook.office365.com", port: int = 993, timeout: int = 20 ): self.account = account self.host = host self.port = port self.timeout = timeout self._conn: Optional[imaplib.IMAP4_SSL] = None @staticmethod def refresh_ms_token(account: OutlookAccount, timeout: int = 15) -> str: """刷新 Microsoft access token""" if not account.client_id or not account.refresh_token: raise RuntimeError("缺少 client_id 或 refresh_token") key = account.email.lower() with OutlookIMAPClient._cache_lock: cached = OutlookIMAPClient._token_cache.get(key) if cached and time.time() < cached[1]: return cached[0] body = urllib.parse.urlencode({ "client_id": account.client_id, "refresh_token": account.refresh_token, "grant_type": "refresh_token", "redirect_uri": "https://login.live.com/oauth20_desktop.srf", }).encode() req = urllib.request.Request( "https://login.live.com/oauth20_token.srf", data=body, headers={"Content-Type": "application/x-www-form-urlencoded"} ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read()) except HTTPError as e: raise RuntimeError(f"MS OAuth 刷新失败: {e.code}") from e token = data.get("access_token") if not token: raise RuntimeError("MS OAuth 响应无 access_token") ttl = int(data.get("expires_in", 3600)) with OutlookIMAPClient._cache_lock: OutlookIMAPClient._token_cache[key] = (token, time.time() + ttl - 120) return token @staticmethod def _build_xoauth2(email_addr: str, token: str) -> bytes: """构建 XOAUTH2 认证字符串""" return f"user={email_addr}\x01auth=Bearer {token}\x01\x01".encode() def connect(self): """连接到 IMAP 服务器""" self._conn = imaplib.IMAP4_SSL(self.host, self.port, timeout=self.timeout) # 优先使用 XOAUTH2 认证 if self.account.has_oauth(): try: token = self.refresh_ms_token(self.account) self._conn.authenticate( "XOAUTH2", lambda _: self._build_xoauth2(self.account.email, token) ) logger.debug(f"使用 XOAUTH2 认证连接: {self.account.email}") return except Exception as e: logger.warning(f"XOAUTH2 认证失败,回退密码认证: {e}") # 回退到密码认证 self._conn.login(self.account.email, self.account.password) logger.debug(f"使用密码认证连接: {self.account.email}") def _ensure_connection(self): """确保连接有效""" if self._conn: try: self._conn.noop() return except Exception: self.close() self.connect() def get_recent_emails( self, count: int = 20, only_unseen: bool = True, timeout: int = 30 ) -> List[Dict[str, Any]]: """ 获取最近的邮件 Args: count: 获取的邮件数量 only_unseen: 是否只获取未读邮件 timeout: 超时时间 Returns: 邮件列表 """ self._ensure_connection() flag = "UNSEEN" if only_unseen else "ALL" self._conn.select("INBOX", readonly=True) _, data = self._conn.search(None, flag) if not data or not data[0]: return [] # 获取最新的邮件 ids = data[0].split()[-count:] result = [] for mid in reversed(ids): try: _, payload = self._conn.fetch(mid, "(RFC822)") if not payload: continue raw = b"" for part in payload: if isinstance(part, tuple) and len(part) > 1: raw = part[1] break if raw: result.append(self._parse_email(raw)) except Exception as e: logger.warning(f"解析邮件失败 (ID: {mid}): {e}") return result @staticmethod def _parse_email(raw: bytes) -> Dict[str, Any]: """解析邮件内容""" # 移除可能的 BOM if raw.startswith(b"\xef\xbb\xbf"): raw = raw[3:] msg = email.message_from_bytes(raw) # 解析邮件头 subject = OutlookIMAPClient._decode_header(msg.get("Subject", "")) sender = OutlookIMAPClient._decode_header(msg.get("From", "")) date_str = OutlookIMAPClient._decode_header(msg.get("Date", "")) to = OutlookIMAPClient._decode_header(msg.get("To", "")) delivered_to = OutlookIMAPClient._decode_header(msg.get("Delivered-To", "")) x_original_to = OutlookIMAPClient._decode_header(msg.get("X-Original-To", "")) # 提取邮件正文 body = OutlookIMAPClient._extract_body(msg) # 解析日期 date_timestamp = 0 try: if date_str: dt = parsedate_to_datetime(date_str) date_timestamp = int(dt.timestamp()) except Exception: pass return { "subject": subject, "from": sender, "date": date_str, "date_timestamp": date_timestamp, "to": to, "delivered_to": delivered_to, "x_original_to": x_original_to, "body": body, "raw": raw.hex()[:100] # 存储原始数据的部分哈希用于调试 } @staticmethod def _decode_header(header: str) -> str: """解码邮件头""" if not header: return "" parts = [] for chunk, encoding in decode_header(header): if isinstance(chunk, bytes): try: decoded = chunk.decode(encoding or "utf-8", errors="replace") parts.append(decoded) except Exception: parts.append(chunk.decode("utf-8", errors="replace")) else: parts.append(chunk) return "".join(parts).strip() @staticmethod def _extract_body(msg) -> str: """提取邮件正文""" import html as html_module texts = [] parts = msg.walk() if msg.is_multipart() else [msg] for part in parts: content_type = part.get_content_type() if content_type not in ("text/plain", "text/html"): continue payload = part.get_payload(decode=True) if not payload: continue charset = part.get_content_charset() or "utf-8" try: text = payload.decode(charset, errors="replace") except LookupError: text = payload.decode("utf-8", errors="replace") # 如果是 HTML,移除标签 if "]+>", " ", text) texts.append(text) # 合并并清理文本 combined = " ".join(texts) combined = html_module.unescape(combined) combined = re.sub(r"\s+", " ", combined).strip() return combined def close(self): """关闭连接""" if self._conn: try: self._conn.close() except Exception: pass try: self._conn.logout() except Exception: pass self._conn = None def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class OutlookService(BaseEmailService): """ Outlook 邮箱服务 支持多个 Outlook 账户的轮询和验证码获取 """ def __init__(self, config: Dict[str, Any] = None, name: str = None): """ 初始化 Outlook 服务 Args: config: 配置字典,支持以下键: - accounts: Outlook 账户列表,每个账户包含: - email: 邮箱地址 - password: 密码 - client_id: OAuth2 client_id (可选) - refresh_token: OAuth2 refresh_token (可选) - imap_host: IMAP 服务器 (默认: outlook.office365.com) - imap_port: IMAP 端口 (默认: 993) - timeout: 超时时间 (默认: 30) - max_retries: 最大重试次数 (默认: 3) name: 服务名称 """ super().__init__(EmailServiceType.OUTLOOK, name) # 默认配置 default_config = { "accounts": [], "imap_host": "outlook.office365.com", "imap_port": 993, "timeout": 30, "max_retries": 3, "proxy_url": None, } self.config = {**default_config, **(config or {})} # 解析账户 self.accounts: List[OutlookAccount] = [] self._current_account_index = 0 self._account_locks: Dict[str, threading.Lock] = {} # 支持两种配置格式: # 1. 单个账户格式:{"email": "xxx", "password": "xxx"} # 2. 多账户格式:{"accounts": [{"email": "xxx", "password": "xxx"}]} if "email" in self.config and "password" in self.config: # 单个账户格式 account = OutlookAccount.from_config(self.config) if account.validate(): self.accounts.append(account) self._account_locks[account.email] = threading.Lock() else: logger.warning(f"无效的 Outlook 账户配置: {self.config}") else: # 多账户格式 for account_config in self.config.get("accounts", []): account = OutlookAccount.from_config(account_config) if account.validate(): self.accounts.append(account) self._account_locks[account.email] = threading.Lock() else: logger.warning(f"无效的 Outlook 账户配置: {account_config}") if not self.accounts: logger.warning("未配置有效的 Outlook 账户") # IMAP 连接限制(防止限流) self._imap_semaphore = threading.Semaphore(5) # 验证码去重机制:email -> set of used codes self._used_codes: Dict[str, set] = {} def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: """ 选择可用的 Outlook 账户 Args: config: 配置参数(目前未使用) Returns: 包含邮箱信息的字典: - email: 邮箱地址 - service_id: 账户邮箱(同 email) - account: 账户信息 """ if not self.accounts: self.update_status(False, EmailServiceError("没有可用的 Outlook 账户")) raise EmailServiceError("没有可用的 Outlook 账户") # 轮询选择账户 with threading.Lock(): account = self.accounts[self._current_account_index] self._current_account_index = (self._current_account_index + 1) % len(self.accounts) email_info = { "email": account.email, "service_id": account.email, # 对于 Outlook,service_id 就是邮箱地址 "account": { "email": account.email, "has_oauth": account.has_oauth() } } logger.info(f"选择 Outlook 账户: {account.email}") self.update_status(True) return email_info def get_verification_code( self, email: str, email_id: str = None, timeout: int = None, pattern: str = OTP_CODE_PATTERN, otp_sent_at: Optional[float] = None, ) -> Optional[str]: """ 从 Outlook 邮箱获取验证码 Args: email: 邮箱地址 email_id: 未使用(对于 Outlook,email 就是标识) timeout: 超时时间(秒),默认使用配置值 pattern: 验证码正则表达式 otp_sent_at: OTP 发送时间戳,用于过滤旧邮件 Returns: 验证码字符串,如果超时或未找到返回 None """ # 查找对应的账户 account = None for acc in self.accounts: if acc.email.lower() == email.lower(): account = acc break if not account: self.update_status(False, EmailServiceError(f"未找到邮箱对应的账户: {email}")) return None # 从数据库获取验证码等待配置 code_settings = get_email_code_settings() actual_timeout = timeout or code_settings["timeout"] poll_interval = code_settings["poll_interval"] logger.info(f"[{email}] 开始获取验证码,超时 {actual_timeout}s,OTP发送时间: {otp_sent_at}") # 初始化验证码去重集合 if email not in self._used_codes: self._used_codes[email] = set() used_codes = self._used_codes[email] # 计算最小时间戳(留出 60 秒时钟偏差) min_timestamp = (otp_sent_at - 60) if otp_sent_at else 0 start_time = time.time() poll_count = 0 while time.time() - start_time < actual_timeout: poll_count += 1 loop_start = time.time() # 渐进式邮件检查:前 3 次只检查未读,之后检查全部 only_unseen = poll_count <= 3 try: connect_start = time.time() with self._imap_semaphore: with OutlookIMAPClient( account, host=self.config["imap_host"], port=self.config["imap_port"], timeout=10 ) as client: connect_elapsed = time.time() - connect_start logger.debug(f"[{email}] IMAP 连接耗时 {connect_elapsed:.2f}s") # 搜索邮件 search_start = time.time() emails = client.get_recent_emails(count=15, only_unseen=only_unseen) search_elapsed = time.time() - search_start logger.debug(f"[{email}] 搜索到 {len(emails)} 封邮件(未读={only_unseen}),耗时 {search_elapsed:.2f}s") for mail in emails: # 时间戳过滤 mail_ts = mail.get("date_timestamp", 0) if min_timestamp > 0 and mail_ts > 0 and mail_ts < min_timestamp: logger.debug(f"[{email}] 跳过旧邮件: {mail.get('subject', '')[:50]}") continue # 检查是否是 OpenAI 验证邮件 if not self._is_openai_verification_mail(mail, email): continue # 提取验证码 code = self._extract_code_from_mail(mail, pattern) if code: # 去重检查 if code in used_codes: logger.debug(f"[{email}] 跳过已使用的验证码: {code}") continue used_codes.add(code) elapsed = int(time.time() - start_time) logger.info(f"[{email}] 找到验证码: {code},总耗时 {elapsed}s,轮询 {poll_count} 次") self.update_status(True) return code except Exception as e: loop_elapsed = time.time() - loop_start logger.warning(f"[{email}] 检查出错: {e},循环耗时 {loop_elapsed:.2f}s") # 等待下次轮询 time.sleep(poll_interval) elapsed = int(time.time() - start_time) logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count} 次") return None def list_emails(self, **kwargs) -> List[Dict[str, Any]]: """ 列出所有可用的 Outlook 账户 Returns: 账户列表 """ return [ { "email": account.email, "id": account.email, "has_oauth": account.has_oauth(), "type": "outlook" } for account in self.accounts ] def delete_email(self, email_id: str) -> bool: """ 删除邮箱(对于 Outlook,不支持删除账户) Args: email_id: 邮箱地址 Returns: False(Outlook 不支持删除账户) """ logger.warning(f"Outlook 服务不支持删除账户: {email_id}") return False def check_health(self) -> bool: """检查 Outlook 服务是否可用""" if not self.accounts: self.update_status(False, EmailServiceError("没有配置的账户")) return False # 测试第一个账户的连接 test_account = self.accounts[0] try: with self._imap_semaphore: with OutlookIMAPClient( test_account, host=self.config["imap_host"], port=self.config["imap_port"], timeout=10 ) as client: # 尝试列出邮箱(快速测试) client._conn.select("INBOX", readonly=True) self.update_status(True) return True except Exception as e: logger.warning(f"Outlook 健康检查失败 ({test_account.email}): {e}") self.update_status(False, e) return False def _is_oai_mail(self, mail: Dict[str, Any]) -> bool: """判断是否为 OpenAI 相关邮件(旧方法,保留兼容)""" combined = f"{mail.get('from', '')} {mail.get('subject', '')} {mail.get('body', '')}".lower() keywords = ["openai", "chatgpt", "verification", "验证码", "code"] return any(keyword in combined for keyword in keywords) def _is_openai_verification_mail( self, mail: Dict[str, Any], target_email: str = None ) -> bool: """ 严格判断是否为 OpenAI 验证邮件 Args: mail: 邮件信息字典 target_email: 目标邮箱地址(用于验证收件人) Returns: 是否为 OpenAI 验证邮件 """ sender = mail.get("from", "").lower() # 1. 发件人必须是 OpenAI valid_senders = OPENAI_EMAIL_SENDERS if not any(s in sender for s in valid_senders): logger.debug(f"邮件发件人非 OpenAI: {sender}") return False # 2. 主题或正文包含验证关键词 subject = mail.get("subject", "").lower() body = mail.get("body", "").lower() verification_keywords = OPENAI_VERIFICATION_KEYWORDS combined = f"{subject} {body}" if not any(kw in combined for kw in verification_keywords): logger.debug(f"邮件未包含验证关键词: {subject[:50]}") return False # 3. 验证收件人(可选) if target_email: recipients = f"{mail.get('to', '')} {mail.get('delivered_to', '')} {mail.get('x_original_to', '')}".lower() if target_email.lower() not in recipients: logger.debug(f"邮件收件人不匹配: {recipients[:50]}") return False logger.debug(f"识别为 OpenAI 验证邮件: {subject[:50]}") return True def _extract_code_from_mail( self, mail: Dict[str, Any], fallback_pattern: str = OTP_CODE_PATTERN ) -> Optional[str]: """ 从邮件中提取验证码 优先级: 1. 从主题提取(6位数字) 2. 从正文用语义正则提取(如 "code is 123456") 3. 兜底:任意 6 位数字 Args: mail: 邮件信息字典 fallback_pattern: 兜底正则表达式 Returns: 验证码字符串,如果未找到返回 None """ # 编译正则 re_simple = re.compile(OTP_CODE_SIMPLE_PATTERN) re_semantic = re.compile(OTP_CODE_SEMANTIC_PATTERN, re.IGNORECASE) # 1. 主题优先 subject = mail.get("subject", "") match = re_simple.search(subject) if match: code = match.group(1) logger.debug(f"从主题提取验证码: {code}") return code # 2. 正文语义匹配 body = mail.get("body", "") match = re_semantic.search(body) if match: code = match.group(1) logger.debug(f"从正文语义提取验证码: {code}") return code # 3. 兜底:任意 6 位数字 match = re_simple.search(body) if match: code = match.group(1) logger.debug(f"从正文兜底提取验证码: {code}") return code return None def get_account_stats(self) -> Dict[str, Any]: """获取账户统计信息""" total = len(self.accounts) oauth_count = sum(1 for acc in self.accounts if acc.has_oauth()) return { "total_accounts": total, "oauth_accounts": oauth_count, "password_accounts": total - oauth_count, "accounts": [ { "email": acc.email, "has_oauth": acc.has_oauth() } for acc in self.accounts ] } def add_account(self, account_config: Dict[str, Any]) -> bool: """添加新的 Outlook 账户""" try: account = OutlookAccount.from_config(account_config) if not account.validate(): return False self.accounts.append(account) self._account_locks[account.email] = threading.Lock() logger.info(f"添加 Outlook 账户: {account.email}") return True except Exception as e: logger.error(f"添加 Outlook 账户失败: {e}") return False def remove_account(self, email: str) -> bool: """移除 Outlook 账户""" for i, acc in enumerate(self.accounts): if acc.email.lower() == email.lower(): self.accounts.pop(i) self._account_locks.pop(email, None) logger.info(f"移除 Outlook 账户: {email}") return True return False