m-ahmad-official's picture
Initial commit: Full-stack todo backend for Hugging Face Spaces
6bed18e
from fastapi import APIRouter, Depends, HTTPException, status, Form
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlmodel import Session
from src.core.database import get_session
from src.auth.security import verify_token, create_access_token
from src.auth.deps import is_token_expired
from src.core.config import settings
from src.auth.user_service import authenticate_user, create_user
from src.models.user import UserCreate
from fastapi.responses import JSONResponse
router = APIRouter()
security = HTTPBearer()
@router.post("/token/refresh", summary="Refresh expired JWT token")
async def refresh_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Refresh an expired JWT token by generating a new one based on the user's identity.
This endpoint allows clients to renew their access tokens without re-authenticating.
"""
token = credentials.credentials
# Verify the token (this will succeed for expired tokens if we just want to extract user data)
# Note: In a real implementation, you'd have a separate refresh token mechanism
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if the token is expired
if not is_token_expired(payload):
# If the token is not expired, we might want to reject the refresh request
# Or we could allow refreshing slightly before expiry
pass # For now, allow refresh regardless of current expiry status
# Create a new token with the same user data
user_data = {key: value for key, value in payload.items() if key != "exp"}
new_token = create_access_token(data=user_data)
return {
"access_token": new_token,
"token_type": "bearer",
"expires_in": int(settings.JWT_EXPIRATION_DELTA),
}
@router.get("/token/validate", summary="Validate JWT token")
async def validate_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Validate a JWT token without using it for any specific operation.
Returns user information if token is valid.
"""
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if token is expired
exp_time = payload.get("exp")
if exp_time:
import time
current_time = time.time()
if current_time >= exp_time:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
return {
"valid": True,
"user_id": payload.get("user_id"),
"role": payload.get("role", "user"),
"exp": payload.get("exp"),
}
@router.post("/token/revoke", summary="Revoke JWT token (placeholder)")
async def revoke_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Revoke a JWT token (this is a placeholder implementation).
In a real system, this would add the token to a blacklist/jti registry.
"""
# In a real implementation, you would add the token to a blacklist
# For now, we just return a success message
return {
"revoked": True,
"message": "Token revoked successfully (in a real implementation, this would be added to a blacklist)",
}
@router.post("/login", summary="Authenticate user and return JWT token")
async def login(
email: str = Form(...),
password: str = Form(...),
session: Session = Depends(get_session),
):
"""
Authenticate a user with email and password.
Returns a JWT token upon successful authentication.
"""
user = authenticate_user(session, email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token - ensure user_id is a string for JWT
access_token = create_access_token(
data={"user_id": str(user.id), "role": getattr(user, "role", "user")}
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": {"id": user.id, "email": user.email, "name": user.name},
}
@router.post("/register", summary="Register a new user")
async def register(
email: str = Form(...),
password: str = Form(...),
name: str = Form(...),
session: Session = Depends(get_session),
):
try:
user_create = UserCreate(email=email, password=password, name=name)
user = create_user(session, user_create)
access_token = create_access_token(
data={"user_id": str(user.id), "role": getattr(user, "role", "user")}
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": {"id": user.id, "email": user.email, "name": user.name},
}
except HTTPException:
# Re-raise HTTP exceptions (like 409 for duplicate email)
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Registration failed: {str(e)}",
)