| |
| """ |
| importar_excel.py |
| Importa planilhas Excel para a tabela Equipamento, com: |
| - Geração de modelo Excel |
| - Upload e pré-visualização |
| - Verificação/edição de duplicidades |
| - Conversões seguras de tipos (datas/números/strings) |
| - Gravação transacional + auditoria resiliente |
| Compatível com Linux (Spaces) e local/Windows. |
| """ |
|
|
| import streamlit as st |
| import pandas as pd |
| from io import BytesIO |
| from datetime import datetime, date |
|
|
| from banco import SessionLocal |
| from models import Equipamento |
| from utils_auditoria import registrar_log |
|
|
| |
| |
| |
| COLUNAS_ESPERADAS = [ |
| "fpso1", "fpso", "data_coleta", "especialista", "conferente", "osm", "modal", |
| "quant_equip", "mrob", "linhas_osm", "linhas_mrob", "linhas_erros", |
| "erro_storekeeper", "erro_operacao", "erro_especialista", "erro_outros", |
| "inclusao_exclusao", "po", "part_number", "material", "solicitante", "motivo", |
| "requisitante", "nota_fiscal", "impacto", "dimensao", "observacoes", "dia_inclusao", |
| ] |
|
|
| |
| COLS_INTEIRAS = { |
| "quant_equip", "linhas_osm", "linhas_mrob", "linhas_erros" |
| } |
|
|
| |
| |
| |
| def _normalize_cols_case_insensitive(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Mapeia colunas do arquivo (case/espaços) para os nomes esperados em COLUNAS_ESPERADAS. |
| Ex.: " FPSo " -> "fpso"; "DATA_COLETA" -> "data_coleta". |
| Colunas não reconhecidas permanecem como estão. |
| """ |
| if df is None or df.empty: |
| return df |
|
|
| |
| existing = list(df.columns) |
| lower_map = {c.strip().lower(): c for c in existing} |
| new_names = {} |
| for wanted in COLUNAS_ESPERADAS: |
| key = wanted.lower() |
| if key in lower_map: |
| new_names[lower_map[key]] = wanted |
|
|
| |
| try: |
| df = df.rename(columns=new_names) |
| except Exception: |
| pass |
| return df |
|
|
|
|
| def to_date(value): |
| """ |
| Converte para date: |
| - pandas.Timestamp/datetime/date -> date |
| - string em formatos 'YYYY-MM-DD' ou 'DD/MM/YYYY' |
| - retorna None se não puder converter |
| Necessário para compatibilidade com SQLite e outros. |
| """ |
| if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| return None |
|
|
| if isinstance(value, pd.Timestamp): |
| return value.date() |
| if isinstance(value, datetime): |
| return value.date() |
| if isinstance(value, date): |
| return value |
|
|
| if isinstance(value, str): |
| s = value.strip() |
| if not s: |
| return None |
| |
| try: |
| return datetime.strptime(s, "%Y-%m-%d").date() |
| except Exception: |
| pass |
| |
| try: |
| return datetime.strptime(s, "%d/%m/%Y").date() |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def safe_value(value): |
| """ |
| Retorna 0 se o valor for vazio/NaN; senão o próprio valor. |
| Usado para campos "livres" que não podem ir nulos. |
| """ |
| if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| return 0 |
| return value |
|
|
|
|
| def safe_int(value) -> int: |
| """ |
| Converte para int com fallback (None/NaN/erro -> 0). |
| """ |
| try: |
| if value is None or pd.isna(value): |
| return 0 |
| |
| if isinstance(value, str) and value.strip() == "": |
| return 0 |
| return int(float(value)) |
| except Exception: |
| return 0 |
|
|
|
|
| def safe_str(value) -> str: |
| """ |
| Converte para string segura (None/NaN -> ""). |
| """ |
| if value is None or (isinstance(value, float) and pd.isna(value)) or pd.isna(value): |
| return "" |
| return str(value) |
|
|
|
|
| def _df_template() -> pd.DataFrame: |
| """ |
| Retorna um DataFrame vazio com as colunas esperadas (para gerar o modelo). |
| """ |
| return pd.DataFrame(columns=COLUNAS_ESPERADAS) |
|
|
|
|
| def _download_modelo_excel() -> BytesIO: |
| """ |
| Gera um arquivo Excel de modelo (aba MODELO) e retorna buffer. |
| """ |
| df = _df_template() |
| buf = BytesIO() |
| with pd.ExcelWriter(buf, engine="openpyxl") as writer: |
| df.to_excel(writer, index=False, sheet_name="MODELO") |
| buf.seek(0) |
| return buf |
|
|
|
|
| def _validate_required_cols(df: pd.DataFrame) -> list: |
| """ |
| Retorna a lista de colunas faltantes em relação a COLUNAS_ESPERADAS. |
| Apenas informa; a importação pode prosseguir mesmo faltando algumas |
| (desde que sua tabela/ETL aceite nulos). |
| """ |
| missing = [c for c in COLUNAS_ESPERADAS if c not in df.columns] |
| return missing |
|
|
|
|
| |
| |
| |
| def main(): |
| st.title("📥 Importação de Dados via Excel") |
|
|
| st.markdown( |
| """ |
| Este módulo permite: |
| - 📄 Baixar um **modelo Excel padrão** |
| - ✍️ Preencher os dados offline |
| - 🔍 Validar antes da gravação |
| - 💾 Importar os registros para o banco |
| """ |
| ) |
|
|
| |
| |
| |
| st.subheader("📄 Baixar modelo Excel") |
|
|
| buffer = _download_modelo_excel() |
| st.download_button( |
| label="⬇️ Baixar modelo Excel", |
| data=buffer, |
| file_name="modelo_importacao_load.xlsx", |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| ) |
|
|
| st.divider() |
|
|
| |
| |
| |
| st.subheader("📤 Importar arquivo preenchido") |
|
|
| arquivo = st.file_uploader( |
| "Selecione o arquivo Excel (.xlsx)", |
| type=["xlsx"] |
| ) |
|
|
| if not arquivo: |
| st.info("📌 Faça o upload de um arquivo para continuar.") |
| return |
|
|
| |
| try: |
| df = pd.read_excel(arquivo, engine="openpyxl") |
| except Exception as e: |
| st.error(f"❌ Erro ao ler o arquivo: {e}") |
| return |
|
|
| if df is None or df.empty: |
| st.error("❌ O arquivo não possui dados (planilha vazia).") |
| return |
|
|
| |
| df = _normalize_cols_case_insensitive(df) |
|
|
| |
| if "_row_id" not in df.columns: |
| df["_row_id"] = range(len(df)) |
|
|
| |
| st.session_state["df_raw"] = df.copy() |
|
|
| |
| faltantes = _validate_required_cols(df) |
| if faltantes: |
| st.warning( |
| "⚠️ Algumas colunas esperadas **não** foram encontradas no arquivo (" |
| + ", ".join(faltantes) + "). " |
| "Se esses campos forem obrigatórios em seu banco, a importação pode falhar." |
| ) |
|
|
| st.success("✅ Arquivo carregado com sucesso!") |
|
|
| |
| |
| |
| st.subheader("🔍 Prévia dos dados (arquivo lido)") |
| st.dataframe(df, use_container_width=True) |
|
|
| |
| |
| |
| st.subheader("🧪 Verificação de duplicidade") |
|
|
| st.caption( |
| "Escolha as colunas que definem a duplicidade. " |
| "Em seguida, marque o checkbox **_excluir** nas linhas que **não** deseja importar." |
| ) |
|
|
| |
| sugestao_chaves = [c for c in ["fpso", "osm", "po", "part_number", "nota_fiscal", "data_coleta"] if c in df.columns] |
|
|
| chaves = st.multiselect( |
| "📌 Colunas para verificação de duplicidade:", |
| options=list(df.columns), |
| default=sugestao_chaves if len(sugestao_chaves) > 0 else [] |
| ) |
|
|
| |
| work_df = df.copy() |
| work_df["_duplicado"] = False |
| work_df["_excluir"] = False |
|
|
| if len(chaves) == 0: |
| st.info("Selecione ao menos **uma** coluna para verificar duplicidade.") |
| |
| with st.expander("🔧 (Opcional) Excluir linhas manualmente mesmo sem duplicidade"): |
| manual_view = work_df.set_index("_row_id")[ |
| [c for c in work_df.columns if c not in ["_duplicado"]] + ["_excluir"] |
| ] |
| edited_manual = st.data_editor( |
| manual_view, |
| use_container_width=True, |
| num_rows="fixed", |
| column_config={ |
| "_excluir": st.column_config.CheckboxColumn("Excluir da importação") |
| } |
| ) |
| |
| work_df = work_df.set_index("_row_id") |
| work_df["_excluir"] = edited_manual["_excluir"].reindex(work_df.index).fillna(False).astype(bool) |
| work_df = work_df.reset_index() |
|
|
| else: |
| |
| mask_dup_any = work_df.duplicated(subset=chaves, keep=False) |
| work_df["_duplicado"] = mask_dup_any |
|
|
| |
| mask_dup_not_first = work_df.duplicated(subset=chaves, keep="first") |
| work_df.loc[mask_dup_not_first, "_excluir"] = True |
|
|
| if mask_dup_any.any(): |
| st.warning("⚠️ Foram encontradas linhas duplicadas com base nas chaves selecionadas:") |
| |
| cols_para_mostrar = chaves + [c for c in ["_duplicado", "_excluir"] if c not in chaves] |
| |
| seen = set() |
| cols_para_mostrar = [c for c in cols_para_mostrar if not (c in seen or seen.add(c))] |
|
|
| dup_view = work_df.loc[mask_dup_any].set_index("_row_id")[cols_para_mostrar] |
| edited_dup = st.data_editor( |
| dup_view, |
| use_container_width=True, |
| num_rows="fixed", |
| column_config={ |
| "_excluir": st.column_config.CheckboxColumn("Excluir da importação"), |
| "_duplicado": st.column_config.CheckboxColumn("Duplicado", disabled=True) |
| } |
| ) |
|
|
| |
| work_df = work_df.set_index("_row_id") |
| if "_excluir" in edited_dup.columns: |
| work_df.loc[edited_dup.index, "_excluir"] = ( |
| edited_dup["_excluir"].reindex(work_df.index).fillna(work_df["_excluir"]).astype(bool) |
| ) |
| work_df = work_df.reset_index() |
|
|
| st.info( |
| f"📊 Totais — Linhas: {len(work_df)} | Duplicadas: {mask_dup_any.sum()} | " |
| f"Marcadas para excluir: {int(work_df['_excluir'].sum())}" |
| ) |
| else: |
| st.success("✅ Nenhuma duplicidade encontrada com as chaves selecionadas.") |
|
|
| st.divider() |
|
|
| |
| |
| |
| df_para_importar = work_df[~work_df["_excluir"]].drop(columns=["_duplicado", "_excluir"], errors="ignore") |
| st.session_state["df_para_importar"] = df_para_importar.copy() |
|
|
| st.subheader("🧾 Prévia do que será importado") |
| st.caption("A prévia abaixo desconsidera as linhas marcadas como **_excluir**.") |
| st.dataframe(df_para_importar.drop(columns=["_row_id"], errors="ignore"), use_container_width=True) |
|
|
| |
| buf_prev = BytesIO() |
| with pd.ExcelWriter(buf_prev, engine="openpyxl") as writer: |
| df_para_importar.drop(columns=["_row_id"], errors="ignore").to_excel(writer, index=False, sheet_name="A_IMPORTAR") |
| buf_prev.seek(0) |
| st.download_button( |
| "⬇️ Baixar prévia (Excel)", |
| data=buf_prev, |
| file_name="previa_a_importar.xlsx", |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
| help="Baixe a prévia do conjunto que será gravado." |
| ) |
|
|
| |
| |
| |
| st.subheader("💾 Gravar dados no banco") |
|
|
| col1, col2 = st.columns(2) |
|
|
| if col1.button("💾 Salvar registros importados"): |
| df_import = st.session_state.get("df_para_importar", df) |
|
|
| if df_import.empty: |
| st.error("Não há registros para importar. Revise as exclusões.") |
| return |
|
|
| with SessionLocal() as db: |
| try: |
| for _, row in df_import.iterrows(): |
| registro = Equipamento( |
| fpso1=safe_str(row.get("fpso1")), |
| fpso=safe_str(row.get("fpso")), |
| data_coleta=to_date(row.get("data_coleta")), |
| especialista=safe_str(row.get("especialista")), |
| conferente=safe_str(row.get("conferente")), |
| osm=safe_str(row.get("osm")), |
| modal=safe_str(row.get("modal")), |
| quant_equip=safe_int(row.get("quant_equip")), |
| mrob=safe_str(row.get("mrob")), |
| linhas_osm=safe_int(row.get("linhas_osm")), |
| linhas_mrob=safe_int(row.get("linhas_mrob")), |
| linhas_erros=safe_int(row.get("linhas_erros")), |
| erro_storekeeper=safe_str(row.get("erro_storekeeper")), |
| erro_operacao=safe_str(row.get("erro_operacao")), |
| erro_especialista=safe_str(row.get("erro_especialista")), |
| erro_outros=safe_str(row.get("erro_outros")), |
| inclusao_exclusao=safe_str(row.get("inclusao_exclusao")), |
| po=safe_str(row.get("po")), |
| part_number=safe_str(row.get("part_number")), |
| material=safe_str(row.get("material")), |
| solicitante=safe_str(row.get("solicitante")), |
| motivo=safe_str(row.get("motivo")), |
| requisitante=safe_str(row.get("requisitante")), |
| nota_fiscal=safe_str(row.get("nota_fiscal")), |
| impacto=safe_str(row.get("impacto")), |
| dimensao=safe_str(row.get("dimensao")), |
| observacoes=safe_str(row.get("observacoes")), |
| dia_inclusao=safe_str(row.get("dia_inclusao")), |
| ) |
| db.add(registro) |
|
|
| db.commit() |
|
|
| try: |
| registrar_log( |
| usuario=st.session_state.get("usuario"), |
| acao=f"IMPORTAÇÃO EXCEL ({len(df_import)} registros) - com filtro de duplicidade", |
| tabela="equipamentos", |
| registro_id=None |
| ) |
| except Exception: |
| |
| pass |
|
|
| st.success(f"🎉 Importação concluída com sucesso! {len(df_import)} registros gravados.") |
|
|
| except Exception as e: |
| db.rollback() |
| st.error(f"❌ Erro ao gravar no banco: {e}") |
|
|
| if col2.button("❌ Cancelar importação"): |
| st.warning("Importação cancelada pelo usuário.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |