Spaces:
Sleeping
Sleeping
| """ | |
| Parquet module. | |
| TODO: handle migrations | |
| TODO: make it work with chunked exports. | |
| TODO: make it work with chunked imports. | |
| Mostly auto-generated by Cursor + GPT-5. | |
| """ | |
| import os | |
| import pandas as pd | |
| from sqlalchemy import inspect, text | |
| from sqlalchemy.engine import Engine | |
| from sqlmodel import Session | |
| def export_to_parquet(engine: Engine, backup_dir: str) -> None: | |
| """ | |
| Export each table in the database to a separate Parquet file. | |
| Loads entire tables into memory and sorts deterministically. | |
| TODO: make it work with chunked exports. | |
| TODO: handle migrations | |
| """ | |
| os.makedirs(backup_dir, exist_ok=True) | |
| inspector = inspect(engine) | |
| table_names = inspector.get_table_names() | |
| for table_name in table_names: | |
| file_path = os.path.join(backup_dir, f"{table_name}.parquet") | |
| # Load entire table into memory | |
| query = text(f"SELECT * FROM {table_name}") | |
| with engine.connect() as conn: | |
| df = pd.read_sql_query(query, conn) | |
| # Sort deterministically by all columns | |
| sort_cols = list(df.columns) | |
| df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True) | |
| # Write to Parquet | |
| df_sorted.to_parquet(file_path, index=False) | |
| print(f"Exported {table_name} to {file_path}") | |
| def import_from_parquet(engine: Engine, backup_dir: str) -> None: | |
| """ | |
| Import each Parquet file into the database. | |
| Checks schema strictly (column names + types). | |
| Loads entire files into memory. | |
| TODO: make it work with chunked imports. | |
| TODO: handle migrations | |
| """ | |
| inspector = inspect(engine) | |
| table_names = inspector.get_table_names() | |
| for table_name in table_names: | |
| file_path = os.path.join(backup_dir, f"{table_name}.parquet") | |
| if not os.path.exists(file_path): | |
| print(f"No backup found for table {table_name}, skipping.") | |
| continue | |
| # Clear table before import | |
| with Session(engine) as session: | |
| session.exec(text(f"DELETE FROM {table_name}")) | |
| # Load entire file and insert at once | |
| df = pd.read_parquet(file_path) | |
| with engine.begin() as conn: | |
| conn.execute(text(f"DELETE FROM {table_name}")) | |
| if not df.empty: | |
| columns = df.columns.tolist() | |
| total_rows = len(df) | |
| chunk_size = 10000 | |
| for start in range(0, total_rows, chunk_size): | |
| end = min(start + chunk_size, total_rows) | |
| chunk = df.iloc[start:end] | |
| values = chunk.to_dict(orient="records") | |
| insert_stmt = text( | |
| f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES " | |
| + ", ".join( | |
| [ | |
| "(" | |
| + ", ".join([f":{col}_{i}" for col in columns]) | |
| + ")" | |
| for i in range(len(values)) | |
| ] | |
| ) | |
| ) | |
| params = {} | |
| for i, row in enumerate(values): | |
| for col in columns: | |
| params[f"{col}_{i}"] = row[col] | |
| conn.execute(insert_stmt, params) | |
| print(f"Imported {table_name} from {file_path}") | |