import sqlite3 import psycopg2 from psycopg2.extras import execute_values import os import json import uuid import secrets import string from dotenv import load_dotenv load_dotenv() def generate_widget_id(): alphabet = string.ascii_lowercase + string.digits return ''.join(secrets.choice(alphabet) for _ in range(8)) def migrate(): sqlite_db = "customeragent.db" pg_url = os.getenv("DATABASE_URL") # Fix for psycopg2 connect (it doesn't like the +psycopg2 part) if pg_url and "postgresql+psycopg2://" in pg_url: pg_url = pg_url.replace("postgresql+psycopg2://", "postgresql://") if not os.path.exists(sqlite_db): print(f"❌ SQLite database {sqlite_db} not found.") return print(f"🚀 Starting migration from {sqlite_db} to PostgreSQL...") try: # Connect to both s_conn = sqlite3.connect(sqlite_db) s_conn.row_factory = sqlite3.Row s_cur = s_conn.cursor() p_conn = psycopg2.connect(pg_url) p_cur = p_conn.cursor() # 1. Migrate Users print("👤 Migrating Users...") s_cur.execute("SELECT * FROM users") users = [dict(row) for row in s_cur.fetchall()] if users: columns = list(users[0].keys()) query = f"INSERT INTO users ({','.join(columns)}) VALUES %s ON CONFLICT (id) DO NOTHING" values = [] for u in users: processed = list(u.values()) # Handle booleans if 'is_active' in columns: idx = columns.index('is_active') processed[idx] = bool(processed[idx]) if 'is_email_verified' in columns: idx = columns.index('is_email_verified') processed[idx] = bool(processed[idx]) values.append(processed) execute_values(p_cur, query, values) # 2. Migrate Websites print("🌐 Migrating Websites...") s_cur.execute("SELECT * FROM websites") websites = [dict(row) for row in s_cur.fetchall()] if websites: # Check target columns in PG p_cur.execute("SELECT column_name FROM information_schema.columns WHERE table_name = 'websites'") pg_columns = [row[0] for row in p_cur.fetchall()] s_columns = list(websites[0].keys()) # Intersection of columns common_columns = [c for c in s_columns if c in pg_columns] # Columns to add manually extra_columns = [] if 'widget_id' in pg_columns and 'widget_id' not in s_columns: extra_columns.append('widget_id') all_target_columns = common_columns + extra_columns query = f"INSERT INTO websites ({','.join(all_target_columns)}) VALUES %s ON CONFLICT (id) DO NOTHING" values = [] for w in websites: processed = [w[c] for c in common_columns] # Handle booleans and JSON in common columns for i, col in enumerate(common_columns): if col == 'is_verified': processed[i] = bool(processed[i]) if col == 'widget_config' and isinstance(processed[i], str): try: json.loads(processed[i]) except: pass # Add extra columns if 'widget_id' in extra_columns: processed.append(generate_widget_id()) values.append(processed) execute_values(p_cur, query, values) # 3. Migrate Unanswered Questions print("🎟️ Migrating tickets...") s_cur.execute("SELECT * FROM unanswered_questions") tickets = [dict(row) for row in s_cur.fetchall()] if tickets: columns = list(tickets[0].keys()) query = f"INSERT INTO unanswered_questions ({','.join(columns)}) VALUES %s ON CONFLICT (id) DO NOTHING" values = [] for t in tickets: processed = list(t.values()) # Handle booleans if 'is_resolved' in columns: idx = columns.index('is_resolved') processed[idx] = bool(processed[idx]) if 'sla_breached' in columns: idx = columns.index('sla_breached') processed[idx] = bool(processed[idx]) values.append(processed) execute_values(p_cur, query, values) p_conn.commit() print("✅ Migration successful!") except Exception as e: print(f"❌ Migration failed: {e}") if 'p_conn' in locals(): p_conn.rollback() finally: if 's_conn' in locals(): s_conn.close() if 'p_conn' in locals(): p_conn.close() if __name__ == "__main__": migrate()