Spaces:
Paused
Paused
| """ | |
| JWT Token 解析工具 | |
| 用于解析和验证 ChatGPT Access Token (AT) | |
| """ | |
| import jwt | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime | |
| import logging | |
| from app.utils.time_utils import get_now | |
| logger = logging.getLogger(__name__) | |
| class JWTParser: | |
| """JWT Token 解析器""" | |
| def __init__(self, verify_signature: bool = False): | |
| """ | |
| 初始化 JWT 解析器 | |
| Args: | |
| verify_signature: 是否验证签名 (开发环境可设为 False) | |
| """ | |
| self.verify_signature = verify_signature | |
| def decode_token(self, token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| 解析 JWT Token | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| 解析后的 payload 字典,失败返回 None | |
| """ | |
| try: | |
| # 解析 JWT (不验证签名) | |
| payload = jwt.decode( | |
| token, | |
| options={ | |
| "verify_signature": self.verify_signature, | |
| "verify_exp": False # 手动检查过期时间 | |
| } | |
| ) | |
| return payload | |
| except jwt.InvalidTokenError as e: | |
| logger.error(f"JWT Token 解析失败: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"解析 Token 时发生错误: {e}") | |
| return None | |
| def extract_email(self, token: str) -> Optional[str]: | |
| """ | |
| 从 Token 中提取邮箱地址 | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| 邮箱地址,失败返回 None | |
| """ | |
| payload = self.decode_token(token) | |
| if not payload: | |
| return None | |
| try: | |
| # ChatGPT Token 的邮箱字段路径 | |
| profile = payload.get("https://api.openai.com/profile", {}) | |
| email = profile.get("email") | |
| return email | |
| except Exception as e: | |
| logger.error(f"提取邮箱失败: {e}") | |
| return None | |
| def extract_client_id(self, token: str) -> Optional[str]: | |
| """从 Token 中提取 OAuth client_id。""" | |
| payload = self.decode_token(token) | |
| if not payload: | |
| return None | |
| try: | |
| cid = payload.get("client_id") | |
| return str(cid) if cid else None | |
| except Exception as e: | |
| logger.error(f"提取 client_id 失败: {e}") | |
| return None | |
| def extract_user_id(self, token: str) -> Optional[str]: | |
| """ | |
| 从 Token 中提取用户 ID | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| 用户 ID,失败返回 None | |
| """ | |
| payload = self.decode_token(token) | |
| if not payload: | |
| return None | |
| try: | |
| # ChatGPT Token 的 user_id 字段路径 | |
| auth = payload.get("https://api.openai.com/auth", {}) | |
| user_id = auth.get("user_id") | |
| return user_id | |
| except Exception as e: | |
| logger.error(f"提取 user_id 失败: {e}") | |
| return None | |
| def get_expiration_time(self, token: str) -> Optional[datetime]: | |
| """ | |
| 获取 Token 过期时间 | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| 过期时间 (datetime 对象),失败返回 None | |
| """ | |
| payload = self.decode_token(token) | |
| if not payload: | |
| return None | |
| try: | |
| exp_timestamp = payload.get("exp") | |
| if exp_timestamp: | |
| return datetime.fromtimestamp(exp_timestamp) | |
| return None | |
| except Exception as e: | |
| logger.error(f"获取过期时间失败: {e}") | |
| return None | |
| def is_token_expired(self, token: str) -> bool: | |
| """ | |
| 检查 Token 是否已过期 | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| True 表示已过期,False 表示未过期 | |
| """ | |
| exp_time = self.get_expiration_time(token) | |
| if not exp_time: | |
| return True # 无法获取过期时间,视为已过期 | |
| return get_now() > exp_time | |
| def validate_token(self, token: str) -> Dict[str, Any]: | |
| """ | |
| 验证 Token 并返回详细信息 | |
| Args: | |
| token: JWT Token 字符串 | |
| Returns: | |
| 验证结果字典,包含 valid, email, user_id, exp_time, is_expired 等字段 | |
| """ | |
| result = { | |
| "valid": False, | |
| "email": None, | |
| "user_id": None, | |
| "exp_time": None, | |
| "is_expired": True, | |
| "error": None | |
| } | |
| # 解析 Token | |
| payload = self.decode_token(token) | |
| if not payload: | |
| result["error"] = "Token 解析失败" | |
| return result | |
| # 提取信息 | |
| result["email"] = self.extract_email(token) | |
| result["user_id"] = self.extract_user_id(token) | |
| result["exp_time"] = self.get_expiration_time(token) | |
| result["is_expired"] = self.is_token_expired(token) | |
| # 验证必要字段 | |
| if not result["email"]: | |
| result["error"] = "无法提取邮箱地址" | |
| return result | |
| if result["is_expired"]: | |
| result["error"] = "Token 已过期" | |
| return result | |
| # 验证通过 | |
| result["valid"] = True | |
| return result | |
| # 创建全局实例 (从配置读取是否验证签名) | |
| def create_jwt_parser(verify_signature: bool = False) -> JWTParser: | |
| """ | |
| 创建 JWT 解析器实例 | |
| Args: | |
| verify_signature: 是否验证签名 | |
| Returns: | |
| JWTParser 实例 | |
| """ | |
| return JWTParser(verify_signature=verify_signature) | |