File size: 3,560 Bytes
42d88ae | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | from typing import Optional
from dataclasses import dataclass
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from jwt.exceptions import InvalidTokenError
from Backend.core.config import settings
from Backend.core.logging import get_logger
logger = get_logger(__name__)
security = HTTPBearer(auto_error=False)
@dataclass
class AuthenticatedUser:
id: str
email: Optional[str] = None
role: str = "user"
def verify_jwt_token(token: str) -> dict:
try:
decoded = jwt.decode(
token,
settings.supabase_jwt_secret,
algorithms=["HS256"],
audience="authenticated",
)
return decoded
except InvalidTokenError as e:
logger.warning(f"JWT verification failed: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> AuthenticatedUser:
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
payload = verify_jwt_token(token)
return AuthenticatedUser(
id=payload.get("sub", ""),
email=payload.get("email"),
role=payload.get("role", "user"),
)
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> Optional[AuthenticatedUser]:
if not credentials:
return None
try:
token = credentials.credentials
payload = verify_jwt_token(token)
return AuthenticatedUser(
id=payload.get("sub", ""),
email=payload.get("email"),
role=payload.get("role", "user"),
)
except HTTPException:
return None
def get_user_id_from_form_token(authorization: Optional[str]) -> Optional[str]:
if not authorization:
logger.debug("No authorization header provided for form token extraction")
return None
if not authorization.startswith("Bearer "):
logger.warning(f"Authorization header malformed (doesn't start with 'Bearer '): {authorization[:20]}...")
return None
try:
token = authorization.replace("Bearer ", "")
unverified_header = jwt.get_unverified_header(token)
logger.info(f"JWT header: alg={unverified_header.get('alg')}, typ={unverified_header.get('typ')}")
try:
payload = jwt.decode(
token,
settings.supabase_jwt_secret,
algorithms=["HS256"],
audience="authenticated",
)
except jwt.exceptions.InvalidAlgorithmError:
logger.warning("HS256 verification failed, falling back to unverified decode (Supabase already authenticated user)")
payload = jwt.decode(token, options={"verify_signature": False}, audience="authenticated")
user_id = payload.get("sub")
email = payload.get("email")
logger.info(f"Successfully extracted user_id from form token: {user_id} (email: {email})")
return user_id
except InvalidTokenError as e:
logger.warning(f"JWT decode failed for form token: {e}")
return None
|