File size: 7,343 Bytes
4e5a541
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""

Token 正则匹配工具

用于从文本中提取 AT Token、邮箱、Account ID 等信息

"""
import re
from typing import List, Optional, Dict
import logging

logger = logging.getLogger(__name__)


class TokenParser:
    """Token 正则匹配解析器"""

    # JWT Token 正则 (以 eyJ 开头的 Base64 字符串)
    # 简化匹配逻辑,三段式 Base64,Header 以 eyJ 开头
    JWT_PATTERN = r'eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+'

    # 邮箱正则 (更通用的邮箱格式)
    EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'

    # Account ID 正则 (UUID 格式)
    ACCOUNT_ID_PATTERN = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'

    # Refresh Token 正则 (支持 rt- 或 rt_ 前缀,且包含点号)
    REFRESH_TOKEN_PATTERN = r'rt[_-][A-Za-z0-9._-]+'
    
    # Session Token 正则 (通常比较长,包含两个点)
    SESSION_TOKEN_PATTERN = r'eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]*\.[A-Za-z0-9_-]+(\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+)?'

    # Client ID 正则 (严格匹配 app_ 开头)
    CLIENT_ID_PATTERN = r'app_[A-Za-z0-9]+'

    def extract_jwt_tokens(self, text: str) -> List[str]:
        """

        从文本中提取所有 JWT Token



        Args:

            text: 输入文本



        Returns:

            JWT Token 列表

        """
        tokens = re.findall(self.JWT_PATTERN, text)
        logger.info(f"从文本中提取到 {len(tokens)} 个 JWT Token")
        return tokens

    def extract_emails(self, text: str) -> List[str]:
        """

        从文本中提取所有邮箱地址



        Args:

            text: 输入文本



        Returns:

            邮箱地址列表

        """
        emails = re.findall(self.EMAIL_PATTERN, text)
        # 过滤掉无效邮箱
        emails = [email for email in emails if len(email) < 100]
        # 去重
        emails = list(set(emails))
        logger.info(f"从文本中提取到 {len(emails)} 个邮箱地址")
        return emails

    def extract_account_ids(self, text: str) -> List[str]:
        """

        从文本中提取所有 Account ID



        Args:

            text: 输入文本



        Returns:

            Account ID 列表

        """
        account_ids = re.findall(self.ACCOUNT_ID_PATTERN, text)
        # 去重
        account_ids = list(set(account_ids))
        logger.info(f"从文本中提取到 {len(account_ids)} 个 Account ID")
        return account_ids

    def parse_team_import_text(self, text: str) -> List[Dict[str, Optional[str]]]:
        """

        解析 Team 导入文本,提取 AT、邮箱、Account ID

        优先解析 [email]----[jwt]----[uuid] 等结构化格式



        Args:

            text: 导入的文本内容



        Returns:

            解析结果列表,每个元素包含 token, email, account_id

        """
        results = []

        # 按行分割文本
        lines = text.strip().split('\n')

        for line in lines:
            line = line.strip()
            if not line:
                continue

            token = None
            email = None
            account_id = None
            refresh_token = None
            session_token = None
            client_id = None

            # 1. 尝试使用分隔符解析 (支持 ----, | , \t, 以及多个空格)
            parts = [p.strip() for p in re.split(r'----|\||\t|\s{2,}', line) if p.strip()]
            
            if len(parts) >= 2:
                # 根据格式特征自动识别各部分
                for part in parts:
                    if not token and re.fullmatch(self.JWT_PATTERN, part):
                        token = part
                    elif not email and re.fullmatch(self.EMAIL_PATTERN, part):
                        email = part
                    elif not account_id and re.fullmatch(self.ACCOUNT_ID_PATTERN, part, re.IGNORECASE):
                        account_id = part
                    elif not refresh_token and re.match(self.REFRESH_TOKEN_PATTERN, part):
                        refresh_token = part
                    elif not session_token and re.match(self.SESSION_TOKEN_PATTERN, part):
                        # 如果已经有了 token (JWT),则第二个匹配 JWT 模式的可能是 session_token
                        if token:
                            session_token = part
                        else:
                            token = part
                    elif not client_id and re.match(self.CLIENT_ID_PATTERN, part):
                        client_id = part

            # 2. 如果结构化解析未找到 Token,尝试全局正则提取结果 (兜底逻辑)
            if not token:
                tokens = re.findall(self.JWT_PATTERN, line)
                if tokens:
                    token = tokens[0]
                    if len(tokens) > 1:
                        session_token = tokens[1]
                
                # 只有在非结构化情况下才全局提取其他信息
                if not email:
                    emails = re.findall(self.EMAIL_PATTERN, line)
                    email = emails[0] if emails else None
                if not account_id:
                    account_ids = re.findall(self.ACCOUNT_ID_PATTERN, line, re.IGNORECASE)
                    account_id = account_ids[0] if account_ids else None
                if not refresh_token:
                    rts = re.findall(self.REFRESH_TOKEN_PATTERN, line)
                    refresh_token = rts[0] if rts else None
                if not client_id:
                    cids = re.findall(self.CLIENT_ID_PATTERN, line)
                    client_id = cids[0] if cids else None

            if token or session_token or refresh_token:
                results.append({
                    "token": token,
                    "email": email,
                    "account_id": account_id,
                    "refresh_token": refresh_token,
                    "session_token": session_token,
                    "client_id": client_id
                })

        logger.info(f"解析完成,共提取 {len(results)} 条 Team 信息")
        return results

    def validate_jwt_format(self, token: str) -> bool:
        """

        验证 JWT Token 格式是否正确



        Args:

            token: JWT Token 字符串



        Returns:

            True 表示格式正确,False 表示格式错误

        """
        return bool(re.fullmatch(self.JWT_PATTERN, token))

    def validate_email_format(self, email: str) -> bool:
        """

        验证邮箱格式是否正确



        Args:

            email: 邮箱地址



        Returns:

            True 表示格式正确,False 表示格式错误

        """
        return bool(re.fullmatch(self.EMAIL_PATTERN, email))

    def validate_account_id_format(self, account_id: str) -> bool:
        """

        验证 Account ID 格式是否正确



        Args:

            account_id: Account ID



        Returns:

            True 表示格式正确,False 表示格式错误

        """
        return bool(re.fullmatch(self.ACCOUNT_ID_PATTERN, account_id))


# 创建全局实例
token_parser = TokenParser()