| import streamlit as st
|
| import pandas as pd
|
| import sqlite3
|
| import io
|
| import base64
|
| import os
|
| from datetime import datetime
|
|
|
|
|
| st.set_page_config(
|
| page_title="Database Operations Tool",
|
| page_icon="ποΈ",
|
| layout="wide",
|
| initial_sidebar_state="expanded"
|
| )
|
|
|
|
|
| st.markdown("""
|
| <style>
|
| .main {
|
| padding: 1rem;
|
| }
|
| .stButton>button {
|
| width: 100%;
|
| background-color: #4CAF50;
|
| color: white;
|
| padding: 0.5rem;
|
| border-radius: 5px;
|
| border: none;
|
| margin: 0.5rem 0;
|
| }
|
| .stButton>button:hover {
|
| background-color: #45a049;
|
| }
|
| .reportview-container {
|
| background: #fafafa;
|
| }
|
| .css-1d391kg {
|
| padding: 1rem;
|
| }
|
| .stSelectbox {
|
| margin: 1rem 0;
|
| }
|
| </style>
|
| """, unsafe_allow_html=True)
|
|
|
| def init_db():
|
| """Initialize SQLite database"""
|
| if not os.path.exists('databases'):
|
| os.makedirs('databases')
|
| conn = sqlite3.connect('databases/main.db')
|
| c = conn.cursor()
|
| c.execute('''CREATE TABLE IF NOT EXISTS database_list
|
| (name TEXT PRIMARY KEY, created_date TEXT)''')
|
| conn.commit()
|
| return conn
|
|
|
| def get_database_names():
|
| """Get list of databases"""
|
| conn = init_db()
|
| c = conn.cursor()
|
| c.execute("SELECT name FROM database_list")
|
| databases = [row[0] for row in c.fetchall()]
|
| conn.close()
|
| return databases
|
|
|
| def create_table(db_name, columns, column_types, primary_key=None, unique_columns=None):
|
| """Create a new table with specified column types, optional primary key, and unique columns"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| c = conn.cursor()
|
|
|
|
|
| column_defs = []
|
| for col, type_ in zip(columns, column_types):
|
| col_def = f"{col} {type_}"
|
|
|
|
|
| if primary_key and col == primary_key:
|
| col_def += " PRIMARY KEY"
|
|
|
|
|
| if unique_columns and col in unique_columns:
|
| col_def += " UNIQUE"
|
|
|
| column_defs.append(col_def)
|
|
|
| query = f'''CREATE TABLE IF NOT EXISTS data
|
| ({', '.join(column_defs)})'''
|
|
|
| c.execute(query)
|
| conn.commit()
|
| conn.close()
|
|
|
| def get_table_data(db_name):
|
| """Get table data"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| return pd.read_sql_query("SELECT * FROM data", conn)
|
|
|
| def get_columns(db_name):
|
| """Get column names and types"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| c = conn.cursor()
|
| c.execute("PRAGMA table_info(data)")
|
| columns = [(row[1], row[2], row[5] == 1) for row in c.fetchall()]
|
|
|
|
|
| c.execute("PRAGMA index_list(data)")
|
| unique_columns = [row[1].replace('data_', '').replace('_unique', '')
|
| for row in c.fetchall() if 'unique' in row[1]]
|
|
|
| conn.close()
|
| return columns, unique_columns
|
|
|
| def rename_column(db_name, old_column_name, new_column_name):
|
| """Rename a column in the database"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| c = conn.cursor()
|
|
|
| try:
|
|
|
| c.execute("PRAGMA table_info(data)")
|
| columns = [row[1] for row in c.fetchall()]
|
|
|
|
|
| if old_column_name not in columns:
|
| return False, "Original column does not exist"
|
| if new_column_name in columns:
|
| return False, "New column name already exists"
|
|
|
|
|
| c.execute(f"ALTER TABLE data RENAME COLUMN {old_column_name} TO {new_column_name}")
|
| conn.commit()
|
| return True, "Column renamed successfully"
|
|
|
| except sqlite3.OperationalError as e:
|
| return False, str(e)
|
| finally:
|
| conn.close()
|
|
|
|
|
| def delete_rows(db_name, condition_col, condition_val):
|
| """Delete rows based on condition"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| c = conn.cursor()
|
| c.execute(f"DELETE FROM data WHERE {condition_col} = ?", (condition_val,))
|
| deleted_count = c.rowcount
|
| conn.commit()
|
| conn.close()
|
| return deleted_count
|
|
|
| def update_row(db_name, condition_col, condition_val, update_col, update_val):
|
| """Update row based on condition with more flexible value setting"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| c = conn.cursor()
|
|
|
| try:
|
|
|
| c.execute(f"UPDATE data SET {update_col} = ? WHERE {condition_col} = ?",
|
| (update_val, condition_val))
|
| updated_count = c.rowcount
|
| conn.commit()
|
| return updated_count
|
| except sqlite3.IntegrityError as e:
|
| st.error(f"Update failed: {str(e)}")
|
| return 0
|
| except sqlite3.OperationalError as e:
|
| st.error(f"SQL Error: {str(e)}")
|
| return 0
|
| finally:
|
| conn.close()
|
|
|
| def bulk_import_data(db_name, df):
|
| """Bulk import data from DataFrame"""
|
| conn = sqlite3.connect(f'databases/{db_name}.db')
|
| try:
|
| df.to_sql('data', conn, if_exists='append', index=False)
|
| return True
|
| except sqlite3.IntegrityError as e:
|
| st.error(f"Import failed: {str(e)}")
|
| return False
|
| finally:
|
| conn.close()
|
|
|
| def main():
|
| st.title("ποΈ Database Operations Tool")
|
|
|
|
|
| with st.sidebar:
|
| st.header("Database Operations")
|
|
|
|
|
| with st.expander("Create New Database", expanded=True):
|
| new_db_name = st.text_input("Database Name")
|
| columns_input = st.text_input("Column Names (comma-separated)")
|
|
|
|
|
| if columns_input:
|
| columns = [col.strip() for col in columns_input.split(",")]
|
| column_types = []
|
|
|
|
|
| st.write("Select type and configure each column:")
|
| for col in columns:
|
| col_type = st.selectbox(
|
| f"Type for {col}",
|
| options=["TEXT", "INTEGER", "REAL", "BLOB", "DATE"],
|
| key=f"type_{col}"
|
| )
|
| column_types.append(col_type)
|
|
|
|
|
| primary_key = st.selectbox(
|
| "Select Primary Key Column (optional)",
|
| ["None"] + columns,
|
| index=0
|
| )
|
| primary_key = None if primary_key == "None" else primary_key
|
|
|
|
|
| unique_columns = st.multiselect(
|
| "Select Unique Columns (optional)",
|
| columns
|
| )
|
|
|
| if st.button("Create Database"):
|
| if new_db_name and columns_input and len(columns) > 0:
|
| try:
|
| conn = init_db()
|
| c = conn.cursor()
|
| c.execute("INSERT INTO database_list VALUES (?, ?)",
|
| (new_db_name, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
|
| conn.commit()
|
| conn.close()
|
|
|
|
|
| create_table(
|
| new_db_name,
|
| columns,
|
| column_types,
|
| primary_key,
|
| unique_columns
|
| )
|
| st.success("β
Database created successfully!")
|
| except sqlite3.IntegrityError:
|
| st.error("β Database name already exists!")
|
| else:
|
| st.warning("β οΈ Please fill all fields")
|
|
|
|
|
| databases = get_database_names()
|
| if databases:
|
| selected_db = st.selectbox("π Select Database", databases)
|
|
|
| if selected_db:
|
| df = get_table_data(selected_db)
|
| columns, unique_columns = get_columns(selected_db)
|
|
|
|
|
| st.sidebar.subheader("Table Constraints")
|
| columns_info = [col for col, _, _ in columns]
|
|
|
|
|
| primary_key = next((col for col, _, is_primary in columns if is_primary), None)
|
| if primary_key:
|
| st.sidebar.info(f"π Primary Key: {primary_key}")
|
|
|
| if unique_columns:
|
| st.sidebar.warning(f"π« Unique Columns: {', '.join(unique_columns)}")
|
|
|
| tabs = st.tabs(["π Manage Data", "π Update Data", "ποΈ Delete Data", "π₯ Import/Export", "βοΈ Column Operations"])
|
|
|
|
|
| with tabs[0]:
|
| st.subheader("Data Preview")
|
| st.dataframe(df, use_container_width=True)
|
|
|
| st.subheader("Add New Row")
|
| new_row_data = {}
|
| for col, type_, is_primary in columns:
|
| if type_ == 'DATE':
|
| new_row_data[col] = st.date_input(f"Enter {col}")
|
| elif type_ == 'INTEGER':
|
| new_row_data[col] = st.number_input(f"Enter {col}", step=1)
|
| elif type_ == 'REAL':
|
| new_row_data[col] = st.number_input(f"Enter {col}", step=0.1)
|
| else:
|
| new_row_data[col] = st.text_input(f"Enter {col}")
|
|
|
| if st.button("Add Row"):
|
| if all(str(val) != "" for val in new_row_data.values()):
|
| conn = sqlite3.connect(f'databases/{selected_db}.db')
|
| c = conn.cursor()
|
| try:
|
| placeholders = ','.join(['?' for _ in columns])
|
| query = f"INSERT INTO data VALUES ({placeholders})"
|
| c.execute(query, list(new_row_data.values()))
|
| conn.commit()
|
| st.success("β
Row added!")
|
| st.rerun()
|
| except sqlite3.IntegrityError as e:
|
| st.error(f"β Insert failed: {str(e)}")
|
| finally:
|
| conn.close()
|
|
|
|
|
| with tabs[1]:
|
| st.subheader("Update Records")
|
| col1, col2 = st.columns(2)
|
|
|
| with col1:
|
| condition_col = st.selectbox("Select Column for Condition",
|
| [col for col, _, _ in columns])
|
| condition_val = st.text_input("Enter Value to Match")
|
|
|
| with col2:
|
| update_col = st.selectbox("Select Column to Update",
|
| [col for col, _, _ in columns])
|
|
|
| column_types_dict = {col: type_ for col, type_, _ in columns}
|
| update_val_input = None
|
|
|
| if column_types_dict[update_col] == 'INTEGER':
|
| update_val_input = st.number_input(f"Enter New Value for {update_col}", step=1)
|
| elif column_types_dict[update_col] == 'REAL':
|
| update_val_input = st.number_input(f"Enter New Value for {update_col}", step=0.1)
|
| elif column_types_dict[update_col] == 'DATE':
|
| update_val_input = st.date_input(f"Enter New Value for {update_col}")
|
| else:
|
| update_val_input = st.text_input(f"Enter New Value for {update_col}")
|
|
|
| if st.button("Update Records"):
|
| if condition_val is not None and update_val_input is not None:
|
|
|
| update_val = str(update_val_input)
|
|
|
| updated = update_row(selected_db, condition_col,
|
| condition_val, update_col, update_val)
|
| if updated > 0:
|
| st.success(f"β
Updated {updated} records!")
|
| st.rerun()
|
| else:
|
| st.warning("No records were updated. Check your conditions.")
|
|
|
|
|
|
|
|
|
|
|
| with tabs[2]:
|
| st.subheader("Delete Records")
|
| del_col = st.selectbox("Select Column for Deletion Condition",
|
| [col for col, _, _ in columns])
|
| del_val = st.text_input("Enter Value to Delete")
|
|
|
| if st.button("Delete Records"):
|
| if del_val:
|
| deleted = delete_rows(selected_db, del_col, del_val)
|
| st.success(f"β
Deleted {deleted} records!")
|
| st.rerun()
|
|
|
|
|
| with tabs[3]:
|
| st.subheader("Import Data")
|
| uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
|
| if uploaded_file is not None:
|
| import_df = pd.read_csv(uploaded_file)
|
| if st.button("Import Data"):
|
| success = bulk_import_data(selected_db, import_df)
|
| if success:
|
| st.success("β
Data imported successfully!")
|
| st.rerun()
|
|
|
| st.subheader("Export Data")
|
| export_format = st.selectbox("Select Format",
|
| ["CSV", "Excel", "JSON"])
|
|
|
| if export_format == "CSV":
|
| csv = df.to_csv(index=False)
|
| st.download_button(
|
| label="π₯ Download CSV",
|
| data=csv,
|
| file_name=f"{selected_db}.csv",
|
| mime="text/csv"
|
| )
|
| elif export_format == "Excel":
|
| buffer = io.BytesIO()
|
| df.to_excel(buffer, index=False)
|
| st.download_button(
|
| label="π₯ Download Excel",
|
| data=buffer.getvalue(),
|
| file_name=f"{selected_db}.xlsx",
|
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
| )
|
| else:
|
| json_str = df.to_json(orient='records')
|
| st.download_button(
|
| label="π₯ Download JSON",
|
| data=json_str,
|
| file_name=f"{selected_db}.json",
|
| mime="application/json"
|
| )
|
|
|
|
|
| with tabs[4]:
|
| st.subheader("Column Operations")
|
|
|
|
|
| st.subheader("Rename Column")
|
| col1, col2 = st.columns(2)
|
|
|
| with col1:
|
| old_column = st.selectbox("Select Column to Rename",
|
| [col for col, _, _ in columns])
|
|
|
| with col2:
|
| new_column_name = st.text_input("Enter New Column Name")
|
|
|
| if st.button("Rename Column"):
|
| if new_column_name:
|
|
|
| if not new_column_name.replace('_', '').isalnum():
|
| st.error("Column name must be alphanumeric (can include underscores)")
|
| else:
|
| success, message = rename_column(selected_db, old_column, new_column_name)
|
| if success:
|
| st.success(message)
|
| st.rerun()
|
| else:
|
| st.error(message)
|
| else:
|
| st.warning("Please enter a new column name")
|
| else:
|
| st.info("π Welcome! Start by creating a new database using the sidebar.")
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|