File size: 3,060 Bytes
6bed18e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from datetime import timedelta
from typing import Optional
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from src.core.config import settings
from src.auth.security import verify_token, create_access_token


# HTTP Bearer scheme for token authentication
security = HTTPBearer()


def get_current_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """
    Get the current user ID from the JWT token in the Authorization header
    """
    token = credentials.credentials

    payload = verify_token(token)
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    user_id: str = payload.get("user_id")
    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user_id


def get_current_user_payload(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """
    Get the full user payload from the JWT token in the Authorization header
    """
    token = credentials.credentials

    payload = verify_token(token)
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return payload


def require_authenticated_user(current_user_id: str = Depends(get_current_user_id)):
    """
    Require an authenticated user for endpoints that need authentication
    but don't necessarily need the user ID
    """
    return current_user_id


def verify_admin_access(current_user_payload: dict = Depends(get_current_user_payload)):
    """
    Verify that the current user has admin access
    """
    role = current_user_payload.get("role", "user")
    if role != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )
    return current_user_payload


def refresh_access_token(current_user_payload: dict) -> str:
    """
    Generate a new access token based on the current user's payload
    This function can be used to refresh an expired token
    """
    # Remove the expiration time from the payload to create a new token
    user_data = {key: value for key, value in current_user_payload.items() if key != "exp"}

    # Create a new token with fresh expiration
    new_token = create_access_token(data=user_data)
    return new_token


def is_token_expired(payload: dict) -> bool:
    """
    Check if the token in the payload is expired
    """
    exp_time = payload.get("exp")
    if exp_time is None:
        return True

    import time
    current_time = time.time()
    return current_time >= exp_time