| from typing import Hashable, Optional, Literal |
|
|
| import pandas as pd |
| from sqlalchemy import text |
|
|
| from src.config import sql_client |
|
|
|
|
| def sql_drop(table: str): |
| try: |
| with sql_client.begin() as conn: |
| conn.execute(text(f"drop table if exists {table};")) |
| except Exception as e: |
| print("Ошибка:", e) |
|
|
|
|
| def sql_dump_df( |
| df: pd.DataFrame, |
| table: str, |
| if_exists: Literal["replace", "append"] = "append", |
| ) -> Optional[int]: |
| return df.to_sql(table, sql_client, if_exists=if_exists, index=False) |
|
|
|
|
| def sql_get_table(table: str) -> pd.DataFrame: |
| with sql_client.connect() as conn: |
| df = pd.read_sql(f"""select * from {table}""", conn) |
|
|
| return df |
|
|
|
|
| def sql_get_by_id(id_: Hashable) -> dict: |
| with sql_client.connect() as conn: |
| row = ( |
| conn.execute( |
| text("SELECT * FROM posts WHERE ctid = :id"), |
| {"id": id_}, |
| ) |
| .mappings() |
| .first() |
| ) |
|
|
| return row |
|
|
|
|
| def sql_get_by_ids(ids_: Hashable) -> list[dict]: |
| with sql_client.connect() as conn: |
| rows = ( |
| conn.execute( |
| text("SELECT * FROM posts WHERE ctid = ANY(:ids)"), |
| {"ids": ids_}, |
| ) |
| .mappings() |
| .all() |
| ) |
|
|
| return rows |
|
|
|
|
| def sql_fetch_batch(batch_size: int = 16, offset: int = 0): |
| query = text(""" |
| SELECT ctid, content |
| FROM posts |
| ORDER BY ctid |
| LIMIT :limit |
| OFFSET :offset |
| """) |
|
|
| with sql_client.connect() as conn: |
| rows = conn.execute(query, {"limit": batch_size, "offset": offset}).mappings().all() |
|
|
| return rows |
|
|
|
|
| def sql_get_by_date(message_date: str): |
| with sql_client.connect() as conn: |
| rows = ( |
| conn.execute( |
| text( |
| """ |
| SELECT * |
| FROM posts |
| WHERE message_dt = :message_date |
| """ |
| ), |
| {"message_date": message_date}, |
| ) |
| .mappings() |
| .all() |
| ) |
|
|
| return rows |
|
|