| """
|
| Auth dependencies for the Space backend.
|
|
|
| Verifies JWT tokens by calling the Admin Worker's /auth/me endpoint.
|
| Results are cached in-memory (TTL-based) to avoid hitting the Worker on every request.
|
| No JWT_SECRET needed on the Space side.
|
| """
|
|
|
| import time
|
| from dataclasses import dataclass
|
|
|
| import httpx
|
| from fastapi import Request, WebSocket, HTTPException
|
|
|
| from config import ADMIN_API_URL
|
|
|
|
|
| @dataclass
|
| class AuthUser:
|
| """Authenticated user extracted from JWT."""
|
| sub: str
|
| username: str
|
| role: str
|
|
|
|
|
|
|
|
|
| _token_cache: dict[str, tuple[AuthUser, float]] = {}
|
| _CACHE_TTL = 300
|
|
|
|
|
| def _cleanup_cache():
|
| """Remove expired entries from cache."""
|
| now = time.time()
|
| expired = [k for k, (_, exp) in _token_cache.items() if exp < now]
|
| for k in expired:
|
| del _token_cache[k]
|
|
|
|
|
| def verify_token(token: str) -> AuthUser | None:
|
| """Verify a token by calling Worker /auth/me, with caching."""
|
| if not token or not ADMIN_API_URL:
|
| return None
|
|
|
|
|
| now = time.time()
|
| cached = _token_cache.get(token)
|
| if cached:
|
| user, expiry = cached
|
| if expiry > now:
|
| return user
|
| else:
|
| del _token_cache[token]
|
|
|
|
|
| try:
|
| resp = httpx.get(
|
| f"{ADMIN_API_URL}/auth/me",
|
| headers={"Authorization": f"Bearer {token}"},
|
| timeout=10,
|
| )
|
| if resp.status_code != 200:
|
| return None
|
|
|
| data = resp.json()
|
| user_data = data.get("user")
|
| if not user_data:
|
| return None
|
|
|
| user = AuthUser(
|
| sub=user_data.get("id", ""),
|
| username=user_data.get("username", ""),
|
| role=user_data.get("role", "user"),
|
| )
|
|
|
| if not user.sub or not user.username:
|
| return None
|
|
|
|
|
| _token_cache[token] = (user, now + _CACHE_TTL)
|
|
|
|
|
| if len(_token_cache) > 100:
|
| _cleanup_cache()
|
|
|
| return user
|
|
|
| except Exception:
|
| return None
|
|
|
|
|
| def get_current_user(request: Request) -> AuthUser:
|
| """FastAPI dependency: extract and verify JWT from Authorization header."""
|
| auth_header = request.headers.get("Authorization", "")
|
| if not auth_header.startswith("Bearer "):
|
| raise HTTPException(401, "Chưa đăng nhập")
|
|
|
| token = auth_header[7:]
|
| user = verify_token(token)
|
| if not user:
|
| raise HTTPException(401, "Token không hợp lệ hoặc đã hết hạn")
|
| return user
|
|
|
|
|
| def get_ws_user(websocket: WebSocket) -> AuthUser | None:
|
| """Extract and verify JWT from WebSocket query parameter ?token=..."""
|
| token = websocket.query_params.get("token", "")
|
| if not token:
|
| return None
|
| return verify_token(token)
|
|
|