Spaces:
Sleeping
Sleeping
| # app/core/dependencies.py | |
| # FastAPI dependency injection — auth guards, role checks, shared helpers | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from app.core.security import decode_access_token | |
| from app.db.models.user import UserDocument, UserRole | |
| _bearer = HTTPBearer(auto_error=False) | |
| async def get_current_user( | |
| credentials: HTTPAuthorizationCredentials | None = Depends(_bearer), | |
| ) -> UserDocument: | |
| """Extract and validate JWT; return the authenticated UserDocument.""" | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Not authenticated", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| payload = decode_access_token(credentials.credentials) | |
| if not payload: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| ) | |
| user = await UserDocument.get(payload["sub"]) | |
| if not user or not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found or deactivated", | |
| ) | |
| return user | |
| def require_role(*roles: UserRole): | |
| """Factory that returns a dependency requiring one of the given roles.""" | |
| async def _check(user: UserDocument = Depends(get_current_user)) -> UserDocument: | |
| if user.role not in roles: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Role '{user.role}' is not permitted for this action", | |
| ) | |
| return user | |
| return _check | |