| |
| """ |
| banco.py |
| Compatível com: |
| - Roteamento por ambiente (db_router.py): produção/teste/treinamento |
| - Fallback: um único DATABASE_URL vindo de env/Secrets |
| - Postgres / MySQL / SQLite (c/ alias de case para load.db no Linux) |
| - Garantia de criação do diretório pai do SQLite (evita 'unable to open database file') |
| |
| Principais melhorias: |
| - 'engine' agora é um PROXY dinâmico (quando houver db_router), refletindo a escolha atual. |
| - Mantém API compatível: engine, Base, SessionLocal, db_info(), get_engine(), etc. |
| """ |
|
|
| import os |
| import shutil |
| import importlib |
| from typing import Optional |
|
|
| from sqlalchemy import create_engine |
| from sqlalchemy.orm import sessionmaker, declarative_base |
| from dotenv import load_dotenv |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| |
| load_dotenv() |
|
|
| |
| |
| |
| def _ensure_sqlite_case_alias() -> str: |
| """ |
| Garante que exista 'load.db' no diretório do app. |
| Se encontrar 'Load.db' (ou outra variação de caixa), copia para 'load.db'. |
| Retorna o caminho absoluto de 'load.db'. |
| """ |
| lower = os.path.join(BASE_DIR, "load.db") |
| if os.path.exists(lower): |
| return lower |
|
|
| |
| for cand in ("Load.db", "LOAD.DB", "Load.DB"): |
| up = os.path.join(BASE_DIR, cand) |
| if os.path.exists(up): |
| try: |
| shutil.copy(up, lower) |
| except Exception: |
| |
| pass |
| break |
|
|
| return lower |
|
|
|
|
| |
| |
| |
| def _ensure_parent_dir_sqlite(sqlite_url: str) -> str: |
| """ |
| Garante que o diretório pai do arquivo SQLite exista. |
| Caso não consiga criar (permissão), cai para ~/.ioirun/<arquivo>.db |
| Retorna a URL (que pode ser ajustada para fallback). |
| """ |
| if not sqlite_url or not sqlite_url.startswith("sqlite"): |
| return sqlite_url |
|
|
| |
| path = sqlite_url.replace("sqlite:///", "", 1) |
| if path.startswith("//"): |
| path = path[1:] |
| file_path = os.path.abspath(path) |
| parent = os.path.dirname(file_path) |
|
|
| try: |
| os.makedirs(parent, exist_ok=True) |
| return sqlite_url |
| except Exception: |
| |
| home_dir = os.path.join(os.path.expanduser("~"), ".ioirun") |
| os.makedirs(home_dir, exist_ok=True) |
| alt = os.path.join(home_dir, os.path.basename(file_path)) |
| return f"sqlite:///{alt}" |
|
|
|
|
| |
| |
| |
| try: |
| from db_router import ( |
| get_engine as _router_get_engine, |
| get_session_factory as _router_get_session_factory, |
| SessionLocal as _router_SessionLocal, |
| current_db_choice as _router_current_choice, |
| bank_label as _router_bank_label, |
| ) |
| _HAS_ROUTER = True |
| except Exception: |
| _HAS_ROUTER = False |
| _router_get_engine = None |
| _router_get_session_factory = None |
| _router_SessionLocal = None |
| _router_current_choice = None |
| _router_bank_label = None |
|
|
|
|
| |
| |
| |
| def _build_fallback_uri() -> str: |
| """ |
| Monta a URI do banco quando não existe db_router. |
| Ordem de preferência: |
| 1. DATABASE_URL (completo) |
| 2. Variáveis separadas: DB_DRIVER, DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME |
| 3. SQLite local em 'load.db' |
| """ |
| |
| url = os.getenv("DATABASE_URL") |
| if url: |
| |
| return _ensure_parent_dir_sqlite(url) |
|
|
| |
| driver = (os.getenv("DB_DRIVER") or "").strip().lower() |
| host = os.getenv("DB_HOST") |
| port = os.getenv("DB_PORT") |
| user = os.getenv("DB_USER") |
| pwd = os.getenv("DB_PASS") |
| name = os.getenv("DB_NAME") |
|
|
| if driver and host and user and pwd and name: |
| |
| if not port: |
| port = "5432" if driver.startswith("post") else "3306" |
|
|
| if driver.startswith("post"): |
| return f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{name}" |
| elif driver.startswith("mysql"): |
| return f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{name}" |
|
|
| |
| sqlite_path = _ensure_sqlite_case_alias() |
| return _ensure_parent_dir_sqlite(f"sqlite:///{sqlite_path}") |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| if _HAS_ROUTER: |
| |
| def get_engine(): |
| """Retorna a engine do banco ATUAL (prod/test/treinamento).""" |
| return _router_get_engine() |
|
|
| def get_session_factory(): |
| """Retorna um sessionmaker para o banco ATUAL (do roteador).""" |
| return _router_get_session_factory() |
|
|
| |
| SessionLocal = _router_SessionLocal |
|
|
| else: |
| |
| DATABASE_URL = _build_fallback_uri() |
|
|
| engine_args = { |
| "echo": False, |
| "pool_pre_ping": True, |
| } |
|
|
| if DATABASE_URL.startswith("sqlite"): |
| |
| engine_args["connect_args"] = {"check_same_thread": False} |
|
|
| _engine = create_engine(DATABASE_URL, **engine_args) |
|
|
| def get_engine(): |
| """ |
| Engine fixa do fallback. Para trocar de banco em runtime sem roteador, |
| é necessário recriar a engine manualmente. |
| """ |
| return _engine |
|
|
| def get_session_factory(): |
| """Fábrica de sessão fixa (fallback).""" |
| return sessionmaker(autocommit=False, autoflush=False, bind=_engine) |
|
|
| |
| SessionLocal = get_session_factory() |
|
|
|
|
| |
| |
| |
| Base = declarative_base() |
|
|
| class _EngineProxy: |
| """ |
| Proxy leve que encaminha atributos/operções para a engine ATUAL. |
| Isso permite manter 'engine' importável sem cristalizar a escolha de banco. |
| """ |
| def __getattr__(self, name): |
| return getattr(get_engine(), name) |
|
|
| def __repr__(self): |
| eng = get_engine() |
| try: |
| url = str(eng.url) |
| except Exception: |
| url = "(url indisponível)" |
| if _HAS_ROUTER and _router_current_choice: |
| ch = _router_current_choice() |
| lbl = _router_bank_label(ch) if _router_bank_label else ch |
| return f"<EngineProxy choice={ch} label={lbl} url={url}>" |
| return f"<EngineProxy url={url}>" |
|
|
| |
| engine = _EngineProxy() |
|
|
|
|
| |
| |
| |
| def init_schema(): |
| """ |
| Cria/atualiza as tabelas no banco ATUAL. |
| • Com roteador: aplica no banco escolhido (Produção/Teste/Treinamento). |
| • Sem roteador: aplica no DATABASE_URL padrão. |
| |
| Use em DEV/TESTE; em produção, prefira migrações (ex.: Alembic). |
| """ |
| |
| try: |
| importlib.import_module("models") |
| except ModuleNotFoundError: |
| |
| |
| raise |
|
|
| Base.metadata.create_all(bind=get_engine()) |
|
|
|
|
| def db_info() -> dict: |
| """ |
| Retorna informações básicas do banco ativo (para debug/UX). |
| """ |
| eng = get_engine() |
| try: |
| url = str(eng.url) |
| except Exception: |
| try: |
| url = DATABASE_URL |
| except Exception: |
| url = "(não disponível)" |
|
|
| info = {"url": url, "using_router": _HAS_ROUTER} |
|
|
| if _HAS_ROUTER and _router_current_choice: |
| ch = _router_current_choice() |
| info["choice"] = ch |
| try: |
| info["label"] = _router_bank_label(ch) |
| except Exception: |
| info["label"] = ch |
|
|
| return info |