| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| import warnings |
|
|
| from .constants import CANDIDATE_CATEGORIES |
| from .pipeline import normalize_bloc |
|
|
| try: |
| from numpy import RankWarning as NP_RANK_WARNING |
| except Exception: |
| class NP_RANK_WARNING(UserWarning): |
| pass |
|
|
|
|
| @dataclass |
| class PredictionResult: |
| category: str |
| predicted_share: float |
| predicted_count: int |
|
|
|
|
| @dataclass |
| class PredictionSummary: |
| bloc_predictions: list[PredictionResult] |
| inscrits: Optional[int] |
| votants: Optional[int] |
| blancs: Optional[int] |
| nuls: Optional[int] |
| abstention: Optional[int] |
| exprimes: Optional[int] |
|
|
|
|
| DISPLAY_BLOC_ORDER = [ |
| "extreme_gauche", |
| "gauche_dure", |
| "gauche_modere", |
| "centre", |
| "droite_modere", |
| "droite_dure", |
| "extreme_droite", |
| ] |
| EXTRA_CATEGORIES = ["blancs", "nuls", "abstention"] |
|
|
|
|
| def _clip01(value: float) -> float: |
| return float(min(1.0, max(0.0, value))) |
|
|
|
|
| def _last_share(df: pd.DataFrame, bloc: str, *, election: Optional[str] = None, year: Optional[int] = None) -> Optional[float]: |
| subset = df[df["bloc"] == bloc] |
| if election: |
| subset = subset[subset["type_scrutin"] == election] |
| if year is not None: |
| subset = subset[subset["annee"] == year] |
| if subset.empty: |
| return None |
| valid = subset.sort_values("date_scrutin")["part_bloc"].dropna() |
| if valid.empty: |
| return None |
| return valid.iloc[-1] |
|
|
|
|
| def _last_value(series: pd.Series) -> Optional[float]: |
| series = pd.to_numeric(series, errors="coerce").dropna() |
| if series.empty: |
| return None |
| return float(series.iloc[-1]) |
|
|
|
|
| def _project_share(series: pd.Series, years: pd.Series, target_year: int) -> Optional[float]: |
| df = pd.DataFrame({"value": pd.to_numeric(series, errors="coerce"), "year": pd.to_numeric(years, errors="coerce")}) |
| df = df.dropna() |
| if df.empty: |
| return None |
| if len(df["year"].unique()) >= 2 and len(df) >= 2: |
| |
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore", category=NP_RANK_WARNING) |
| try: |
| slope, intercept = np.polyfit(df["year"], df["value"], 1) |
| projected = slope * target_year + intercept |
| except Exception: |
| projected = df["value"].iloc[-1] |
| else: |
| projected = df["value"].iloc[-1] |
| return _clip01(float(projected)) |
|
|
|
|
| def _project_rate( |
| series: pd.Series, |
| years: pd.Series, |
| target_year: int, |
| *, |
| min_points_trend: int = 3, |
| clamp_to_observed: bool = True, |
| ) -> Optional[float]: |
| df = pd.DataFrame( |
| {"value": pd.to_numeric(series, errors="coerce"), "year": pd.to_numeric(years, errors="coerce")} |
| ).dropna() |
| if df.empty: |
| return None |
| values = df["value"].to_numpy() |
| years_arr = df["year"].to_numpy() |
| if len(set(years_arr)) >= min_points_trend and len(df) >= min_points_trend: |
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore", category=NP_RANK_WARNING) |
| try: |
| slope, intercept = np.polyfit(years_arr, values, 1) |
| projected = slope * target_year + intercept |
| except Exception: |
| projected = values[-1] |
| else: |
| projected = values[-1] |
| if clamp_to_observed and len(values): |
| projected = min(max(projected, float(np.nanmin(values))), float(np.nanmax(values))) |
| return _clip01(float(projected)) |
|
|
|
|
| def _allocate_counts(shares: list[float], total: int) -> list[int]: |
| if total <= 0 or not shares: |
| return [0 for _ in shares] |
| arr = np.clip(np.asarray(shares, dtype=float), 0, None) |
| if arr.sum() == 0: |
| return [0 for _ in shares] |
| arr = arr / arr.sum() |
| raw = arr * total |
| floors = np.floor(raw) |
| remainder = int(total - floors.sum()) |
| if remainder > 0: |
| order = np.argsort(-(raw - floors)) |
| for idx in order[:remainder]: |
| floors[idx] += 1 |
| return floors.astype(int).tolist() |
|
|
|
|
| def compute_predictions( |
| history: pd.DataFrame, |
| *, |
| target_election: str = "municipales", |
| target_year: int = 2026, |
| inscrits_override: Optional[float] = None, |
| ) -> PredictionSummary: |
| if history.empty: |
| return PredictionSummary([], None, None, None, None, None, None) |
|
|
| df = history.copy() |
| target_election = str(target_election).strip().lower() |
| df["bloc"] = df["bloc"].apply(normalize_bloc) |
| if "type_scrutin" in df.columns: |
| df["type_scrutin"] = df["type_scrutin"].astype(str).str.strip().str.lower() |
| |
| for col in ["voix_bloc", "exprimes", "inscrits", "votants", "blancs", "nuls"]: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| for col in ["inscrits", "votants", "blancs", "nuls"]: |
| if col not in df.columns: |
| df[col] = np.nan |
| if "exprimes" in df.columns: |
| sum_voix = df.groupby(["code_bv", "date_scrutin"])["voix_bloc"].transform("sum") |
| df["exprimes"] = df["exprimes"].fillna(sum_voix) |
| df.loc[df["exprimes"] == 0, "exprimes"] = sum_voix |
| if "part_bloc" not in df.columns or df["part_bloc"].isna().all(): |
| df["part_bloc"] = df["voix_bloc"] / df["exprimes"] |
| df["part_bloc"] = pd.to_numeric(df["part_bloc"], errors="coerce").clip(upper=1) |
| df = df.dropna(subset=["bloc"]) |
|
|
| bloc_order = [b for b in DISPLAY_BLOC_ORDER if b in CANDIDATE_CATEGORIES] |
| raw_shares: dict[str, float] = {} |
| for bloc in bloc_order: |
| bloc_hist = df[df["bloc"] == bloc].sort_values("date_scrutin") |
| last_overall = _last_share(bloc_hist, bloc) |
| base_series = bloc_hist["part_bloc"] |
| base_years = bloc_hist["annee"] |
| if not bloc_hist.empty and target_election in bloc_hist["type_scrutin"].values: |
| base_series = bloc_hist[bloc_hist["type_scrutin"] == target_election]["part_bloc"] |
| base_years = bloc_hist[bloc_hist["type_scrutin"] == target_election]["annee"] |
|
|
| projected = _project_share(base_series, base_years, target_year) |
| if projected is None and last_overall is not None: |
| projected = last_overall |
| predicted = _clip01(projected or 0.0) |
| raw_shares[bloc] = predicted |
|
|
| share_values = np.array([raw_shares.get(b, 0.0) for b in bloc_order], dtype=float) |
| share_sum = share_values.sum() |
| if share_sum > 0: |
| share_values = share_values / share_sum |
| else: |
| share_values = np.zeros_like(share_values) |
|
|
| event_cols = [col for col in ["code_bv", "date_scrutin", "type_scrutin", "tour", "annee"] if col in df.columns] |
| event_df = df.groupby(event_cols, as_index=False).agg( |
| inscrits=("inscrits", "max"), |
| votants=("votants", "max"), |
| blancs=("blancs", "max"), |
| nuls=("nuls", "max"), |
| ) |
| if "date_scrutin" in event_df.columns: |
| event_df = event_df.sort_values("date_scrutin") |
| if "type_scrutin" not in event_df.columns: |
| event_df["type_scrutin"] = "" |
| if "annee" not in event_df.columns: |
| if "date_scrutin" in event_df.columns: |
| event_df["annee"] = pd.to_datetime(event_df["date_scrutin"], errors="coerce").dt.year |
| else: |
| event_df["annee"] = np.nan |
| base_inscrits = event_df["inscrits"].replace(0, pd.NA) |
| event_df["taux_participation"] = event_df["votants"] / base_inscrits |
| event_df["taux_blancs"] = event_df["blancs"] / base_inscrits |
| event_df["taux_nuls"] = event_df["nuls"] / base_inscrits |
|
|
| def _select_series(col: str) -> tuple[pd.Series, pd.Series]: |
| scoped = event_df |
| if "tour" in event_df.columns: |
| round1 = event_df[event_df["tour"] == 1] |
| if not round1.empty: |
| scoped = round1 |
| if not scoped.empty and target_election in scoped["type_scrutin"].values: |
| mask = scoped["type_scrutin"] == target_election |
| return scoped.loc[mask, col], scoped.loc[mask, "annee"] |
| return scoped[col], scoped["annee"] |
|
|
| turnout_series, turnout_years = _select_series("taux_participation") |
| blancs_series, blancs_years = _select_series("taux_blancs") |
| nuls_series, nuls_years = _select_series("taux_nuls") |
|
|
| taux_participation = _project_rate(turnout_series, turnout_years, target_year) |
| taux_blancs = _project_rate(blancs_series, blancs_years, target_year) |
| taux_nuls = _project_rate(nuls_series, nuls_years, target_year) |
|
|
| inscrits_used = None |
| if inscrits_override is not None: |
| try: |
| value = float(inscrits_override) |
| if value > 0: |
| inscrits_used = value |
| except (TypeError, ValueError): |
| inscrits_used = None |
| if inscrits_used is None: |
| inscrits_used = _last_value(event_df["inscrits"]) |
| if inscrits_used is None: |
| return PredictionSummary([], None, None, None, None, None, None) |
|
|
| if taux_participation is None: |
| taux_participation = 0.0 |
| if taux_blancs is None: |
| taux_blancs = 0.0 |
| if taux_nuls is None: |
| taux_nuls = 0.0 |
|
|
| if taux_blancs + taux_nuls > taux_participation and (taux_blancs + taux_nuls) > 0: |
| scale = taux_participation / (taux_blancs + taux_nuls) |
| taux_blancs *= scale |
| taux_nuls *= scale |
|
|
| inscrits_total = int(round(inscrits_used)) |
| votants_total = int(round(inscrits_total * taux_participation)) |
| blancs_total = int(round(inscrits_total * taux_blancs)) |
| nuls_total = int(round(inscrits_total * taux_nuls)) |
| if blancs_total + nuls_total > votants_total and (blancs_total + nuls_total) > 0: |
| scale = votants_total / (blancs_total + nuls_total) |
| blancs_total = int(round(blancs_total * scale)) |
| nuls_total = int(round(nuls_total * scale)) |
| exprimes_total = max(0, votants_total - blancs_total - nuls_total) |
| abstention_total = max(0, inscrits_total - votants_total) |
|
|
| bloc_counts = _allocate_counts(share_values.tolist(), exprimes_total) |
| bloc_predictions: list[PredictionResult] = [] |
| for bloc, share, count in zip(bloc_order, share_values.tolist(), bloc_counts): |
| bloc_predictions.append( |
| PredictionResult( |
| category=bloc, |
| predicted_share=float(share), |
| predicted_count=int(count), |
| ) |
| ) |
|
|
| return PredictionSummary( |
| bloc_predictions=bloc_predictions, |
| inscrits=inscrits_total, |
| votants=votants_total, |
| blancs=blancs_total, |
| nuls=nuls_total, |
| abstention=abstention_total, |
| exprimes=exprimes_total, |
| ) |
|
|
|
|
| def predictions_as_dataframe(summary: PredictionSummary) -> pd.DataFrame: |
| if summary is None or not summary.bloc_predictions: |
| return pd.DataFrame(columns=["categorie", "nombre"]) |
| rows = [] |
| pred_map = {item.category: item for item in summary.bloc_predictions} |
| for bloc in [b for b in DISPLAY_BLOC_ORDER if b in pred_map]: |
| item = pred_map[bloc] |
| rows.append({"categorie": bloc, "nombre": int(item.predicted_count)}) |
| if summary.blancs is not None: |
| rows.append({"categorie": "blancs", "nombre": int(summary.blancs)}) |
| if summary.nuls is not None: |
| rows.append({"categorie": "nuls", "nombre": int(summary.nuls)}) |
| if summary.abstention is not None: |
| rows.append({"categorie": "abstention", "nombre": int(summary.abstention)}) |
| return pd.DataFrame(rows) |
|
|
|
|
| __all__ = ["compute_predictions", "predictions_as_dataframe", "PredictionResult", "PredictionSummary"] |
|
|