| """ |
| Authentication and authorization |
| JWT-based authentication with role-based access control |
| """ |
|
|
| from datetime import datetime, timedelta, timezone |
| from typing import Optional |
| import bcrypt |
| from jose import JWTError, jwt |
| from fastapi import Depends, HTTPException, status, Request |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from pydantic import BaseModel |
|
|
| from ..config import settings |
|
|
| |
| security = HTTPBearer() |
|
|
|
|
| class Token(BaseModel): |
| access_token: str |
| token_type: str |
|
|
|
|
| class TokenData(BaseModel): |
| username: Optional[str] = None |
| scopes: list = [] |
|
|
|
|
| class User(BaseModel): |
| username: str |
| email: Optional[str] = None |
| full_name: Optional[str] = None |
| disabled: bool = False |
| scopes: list = [] |
| tenant_id: Optional[str] = None |
|
|
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify password hash""" |
| try: |
| |
| password_bytes = plain_password.encode('utf-8')[:72] |
| return bcrypt.checkpw(password_bytes, hashed_password.encode('utf-8')) |
| except ValueError: |
| return False |
|
|
|
|
| def get_password_hash(password: str) -> str: |
| """Hash password""" |
| password_bytes = password.encode('utf-8')[:72] |
| return bcrypt.hashpw(password_bytes, bcrypt.gensalt()).decode('utf-8') |
|
|
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: |
| """ |
| Create JWT access token |
| |
| Args: |
| data: Token payload data |
| expires_delta: Optional expiration time |
| |
| Returns: |
| JWT token string |
| """ |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.now(timezone.utc).replace(tzinfo=None) + expires_delta |
| else: |
| expire = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=settings.access_token_expire_minutes) |
| |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode( |
| to_encode, |
| settings.secret_key, |
| algorithm=settings.algorithm |
| ) |
| |
| return encoded_jwt |
|
|
|
|
| def decode_token(token: str) -> TokenData: |
| """ |
| Decode and validate JWT token |
| |
| Args: |
| token: JWT token string |
| |
| Returns: |
| TokenData with username and scopes |
| |
| Raises: |
| HTTPException if token is invalid |
| """ |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| try: |
| payload = jwt.decode( |
| token, |
| settings.secret_key, |
| algorithms=[settings.algorithm] |
| ) |
| username: str = payload.get("sub") |
| if username is None: |
| raise credentials_exception |
| |
| token_data = TokenData( |
| username=username, |
| scopes=payload.get("scopes", []) |
| ) |
| return token_data |
| except JWTError: |
| raise credentials_exception |
|
|
|
|
| async def get_current_user( |
| request: Request, |
| credentials: HTTPAuthorizationCredentials = Depends(security) |
| ) -> User: |
| """ |
| Get current user from JWT token |
| |
| Args: |
| request: FastAPI Request to access app state |
| credentials: Authorization credentials from header |
| |
| Returns: |
| Current user |
| |
| Raises: |
| HTTPException if authentication fails |
| """ |
| token = credentials.credentials |
| token_data = decode_token(token) |
| |
| store = getattr(request.app.state, "graph_store", None) |
| if not store: |
| raise HTTPException(status_code=503, detail="Graph store unavailable") |
| |
| user_data = await store.get_user(token_data.username) |
| if not user_data: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="User not found", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| user = User( |
| username=user_data["username"], |
| email=user_data.get("email"), |
| full_name=user_data.get("full_name"), |
| scopes=user_data.get("scopes", []), |
| disabled=user_data.get("disabled", False), |
| tenant_id=user_data.get("tenant_id", settings.default_tenant_id) |
| ) |
| |
| if user.disabled: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="Inactive user" |
| ) |
| |
| return user |
|
|
|
|
| def check_scope(required_scope: str): |
| """ |
| Dependency to check if user has required scope |
| |
| Args: |
| required_scope: Required scope/permission |
| |
| Returns: |
| Dependency function |
| """ |
| |
| async def scope_checker(current_user: User = Depends(get_current_user)): |
| if required_scope not in current_user.scopes and "admin" not in current_user.scopes: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Not enough permissions" |
| ) |
| return current_user |
| |
| return scope_checker |
|
|