| """Utility helpers for the Business Intelligence dashboard.""" |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Dict, Iterable, List, Tuple |
|
|
| import pandas as pd |
|
|
|
|
| SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls") |
| """Allowed file extensions for uploads.""" |
|
|
| PREVIEW_ROWS: int = 5 |
| """Default number of rows to display in dataset previews.""" |
|
|
|
|
| @dataclass(frozen=True) |
| class ColumnTypes: |
| """Container describing inferred column groupings.""" |
|
|
| numeric: Tuple[str, ...] |
| categorical: Tuple[str, ...] |
| datetime: Tuple[str, ...] |
|
|
|
|
| def is_supported_file(filename: str | None) -> bool: |
| """Return True when the provided filename uses a supported extension.""" |
| if not filename: |
| return False |
| lowered = filename.lower() |
| return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES) |
|
|
|
|
| def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]: |
| """Attempt to parse object columns as datetimes when enough values can be converted. |
| |
| Parameters |
| ---------- |
| df: |
| Input DataFrame to mutate in-place. |
| threshold: |
| Minimum fraction of non-null values that must successfully convert |
| for the column to be promoted to datetime. |
| |
| Returns |
| ------- |
| tuple |
| Mutated DataFrame and the tuple of datetime column names. |
| """ |
| datetime_cols: List[str] = list( |
| df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns |
| ) |
|
|
| object_cols = df.select_dtypes(include=["object"]).columns |
| for col in object_cols: |
| series = df[col] |
| non_null_ratio = series.notna().mean() |
| if non_null_ratio == 0 or non_null_ratio < threshold: |
| continue |
| converted = pd.to_datetime(series, errors="coerce", utc=False) |
| success_ratio = converted.notna().mean() |
| if success_ratio >= threshold: |
| df[col] = converted |
| datetime_cols.append(col) |
|
|
| return df, tuple(sorted(set(datetime_cols))) |
|
|
|
|
| def infer_column_types(df: pd.DataFrame) -> ColumnTypes: |
| """Infer high-level data types for the provided DataFrame's columns.""" |
| numeric_cols = tuple(df.select_dtypes(include=["number"]).columns) |
| datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) |
| categorical_cols: List[str] = [] |
|
|
| for col in df.columns: |
| if col in numeric_cols or col in datetime_cols: |
| continue |
| categorical_cols.append(col) |
|
|
| return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols) |
|
|
|
|
| def clamp_numeric(value: float, minimum: float, maximum: float) -> float: |
| """Clamp *value* into the closed range [minimum, maximum].""" |
| return max(minimum, min(maximum, value)) |
|
|
|
|
| def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame: |
| """Rename duplicate columns to maintain uniqueness.""" |
| if df.columns.is_unique: |
| return df |
|
|
| new_columns: List[str] = [] |
| seen: Dict[str, int] = {} |
| for col in df.columns: |
| count = seen.get(col, 0) |
| if count == 0: |
| new_columns.append(col) |
| else: |
| new_columns.append(f"{col}_{count}") |
| seen[col] = count + 1 |
|
|
| df = df.copy() |
| df.columns = new_columns |
| return df |
|
|
|
|
| def shorten_text(value: str, max_length: int = 80) -> str: |
| """Truncate long text values for cleaner display.""" |
| if len(value) <= max_length: |
| return value |
| return f"{value[: max_length - 3]}..." |
|
|
|
|
| def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]: |
| """Return a list of *columns* that exist inside *allowed*.""" |
| allowed_set = set(allowed) |
| return [col for col in columns if col in allowed_set] |
|
|