ICH-Detection-Pipeline / auth_utils.py
Harshit Ghosh
phase 3 ig
5105d0e
raw
history blame
4.32 kB
"""
Authentication utilities and decorators for user management and security
"""
import os
import logging
from functools import wraps
from flask import session, redirect, url_for, request, g, abort, has_request_context
from flask_login import LoginManager, current_user
from models import db, User, AuditLog, now_ist
from sqlalchemy.exc import SQLAlchemyError
logger = logging.getLogger(__name__)
login_manager = LoginManager()
def init_auth(app):
"""Initialize authentication system"""
login_manager.init_app(app)
login_manager.login_view = 'auth.login'
login_manager.login_message = 'Please log in to access this page.'
login_manager.login_message_category = 'info'
@login_manager.user_loader
def load_user(user_id):
"""Load user from database by ID"""
try:
return User.query.get(int(user_id))
except SQLAlchemyError as e:
logger.warning(f"User loader failed, clearing session context: {e}")
try:
db.session.rollback()
except Exception:
pass
return None
except Exception as e:
logger.warning(f"Unexpected user loader failure: {e}")
try:
db.session.rollback()
except Exception:
pass
return None
def get_client_ip():
"""Extract client IP address from request"""
if not has_request_context():
return 'system'
if request.headers.get('X-Forwarded-For'):
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
return request.remote_addr or 'unknown'
def log_audit(action, user_id=None, resource_type=None, resource_id=None,
details=None, status='success'):
"""Log action to audit trail"""
try:
audit_entry = AuditLog(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
ip_address=get_client_ip(),
timestamp=now_ist(),
status=status
)
db.session.add(audit_entry)
db.session.commit()
except Exception as e:
logger.error(f"Failed to log audit entry: {e}")
# Don't raise - audit failures shouldn't break the app
def login_required_with_audit(f):
"""Decorator that requires login and logs the access"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
log_audit('access_denied', status='failure', details=f'Unauthorized access to {request.path}')
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
def require_json_content_type(f):
"""Decorator to ensure request has JSON content type"""
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method in ['POST', 'PUT', 'PATCH']:
if not request.is_json:
return {'error': 'Content-Type must be application/json'}, 400
return f(*args, **kwargs)
return decorated_function
def validate_username(username):
"""Validate username format"""
if not username or len(username) < 3 or len(username) > 80:
return False, "Username must be between 3 and 80 characters"
if not all(c.isalnum() or c in '_-' for c in username):
return False, "Username can only contain letters, numbers, underscores, and hyphens"
return True, ""
def validate_password(password):
"""Validate password strength"""
if not password or len(password) < 8:
return False, "Password must be at least 8 characters long"
if len(password) > 128:
return False, "Password must be less than 128 characters"
# Check for at least one uppercase, one lowercase, one digit
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_digit = any(c.isdigit() for c in password)
if not (has_upper and has_lower and has_digit):
return False, "Password must contain uppercase, lowercase, and digits"
return True, ""
def validate_email(email):
"""Basic email validation"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
return False, "Invalid email format"
return True, ""