| """ |
| Auth dependencies for the Space backend. |
| |
| Verifies JWT tokens by calling the Admin Worker's /auth/me endpoint. |
| Results are cached in-memory (TTL-based) to avoid hitting the Worker on every request. |
| No JWT_SECRET needed on the Space side. |
| """ |
|
|
| import time |
| from dataclasses import dataclass |
|
|
| import httpx |
| from fastapi import Request, WebSocket, HTTPException |
|
|
| from config import ADMIN_API_URL |
|
|
|
|
| @dataclass |
| class AuthUser: |
| """Authenticated user extracted from JWT.""" |
| sub: str |
| username: str |
| role: str |
|
|
|
|
| |
| |
| _token_cache: dict[str, tuple[AuthUser, float]] = {} |
| _CACHE_TTL = 300 |
|
|
|
|
| def _cleanup_cache(): |
| """Remove expired entries from cache.""" |
| now = time.time() |
| expired = [k for k, (_, exp) in _token_cache.items() if exp < now] |
| for k in expired: |
| del _token_cache[k] |
|
|
|
|
| def verify_token(token: str) -> AuthUser | None: |
| """Verify a token by calling Worker /auth/me, with caching.""" |
| if not token or not ADMIN_API_URL: |
| return None |
|
|
| |
| now = time.time() |
| cached = _token_cache.get(token) |
| if cached: |
| user, expiry = cached |
| if expiry > now: |
| return user |
| else: |
| del _token_cache[token] |
|
|
| |
| try: |
| resp = httpx.get( |
| f"{ADMIN_API_URL}/auth/me", |
| headers={"Authorization": f"Bearer {token}"}, |
| timeout=10, |
| ) |
| if resp.status_code != 200: |
| return None |
|
|
| data = resp.json() |
| user_data = data.get("user") |
| if not user_data: |
| return None |
|
|
| user = AuthUser( |
| sub=user_data.get("id", ""), |
| username=user_data.get("username", ""), |
| role=user_data.get("role", "user"), |
| ) |
|
|
| if not user.sub or not user.username: |
| return None |
|
|
| |
| _token_cache[token] = (user, now + _CACHE_TTL) |
|
|
| |
| if len(_token_cache) > 100: |
| _cleanup_cache() |
|
|
| return user |
|
|
| except Exception: |
| return None |
|
|
|
|
| def get_current_user(request: Request) -> AuthUser: |
| """FastAPI dependency: extract and verify JWT from Authorization header, query token, or cookie.""" |
| token = "" |
|
|
| auth_header = request.headers.get("Authorization", "") |
| if auth_header.startswith("Bearer "): |
| token = auth_header[7:] |
|
|
| if not token: |
| token = request.query_params.get("token", "") |
|
|
| if not token: |
| token = request.cookies.get("token", "") |
|
|
| if not token: |
| raise HTTPException(401, "Chưa đăng nhập") |
|
|
| user = verify_token(token) |
| if not user: |
| raise HTTPException(401, "Token không hợp lệ hoặc đã hết hạn") |
| return user |
|
|
|
|
| def get_ws_user(websocket: WebSocket) -> AuthUser | None: |
| """Extract and verify JWT from WebSocket query parameter ?token=...""" |
| token = websocket.query_params.get("token", "") |
| if not token: |
| return None |
| return verify_token(token) |
|
|