from typing import Optional from sqlmodel import Session, select from fastapi import HTTPException, status from passlib.context import CryptContext from datetime import datetime, timedelta from jose import JWTError, jwt from src.models.user import User, UserCreate, UserPublic from src.core.config import settings # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a plaintext password against a hashed password.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a plaintext password.""" return pwd_context.hash(password) def authenticate_user(session: Session, email: str, password: str) -> Optional[User]: """Authenticate a user by email and password.""" statement = select(User).where(User.email == email) user = session.exec(statement).first() if not user or not verify_password(password, user.hashed_password): return None return user def create_user(session: Session, user_create: UserCreate) -> User: """Create a new user with hashed password.""" # Check if user already exists statement = select(User).where(User.email == user_create.email) existing_user = session.exec(statement).first() if existing_user: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="User with this email already exists" ) # Hash the password hashed_password = get_password_hash(user_create.password) # Create the user db_user = User( email=user_create.email, name=user_create.name, hashed_password=hashed_password ) session.add(db_user) session.commit() session.refresh(db_user) return db_user