| import logging
|
| from datetime import datetime
|
| import hashlib
|
|
|
| class AuthManager:
|
| """Authentication and user management"""
|
|
|
| def __init__(self, db_manager):
|
| """Initialize auth manager with database manager"""
|
| self.db = db_manager
|
|
|
| def hash_password(self, password):
|
| """Simple password hashing (in production, use bcrypt or similar)"""
|
| return hashlib.sha256(password.encode()).hexdigest()
|
|
|
| def authenticate_user(self, username, password):
|
| """Authenticate user and return user data"""
|
| try:
|
| user = self.db.execute_query_one(
|
| "SELECT id, username, email, name, role, org FROM users WHERE username = %s",
|
| (username,)
|
| )
|
|
|
| if not user:
|
| logging.warning(f"User not found: {username}")
|
| return None
|
|
|
| user_password = self.db.execute_query_one(
|
| "SELECT password FROM users WHERE username = %s",
|
| (username,)
|
| )
|
|
|
| if not user_password:
|
| logging.warning(f"Password not found for user: {username}")
|
| return None
|
|
|
| stored_password = user_password['password']
|
|
|
|
|
| is_hashed = len(stored_password) == 64 and all(c in '0123456789abcdef' for c in stored_password)
|
|
|
| if is_hashed:
|
|
|
| password_match = stored_password == self.hash_password(password)
|
| else:
|
|
|
| password_match = stored_password == password
|
|
|
| logging.info(f"Debug - Username: {username}, Password type: {'hashed' if is_hashed else 'plain'}")
|
|
|
| if not password_match:
|
| logging.warning(f"Invalid password for user: {username}")
|
| return None
|
|
|
|
|
| try:
|
| self.db.execute_query(
|
| "UPDATE users SET updated_at = %s WHERE id = %s",
|
| (datetime.now(), user['id'])
|
| )
|
| except Exception as e:
|
| logging.warning(f"Could not update last login: {e}")
|
|
|
| logging.info(f"User authenticated successfully: {username}")
|
| return user
|
|
|
| except Exception as e:
|
| logging.error(f"Authentication error: {e}")
|
| return None
|
|
|
| def get_user_data(self, username):
|
| """Get user data by username - for compatibility with UI"""
|
| try:
|
| user = self.db.execute_query_one(
|
| "SELECT id, username, email, name, role, org FROM users WHERE username = %s",
|
| (username,)
|
| )
|
|
|
| if user:
|
| logging.info(f"Retrieved user data for: {username}")
|
| return user
|
| else:
|
| logging.warning(f"User not found: {username}")
|
| return None
|
|
|
| except Exception as e:
|
| logging.error(f"Error getting user data: {e}")
|
| return None
|
|
|
| def create_user(self, username, email, password, name, role, org_name="",
|
| phone="", country_code="", department="", location="", organization_id=None):
|
| """Create a new user account"""
|
| try:
|
|
|
| existing_user = self.db.execute_query_one(
|
| "SELECT id FROM users WHERE username = %s OR email = %s",
|
| (username, email)
|
| )
|
|
|
| if existing_user:
|
| logging.warning(f"User already exists: {username} or {email}")
|
| return False
|
|
|
|
|
| if role == 'organization':
|
| org_result = self.db.execute_query(
|
| """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
|
| VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
| (org_name or name, email, phone, country_code, department, location, datetime.now())
|
| )
|
|
|
| if not org_result:
|
| logging.error("Failed to create organization")
|
| return False
|
|
|
|
|
| org_id = self.db.execute_query_one(
|
| "SELECT id FROM organizations WHERE email = %s ORDER BY created_at DESC LIMIT 1",
|
| (email,)
|
| )
|
| organization_id = org_id['id'] if org_id else None
|
|
|
|
|
| hashed_password = self.hash_password(password)
|
| user_result = self.db.execute_query(
|
| """INSERT INTO users (username, email, password, name, role, org, created_at)
|
| VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
| (username, email, hashed_password, name, role, organization_id, datetime.now())
|
| )
|
|
|
| if user_result:
|
| logging.info(f"User created successfully: {username}")
|
| return True
|
| else:
|
| logging.error(f"Failed to create user: {username}")
|
| return False
|
|
|
| except Exception as e:
|
| logging.error(f"User creation error: {e}")
|
| return False
|
|
|
| def get_user_by_id(self, user_id):
|
| """Get user by ID"""
|
| try:
|
| user = self.db.execute_query_one(
|
| "SELECT id, username, email, name, role, org FROM users WHERE id = %s",
|
| (user_id,)
|
| )
|
| return user
|
| except Exception as e:
|
| logging.error(f"Error fetching user by ID: {e}")
|
| return None
|
|
|
| def update_user_last_login(self, user_id):
|
| """Update user's last login timestamp"""
|
| try:
|
| result = self.db.execute_query(
|
| "UPDATE users SET updated_at = %s WHERE id = %s",
|
| (datetime.now(), user_id)
|
| )
|
| return bool(result)
|
| except Exception as e:
|
| logging.error(f"Error updating last login: {e}")
|
| return False
|
|
|