| |
| """ |
| login.py — Sistema de autenticação seguro para Streamlit |
| Compatível com HuggingFace Spaces (Linux) e Windows. |
| |
| Recursos: |
| - Login normal (usuários no banco SQLite/Postgres/MySQL) |
| - Hash seguro com bcrypt |
| - Autologin para testes (DISABLE_AUTH=1) |
| - Login emergencial (ALLOW_EMERGENCY_LOGIN=1 + EMERG_USER + EMERG_PASS_BCRYPT) |
| - Auditoria opcional via registrar_log |
| - Compatível com múltiplos nomes de coluna de senha (senha_hash, password_hash, senha, etc.) |
| """ |
|
|
| import os |
| import bcrypt |
| import streamlit as st |
|
|
| |
| try: |
| from utils_auditoria import registrar_log |
| _HAS_AUDIT = True |
| except Exception: |
| _HAS_AUDIT = False |
|
|
| |
| from banco import SessionLocal |
| from models import Usuario |
|
|
| |
| _DISABLE_AUTH = os.getenv("DISABLE_AUTH", "0") == "1" |
| _ALLOW_EMERG = os.getenv("ALLOW_EMERGENCY_LOGIN", "0") == "1" |
|
|
| _DEMO_USER = os.getenv("DEMO_USER", "demo") |
| _DEMO_PERFIL = os.getenv("DEMO_PERFIL", "admin") |
| _DEMO_EMAIL = os.getenv("DEMO_EMAIL", "demo@example.com") |
|
|
| |
| |
| |
| _PASSWORD_FIELDS_ENV = os.getenv("PASSWORD_FIELD", "").strip() |
| _PASSWORD_FIELDS = [f.strip() for f in _PASSWORD_FIELDS_ENV.split(",") if f.strip()] |
| |
| _DEFAULT_PASS_FIELDS = ["senha_hash", "password_hash", "senha", "hash", "pass_hash", "pwd_hash"] |
|
|
| def _get_password_hash_from_row(row) -> str | None: |
| """Retorna o hash de senha a partir do objeto ORM 'row', testando vários nomes possíveis.""" |
| |
| for fname in (_PASSWORD_FIELDS or []): |
| if hasattr(row, fname): |
| v = getattr(row, fname) |
| if v: |
| return str(v) |
|
|
| |
| for fname in _DEFAULT_PASS_FIELDS: |
| if hasattr(row, fname): |
| v = getattr(row, fname) |
| if v: |
| return str(v) |
|
|
| |
| try: |
| for k, v in getattr(row, "__dict__", {}).items(): |
| if k.startswith("_"): |
| continue |
| if k.lower() in set(_DEFAULT_PASS_FIELDS): |
| if v: |
| return str(v) |
| except Exception: |
| pass |
|
|
| return None |
|
|
| def _get_user_identity_fields(row): |
| """ |
| Retorna (usuario, perfil, email) com fallbacks para nomes alternativos. |
| """ |
| usuario = ( |
| getattr(row, "usuario", None) |
| or getattr(row, "username", None) |
| or getattr(row, "login", None) |
| or getattr(row, "nome_usuario", None) |
| ) |
| perfil = ( |
| getattr(row, "perfil", None) |
| or getattr(row, "role", None) |
| or getattr(row, "papel", None) |
| or "usuario" |
| ) |
| email = ( |
| getattr(row, "email", None) |
| or getattr(row, "e_mail", None) |
| or getattr(row, "mail", None) |
| ) |
| return usuario, perfil, email |
|
|
| def _fetch_user_by_login(db, user_login: str): |
| """ |
| Busca um usuário tentando múltiplas colunas de login, sem quebrar se alguma não existir. |
| Ordem: usuario, username, login, email. |
| """ |
| candidates = ["usuario", "username", "login", "email"] |
| for field in candidates: |
| try: |
| col = getattr(Usuario, field) |
| row = db.query(Usuario).filter(col == user_login.strip()).first() |
| if row: |
| return row |
| except Exception: |
| continue |
| return None |
|
|
|
|
| |
| |
| |
| def verificar_hash(senha_digitada: str, senha_hash: str) -> bool: |
| """Retorna True se a senha confere com o hash bcrypt.""" |
| try: |
| return bcrypt.checkpw( |
| senha_digitada.encode("utf-8"), |
| senha_hash.encode("utf-8") |
| ) |
| except Exception: |
| return False |
|
|
|
|
| |
| |
| |
| def _autologin_if_allowed() -> bool: |
| if not _DISABLE_AUTH: |
| return False |
|
|
| st.session_state.logado = True |
| st.session_state.usuario = _DEMO_USER |
| st.session_state.perfil = _DEMO_PERFIL |
| st.session_state.email = _DEMO_EMAIL |
|
|
| if _HAS_AUDIT: |
| try: |
| registrar_log( |
| usuario=_DEMO_USER, |
| acao="Autologin (DISABLE_AUTH=1)", |
| tabela="login", |
| registro_id=None |
| ) |
| except Exception: |
| pass |
|
|
| st.success(f"🔓 Autologin ativo: {_DEMO_USER} ({_DEMO_PERFIL})") |
| return True |
|
|
|
|
| |
| |
| |
| def _try_emergency_login(usuario: str, senha: str) -> bool: |
| if not _ALLOW_EMERG: |
| return False |
|
|
| EMU = os.getenv("EMERG_USER") |
| EHP = os.getenv("EMERG_PASS_BCRYPT") |
|
|
| if not EMU or not EHP: |
| return False |
|
|
| if usuario.strip().lower() != EMU.strip().lower(): |
| return False |
|
|
| if not verificar_hash(senha, EHP): |
| return False |
|
|
| st.session_state.logado = True |
| st.session_state.usuario = EMU |
| st.session_state.perfil = "admin" |
| st.session_state.email = f"{EMU}@local" |
|
|
| st.success("🔑 Login emergencial bem-sucedido.") |
|
|
| if _HAS_AUDIT: |
| try: |
| registrar_log( |
| usuario=EMU, |
| acao="Login emergencial", |
| tabela="login", |
| registro_id=None |
| ) |
| except Exception: |
| pass |
|
|
| return True |
|
|
|
|
| |
| |
| |
| def _login_normal(usuario: str, senha: str) -> bool: |
| db = SessionLocal() |
| try: |
| row = _fetch_user_by_login(db, usuario) |
| except Exception: |
| row = None |
| finally: |
| try: |
| db.close() |
| except Exception: |
| pass |
|
|
| if not row: |
| return False |
|
|
| |
| pwd_hash = _get_password_hash_from_row(row) |
| if not pwd_hash: |
| |
| return False |
|
|
| if not verificar_hash(senha, pwd_hash): |
| return False |
|
|
| |
| u, p, e = _get_user_identity_fields(row) |
| st.session_state.logado = True |
| st.session_state.usuario = u or usuario |
| st.session_state.perfil = (p or "usuario").strip().lower() |
| st.session_state.email = e |
|
|
| if _HAS_AUDIT: |
| try: |
| registrar_log( |
| usuario=st.session_state.usuario, |
| acao="Login normal", |
| tabela="login", |
| registro_id=getattr(row, "id", None) |
| ) |
| except Exception: |
| pass |
|
|
| return True |
|
|
|
|
| |
| |
| |
| def login(): |
| |
| if _autologin_if_allowed(): |
| return |
|
|
| st.markdown("### 🔐 Login") |
| with st.form("form_login"): |
| usuario = st.text_input("Usuário") |
| senha = st.text_input("Senha", type="password") |
| btn = st.form_submit_button("Entrar") |
|
|
| if btn: |
| usuario = usuario.strip() |
| senha = senha.strip() |
|
|
| |
| if _login_normal(usuario, senha): |
| st.rerun() |
| return |
|
|
| |
| if _try_emergency_login(usuario, senha): |
| st.rerun() |
| return |
|
|
| st.error("Usuário ou senha incorretos.") |
|
|
| |
| if _ALLOW_EMERG: |
| st.info("🔑 Login emergencial disponível.") |
|
|
|
|
| |
| |
| |
| def logout(): |
| st.session_state.logado = False |
| st.session_state.usuario = None |
| st.session_state.perfil = None |
| st.session_state.email = None |
| st.rerun() |