bharathmunakala commited on
Commit
134808d
·
verified ·
1 Parent(s): 5b41dc3

Update database.py

Browse files
Files changed (1) hide show
  1. database.py +163 -161
database.py CHANGED
@@ -1,161 +1,163 @@
1
- import os
2
- from datetime import datetime
3
- from typing import Optional, Dict, Any, List
4
- from pymongo import MongoClient, ReturnDocument
5
- from pymongo.errors import DuplicateKeyError, ConnectionFailure
6
- import bcrypt
7
- from dotenv import load_dotenv
8
- import logging
9
-
10
- # Configure logging
11
- logging.basicConfig(level=logging.INFO)
12
- logger = logging.getLogger(__name__)
13
-
14
- load_dotenv()
15
-
16
- class Database:
17
- def __init__(self):
18
- """Initialize database connection and ensure indexes"""
19
- try:
20
- mongo_uri = os.getenv("MONGO_URI")
21
- if not mongo_uri:
22
- raise ValueError("MONGO_URI environment variable is not set")
23
-
24
- self.client = MongoClient(
25
- mongo_uri,
26
- serverSelectionTimeoutMS=5000, # 5 second timeout
27
- connectTimeoutMS=30000, # 30 second connection timeout
28
- socketTimeoutMS=45000, # 45 second socket timeout
29
- connect=False # Lazy connection
30
- )
31
-
32
- # Test the connection
33
- self.client.admin.command('ping')
34
-
35
- self.db = self.client[os.getenv("DB_NAME", "rag_system")]
36
- self.users = self.db["users"]
37
- self._create_indexes()
38
- logger.info("Successfully connected to MongoDB")
39
-
40
- except Exception as e:
41
- logger.error(f"Failed to connect to MongoDB: {str(e)}")
42
- raise
43
-
44
- def _create_indexes(self):
45
- """Create necessary database indexes"""
46
- try:
47
- # Create unique index on username
48
- self.users.create_index("username", unique=True)
49
- logger.info("Created database indexes")
50
- except Exception as e:
51
- logger.error(f"Error creating indexes: {str(e)}")
52
- raise
53
-
54
- def add_user(self, username: str, password: str, role: str) -> bool:
55
- """Add a new user to the database"""
56
- if not username or not password or not role:
57
- logger.warning("Missing required fields for user creation")
58
- return False
59
-
60
- try:
61
- hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
62
- user_data = {
63
- "username": username,
64
- "password": hashed.decode('utf-8'),
65
- "role": role.lower(),
66
- "created_at": datetime.utcnow(),
67
- "last_login": None
68
- }
69
-
70
- result = self.users.insert_one(user_data)
71
- if result.inserted_id:
72
- logger.info(f"Created new user: {username}")
73
- return True
74
- return False
75
-
76
- except DuplicateKeyError:
77
- logger.warning(f"Username already exists: {username}")
78
- return False
79
- except Exception as e:
80
- logger.error(f"Error adding user {username}: {str(e)}")
81
- return False
82
-
83
- def verify_user(self, username: str, password: str) -> Optional[Dict[str, Any]]:
84
- """Verify user credentials"""
85
- try:
86
- user = self.users.find_one({"username": username})
87
- if not user:
88
- logger.warning(f"Login attempt for non-existent user: {username}")
89
- return None
90
-
91
- if bcrypt.checkpw(password.encode('utf-8'), user["password"].encode('utf-8')):
92
- # Update last login time
93
- self.users.update_one(
94
- {"_id": user["_id"]},
95
- {"$set": {"last_login": datetime.utcnow()}}
96
- )
97
- logger.info(f"Successful login for user: {username}")
98
- return {
99
- "username": user["username"],
100
- "role": user["role"],
101
- "last_login": user.get("last_login")
102
- }
103
-
104
- logger.warning(f"Failed login attempt for user: {username}")
105
- return None
106
-
107
- except Exception as e:
108
- logger.error(f"Error verifying user {username}: {str(e)}")
109
- return None
110
-
111
- def get_user(self, username: str) -> Optional[Dict[str, Any]]:
112
- """Get user by username (without sensitive data)"""
113
- try:
114
- user = self.users.find_one(
115
- {"username": username},
116
- {"password": 0} # Exclude password from results
117
- )
118
- return user
119
- except Exception as e:
120
- logger.error(f"Error fetching user {username}: {str(e)}")
121
- return None
122
-
123
- # Initialize database connection
124
- db = Database()
125
-
126
- def initialize_users():
127
- """
128
- Initialize default users if they don't exist.
129
- Returns tuple of (success_count, total_users, errors)
130
- """
131
- from datetime import datetime
132
-
133
- default_users = [
134
- {"username": "Tony", "password": "password123", "role": "engineering"},
135
- {"username": "Bruce", "password": "securepass", "role": "marketing"},
136
- {"username": "Sam", "password": "financepass", "role": "finance"},
137
- {"username": "Peter", "password": "pete123", "role": "engineering"},
138
- {"username": "Sid", "password": "sidpass123", "role": "marketing"},
139
- {"username": "Natasha", "password": "hrpass123", "role": "hr"}
140
- ]
141
-
142
- success_count = 0
143
- errors = []
144
-
145
- for user in default_users:
146
- try:
147
- if db.add_user(user["username"], user["password"], user["role"]):
148
- success_count += 1
149
- logger.info(f"Initialized user: {user['username']}")
150
- else:
151
- errors.append(f"Failed to add user: {user['username']}")
152
- except Exception as e:
153
- error_msg = f"Error initializing user {user['username']}: {str(e)}"
154
- logger.error(error_msg)
155
- errors.append(error_msg)
156
-
157
- logger.info(f"User initialization complete. Success: {success_count}/{len(default_users)}")
158
- if errors:
159
- logger.warning(f"Encountered {len(errors)} errors during user initialization")
160
-
161
- return success_count, len(default_users), errors
 
 
 
