| from __future__ import annotations |
|
|
| import argparse |
| import logging |
| from pathlib import Path |
| from typing import Dict, Iterable, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| import sqlalchemy as sa |
| from sqlalchemy.dialects.postgresql import insert |
|
|
| from src.constants import CANDIDATE_CATEGORIES |
| from src.data import preprocess as preprocess_module |
| from src.db.schema import ( |
| bureaux, |
| categories, |
| communes, |
| create_schema, |
| elections, |
| get_engine, |
| results_local, |
| results_national, |
| ) |
| from src.features import build_features |
|
|
| LOGGER = logging.getLogger(__name__) |
| TARGET_COLS = [f"target_share_{c}" for c in CANDIDATE_CATEGORIES] |
| ID_COLS = ["commune_code", "code_bv", "election_type", "election_year", "round", "date_scrutin"] |
|
|
|
|
| def load_panel(input_path: Path) -> pd.DataFrame: |
| if not input_path.exists(): |
| raise FileNotFoundError(f"Dataset panel introuvable : {input_path}") |
| if input_path.suffix == ".parquet": |
| return pd.read_parquet(input_path) |
| return pd.read_csv(input_path, sep=";") |
|
|
|
|
| def ensure_panel_exists(panel_path: Path, elections_long_path: Path, mapping_path: Path) -> pd.DataFrame: |
| if panel_path.exists(): |
| return load_panel(panel_path) |
| LOGGER.info("Panel manquant, tentative de reconstruction via preprocess + build_features.") |
| if not elections_long_path.exists(): |
| preprocess_module.preprocess_all(Path("data/raw"), elections_long_path.parent, preprocess_module.DEFAULT_META_CONFIG) |
| build_features.build_panel(elections_long_path, mapping_path, panel_path, csv_output=None) |
| return load_panel(panel_path) |
|
|
|
|
| def check_mass(panel: pd.DataFrame, tolerance: float = 0.05) -> None: |
| sums = panel[TARGET_COLS].sum(axis=1) |
| bad = panel[(sums < (1 - tolerance)) | (sums > (1 + tolerance))] |
| if not bad.empty: |
| LOGGER.warning("Somme des parts hors intervalle attendu pour %s lignes (tol=%s).", len(bad), tolerance) |
|
|
|
|
| def melt_panel(panel: pd.DataFrame) -> pd.DataFrame: |
| long_df = panel.melt(id_vars=ID_COLS + ["turnout_pct"], value_vars=TARGET_COLS, var_name="category", value_name="share") |
| long_df["category"] = long_df["category"].str.replace("target_share_", "", regex=False) |
| return long_df |
|
|
|
|
| def _upsert_simple(conn, table, rows: Iterable[dict], index_elements: Iterable[str]) -> None: |
| stmt = insert(table).values(list(rows)) |
| stmt = stmt.on_conflict_do_nothing(index_elements=list(index_elements)) |
| if rows: |
| conn.execute(stmt) |
|
|
|
|
| def ingest(panel: pd.DataFrame, engine) -> None: |
| check_mass(panel) |
| panel = panel.copy() |
| panel["round"] = panel["round"].fillna(1).astype(int) |
| panel["date_scrutin"] = pd.to_datetime(panel["date_scrutin"]).dt.date |
|
|
| long_df = melt_panel(panel) |
| long_df = long_df[long_df["category"].isin(CANDIDATE_CATEGORIES)] |
| long_df["share_pct"] = (long_df["share"].astype(float) * 100).round(6) |
|
|
| with engine.begin() as conn: |
| create_schema(conn) |
| LOGGER.info("Schéma vérifié.") |
|
|
| _upsert_simple(conn, categories, [{"name": cat} for cat in CANDIDATE_CATEGORIES], ["name"]) |
| cat_map = dict(conn.execute(sa.select(categories.c.name, categories.c.id))) |
|
|
| commune_rows = [ |
| {"name_normalized": code, "insee_code": code} |
| for code in sorted(long_df["commune_code"].dropna().unique()) |
| ] |
| _upsert_simple(conn, communes, commune_rows, ["insee_code"]) |
| commune_map = dict(conn.execute(sa.select(communes.c.insee_code, communes.c.id))) |
|
|
| def bureau_code_only(code_bv: str) -> str: |
| if "-" in str(code_bv): |
| parts = str(code_bv).split("-", 1) |
| return parts[1] |
| return str(code_bv) |
|
|
| bureau_rows = [] |
| for _, row in long_df.drop_duplicates(subset=["commune_code", "code_bv"]).iterrows(): |
| commune_id = commune_map.get(row["commune_code"]) |
| if commune_id is None: |
| continue |
| bureau_rows.append( |
| { |
| "commune_id": commune_id, |
| "bureau_code": bureau_code_only(row["code_bv"]), |
| "bureau_label": None, |
| } |
| ) |
| _upsert_simple(conn, bureaux, bureau_rows, ["commune_id", "bureau_code"]) |
| bureau_map = { |
| (commune_id, bureau_code): bureau_id |
| for bureau_id, commune_id, bureau_code in conn.execute( |
| sa.select(bureaux.c.id, bureaux.c.commune_id, bureaux.c.bureau_code) |
| ) |
| } |
|
|
| election_rows = [] |
| for _, row in panel.drop_duplicates(subset=["election_type", "election_year", "round"]).iterrows(): |
| election_rows.append( |
| { |
| "election_type": row["election_type"], |
| "election_year": int(row["election_year"]), |
| "round": int(row["round"]) if not pd.isna(row["round"]) else None, |
| "date": row["date_scrutin"], |
| } |
| ) |
| _upsert_simple(conn, elections, election_rows, ["election_type", "election_year", "round"]) |
| election_map: Dict[Tuple[str, int, int], int] = { |
| (etype, year, int(round_) if round_ is not None else 1): eid |
| for eid, etype, year, round_ in conn.execute( |
| sa.select(elections.c.id, elections.c.election_type, elections.c.election_year, elections.c.round) |
| ) |
| } |
|
|
| local_rows = [] |
| for row in long_df.itertuples(index=False): |
| commune_id = commune_map.get(row.commune_code) |
| if commune_id is None: |
| continue |
| bureau_id = bureau_map.get((commune_id, bureau_code_only(row.code_bv))) |
| election_id = election_map.get((row.election_type, int(row.election_year), int(row.round))) |
| category_id = cat_map.get(row.category) |
| if None in (bureau_id, election_id, category_id): |
| continue |
| turnout_pct = None if pd.isna(row.turnout_pct) else float(row.turnout_pct) * 100 |
| local_rows.append( |
| { |
| "bureau_id": bureau_id, |
| "election_id": election_id, |
| "category_id": category_id, |
| "share_pct": None if pd.isna(row.share_pct) else float(row.share_pct), |
| "votes": None, |
| "expressed": None, |
| "turnout_pct": turnout_pct, |
| } |
| ) |
| if local_rows: |
| stmt = insert(results_local).values(local_rows) |
| stmt = stmt.on_conflict_do_update( |
| index_elements=["bureau_id", "election_id", "category_id"], |
| set_={ |
| "share_pct": stmt.excluded.share_pct, |
| "votes": stmt.excluded.votes, |
| "expressed": stmt.excluded.expressed, |
| "turnout_pct": stmt.excluded.turnout_pct, |
| }, |
| ) |
| conn.execute(stmt) |
| LOGGER.info("Résultats locaux insérés/mis à jour : %s lignes", len(local_rows)) |
|
|
| nat_rows = [] |
| nat = ( |
| long_df.groupby(["election_type", "election_year", "round", "category"], as_index=False) |
| .agg(share=("share_pct", "mean")) |
| .rename(columns={"share": "share_pct"}) |
| ) |
| |
| turnout_nat = panel.groupby(["election_type", "election_year", "round"], as_index=False)["turnout_pct"].mean() |
| nat = nat.merge(turnout_nat, on=["election_type", "election_year", "round"], how="left") |
|
|
| for row in nat.itertuples(index=False): |
| election_id = election_map.get((row.election_type, int(row.election_year), int(row.round))) |
| category_id = cat_map.get(row.category) |
| if None in (election_id, category_id): |
| continue |
| nat_rows.append( |
| { |
| "election_id": election_id, |
| "category_id": category_id, |
| "share_pct": None if pd.isna(row.share_pct) else float(row.share_pct), |
| "votes": None, |
| "expressed": None, |
| "turnout_pct": None if pd.isna(row.turnout_pct) else float(row.turnout_pct * 100), |
| } |
| ) |
| if nat_rows: |
| stmt = insert(results_national).values(nat_rows) |
| stmt = stmt.on_conflict_do_update( |
| index_elements=["election_id", "category_id"], |
| set_={ |
| "share_pct": stmt.excluded.share_pct, |
| "votes": stmt.excluded.votes, |
| "expressed": stmt.excluded.expressed, |
| "turnout_pct": stmt.excluded.turnout_pct, |
| }, |
| ) |
| conn.execute(stmt) |
| LOGGER.info("Référentiels nationaux insérés/mis à jour : %s lignes", len(nat_rows)) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Ingestion du panel harmonisé dans PostgreSQL.") |
| parser.add_argument("--input", type=Path, default=Path("data/processed/panel.parquet"), help="Chemin vers le panel parquet.") |
| parser.add_argument( |
| "--elections-long", |
| type=Path, |
| default=Path("data/interim/elections_long.parquet"), |
| help="Format long (fallback pour reconstruire le panel).", |
| ) |
| parser.add_argument( |
| "--mapping", |
| type=Path, |
| default=Path("data/mapping_candidats_blocs.csv"), |
| help="Mapping nuance -> catégorie (fallback).", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def main() -> None: |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") |
| args = parse_args() |
| panel = ensure_panel_exists(args.input, args.elections_long, args.mapping) |
| engine = get_engine() |
| ingest(panel, engine) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|