""" Auth dependencies for the Space backend. Verifies JWT tokens by calling the Admin Worker's /auth/me endpoint. Results are cached in-memory (TTL-based) to avoid hitting the Worker on every request. No JWT_SECRET needed on the Space side. """ import time from dataclasses import dataclass import httpx from fastapi import Request, WebSocket, HTTPException from config import ADMIN_API_URL @dataclass class AuthUser: """Authenticated user extracted from JWT.""" sub: str # user ID username: str role: str # "admin" or "user" # ── Token verification cache ── # Maps token -> (AuthUser, expiry_timestamp) _token_cache: dict[str, tuple[AuthUser, float]] = {} _CACHE_TTL = 300 # 5 minutes def _cleanup_cache(): """Remove expired entries from cache.""" now = time.time() expired = [k for k, (_, exp) in _token_cache.items() if exp < now] for k in expired: del _token_cache[k] def verify_token(token: str) -> AuthUser | None: """Verify a token by calling Worker /auth/me, with caching.""" if not token or not ADMIN_API_URL: return None # Check cache first now = time.time() cached = _token_cache.get(token) if cached: user, expiry = cached if expiry > now: return user else: del _token_cache[token] # Call Worker to verify try: resp = httpx.get( f"{ADMIN_API_URL}/auth/me", headers={"Authorization": f"Bearer {token}"}, timeout=10, ) if resp.status_code != 200: return None data = resp.json() user_data = data.get("user") if not user_data: return None user = AuthUser( sub=user_data.get("id", ""), username=user_data.get("username", ""), role=user_data.get("role", "user"), ) if not user.sub or not user.username: return None # Cache the result _token_cache[token] = (user, now + _CACHE_TTL) # Periodic cleanup if len(_token_cache) > 100: _cleanup_cache() return user except Exception: return None def get_current_user(request: Request) -> AuthUser: """FastAPI dependency: extract and verify JWT from Authorization header.""" auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(401, "Chưa đăng nhập") token = auth_header[7:] user = verify_token(token) if not user: raise HTTPException(401, "Token không hợp lệ hoặc đã hết hạn") return user def get_ws_user(websocket: WebSocket) -> AuthUser | None: """Extract and verify JWT from WebSocket query parameter ?token=...""" token = websocket.query_params.get("token", "") if not token: return None return verify_token(token)