|
|
|
|
| """
|
| db_export_import.py — Backup & Restore (Export/Import) do banco ATIVO (Produção/Teste)
|
|
|
| Recursos:
|
| • Exibe banco ativo (prod/test) e URL do engine
|
| • Exporta todas as tabelas para:
|
| - ZIP (CSV por tabela + manifest.json)
|
| - Excel (.xlsx) (1 aba por tabela + manifest sheet)
|
| • Importa (upload) de:
|
| - ZIP (CSV por tabela)
|
| - Excel (.xlsx)
|
| • Modos de import: APPEND ou REPLACE (cuidado com FK)
|
| • Snapshot físico para SQLite: cópia do arquivo (.db) — backup/restore rápido
|
|
|
| Dependências:
|
| - pandas, openpyxl, sqlalchemy, zipfile, io, json, datetime
|
| """
|
|
|
| import os
|
| import io
|
| import json
|
| import zipfile
|
| from datetime import datetime
|
|
|
| import streamlit as st
|
| import pandas as pd
|
| from sqlalchemy import inspect, text
|
|
|
| from banco import get_engine, db_info, SessionLocal
|
| from utils_auditoria import registrar_log
|
|
|
|
|
| try:
|
| from db_router import current_db_choice
|
| _HAS_ROUTER = True
|
| except Exception:
|
| _HAS_ROUTER = False
|
|
|
| def current_db_choice() -> str:
|
| return "prod"
|
|
|
|
|
|
|
|
|
|
|
| def list_tables(engine) -> list[str]:
|
| """Retorna nomes de todas as tabelas via SQLAlchemy inspection."""
|
| inspector = inspect(engine)
|
| return inspector.get_table_names()
|
|
|
|
|
| def _read_table_df(engine, table_name: str) -> pd.DataFrame:
|
| """Lê toda a tabela como DataFrame."""
|
| try:
|
|
|
| return pd.read_sql_table(table_name, con=engine)
|
| except Exception:
|
|
|
| return pd.read_sql(f'SELECT * FROM "{table_name}"', con=engine)
|
|
|
|
|
| def _write_table_df(engine, table_name: str, df: pd.DataFrame, mode: str = "append"):
|
| """
|
| Escreve DataFrame em tabela.
|
| mode: "append" (adiciona) ou "replace" (sobrescreve todos os dados).
|
| Observação: nos fluxos de import, quando 'replace' foi selecionado,
|
| todas as tabelas são truncadas previamente, e aqui usamos 'append'.
|
| """
|
| if mode not in ("append", "replace"):
|
| mode = "append"
|
| df.to_sql(table_name, con=engine, if_exists=mode, index=False)
|
|
|
|
|
|
|
|
|
|
|
| def export_zip(engine, ambiente: str) -> bytes:
|
| """
|
| Exporta todas as tabelas para um ZIP:
|
| - 1 CSV por tabela (UTF-8-BOM)
|
| - manifest.json com metadados (ambiente, timestamp, url, tabelas)
|
| """
|
| tables = list_tables(engine)
|
| buf = io.BytesIO()
|
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
|
| for t in tables:
|
| df = _read_table_df(engine, t)
|
| csv_bytes = df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8-sig")
|
| z.writestr(f"{t}.csv", csv_bytes)
|
|
|
| manifest = {
|
| "ambiente": ambiente,
|
| "timestamp": datetime.now().isoformat(),
|
| "engine_url": str(engine.url),
|
| "tables": tables,
|
| "format": "zip/csv",
|
| "version": "1.0",
|
| }
|
| z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))
|
| buf.seek(0)
|
| return buf.getvalue()
|
|
|
|
|
| def export_excel(engine, ambiente: str) -> bytes:
|
| """
|
| Exporta todas as tabelas para um Excel (.xlsx):
|
| - 1 aba por tabela (limitada a 31 caracteres)
|
| - "manifest" com metadados
|
| """
|
| tables = list_tables(engine)
|
| buf = io.BytesIO()
|
| with pd.ExcelWriter(buf, engine="openpyxl") as writer:
|
|
|
| manifest = pd.DataFrame([{
|
| "ambiente": ambiente,
|
| "timestamp": datetime.now().isoformat(),
|
| "engine_url": str(engine.url),
|
| "tables": ", ".join(tables),
|
| "format": "xlsx",
|
| "version": "1.0",
|
| }])
|
| manifest.to_excel(writer, sheet_name="manifest", index=False)
|
|
|
|
|
| for t in tables:
|
| df = _read_table_df(engine, t)
|
| sheet = t[:31] if len(t) > 31 else t
|
| df.to_excel(writer, sheet_name=sheet, index=False)
|
| buf.seek(0)
|
| return buf.getvalue()
|
|
|
|
|
|
|
|
|
|
|
| def import_zip(engine, file_bytes: bytes, mode: str = "append") -> dict:
|
| """
|
| Importa dados de um ZIP (CSV por tabela).
|
| mode: "append" ou "replace".
|
| Retorna um relatório {table: {"rows": int, "mode": str}}.
|
| """
|
| report = {}
|
| zbuf = io.BytesIO(file_bytes)
|
| with zipfile.ZipFile(zbuf, "r") as z:
|
|
|
| if mode == "replace":
|
| _truncate_all(engine)
|
|
|
| for name in z.namelist():
|
| if not name.lower().endswith(".csv"):
|
| continue
|
| table = os.path.splitext(os.path.basename(name))[0]
|
| csv_bytes = z.read(name)
|
| df = pd.read_csv(io.BytesIO(csv_bytes), dtype=str)
|
|
|
| for col in df.columns:
|
| if "data" in col.lower() or "date" in col.lower():
|
| try:
|
| df[col] = pd.to_datetime(df[col], errors="ignore")
|
| except Exception:
|
| pass
|
|
|
| _write_table_df(engine, table, df, mode="append")
|
| report[table] = {"rows": int(len(df)), "mode": mode}
|
| return report
|
|
|
|
|
| def import_excel(engine, file_bytes: bytes, mode: str = "append") -> dict:
|
| """
|
| Importa dados de um Excel (.xlsx) com múltiplas abas (1 por tabela).
|
| mode: "append" ou "replace".
|
| """
|
| report = {}
|
| xbuf = io.BytesIO(file_bytes)
|
| xls = pd.ExcelFile(xbuf, engine="openpyxl")
|
| sheets = [s for s in xls.sheet_names if s.lower() != "manifest"]
|
|
|
| if mode == "replace":
|
| _truncate_all(engine)
|
|
|
| for sheet in sheets:
|
| df = xls.parse(sheet_name=sheet, dtype=str)
|
|
|
| for col in df.columns:
|
| if "data" in col.lower() or "date" in col.lower():
|
| try:
|
| df[col] = pd.to_datetime(df[col], errors="ignore")
|
| except Exception:
|
| pass
|
| table = sheet
|
| _write_table_df(engine, table, df, mode="append")
|
| report[table] = {"rows": int(len(df)), "mode": mode}
|
| return report
|
|
|
|
|
|
|
|
|
|
|
| def _truncate_all(engine):
|
| """
|
| Limpa todas as tabelas do banco ativo (cuidado!).
|
| • Para SQLite: desabilita FK temporariamente, apaga, e reabilita.
|
| • Para outros bancos: executa DELETE tabela; considere ordem por FK se necessário.
|
| """
|
| insp = inspect(engine)
|
| tables = insp.get_table_names()
|
|
|
| with engine.begin() as conn:
|
| url = str(engine.url)
|
| is_sqlite = url.startswith("sqlite")
|
| if is_sqlite:
|
| conn.execute(text("PRAGMA foreign_keys=OFF"))
|
|
|
|
|
| for t in tables:
|
| conn.execute(text(f'DELETE FROM "{t}"'))
|
|
|
| if is_sqlite:
|
| conn.execute(text("PRAGMA foreign_keys=ON"))
|
|
|
|
|
|
|
|
|
|
|
| def snapshot_sqlite(engine, ambiente: str) -> bytes:
|
| """
|
| Cria um snapshot (cópia física) do arquivo SQLite do banco ativo.
|
| Retorna o conteúdo do arquivo para download.
|
| """
|
| url = str(engine.url)
|
| if not url.startswith("sqlite:///"):
|
| raise RuntimeError("Snapshot físico disponível apenas para SQLite.")
|
| db_path = url.replace("sqlite:///", "")
|
| if not os.path.isfile(db_path):
|
| raise FileNotFoundError(f"Arquivo SQLite não encontrado: {db_path}")
|
|
|
| with open(db_path, "rb") as f:
|
| data = f.read()
|
|
|
|
|
| try:
|
| registrar_log(usuario=st.session_state.get("usuario"),
|
| acao=f"Snapshot SQLite ({ambiente})",
|
| tabela="backup",
|
| registro_id=None)
|
| except Exception:
|
| pass
|
|
|
| return data
|
|
|
|
|
|
|
|
|
|
|
| def main():
|
| st.title("🗄️ Backup & Restore | Export/Import de Banco")
|
|
|
|
|
| ambiente = current_db_choice()
|
| info = db_info()
|
| st.caption(f"🧭 Ambiente: {'Produção' if ambiente == 'prod' else 'Teste'}")
|
| st.caption(f"🔗 Engine URL: {info.get('url')}")
|
|
|
| engine = get_engine()
|
|
|
| st.divider()
|
| st.subheader("⬇️ Exportar dados")
|
|
|
| colA, colB, colC = st.columns(3)
|
| with colA:
|
| if st.button("Exportar ZIP (CSV por tabela)", type="primary"):
|
| try:
|
| zip_bytes = export_zip(engine, ambiente)
|
| fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
|
| st.download_button("📥 Baixar ZIP", data=zip_bytes, file_name=fname, mime="application/zip")
|
| registrar_log(usuario=st.session_state.get("usuario"),
|
| acao=f"Export ZIP (ambiente={ambiente})",
|
| tabela="backup", registro_id=None)
|
| except Exception as e:
|
| st.error(f"Falha ao exportar ZIP: {e}")
|
|
|
| with colB:
|
| if st.button("Exportar Excel (.xlsx)", type="primary"):
|
| try:
|
| xlsx_bytes = export_excel(engine, ambiente)
|
| fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
|
| st.download_button("📥 Baixar Excel", data=xlsx_bytes, file_name=fname,
|
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
|
| registrar_log(usuario=st.session_state.get("usuario"),
|
| acao=f"Export XLSX (ambiente={ambiente})",
|
| tabela="backup", registro_id=None)
|
| except Exception as e:
|
| st.error(f"Falha ao exportar Excel: {e}")
|
|
|
| with colC:
|
|
|
| url = str(engine.url)
|
| if url.startswith("sqlite:///"):
|
| if st.button("Snapshot físico (SQLite)", type="secondary"):
|
| try:
|
| snap_bytes = snapshot_sqlite(engine, ambiente)
|
| fname = f"snapshot_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
|
| st.download_button("📥 Baixar Snapshot (.db)", data=snap_bytes, file_name=fname, mime="application/octet-stream")
|
| except Exception as e:
|
| st.error(f"Falha ao criar snapshot: {e}")
|
| else:
|
| st.caption("ℹ️ Snapshot físico disponível apenas para SQLite.")
|
|
|
| st.divider()
|
| st.subheader("⬆️ Importar dados")
|
|
|
| mode = st.radio("Modo de importação:", ["APPEND (adicionar)", "REPLACE (substituir tudo)"], horizontal=True)
|
| mode_val = "append" if "APPEND" in mode else "replace"
|
|
|
| up_col1, up_col2 = st.columns(2)
|
| with up_col1:
|
| zip_file = st.file_uploader("Upload ZIP (CSV por tabela)", type=["zip"])
|
| if zip_file is not None and st.button("Importar do ZIP", type="primary"):
|
| try:
|
| report = import_zip(engine, zip_file.read(), mode=mode_val)
|
| st.success(f"Import ZIP concluído ({mode_val}).")
|
| st.json(report)
|
| registrar_log(usuario=st.session_state.get("usuario"),
|
| acao=f"Import ZIP ({mode_val}, ambiente={ambiente})",
|
| tabela="restore", registro_id=None)
|
| except Exception as e:
|
| st.error(f"Falha ao importar ZIP: {e}")
|
|
|
| with up_col2:
|
| xls_file = st.file_uploader("Upload Excel (.xlsx)", type=["xlsx"])
|
| if xls_file is not None and st.button("Importar do Excel", type="primary"):
|
| try:
|
| report = import_excel(engine, xls_file.read(), mode=mode_val)
|
| st.success(f"Import Excel concluído ({mode_val}).")
|
| st.json(report)
|
| registrar_log(usuario=st.session_state.get("usuario"),
|
| acao=f"Import XLSX ({mode_val}, ambiente={ambiente})",
|
| tabela="restore", registro_id=None)
|
| except Exception as e:
|
| st.error(f"Falha ao importar Excel: {e}")
|
|
|
| st.divider()
|
| st.info("⚠️ Recomendações:\n"
|
| "• Para restore completo com integridade referencial, prefira snapshot físico no SQLite, ou migrações controladas em bancos como Postgres/SQL Server.\n"
|
| "• O modo REPLACE desabilita FK temporariamente no SQLite para permitir limpeza; use com cautela.\n"
|
| "• Em produção, considere gerar backups com versionamento e retenção (ex.: timestamp no nome do arquivo).")
|
|
|
|
|
| def render():
|
|
|
| main()
|
|
|
|
|
| if __name__ == "__main__":
|
| st.set_page_config(page_title="Backup & Restore | ARM", layout="wide")
|
| main()
|
|
|
|
|