File size: 5,918 Bytes
4e5a541
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""

JWT Token 解析工具

用于解析和验证 ChatGPT Access Token (AT)

"""
import jwt
from typing import Optional, Dict, Any
from datetime import datetime
import logging
from app.utils.time_utils import get_now

logger = logging.getLogger(__name__)


class JWTParser:
    """JWT Token 解析器"""

    def __init__(self, verify_signature: bool = False):
        """

        初始化 JWT 解析器



        Args:

            verify_signature: 是否验证签名 (开发环境可设为 False)

        """
        self.verify_signature = verify_signature

    def decode_token(self, token: str) -> Optional[Dict[str, Any]]:
        """

        解析 JWT Token



        Args:

            token: JWT Token 字符串



        Returns:

            解析后的 payload 字典,失败返回 None

        """
        try:
            # 解析 JWT (不验证签名)
            payload = jwt.decode(
                token,
                options={
                    "verify_signature": self.verify_signature,
                    "verify_exp": False  # 手动检查过期时间
                }
            )
            return payload

        except jwt.InvalidTokenError as e:
            logger.error(f"JWT Token 解析失败: {e}")
            return None
        except Exception as e:
            logger.error(f"解析 Token 时发生错误: {e}")
            return None

    def extract_email(self, token: str) -> Optional[str]:
        """

        从 Token 中提取邮箱地址



        Args:

            token: JWT Token 字符串



        Returns:

            邮箱地址,失败返回 None

        """
        payload = self.decode_token(token)
        if not payload:
            return None

        try:
            # ChatGPT Token 的邮箱字段路径
            profile = payload.get("https://api.openai.com/profile", {})
            email = profile.get("email")
            return email
        except Exception as e:
            logger.error(f"提取邮箱失败: {e}")
            return None

    def extract_client_id(self, token: str) -> Optional[str]:
        """从 Token 中提取 OAuth client_id。"""
        payload = self.decode_token(token)
        if not payload:
            return None

        try:
            cid = payload.get("client_id")
            return str(cid) if cid else None
        except Exception as e:
            logger.error(f"提取 client_id 失败: {e}")
            return None

    def extract_user_id(self, token: str) -> Optional[str]:
        """

        从 Token 中提取用户 ID



        Args:

            token: JWT Token 字符串



        Returns:

            用户 ID,失败返回 None

        """
        payload = self.decode_token(token)
        if not payload:
            return None

        try:
            # ChatGPT Token 的 user_id 字段路径
            auth = payload.get("https://api.openai.com/auth", {})
            user_id = auth.get("user_id")
            return user_id
        except Exception as e:
            logger.error(f"提取 user_id 失败: {e}")
            return None

    def get_expiration_time(self, token: str) -> Optional[datetime]:
        """

        获取 Token 过期时间



        Args:

            token: JWT Token 字符串



        Returns:

            过期时间 (datetime 对象),失败返回 None

        """
        payload = self.decode_token(token)
        if not payload:
            return None

        try:
            exp_timestamp = payload.get("exp")
            if exp_timestamp:
                return datetime.fromtimestamp(exp_timestamp)
            return None
        except Exception as e:
            logger.error(f"获取过期时间失败: {e}")
            return None

    def is_token_expired(self, token: str) -> bool:
        """

        检查 Token 是否已过期



        Args:

            token: JWT Token 字符串



        Returns:

            True 表示已过期,False 表示未过期

        """
        exp_time = self.get_expiration_time(token)
        if not exp_time:
            return True  # 无法获取过期时间,视为已过期

        return get_now() > exp_time

    def validate_token(self, token: str) -> Dict[str, Any]:
        """

        验证 Token 并返回详细信息



        Args:

            token: JWT Token 字符串



        Returns:

            验证结果字典,包含 valid, email, user_id, exp_time, is_expired 等字段

        """
        result = {
            "valid": False,
            "email": None,
            "user_id": None,
            "exp_time": None,
            "is_expired": True,
            "error": None
        }

        # 解析 Token
        payload = self.decode_token(token)
        if not payload:
            result["error"] = "Token 解析失败"
            return result

        # 提取信息
        result["email"] = self.extract_email(token)
        result["user_id"] = self.extract_user_id(token)
        result["exp_time"] = self.get_expiration_time(token)
        result["is_expired"] = self.is_token_expired(token)

        # 验证必要字段
        if not result["email"]:
            result["error"] = "无法提取邮箱地址"
            return result

        if result["is_expired"]:
            result["error"] = "Token 已过期"
            return result

        # 验证通过
        result["valid"] = True
        return result


# 创建全局实例 (从配置读取是否验证签名)
def create_jwt_parser(verify_signature: bool = False) -> JWTParser:
    """

    创建 JWT 解析器实例



    Args:

        verify_signature: 是否验证签名



    Returns:

        JWTParser 实例

    """
    return JWTParser(verify_signature=verify_signature)