| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Iterable, Mapping, Optional |
|
|
| import pandas as pd |
| import re |
| import yaml |
|
|
| from .constants import CANDIDATE_CATEGORIES |
|
|
|
|
| def normalize_bloc(bloc: str | None) -> str: |
| """ |
| Map bloc labels to the canonical categories used across the project. |
| """ |
| if bloc is None: |
| return "centre" |
| norm = str(bloc).strip().lower().replace(" ", "_").replace("-", "_") |
| synonyms = { |
| "droite_moderee": "droite_modere", |
| "gauche_moderee": "gauche_modere", |
| "doite_dure": "droite_dure", |
| "gauche": "gauche_modere", |
| "droite": "droite_modere", |
| "divers": "centre", |
| "divers_droite": "droite_modere", |
| "divers_gauche": "gauche_modere", |
| "divers_centre": "centre", |
| "extreme_gauche": "extreme_gauche", |
| "extreme_droite": "extreme_droite", |
| } |
| norm = synonyms.get(norm, norm) |
| if norm not in CANDIDATE_CATEGORIES: |
| return "centre" |
| return norm |
|
|
|
|
| DEFAULT_COMMUNES_PATH = (Path(__file__).resolve().parents[1] / "config" / "communes.yaml") |
|
|
|
|
| def _normalize_insee_code(value: str | int | None) -> str: |
| if value is None: |
| return "" |
| cleaned = ( |
| str(value) |
| .strip() |
| .replace(".0", "") |
| ) |
| cleaned = re.sub(r"\D", "", cleaned) |
| if not cleaned: |
| return "" |
| if len(cleaned) >= 5: |
| return cleaned[:5] |
| return cleaned.zfill(5) |
|
|
|
|
| def load_target_communes(path: Path = DEFAULT_COMMUNES_PATH) -> dict[str, str]: |
| if not path.exists(): |
| raise FileNotFoundError(f"Fichier communes introuvable: {path}") |
| raw = yaml.safe_load(path.read_text()) or {} |
| entries = raw.get("communes", raw) if isinstance(raw, dict) else raw |
| communes: dict[str, str] = {} |
|
|
| if isinstance(entries, dict): |
| for code, name in entries.items(): |
| norm = _normalize_insee_code(code) |
| if norm: |
| communes[norm] = str(name) if name is not None else "" |
| return communes |
|
|
| if not isinstance(entries, list): |
| raise ValueError("Format YAML invalide: attendu une liste ou un mapping sous 'communes'.") |
|
|
| for entry in entries: |
| if isinstance(entry, str): |
| norm = _normalize_insee_code(entry) |
| if norm: |
| communes[norm] = "" |
| continue |
| if isinstance(entry, dict): |
| code = entry.get("code_insee") or entry.get("code") or entry.get("insee") |
| name = entry.get("nom") or entry.get("name") or "" |
| norm = _normalize_insee_code(code) |
| if norm: |
| communes[norm] = str(name) if name is not None else "" |
| continue |
| return communes |
|
|
|
|
| def load_elections_long(path: Path) -> pd.DataFrame: |
| """ |
| Load the harmonised long format dataset (output of notebook 01_pretraitement). |
| """ |
| if path.suffix == ".parquet": |
| df = pd.read_parquet(path) |
| else: |
| df = pd.read_csv(path, sep=";") |
| df["date_scrutin"] = pd.to_datetime(df["date_scrutin"]) |
| numeric_cols = ["exprimes", "inscrits", "votants", "voix", "blancs", "nuls"] |
| for col in numeric_cols: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| df["voix"] = df["voix"].fillna(0) |
| return df |
|
|
|
|
| def _mapping_from_yaml(path: Path) -> pd.DataFrame: |
| try: |
| import yaml |
| except Exception as exc: |
| raise RuntimeError("PyYAML est requis pour charger un mapping YAML.") from exc |
| raw = yaml.safe_load(path.read_text()) or {} |
| if not isinstance(raw, dict): |
| raise ValueError("Mapping YAML invalide: attendu un dictionnaire.") |
|
|
| base_mapping = raw.get("base_mapping") |
| mapping_entries = raw.get("mapping") |
| overrides = raw.get("overrides", []) |
|
|
| mapping = pd.DataFrame() |
| if mapping_entries: |
| mapping = pd.DataFrame(mapping_entries) |
| elif base_mapping: |
| base_path = Path(base_mapping) |
| if not base_path.is_absolute(): |
| base_path = path.parent / base_path |
| mapping = pd.read_csv(base_path, sep=";") |
| else: |
| mapping = pd.DataFrame(columns=["code_candidature", "nom_candidature", "bloc_1", "bloc_2", "bloc_3"]) |
|
|
| if overrides: |
| override_df = pd.DataFrame(overrides) |
| if not override_df.empty: |
| if "blocs" in override_df.columns: |
| blocs = override_df["blocs"].apply(lambda v: v if isinstance(v, list) else []) |
| override_df["bloc_1"] = blocs.apply(lambda v: v[0] if len(v) > 0 else None) |
| override_df["bloc_2"] = blocs.apply(lambda v: v[1] if len(v) > 1 else None) |
| override_df["bloc_3"] = blocs.apply(lambda v: v[2] if len(v) > 2 else None) |
| override_df = override_df.drop(columns=["blocs"]) |
| if "code_candidature" not in override_df.columns and "code" in override_df.columns: |
| override_df = override_df.rename(columns={"code": "code_candidature"}) |
| if "nom_candidature" not in override_df.columns and "nom" in override_df.columns: |
| override_df = override_df.rename(columns={"nom": "nom_candidature"}) |
|
|
| mapping = mapping.copy() |
| if "code_candidature" in mapping.columns: |
| mapping["code_candidature"] = mapping["code_candidature"].astype(str) |
| if "code_candidature" in override_df.columns: |
| override_df["code_candidature"] = override_df["code_candidature"].astype(str) |
|
|
| for _, row in override_df.iterrows(): |
| code = row.get("code_candidature") |
| if code is None: |
| continue |
| if "code_candidature" in mapping.columns: |
| mask = mapping["code_candidature"] == code |
| else: |
| mask = pd.Series([False] * len(mapping)) |
| if mask.any(): |
| for col in ["nom_candidature", "bloc_1", "bloc_2", "bloc_3"]: |
| if col in row and pd.notna(row[col]): |
| mapping.loc[mask, col] = row[col] |
| else: |
| mapping = pd.concat([mapping, pd.DataFrame([row])], ignore_index=True) |
| return mapping |
|
|
|
|
| def load_bloc_mapping(path: Path) -> pd.DataFrame: |
| if path.suffix in {".yml", ".yaml"}: |
| mapping = _mapping_from_yaml(path) |
| else: |
| mapping = pd.read_csv(path, sep=";") |
| |
| for col in ["bloc_1", "bloc_2", "bloc_3"]: |
| if col in mapping.columns: |
| mapping[col] = mapping[col].apply(normalize_bloc) |
| return mapping |
|
|
|
|
| def expand_voix_by_bloc(elections_long: pd.DataFrame, mapping: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Distribute voix of each candidature across its mapped blocs. |
| """ |
| df = elections_long.merge(mapping, on="code_candidature", how="left") |
| records: list[dict] = [] |
| for _, row in df.iterrows(): |
| blocs = [row.get("bloc_1"), row.get("bloc_2"), row.get("bloc_3")] |
| blocs = [b for b in blocs if isinstance(b, str) and b] |
| blocs = [normalize_bloc(b) for b in blocs] |
| if not blocs: |
| blocs = ["centre"] |
| voix = row.get("voix", 0) or 0 |
| repartition = voix / len(blocs) |
| for bloc in blocs: |
| records.append( |
| { |
| "code_bv": row.get("code_bv"), |
| "nom_bv": row.get("nom_bv"), |
| "date_scrutin": row.get("date_scrutin"), |
| "annee": row.get("annee"), |
| "type_scrutin": row.get("type_scrutin"), |
| "tour": row.get("tour"), |
| "bloc": bloc, |
| "voix_bloc": repartition, |
| "exprimes": row.get("exprimes"), |
| "inscrits": row.get("inscrits"), |
| "votants": row.get("votants"), |
| "blancs": row.get("blancs"), |
| "nuls": row.get("nuls"), |
| } |
| ) |
| result = pd.DataFrame.from_records(records) |
| result["date_scrutin"] = pd.to_datetime(result["date_scrutin"]) |
| for col in ["voix_bloc", "exprimes", "inscrits", "votants", "blancs", "nuls"]: |
| result[col] = pd.to_numeric(result[col], errors="coerce") |
| result["part_bloc"] = result["voix_bloc"] / result["exprimes"] |
| base_inscrits = result["inscrits"].replace(0, pd.NA) |
| result["taux_participation_bv"] = result["votants"] / base_inscrits |
| result["taux_blancs_bv"] = result["blancs"] / base_inscrits |
| result["taux_nuls_bv"] = result["nuls"] / base_inscrits |
| return result |
|
|
|
|
| def compute_national_reference(elections_blocs: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Aggregate national part/participation per date & bloc if no external national file is provided. |
| """ |
| grouped = ( |
| elections_blocs.groupby(["date_scrutin", "bloc"], as_index=False)[["voix_bloc", "exprimes", "votants", "inscrits"]] |
| .sum() |
| .rename(columns={"voix_bloc": "voix_bloc_nat", "exprimes": "exprimes_nat", "votants": "votants_nat", "inscrits": "inscrits_nat"}) |
| ) |
| grouped["part_bloc_national"] = grouped["voix_bloc_nat"] / grouped["exprimes_nat"].replace(0, pd.NA) |
| grouped["taux_participation_national"] = grouped["votants_nat"] / grouped["inscrits_nat"].replace(0, pd.NA) |
| return grouped[["date_scrutin", "bloc", "part_bloc_national", "taux_participation_national"]] |
|
|
|
|
| def attach_national_results( |
| elections_blocs: pd.DataFrame, |
| resultats_nationaux: Optional[pd.DataFrame] = None, |
| ) -> pd.DataFrame: |
| """ |
| Merge national reference scores if provided; otherwise, compute them from the full dataset. |
| """ |
| if resultats_nationaux is None: |
| df_nat = compute_national_reference(elections_blocs) |
| else: |
| df_nat = resultats_nationaux.copy() |
| df_nat["date_scrutin"] = pd.to_datetime(df_nat["date_scrutin"]) |
|
|
| elections_blocs = elections_blocs.merge(df_nat, on=["date_scrutin", "bloc"], how="left") |
| elections_blocs["ecart_bloc_vs_national"] = ( |
| elections_blocs["part_bloc"] - elections_blocs["part_bloc_national"] |
| ) |
| elections_blocs["ecart_participation_vs_nat"] = ( |
| elections_blocs["taux_participation_bv"] - elections_blocs["taux_participation_national"] |
| ) |
| return elections_blocs |
|
|
|
|
| def compute_population_growth(elections_blocs: pd.DataFrame, base_year: int = 2014) -> pd.DataFrame: |
| bv_pop = elections_blocs.groupby(["code_bv", "annee"], as_index=False)["inscrits"].mean() |
| bv_base = ( |
| bv_pop[bv_pop["annee"] == base_year][["code_bv", "inscrits"]] |
| .rename(columns={"inscrits": "inscrits_base"}) |
| ) |
| bv_pop = bv_pop.merge(bv_base, on="code_bv", how="left") |
| bv_pop["croissance_inscrits_depuis_base"] = ( |
| bv_pop["inscrits"] - bv_pop["inscrits_base"] |
| ) / bv_pop["inscrits_base"] |
|
|
| elections_blocs = elections_blocs.merge( |
| bv_pop[["code_bv", "annee", "croissance_inscrits_depuis_base"]], |
| on=["code_bv", "annee"], |
| how="left", |
| ) |
| return elections_blocs |
|
|
|
|
| def add_lag_features(elections_blocs: pd.DataFrame) -> pd.DataFrame: |
| df = elections_blocs.sort_values(["code_bv", "bloc", "date_scrutin"]) |
| df["part_bloc_lag1"] = df.groupby(["code_bv", "bloc"])["part_bloc"].shift(1) |
| df["ecart_bloc_vs_national_lag1"] = df.groupby(["code_bv", "bloc"])[ |
| "ecart_bloc_vs_national" |
| ].shift(1) |
| df["taux_participation_bv_lag1"] = df.groupby(["code_bv", "bloc"])[ |
| "taux_participation_bv" |
| ].shift(1) |
| df["annee_centre"] = df["annee"] - df["annee"].median() |
| return df |
|
|
|
|
| def filter_target_communes(elections_blocs: pd.DataFrame, target_communes: Mapping[str, str]) -> pd.DataFrame: |
| """ |
| Keep only bureaux belonging to the target communes list. |
| """ |
| df = elections_blocs.copy() |
| if "code_commune" in df.columns: |
| code_series = df["code_commune"].astype(str) |
| else: |
| code_series = df["code_bv"].astype(str).str.split("-").str[0] |
| code_series = code_series.str.replace(r"\D", "", regex=True).str.zfill(5).str.slice(0, 5) |
| df["code_commune"] = code_series |
| df["nom_commune"] = df["code_commune"].map(target_communes) |
| return df[df["code_commune"].isin(target_communes.keys())] |
|
|
|
|
| def compute_commune_event_stats( |
| elections_long: pd.DataFrame, |
| target_communes: Mapping[str, str], |
| ) -> pd.DataFrame: |
| df = elections_long.copy() |
| if "code_commune" in df.columns: |
| code_series = df["code_commune"].astype(str) |
| else: |
| code_series = df["code_bv"].astype(str).str.split("-").str[0] |
| code_series = code_series.str.replace(r"\D", "", regex=True).str.zfill(5).str.slice(0, 5) |
| df["code_commune"] = code_series |
| df = df[df["code_commune"].isin(target_communes.keys())] |
| df["nom_commune"] = df["code_commune"].map(target_communes) |
| if "date_scrutin" in df.columns: |
| df["date_scrutin"] = pd.to_datetime(df["date_scrutin"], errors="coerce") |
| for col in ["exprimes", "inscrits", "votants", "blancs", "nuls"]: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| else: |
| df[col] = pd.NA |
|
|
| bv_cols = [c for c in ["code_commune", "code_bv", "type_scrutin", "annee", "tour", "date_scrutin"] if c in df.columns] |
| bv_event = ( |
| df.groupby(bv_cols, as_index=False) |
| .agg( |
| exprimes=("exprimes", "max"), |
| inscrits=("inscrits", "max"), |
| votants=("votants", "max"), |
| blancs=("blancs", "max"), |
| nuls=("nuls", "max"), |
| ) |
| ) |
| commune_cols = [c for c in ["code_commune", "type_scrutin", "annee", "tour", "date_scrutin"] if c in bv_event.columns] |
| commune = ( |
| bv_event.groupby(commune_cols, as_index=False) |
| .agg( |
| exprimes=("exprimes", "sum"), |
| inscrits=("inscrits", "sum"), |
| votants=("votants", "sum"), |
| blancs=("blancs", "sum"), |
| nuls=("nuls", "sum"), |
| ) |
| ) |
| base_inscrits = commune["inscrits"].replace(0, pd.NA) |
| commune["turnout_pct"] = commune["votants"] / base_inscrits |
| commune["blancs_pct"] = commune["blancs"] / base_inscrits |
| commune["nuls_pct"] = commune["nuls"] / base_inscrits |
| commune["nom_commune"] = commune["code_commune"].map(target_communes) |
| return commune |
|
|
|
|
| def build_elections_blocs( |
| elections_long_path: Path, |
| mapping_path: Path, |
| *, |
| national_results_path: Optional[Path] = None, |
| base_year: int = 2014, |
| target_communes_path: Path = DEFAULT_COMMUNES_PATH, |
| ) -> pd.DataFrame: |
| elections_long = load_elections_long(elections_long_path) |
| mapping = load_bloc_mapping(mapping_path) |
|
|
| elections_blocs = expand_voix_by_bloc(elections_long, mapping) |
|
|
| national_df = None |
| if national_results_path and national_results_path.exists(): |
| if national_results_path.suffix == ".parquet": |
| national_df = pd.read_parquet(national_results_path) |
| else: |
| national_df = pd.read_csv(national_results_path, sep=";") |
| |
| elections_blocs = attach_national_results(elections_blocs, national_df) |
| |
| target_communes = load_target_communes(target_communes_path) |
| elections_blocs = filter_target_communes(elections_blocs, target_communes) |
|
|
| elections_blocs = compute_population_growth(elections_blocs, base_year=base_year) |
| elections_blocs = add_lag_features(elections_blocs) |
| return elections_blocs |
|
|
|
|
| def save_processed(df: pd.DataFrame, output_dir: Path) -> None: |
| output_dir.mkdir(parents=True, exist_ok=True) |
| parquet_path = output_dir / "elections_blocs.parquet" |
| csv_path = output_dir / "elections_blocs.csv" |
| df.to_parquet(parquet_path, index=False) |
| df.to_csv(csv_path, sep=";", index=False) |
|
|
|
|
| def save_commune_event_stats(df: pd.DataFrame, output_dir: Path) -> None: |
| output_dir.mkdir(parents=True, exist_ok=True) |
| parquet_path = output_dir / "commune_event_stats.parquet" |
| csv_path = output_dir / "commune_event_stats.csv" |
| df.to_parquet(parquet_path, index=False) |
| df.to_csv(csv_path, sep=";", index=False) |
|
|
|
|
| def run_full_pipeline( |
| elections_long_path: Path = Path("data/interim/elections_long.parquet"), |
| mapping_path: Path = Path("config/nuances.yaml"), |
| output_dir: Path = Path("data/processed"), |
| national_results_path: Optional[Path] = None, |
| target_communes_path: Path = DEFAULT_COMMUNES_PATH, |
| ) -> pd.DataFrame: |
| df = build_elections_blocs( |
| elections_long_path=elections_long_path, |
| mapping_path=mapping_path, |
| national_results_path=national_results_path, |
| target_communes_path=target_communes_path, |
| ) |
| save_processed(df, output_dir) |
| elections_long = load_elections_long(elections_long_path) |
| target_communes = load_target_communes(target_communes_path) |
| commune_stats = compute_commune_event_stats(elections_long, target_communes) |
| save_commune_event_stats(commune_stats, output_dir) |
| return df |
|
|
|
|
| __all__ = [ |
| "build_elections_blocs", |
| "run_full_pipeline", |
| "save_processed", |
| "normalize_bloc", |
| "load_target_communes", |
| "compute_commune_event_stats", |
| "save_commune_event_stats", |
| ] |
|
|