Spaces:
Running
Running
| import os | |
| import secrets | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| from passlib.context import CryptContext | |
| from sqlalchemy import select | |
| from sqlalchemy.orm import Session | |
| from database import get_db | |
| from models import User | |
| security = HTTPBasic() | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(password: str, password_hash: str) -> bool: | |
| return pwd_context.verify(password, password_hash) | |
| def ensure_default_admin(db: Session) -> None: | |
| username = os.getenv("ADMIN_USERNAME", "admin") | |
| password = os.getenv("ADMIN_PASSWORD", "admin") | |
| existing = db.scalar(select(User).where(User.username == username)) | |
| if existing: | |
| return | |
| db.add(User(username=username, password_hash=hash_password(password), role="admin")) | |
| db.commit() | |
| def get_current_user( | |
| credentials: HTTPBasicCredentials = Depends(security), | |
| db: Session = Depends(get_db), | |
| ) -> User: | |
| user = db.scalar(select(User).where(User.username == credentials.username)) | |
| if not user or not verify_password(credentials.password, user.password_hash): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid credentials", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| if not secrets.compare_digest(user.username, credentials.username): | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") | |
| return user | |
| def require_manager(user: User = Depends(get_current_user)) -> User: | |
| if user.role not in {"admin", "manager"}: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Manager access required") | |
| return user | |
| def require_admin(user: User = Depends(get_current_user)) -> User: | |
| if user.role != "admin": | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") | |
| return user | |