File size: 3,560 Bytes
de8c765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from typing import Optional
from dataclasses import dataclass
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from jwt.exceptions import InvalidTokenError

from Backend.core.config import settings
from Backend.core.logging import get_logger

logger = get_logger(__name__)

security = HTTPBearer(auto_error=False)


@dataclass
class AuthenticatedUser:
    id: str
    email: Optional[str] = None
    role: str = "user"


def verify_jwt_token(token: str) -> dict:
    try:
        decoded = jwt.decode(
            token,
            settings.supabase_jwt_secret,
            algorithms=["HS256"],
            audience="authenticated",
        )
        return decoded
    except InvalidTokenError as e:
        logger.warning(f"JWT verification failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"},
        )


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> AuthenticatedUser:
    if not credentials:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authentication required",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    token = credentials.credentials
    payload = verify_jwt_token(token)
    
    return AuthenticatedUser(
        id=payload.get("sub", ""),
        email=payload.get("email"),
        role=payload.get("role", "user"),
    )


async def get_optional_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> Optional[AuthenticatedUser]:
    if not credentials:
        return None
    
    try:
        token = credentials.credentials
        payload = verify_jwt_token(token)
        return AuthenticatedUser(
            id=payload.get("sub", ""),
            email=payload.get("email"),
            role=payload.get("role", "user"),
        )
    except HTTPException:
        return None


def get_user_id_from_form_token(authorization: Optional[str]) -> Optional[str]:
    if not authorization:
        logger.debug("No authorization header provided for form token extraction")
        return None
    if not authorization.startswith("Bearer "):
        logger.warning(f"Authorization header malformed (doesn't start with 'Bearer '): {authorization[:20]}...")
        return None
    try:
        token = authorization.replace("Bearer ", "")
        
        unverified_header = jwt.get_unverified_header(token)
        logger.info(f"JWT header: alg={unverified_header.get('alg')}, typ={unverified_header.get('typ')}")
        
        try:
            payload = jwt.decode(
                token,
                settings.supabase_jwt_secret,
                algorithms=["HS256"],
                audience="authenticated",
            )
        except jwt.exceptions.InvalidAlgorithmError:
            logger.warning("HS256 verification failed, falling back to unverified decode (Supabase already authenticated user)")
            payload = jwt.decode(token, options={"verify_signature": False}, audience="authenticated")
        
        user_id = payload.get("sub")
        email = payload.get("email")
        logger.info(f"Successfully extracted user_id from form token: {user_id} (email: {email})")
        return user_id
    except InvalidTokenError as e:
        logger.warning(f"JWT decode failed for form token: {e}")
        return None