1
+ # database.py
2
+ import sqlite3
3
+ import hashlib
4
+ import logging
5
+ from pathlib import Path
6
+ from typing import Optional, Dict, Any
7
+
8
+ # Set up logging
9
+ logging.basicConfig(level=logging.INFO)
10
+ logger = logging.getLogger(__name__)
11
+
12
+ class Database:
13
+ def __init__(self, db_path: str = "users.db"):
14
+ self.db_path = db_path
15
+ self.init_database()
16
+
17
+ def init_database(self):
18
+ """Initialize the database and create tables"""
19
+ try:
20
+ # Ensure directory exists
21
+ Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
22
+
23
+ conn = sqlite3.connect(self.db_path)
24
+ cursor = conn.cursor()
25
+
26
+ # Create users table with proper constraints
27
+ cursor.execute('''
28
+ CREATE TABLE IF NOT EXISTS users (
29
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
30
+ username TEXT UNIQUE NOT NULL,
31
+ password_hash TEXT NOT NULL,
32
+ role TEXT NOT NULL,
33
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
34
+ )
35
+ ''')
36
+
37
+ conn.commit()
38
+ conn.close()
39
+ logger.info("Database initialized successfully")
40
+
41
+ except Exception as e:
42
+ logger.error(f"Error initializing database: {e}")
43
+ raise
44
+
45
+ def hash_password(self, password: str) -> str:
46
+ """Hash a password using SHA-256"""
47
+ return hashlib.sha256(password.encode()).hexdigest()
48
+
49
+ def create_user(self, username: str, password: str, role: str) -> bool:
50
+ """Create a new user"""
51
+ try:
52
+ conn = sqlite3.connect(self.db_path)
53
+ cursor = conn.cursor()
54
+
55
+ password_hash = self.hash_password(password)
56
+
57
+ cursor.execute('''
58
+ INSERT INTO users (username, password_hash, role)
59
+ VALUES (?, ?, ?)
60
+ ''', (username, password_hash, role))
61
+
62
+ conn.commit()
63
+ conn.close()
64
+ logger.info(f"User '{username}' created successfully with role '{role}'")
65
+ return True
66
+
67
+ except sqlite3.IntegrityError as e:
68
+ if "UNIQUE constraint failed" in str(e):
69
+ logger.info(f"User '{username}' already exists, skipping...")
70
+ return False # User already exists, not really an error
71
+ else:
72
+ logger.error(f"Integrity error creating user '{username}': {e}")
73
+ return False
74
+ except Exception as e:
75
+ logger.error(f"Error creating user '{username}': {e}")
76
+ return False
77
+
78
+ def verify_user(self, username: str, password: str) -> Optional[Dict[str, Any]]:
79
+ """Verify user credentials and return user info"""
80
+ try:
81
+ conn = sqlite3.connect(self.db_path)
82
+ cursor = conn.cursor()
83
+
84
+ password_hash = self.hash_password(password)
85
+
86
+ cursor.execute('''
87
+ SELECT username, role FROM users
88
+ WHERE username = ? AND password_hash = ?
89
+ ''', (username, password_hash))
90
+
91
+ result = cursor.fetchone()
92
+ conn.close()
93
+
94
+ if result:
95
+ return {
96
+ "username": result[0],
97
+ "role": result[1]
98
+ }
99
+ return None
100
+
101
+ except Exception as e:
102
+ logger.error(f"Error verifying user '{username}': {e}")
103
+ return None
104
+
105
+ def list_users(self) -> list:
106
+ """List all users (for debugging)"""
107
+ try:
108
+ conn = sqlite3.connect(self.db_path)
109
+ cursor = conn.cursor()
110
+
111
+ cursor.execute('SELECT username, role FROM users')
112
+ users = cursor.fetchall()
113
+ conn.close()
114
+
115
+ return [{"username": user[0], "role": user[1]} for user in users]
116
+
117
+ except Exception as e:
118
+ logger.error(f"Error listing users: {e}")
119
+ return []
120
+
121
+ # Create global database instance
122
+ db = Database()
123
+
124
+ def initialize_users():
125
+ """Initialize default users"""
126
+ default_users = [
127
+ ("Tony", "password123", "engineering"),
128
+ ("Bruce", "securepass", "marketing"),
129
+ ("Sam", "financepass", "finance"),
130
+ ("Natasha", "hrpass123", "hr"),
131
+ ]
132
+
133
+ success_count = 0
134
+ error_count = 0
135
+
136
+ logger.info("Initializing default users...")
137
+
138
+ for username, password, role in default_users:
139
+ try:
140
+ if db.create_user(username, password, role):
141
+ success_count += 1
142
+ else:
143
+ # User already exists, count as success but log as info
144
+ success_count += 1
145
+ # The error_count was being incremented for existing users
146
+ # which is why you were seeing "6 errors" - let's not count this as an error
147
+ except Exception as e:
148
+ error_count += 1
149
+ logger.error(f"Failed to create user {username}: {e}")
150
+
151
+ if error_count == 0:
152
+ logger.info(f"User initialization successful: {success_count} users ready")
153
+ else:
154
+ logger.info(f"User initialization completed: {success_count} successful, {error_count} errors")
155
+
156
+ # List all users for verification
157
+ users = db.list_users()
158
+ if users:
159
+ logger.info(f"Available users: {', '.join([f\"{u['username']}({u['role']})\" for u in users])}")
160
+ else:
161
+ logger.warning("No users found in database!")
162
+
163
+ return success_count, error_count