| """Insight generation utilities for the BI dashboard.""" |
|
|
| from __future__ import annotations |
|
|
| from typing import Dict, Iterable, Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from utils import ColumnTypes |
|
|
|
|
| def top_bottom_performers(df: pd.DataFrame, column: str, n: int = 5) -> Dict[str, pd.DataFrame]: |
| """Return the top and bottom performers for a numeric column.""" |
| if column not in df.columns: |
| raise ValueError(f"Column '{column}' not found in dataset.") |
|
|
| numeric_series = pd.to_numeric(df[column], errors="coerce").dropna() |
| if numeric_series.empty: |
| raise ValueError(f"Column '{column}' does not contain numeric data.") |
|
|
| top = numeric_series.nlargest(n) |
| bottom = numeric_series.nsmallest(n) |
| return { |
| "top": top.reset_index(), |
| "bottom": bottom.reset_index(), |
| } |
|
|
|
|
| def detect_trend(df: pd.DataFrame, date_column: str, value_column: str) -> str: |
| """Analyze basic trend between the first and last data points.""" |
| if date_column not in df.columns or value_column not in df.columns: |
| raise ValueError("Selected columns are not present in the dataset.") |
|
|
| working = df[[date_column, value_column]].dropna() |
| working[date_column] = pd.to_datetime(working[date_column], errors="coerce") |
| working = working.dropna() |
|
|
| if working.empty or working[date_column].nunique() < 2: |
| return "Not enough data to evaluate a trend." |
|
|
| working = working.sort_values(by=date_column) |
| first_date = working[date_column].iloc[0] |
| last_date = working[date_column].iloc[-1] |
| first_value = working[value_column].iloc[0] |
| last_value = working[value_column].iloc[-1] |
|
|
| change = last_value - first_value |
| pct_change = (change / first_value * 100) if first_value != 0 else np.nan |
|
|
| if np.isnan(pct_change): |
| direction = "changed" |
| elif pct_change > 0: |
| direction = "increased" |
| elif pct_change < 0: |
| direction = "decreased" |
| else: |
| direction = "remained stable" |
|
|
| pct_text = f" ({pct_change:.2f}%)" if not np.isnan(pct_change) else "" |
| return ( |
| f"Between {first_date.date()} and {last_date.date()}, " |
| f"{value_column} {direction} by {change:.2f}{pct_text}." |
| ) |
|
|
|
|
| def detect_anomalies(df: pd.DataFrame, column: str, z_threshold: float = 3.0, limit: int = 5) -> pd.DataFrame: |
| """Identify potential outliers using a simple z-score approach.""" |
| if column not in df.columns: |
| raise ValueError(f"Column '{column}' not found in dataset.") |
|
|
| series = pd.to_numeric(df[column], errors="coerce") |
| z_scores = ((series - series.mean()) / series.std()).abs() |
| anomalies = df.loc[z_scores > z_threshold, [column]].copy() |
| anomalies["z_score"] = z_scores[z_scores > z_threshold] |
| return anomalies.sort_values(by="z_score", ascending=False).head(limit) |
|
|
|
|
| def get_default_insight_columns(column_types: ColumnTypes) -> Dict[str, Optional[str]]: |
| """Determine default columns to use when auto-generating insights.""" |
| numeric_col = column_types.numeric[0] if column_types.numeric else None |
| date_col = column_types.datetime[0] if column_types.datetime else None |
| return {"numeric": numeric_col, "datetime": date_col} |
|
|