File size: 4,295 Bytes
e4fd6e0
 
 
 
 
31bb73c
e4fd6e0
5105d0e
e3566c9
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3566c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e4fd6e0
 
 
 
c4436fb
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5105d0e
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Authentication utilities and decorators for user management and security
"""
import logging
from functools import wraps
from flask import session, redirect, url_for, request, has_request_context
from flask_login import LoginManager, current_user
from models import db, User, AuditLog, now_ist
from sqlalchemy.exc import SQLAlchemyError

logger = logging.getLogger(__name__)

login_manager = LoginManager()


def init_auth(app):
    """Initialize authentication system"""
    login_manager.init_app(app)
    login_manager.login_view = 'auth.login'
    login_manager.login_message = 'Please log in to access this page.'
    login_manager.login_message_category = 'info'


@login_manager.user_loader
def load_user(user_id):
    """Load user from database by ID"""
    try:
        return User.query.get(int(user_id))
    except SQLAlchemyError as e:
        logger.warning(f"User loader failed, clearing session context: {e}")
        try:
            db.session.rollback()
        except Exception:
            pass
        return None
    except Exception as e:
        logger.warning(f"Unexpected user loader failure: {e}")
        try:
            db.session.rollback()
        except Exception:
            pass
        return None


def get_client_ip():
    """Extract client IP address from request"""
    if not has_request_context():
        return 'system'
    if request.headers.get('X-Forwarded-For'):
        return request.headers.get('X-Forwarded-For').split(',')[0].strip()
    return request.remote_addr or 'unknown'


def log_audit(action, user_id=None, resource_type=None, resource_id=None, 
              details=None, status='success'):
    """Log action to audit trail"""
    try:
        audit_entry = AuditLog(
            user_id=user_id,
            action=action,
            resource_type=resource_type,
            resource_id=resource_id,
            details=details,
            ip_address=get_client_ip(),
            timestamp=now_ist(),
            status=status
        )
        db.session.add(audit_entry)
        db.session.commit()
    except Exception as e:
        logger.error(f"Failed to log audit entry: {e}")
        # Don't raise - audit failures shouldn't break the app


def login_required_with_audit(f):
    """Decorator that requires login and logs the access"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated:
            log_audit('access_denied', status='failure', details=f'Unauthorized access to {request.path}')
            return redirect(url_for('auth.login'))
        return f(*args, **kwargs)
    return decorated_function


def require_json_content_type(f):
    """Decorator to ensure request has JSON content type"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if request.method in ['POST', 'PUT', 'PATCH']:
            if not request.is_json:
                return {'error': 'Content-Type must be application/json'}, 400
        return f(*args, **kwargs)
    return decorated_function


def validate_username(username):
    """Validate username format"""
    if not username or len(username) < 3 or len(username) > 80:
        return False, "Username must be between 3 and 80 characters"
    if not all(c.isalnum() or c in '_-' for c in username):
        return False, "Username can only contain letters, numbers, underscores, and hyphens"
    return True, ""


def validate_password(password):
    """Validate password strength"""
    if not password or len(password) < 8:
        return False, "Password must be at least 8 characters long"
    if len(password) > 128:
        return False, "Password must be less than 128 characters"
    # Check for at least one uppercase, one lowercase, one digit
    has_upper = any(c.isupper() for c in password)
    has_lower = any(c.islower() for c in password)
    has_digit = any(c.isdigit() for c in password)
    if not (has_upper and has_lower and has_digit):
        return False, "Password must contain uppercase, lowercase, and digits"
    return True, ""


def validate_email(email):
    """Basic email validation"""
    import re
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(pattern, email):
        return False, "Invalid email format"
    return True, ""