m-ahmad-official's picture
Initial commit: Full-stack todo backend for Hugging Face Spaces
6bed18e
from datetime import timedelta
from typing import Optional
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from src.core.config import settings
from src.auth.security import verify_token, create_access_token
# HTTP Bearer scheme for token authentication
security = HTTPBearer()
def get_current_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Get the current user ID from the JWT token in the Authorization header
"""
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("user_id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user_id
def get_current_user_payload(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Get the full user payload from the JWT token in the Authorization header
"""
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
def require_authenticated_user(current_user_id: str = Depends(get_current_user_id)):
"""
Require an authenticated user for endpoints that need authentication
but don't necessarily need the user ID
"""
return current_user_id
def verify_admin_access(current_user_payload: dict = Depends(get_current_user_payload)):
"""
Verify that the current user has admin access
"""
role = current_user_payload.get("role", "user")
if role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return current_user_payload
def refresh_access_token(current_user_payload: dict) -> str:
"""
Generate a new access token based on the current user's payload
This function can be used to refresh an expired token
"""
# Remove the expiration time from the payload to create a new token
user_data = {key: value for key, value in current_user_payload.items() if key != "exp"}
# Create a new token with fresh expiration
new_token = create_access_token(data=user_data)
return new_token
def is_token_expired(payload: dict) -> bool:
"""
Check if the token in the payload is expired
"""
exp_time = payload.get("exp")
if exp_time is None:
return True
import time
current_time = time.time()
return current_time >= exp_time