Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from jose import JWTError, jwt | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from config import get_settings | |
| settings = get_settings() | |
| security = HTTPBearer() | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.now(timezone.utc) + (expires_delta or timedelta(seconds=settings.JWT_EXPIRY)) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) | |
| def verify_token(token: str) -> dict: | |
| try: | |
| payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) | |
| return payload | |
| except JWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| ) | |
| async def get_current_user( | |
| credentials: HTTPAuthorizationCredentials = Depends(security), | |
| ) -> dict: | |
| payload = verify_token(credentials.credentials) | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise HTTPException(status_code=401, detail="Invalid token payload") | |
| return { | |
| "user_id": user_id, | |
| "email": payload.get("email", ""), | |
| "role": payload.get("role", "student"), | |
| "name": payload.get("name", ""), | |
| } | |
| def require_role(role: str): | |
| async def role_checker(current_user: dict = Depends(get_current_user)): | |
| if current_user.get("role") != role: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Access denied. Requires '{role}' role.", | |
| ) | |
| return current_user | |
| return role_checker | |