import os import hmac import hashlib import base64 import json import time from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials SECRET_KEY = os.getenv("JWT_SECRET", "change-me-in-prod") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 120 security = HTTPBearer() fake_users_db = {} def get_password_hash(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() def verify_password(plain_password: str, hashed_password: str) -> bool: return hmac.compare_digest(get_password_hash(plain_password), hashed_password) def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": int(expire.timestamp())}) payload = json.dumps(to_encode, separators=(',', ':'), sort_keys=True).encode() signature = hmac.new(SECRET_KEY.encode(), payload, hashlib.sha256).digest() token = base64.urlsafe_b64encode(payload).decode().rstrip("=") + "." + base64.urlsafe_b64encode(signature).decode().rstrip("=") return token def decode_access_token(token: str) -> dict: try: payload_b64, sig_b64 = token.split(".") padding = '=' * (-len(payload_b64) % 4) payload = base64.urlsafe_b64decode(payload_b64 + padding) padding = '=' * (-len(sig_b64) % 4) signature = base64.urlsafe_b64decode(sig_b64 + padding) expected_sig = hmac.new(SECRET_KEY.encode(), payload, hashlib.sha256).digest() if not hmac.compare_digest(expected_sig, signature): raise ValueError("Invalid signature") content = json.loads(payload.decode()) if "exp" in content and time.time() > content["exp"]: raise ValueError("Token expired") return content except Exception as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token") def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: token = credentials.credentials data = decode_access_token(token) username = data.get("sub") if not username or username not in fake_users_db: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid user") return {"username": username} def register_user(username: str, password: str) -> dict: if username in fake_users_db: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User already exists") hashed = get_password_hash(password) fake_users_db[username] = {"username": username, "hashed_password": hashed} return {"username": username} def authenticate_user(username: str, password: str) -> bool: user = fake_users_db.get(username) if not user: return False return verify_password(password, user.get("hashed_password", ""))