| from typing import Optional |
| from sqlmodel import Session, select |
| from fastapi import HTTPException, status |
| from passlib.context import CryptContext |
| from datetime import datetime, timedelta |
| from jose import JWTError, jwt |
| from src.models.user import User, UserCreate, UserPublic |
| from src.core.config import settings |
|
|
| |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify a plaintext password against a hashed password.""" |
| return pwd_context.verify(plain_password, hashed_password) |
|
|
| def get_password_hash(password: str) -> str: |
| """Hash a plaintext password.""" |
| return pwd_context.hash(password) |
|
|
| def authenticate_user(session: Session, email: str, password: str) -> Optional[User]: |
| """Authenticate a user by email and password.""" |
| statement = select(User).where(User.email == email) |
| user = session.exec(statement).first() |
|
|
| if not user or not verify_password(password, user.hashed_password): |
| return None |
|
|
| return user |
|
|
| def create_user(session: Session, user_create: UserCreate) -> User: |
| """Create a new user with hashed password.""" |
| |
| statement = select(User).where(User.email == user_create.email) |
| existing_user = session.exec(statement).first() |
| if existing_user: |
| raise HTTPException( |
| status_code=status.HTTP_409_CONFLICT, |
| detail="User with this email already exists" |
| ) |
|
|
| |
| hashed_password = get_password_hash(user_create.password) |
|
|
| |
| db_user = User( |
| email=user_create.email, |
| name=user_create.name, |
| hashed_password=hashed_password |
| ) |
|
|
| session.add(db_user) |
| session.commit() |
| session.refresh(db_user) |
|
|
| return db_user |