import os import secrets from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from passlib.context import CryptContext from sqlalchemy import select from sqlalchemy.orm import Session from database import get_db from models import User security = HTTPBasic() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def ensure_default_admin(db: Session) -> None: username = os.getenv("ADMIN_USERNAME", "admin") password = os.getenv("ADMIN_PASSWORD", "admin") existing = db.scalar(select(User).where(User.username == username)) if existing: return db.add(User(username=username, password_hash=hash_password(password), role="admin")) db.commit() def get_current_user( credentials: HTTPBasicCredentials = Depends(security), db: Session = Depends(get_db), ) -> User: user = db.scalar(select(User).where(User.username == credentials.username)) if not user or not verify_password(credentials.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) if not secrets.compare_digest(user.username, credentials.username): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") return user def require_manager(user: User = Depends(get_current_user)) -> User: if user.role not in {"admin", "manager"}: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Manager access required") return user def require_admin(user: User = Depends(get_current_user)) -> User: if user.role != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return user