File size: 3,046 Bytes
36ce73b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | """
Auth dependencies for the Space backend.
Verifies JWT tokens by calling the Admin Worker's /auth/me endpoint.
Results are cached in-memory (TTL-based) to avoid hitting the Worker on every request.
No JWT_SECRET needed on the Space side.
"""
import time
from dataclasses import dataclass
import httpx
from fastapi import Request, WebSocket, HTTPException
from config import ADMIN_API_URL
@dataclass
class AuthUser:
"""Authenticated user extracted from JWT."""
sub: str # user ID
username: str
role: str # "admin" or "user"
# ── Token verification cache ──
# Maps token -> (AuthUser, expiry_timestamp)
_token_cache: dict[str, tuple[AuthUser, float]] = {}
_CACHE_TTL = 300 # 5 minutes
def _cleanup_cache():
"""Remove expired entries from cache."""
now = time.time()
expired = [k for k, (_, exp) in _token_cache.items() if exp < now]
for k in expired:
del _token_cache[k]
def verify_token(token: str) -> AuthUser | None:
"""Verify a token by calling Worker /auth/me, with caching."""
if not token or not ADMIN_API_URL:
return None
# Check cache first
now = time.time()
cached = _token_cache.get(token)
if cached:
user, expiry = cached
if expiry > now:
return user
else:
del _token_cache[token]
# Call Worker to verify
try:
resp = httpx.get(
f"{ADMIN_API_URL}/auth/me",
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
if resp.status_code != 200:
return None
data = resp.json()
user_data = data.get("user")
if not user_data:
return None
user = AuthUser(
sub=user_data.get("id", ""),
username=user_data.get("username", ""),
role=user_data.get("role", "user"),
)
if not user.sub or not user.username:
return None
# Cache the result
_token_cache[token] = (user, now + _CACHE_TTL)
# Periodic cleanup
if len(_token_cache) > 100:
_cleanup_cache()
return user
except Exception:
return None
def get_current_user(request: Request) -> AuthUser:
"""FastAPI dependency: extract and verify JWT from Authorization header."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(401, "Chưa đăng nhập")
token = auth_header[7:]
user = verify_token(token)
if not user:
raise HTTPException(401, "Token không hợp lệ hoặc đã hết hạn")
return user
def get_ws_user(websocket: WebSocket) -> AuthUser | None:
"""Extract and verify JWT from WebSocket query parameter ?token=..."""
token = websocket.query_params.get("token", "")
if not token:
return None
return verify_token(token)
|