Harshit Ghosh
feat: implement robust security middleware, authentication system, and frontend UI components
e4fd6e0 | """ | |
| Authentication utilities and decorators for user management and security | |
| """ | |
| import os | |
| import logging | |
| from functools import wraps | |
| from flask import session, redirect, url_for, request, g, abort | |
| from flask_login import LoginManager, current_user | |
| from models import db, User, AuditLog | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| login_manager = LoginManager() | |
| def init_auth(app): | |
| """Initialize authentication system""" | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'auth.login' | |
| login_manager.login_message = 'Please log in to access this page.' | |
| login_manager.login_message_category = 'info' | |
| def load_user(user_id): | |
| """Load user from database by ID""" | |
| return User.query.get(int(user_id)) | |
| def get_client_ip(): | |
| """Extract client IP address from request""" | |
| if request.headers.get('X-Forwarded-For'): | |
| return request.headers.get('X-Forwarded-For').split(',')[0].strip() | |
| return request.remote_addr or 'unknown' | |
| def log_audit(action, user_id=None, resource_type=None, resource_id=None, | |
| details=None, status='success'): | |
| """Log action to audit trail""" | |
| try: | |
| audit_entry = AuditLog( | |
| user_id=user_id, | |
| action=action, | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| details=details, | |
| ip_address=get_client_ip(), | |
| timestamp=datetime.utcnow(), | |
| status=status | |
| ) | |
| db.session.add(audit_entry) | |
| db.session.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to log audit entry: {e}") | |
| # Don't raise - audit failures shouldn't break the app | |
| def login_required_with_audit(f): | |
| """Decorator that requires login and logs the access""" | |
| def decorated_function(*args, **kwargs): | |
| if not current_user.is_authenticated: | |
| log_audit('access_denied', status='failure', details=f'Unauthorized access to {request.path}') | |
| return redirect(url_for('auth.login')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def require_json_content_type(f): | |
| """Decorator to ensure request has JSON content type""" | |
| def decorated_function(*args, **kwargs): | |
| if request.method in ['POST', 'PUT', 'PATCH']: | |
| if not request.is_json: | |
| return {'error': 'Content-Type must be application/json'}, 400 | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def validate_username(username): | |
| """Validate username format""" | |
| if not username or len(username) < 3 or len(username) > 80: | |
| return False, "Username must be between 3 and 80 characters" | |
| if not all(c.isalnum() or c in '_-' for c in username): | |
| return False, "Username can only contain letters, numbers, underscores, and hyphens" | |
| return True, "" | |
| def validate_password(password): | |
| """Validate password strength""" | |
| if not password or len(password) < 8: | |
| return False, "Password must be at least 8 characters long" | |
| if len(password) > 128: | |
| return False, "Password must be less than 128 characters" | |
| # Check for at least one uppercase, one lowercase, one digit | |
| has_upper = any(c.isupper() for c in password) | |
| has_lower = any(c.islower() for c in password) | |
| has_digit = any(c.isdigit() for c in password) | |
| if not (has_upper and has_lower and has_digit): | |
| return False, "Password must contain uppercase, lowercase, and digits" | |
| return True, "" | |
| def validate_email(email): | |
| """Basic email validation""" | |
| import re | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| if not re.match(pattern, email): | |
| return False, "Invalid email format" | |
| return True, "" | |