sajith-0701's picture
v1.1
1cff1e5
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import get_settings
settings = get_settings()
security = HTTPBearer()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(seconds=settings.JWT_EXPIRY))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
payload = verify_token(credentials.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token payload")
return {
"user_id": user_id,
"email": payload.get("email", ""),
"role": payload.get("role", "student"),
"name": payload.get("name", ""),
}
def require_role(role: str):
async def role_checker(current_user: dict = Depends(get_current_user)):
if current_user.get("role") != role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. Requires '{role}' role.",
)
return current_user
return role_checker