| import mysql.connector |
| from mysql.connector import Error |
| import logging |
| from datetime import datetime |
| import json |
| import uuid |
| import os |
| from PIL import Image |
|
|
| class DashboardDatabaseManager: |
| """Fixed database operations manager for SmartHeal application with dashboard integration""" |
| |
| def __init__(self, mysql_config): |
| """Initialize database manager with MySQL configuration""" |
| self.mysql_config = mysql_config |
| self.test_connection() |
| |
| def test_connection(self): |
| """Test database connection""" |
| try: |
| connection = self.get_connection() |
| if connection: |
| connection.close() |
| logging.info("✅ Database connection successful") |
| else: |
| logging.error("❌ Database connection failed") |
| except Exception as e: |
| logging.error(f"Database connection test failed: {e}") |
| |
| def get_connection(self): |
| """Get a database connection""" |
| try: |
| connection = mysql.connector.connect(**self.mysql_config) |
| return connection |
| except Error as e: |
| logging.error(f"Error connecting to MySQL: {e}") |
| return None |
| |
| def execute_query(self, query, params=None, fetch=False): |
| """Execute a query and return results if fetch=True""" |
| connection = self.get_connection() |
| if not connection: |
| return None |
| |
| cursor = None |
| try: |
| cursor = connection.cursor(dictionary=True) |
| cursor.execute(query, params or ()) |
| |
| if fetch: |
| result = cursor.fetchall() |
| else: |
| connection.commit() |
| result = cursor.rowcount |
| |
| return result |
| except Error as e: |
| logging.error(f"Error executing query: {e}") |
| if connection: |
| connection.rollback() |
| return None |
| finally: |
| if cursor: |
| cursor.close() |
| if connection and connection.is_connected(): |
| connection.close() |
| |
| def execute_query_one(self, query, params=None): |
| """Execute a query and return one result""" |
| connection = self.get_connection() |
| if not connection: |
| return None |
| |
| cursor = None |
| try: |
| cursor = connection.cursor(dictionary=True) |
| cursor.execute(query, params or ()) |
| result = cursor.fetchone() |
| return result |
| except Error as e: |
| logging.error(f"Error executing query: {e}") |
| return None |
| finally: |
| if cursor: |
| cursor.close() |
| if connection and connection.is_connected(): |
| connection.close() |
| |
| def save_questionnaire_response(self, questionnaire_data, user_id): |
| """Save questionnaire response to database""" |
| connection = None |
| cursor = None |
| try: |
| connection = self.get_connection() |
| if not connection: |
| return None |
| cursor = connection.cursor() |
| |
| |
| patient_id = self._create_or_get_patient(cursor, questionnaire_data) |
| if not patient_id: |
| raise Exception("Failed to get or create patient") |
| |
| |
| questionnaire_id = self._get_or_create_default_questionnaire(cursor) |
| if not questionnaire_id: |
| raise Exception("Failed to get or create questionnaire") |
| |
| |
| response_data = { |
| 'patient_info': { |
| 'name': questionnaire_data.get('patient_name', ''), |
| 'age': questionnaire_data.get('patient_age', 0), |
| 'gender': questionnaire_data.get('patient_gender', '') |
| }, |
| 'wound_details': { |
| 'location': questionnaire_data.get('wound_location', ''), |
| 'duration': questionnaire_data.get('wound_duration', ''), |
| 'pain_level': questionnaire_data.get('pain_level', 0), |
| 'moisture_level': questionnaire_data.get('moisture_level', ''), |
| 'infection_signs': questionnaire_data.get('infection_signs', ''), |
| 'diabetic_status': questionnaire_data.get('diabetic_status', '') |
| }, |
| 'medical_history': { |
| 'previous_treatment': questionnaire_data.get('previous_treatment', ''), |
| 'medical_history': questionnaire_data.get('medical_history', ''), |
| 'medications': questionnaire_data.get('medications', ''), |
| 'allergies': questionnaire_data.get('allergies', ''), |
| 'additional_notes': questionnaire_data.get('additional_notes', '') |
| } |
| } |
| |
| |
| insert_resp = """ |
| INSERT INTO questionnaire_responses |
| (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) |
| VALUES (%s, %s, %s, %s, %s) |
| """ |
| cursor.execute(insert_resp, ( |
| questionnaire_id, |
| patient_id, |
| user_id, |
| json.dumps(response_data), |
| datetime.now() |
| )) |
| response_id = cursor.lastrowid |
| |
| connection.commit() |
| logging.info(f"✅ Saved questionnaire response ID {response_id}") |
| return response_id |
| |
| except Exception as e: |
| logging.error(f"❌ Error saving questionnaire: {e}") |
| if connection: |
| connection.rollback() |
| return None |
| finally: |
| if cursor: |
| cursor.close() |
| if connection: |
| connection.close() |
| |
| def _create_or_get_patient(self, cursor, questionnaire_data): |
| """Create or get existing patient record""" |
| try: |
| |
| select_query = """ |
| SELECT id FROM patients |
| WHERE name = %s AND age = %s AND gender = %s |
| """ |
| cursor.execute(select_query, ( |
| questionnaire_data.get('patient_name', ''), |
| questionnaire_data.get('patient_age', 0), |
| questionnaire_data.get('patient_gender', '') |
| )) |
| |
| existing_patient = cursor.fetchone() |
| if existing_patient: |
| return existing_patient[0] |
| |
| |
| patient_uuid = str(uuid.uuid4()) |
| insert_query = """ |
| INSERT INTO patients ( |
| uuid, name, age, gender, illness, allergy, notes, created_at |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) |
| """ |
| |
| cursor.execute(insert_query, ( |
| patient_uuid, |
| questionnaire_data.get('patient_name', ''), |
| questionnaire_data.get('patient_age', 0), |
| questionnaire_data.get('patient_gender', ''), |
| questionnaire_data.get('medical_history', ''), |
| questionnaire_data.get('allergies', ''), |
| questionnaire_data.get('additional_notes', ''), |
| datetime.now() |
| )) |
| |
| return cursor.lastrowid |
| |
| except Exception as e: |
| logging.error(f"Error creating/getting patient: {e}") |
| return None |
| |
| def _get_or_create_default_questionnaire(self, cursor): |
| """Get or create default questionnaire""" |
| try: |
| |
| cursor.execute("SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1") |
| questionnaire_row = cursor.fetchone() |
| |
| if questionnaire_row: |
| return questionnaire_row[0] |
| |
| |
| cursor.execute(""" |
| INSERT INTO questionnaires (name, description, created_at) |
| VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', NOW()) |
| """) |
| return cursor.lastrowid |
| |
| except Exception as e: |
| logging.error(f"Error getting/creating questionnaire: {e}") |
| return None |
| |
| def save_wound_image(self, response_id, image, filename): |
| """Save wound image to dataset and database with proper URL""" |
| try: |
| |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| unique_id = str(uuid.uuid4())[:8] |
| file_extension = os.path.splitext(filename)[1] or '.jpg' |
| unique_filename = f"wound_{timestamp}_{unique_id}{file_extension}" |
| |
| |
| dataset_dir = os.path.join("dataset", "wound_images") |
| os.makedirs(dataset_dir, exist_ok=True) |
| |
| |
| file_path = os.path.join(dataset_dir, unique_filename) |
| |
| if hasattr(image, 'save'): |
| image.save(file_path, format='JPEG', quality=95) |
| width, height = image.size |
| file_size = os.path.getsize(file_path) |
| else: |
| |
| from PIL import Image as PILImage |
| if hasattr(image, 'shape'): |
| pil_image = PILImage.fromarray(image) |
| pil_image.save(file_path, format='JPEG', quality=95) |
| width, height = pil_image.size |
| file_size = os.path.getsize(file_path) |
| else: |
| raise ValueError("Unsupported image format") |
| |
| |
| image_url = f"/dataset/wound_images/{unique_filename}" |
| |
| |
| query = """ |
| INSERT INTO wound_images ( |
| questionnaire_response_id, image_url, original_filename, |
| file_size, image_width, image_height, created_at |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s) |
| """ |
| |
| params = ( |
| response_id, |
| image_url, |
| filename, |
| file_size, |
| width, |
| height, |
| datetime.now() |
| ) |
| |
| connection = self.get_connection() |
| if not connection: |
| return None |
| |
| try: |
| cursor = connection.cursor() |
| cursor.execute(query, params) |
| connection.commit() |
| image_id = cursor.lastrowid |
| |
| logging.info(f"✅ Image saved to dataset: {file_path}") |
| logging.info(f"✅ Image URL: {image_url}") |
| logging.info(f"✅ Database image ID: {image_id}") |
| |
| return image_id |
| |
| finally: |
| cursor.close() |
| connection.close() |
| |
| except Exception as e: |
| logging.error(f"❌ Error saving wound image: {e}") |
| return None |
| |
| def save_ai_analysis(self, analysis_data): |
| """Save AI analysis results to database""" |
| try: |
| |
| response_id = analysis_data.get('questionnaire_id') |
| image_id = analysis_data.get('image_id') |
| visual_results = analysis_data.get('visual_results', {}) |
| |
| |
| risk_score = analysis_data.get('risk_score', 0) |
| if risk_score >= 70: |
| risk_level = 'High' |
| elif risk_score >= 40: |
| risk_level = 'Moderate' |
| else: |
| risk_level = 'Low' |
| |
| |
| length = visual_results.get('length_cm', 0) |
| breadth = visual_results.get('breadth_cm', 0) |
| area = visual_results.get('surface_area_cm2', 0) |
| wound_dimensions = f"{length} × {breadth} cm (Area: {area} cm²)" |
| |
| query = """ |
| INSERT INTO ai_analyses ( |
| questionnaire_response_id, image_id, analysis_data, summary, |
| recommendations, risk_score, risk_level, wound_type, |
| wound_dimensions, processing_time, model_version, created_at |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) |
| """ |
| |
| params = ( |
| response_id, |
| image_id, |
| json.dumps(analysis_data.get('analysis_data', {})), |
| analysis_data.get('summary', '')[:1000], |
| analysis_data.get('recommendations', ''), |
| risk_score, |
| risk_level, |
| visual_results.get('wound_type', 'Unknown'), |
| wound_dimensions, |
| analysis_data.get('processing_time', 0), |
| analysis_data.get('model_version', 'v1.0'), |
| datetime.now() |
| ) |
| |
| result = self.execute_query(query, params) |
| if result is not None: |
| |
| connection = self.get_connection() |
| if connection: |
| cursor = connection.cursor() |
| cursor.execute("SELECT LAST_INSERT_ID()") |
| analysis_id = cursor.fetchone()[0] |
| cursor.close() |
| connection.close() |
| |
| logging.info(f"✅ AI analysis saved with ID: {analysis_id}") |
| return analysis_id |
| |
| return None |
| |
| except Exception as e: |
| logging.error(f"❌ Error saving AI analysis: {e}") |
| return None |
| |
| def save_analysis_session(self, session_data): |
| """Save analysis session data""" |
| try: |
| query = """ |
| INSERT INTO analysis_sessions ( |
| user_id, questionnaire_response_id, image_id, analysis_id, |
| session_duration, created_at |
| ) VALUES (%s, %s, %s, %s, %s, %s) |
| """ |
| |
| params = ( |
| session_data.get('user_id'), |
| session_data.get('questionnaire_id'), |
| session_data.get('image_id'), |
| session_data.get('analysis_id'), |
| session_data.get('session_duration', 0), |
| datetime.now() |
| ) |
| |
| result = self.execute_query(query, params) |
| if result is not None: |
| connection = self.get_connection() |
| if connection: |
| cursor = connection.cursor() |
| cursor.execute("SELECT LAST_INSERT_ID()") |
| session_id = cursor.fetchone()[0] |
| cursor.close() |
| connection.close() |
| |
| logging.info(f"✅ Analysis session saved with ID: {session_id}") |
| return session_id |
| |
| return None |
| |
| except Exception as e: |
| logging.error(f"❌ Error saving analysis session: {e}") |
| return None |
| |
| def save_bot_interaction(self, interaction_data): |
| """Save bot interaction data""" |
| try: |
| query = """ |
| INSERT INTO bot_interactions ( |
| patient_id, practitioner_id, input_text, output_text, |
| wound_image_url, interaction_type, interacted_at |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s) |
| """ |
| |
| params = ( |
| interaction_data.get('patient_id'), |
| interaction_data.get('practitioner_id'), |
| interaction_data.get('input_text', ''), |
| interaction_data.get('output_text', ''), |
| interaction_data.get('wound_image_url', ''), |
| interaction_data.get('interaction_type', 'wound_analysis'), |
| datetime.now() |
| ) |
| |
| result = self.execute_query(query, params) |
| if result is not None: |
| connection = self.get_connection() |
| if connection: |
| cursor = connection.cursor() |
| cursor.execute("SELECT LAST_INSERT_ID()") |
| interaction_id = cursor.fetchone()[0] |
| cursor.close() |
| connection.close() |
| |
| logging.info(f"✅ Bot interaction saved with ID: {interaction_id}") |
| return interaction_id |
| |
| return None |
| |
| except Exception as e: |
| logging.error(f"❌ Error saving bot interaction: {e}") |
| return None |
| |
| def get_analytics_data(self): |
| """Get comprehensive analytics data for dashboard""" |
| try: |
| analytics = {} |
| |
| |
| result = self.execute_query_one("SELECT COUNT(*) as count FROM ai_analyses") |
| analytics['total_analyses'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT AVG(processing_time) as avg_time FROM ai_analyses |
| WHERE processing_time IS NOT NULL |
| """) |
| analytics['avg_processing_time'] = round(result['avg_time'], 2) if result and result['avg_time'] else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(*) as count FROM ai_analyses |
| WHERE risk_level = 'High' |
| """) |
| analytics['high_risk_count'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT AVG(risk_score) as avg_risk FROM ai_analyses |
| WHERE risk_score IS NOT NULL |
| """) |
| analytics['avg_risk_score'] = round(result['avg_risk'], 1) if result and result['avg_risk'] else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(*) as count FROM ai_analyses |
| WHERE DATE(created_at) = CURDATE() |
| """) |
| analytics['analyses_today'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(*) as count FROM ai_analyses |
| WHERE YEARWEEK(created_at) = YEARWEEK(NOW()) |
| """) |
| analytics['analyses_this_week'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(DISTINCT questionnaire_response_id) as count FROM ai_analyses |
| """) |
| analytics['unique_questionnaires'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(*) as count FROM ai_analyses |
| WHERE image_id IS NOT NULL |
| """) |
| analytics['analyses_with_images'] = result['count'] if result else 0 |
| |
| return analytics |
| |
| except Exception as e: |
| logging.error(f"Error getting analytics data: {e}") |
| return {} |
| |
| def get_interaction_history(self, limit=50): |
| """Get bot interaction history""" |
| try: |
| query = """ |
| SELECT |
| bi.id, |
| bi.input_text, |
| bi.output_text, |
| bi.wound_image_url, |
| bi.interaction_type, |
| bi.interacted_at, |
| p.name as patient_name, |
| u.name as practitioner_name |
| FROM bot_interactions bi |
| LEFT JOIN patients p ON bi.patient_id = p.id |
| LEFT JOIN users u ON bi.practitioner_id = u.id |
| ORDER BY bi.interacted_at DESC |
| LIMIT %s |
| """ |
| |
| results = self.execute_query(query, (limit,), fetch=True) |
| return results or [] |
| |
| except Exception as e: |
| logging.error(f"Error getting interaction history: {e}") |
| return [] |
| |
| def get_session_analytics(self): |
| """Get session analytics data""" |
| try: |
| analytics = {} |
| |
| |
| result = self.execute_query_one("SELECT COUNT(*) as count FROM analysis_sessions") |
| analytics['total_sessions'] = result['count'] if result else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT AVG(session_duration) as avg_duration FROM analysis_sessions |
| WHERE session_duration IS NOT NULL |
| """) |
| analytics['avg_session_duration'] = round(result['avg_duration'], 2) if result and result['avg_duration'] else 0 |
| |
| |
| result = self.execute_query_one(""" |
| SELECT COUNT(*) as count FROM analysis_sessions |
| WHERE DATE(created_at) = CURDATE() |
| """) |
| analytics['sessions_today'] = result['count'] if result else 0 |
| |
| return analytics |
| |
| except Exception as e: |
| logging.error(f"Error getting session analytics: {e}") |
| return {} |