| """Data loading, cleaning, and filtering helpers for the BI dashboard.""" |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from io import BytesIO |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Mapping, Optional, Tuple |
|
|
| import pandas as pd |
|
|
| from utils import ( |
| ColumnTypes, |
| PREVIEW_ROWS, |
| coerce_datetime_columns, |
| ensure_unique_columns, |
| infer_column_types, |
| is_supported_file, |
| ) |
|
|
| SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "data" |
| SAMPLE_DESCRIPTIONS = { |
| "train.csv": "Weekly Walmart sales with markdowns and holidays (training set).", |
| "test.csv": "Companion test set without weekly sales labels.", |
| "features.csv": "Store-level features such as markdowns, CPI, unemployment.", |
| "stores.csv": "Store metadata including type and size.", |
| } |
|
|
|
|
| @dataclass(frozen=True) |
| class DatasetBundle: |
| """Container storing the dataset and metadata required by the UI.""" |
|
|
| dataframe: pd.DataFrame |
| column_types: ColumnTypes |
| source_name: str |
|
|
|
|
| def load_dataset(file_obj) -> DatasetBundle: |
| """Load the provided uploaded file into a pandas DataFrame. |
| |
| Parameters |
| ---------- |
| file_obj: |
| File-like object produced by the Gradio upload widget. |
| |
| Returns |
| ------- |
| DatasetBundle |
| Loaded dataset alongside inferred column metadata. |
| |
| Raises |
| ------ |
| ValueError |
| If the file cannot be read or uses an unsupported format. |
| """ |
| if file_obj is None: |
| raise ValueError("Please upload a CSV or Excel file.") |
|
|
| file_name = getattr(file_obj, "name", None) |
| original_name = getattr(file_obj, "orig_name", file_name) |
|
|
| if not original_name or not is_supported_file(original_name): |
| raise ValueError("Unsupported file type. Please upload a CSV or Excel file.") |
|
|
| path_candidate = Path(str(file_name)) if file_name else None |
| dataframe: Optional[pd.DataFrame] = None |
|
|
| try: |
| if path_candidate and path_candidate.exists(): |
| dataframe = _read_from_path(path_candidate, original_name) |
| else: |
| dataframe = _read_from_buffer(file_obj, original_name) |
| except Exception as exc: |
| raise ValueError(f"Unable to load dataset: {exc}") from exc |
|
|
| if dataframe is None: |
| raise ValueError("Failed to load dataset. The file may be empty or corrupted.") |
|
|
| dataframe = ensure_unique_columns(dataframe) |
| dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
| column_types = infer_column_types(dataframe) |
|
|
| |
| column_types = ColumnTypes( |
| numeric=column_types.numeric, |
| categorical=column_types.categorical, |
| datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
| ) |
|
|
| return DatasetBundle( |
| dataframe=dataframe, |
| column_types=column_types, |
| source_name=Path(original_name).name, |
| ) |
|
|
|
|
| def _read_from_path(path: Path, original_name: str) -> pd.DataFrame: |
| """Read a dataset from disk.""" |
| suffix = path.suffix.lower() |
| if suffix == ".csv": |
| return pd.read_csv(path) |
| if suffix in {".xlsx", ".xls"}: |
| return pd.read_excel(path) |
| raise ValueError(f"Unsupported file extension in {original_name}.") |
|
|
|
|
| def _read_from_buffer(file_obj, original_name: str) -> pd.DataFrame: |
| """Read a dataset from an in-memory buffer.""" |
| bytes_data = getattr(file_obj, "read", lambda: b"")() |
| if not bytes_data: |
| raise ValueError(f"The uploaded file '{original_name}' is empty.") |
|
|
| buffer = BytesIO(bytes_data) |
| lowered = original_name.lower() |
| if lowered.endswith(".csv"): |
| return pd.read_csv(buffer) |
| if lowered.endswith((".xlsx", ".xls")): |
| return pd.read_excel(buffer) |
|
|
| raise ValueError("Only CSV and Excel files are supported.") |
|
|
|
|
| def dataset_overview(df: pd.DataFrame) -> Dict[str, object]: |
| """Return basic information about the dataset.""" |
| info = { |
| "Rows": int(df.shape[0]), |
| "Columns": int(df.shape[1]), |
| "Memory Usage (MB)": round(df.memory_usage(deep=True).sum() / (1024**2), 2), |
| } |
| dtypes = pd.DataFrame({"Column": df.columns, "Type": df.dtypes.astype(str)}) |
| return {"info": info, "dtypes": dtypes} |
|
|
|
|
| def dataset_preview(df: pd.DataFrame, rows: int = PREVIEW_ROWS) -> Dict[str, pd.DataFrame]: |
| """Return head and tail previews of the dataset.""" |
| return { |
| "head": df.head(rows), |
| "tail": df.tail(rows), |
| } |
|
|
|
|
| def numeric_summary(df: pd.DataFrame) -> pd.DataFrame: |
| """Compute descriptive statistics for numeric columns.""" |
| numeric_df = df.select_dtypes(include=["number"]) |
| if numeric_df.empty: |
| return pd.DataFrame() |
|
|
| summary = pd.DataFrame( |
| { |
| "count": numeric_df.count(), |
| "mean": numeric_df.mean(), |
| "median": numeric_df.median(), |
| "std": numeric_df.std(), |
| "min": numeric_df.min(), |
| "25%": numeric_df.quantile(0.25), |
| "75%": numeric_df.quantile(0.75), |
| "max": numeric_df.max(), |
| } |
| ) |
|
|
| summary.index.name = "column" |
| return summary.round(3) |
|
|
|
|
| def categorical_summary(df: pd.DataFrame, top_values: int = 5) -> pd.DataFrame: |
| """Compute summary statistics for categorical columns.""" |
| categorical_cols = df.select_dtypes(exclude=["number", "datetime64[ns]", "datetime64[ns, UTC]"]) |
| if categorical_cols.empty: |
| return pd.DataFrame() |
|
|
| rows: List[Dict[str, object]] = [] |
| for column in categorical_cols: |
| series = categorical_cols[column] |
| mode_series = series.mode(dropna=True) |
| mode_value = mode_series.iloc[0] if not mode_series.empty else None |
| counts = series.value_counts(dropna=True).head(top_values) |
| top_repr = ", ".join(f"{idx} ({count})" for idx, count in counts.items()) |
| rows.append( |
| { |
| "column": column, |
| "unique_values": int(series.nunique(dropna=True)), |
| "mode": mode_value, |
| "mode_count": int(counts.iloc[0]) if not counts.empty else 0, |
| f"top_{top_values}": top_repr, |
| } |
| ) |
|
|
| return pd.DataFrame(rows) |
|
|
|
|
| def missing_value_report(df: pd.DataFrame) -> pd.DataFrame: |
| """Return the count and percentage of missing values per column.""" |
| missing_counts = df.isna().sum() |
| if missing_counts.sum() == 0: |
| return pd.DataFrame(columns=["column", "missing_count", "missing_pct"]) |
|
|
| missing_pct = (missing_counts / len(df)) * 100 |
| report = pd.DataFrame( |
| { |
| "column": missing_counts.index, |
| "missing_count": missing_counts.values, |
| "missing_pct": missing_pct.values, |
| } |
| ) |
| return report.sort_values(by="missing_pct", ascending=False).reset_index(drop=True).round({"missing_pct": 2}) |
|
|
|
|
| def correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: |
| """Compute the correlation matrix for numeric columns.""" |
| numeric_df = df.select_dtypes(include=["number"]) |
| if numeric_df.empty or numeric_df.shape[1] < 2: |
| return pd.DataFrame() |
| corr = numeric_df.corr() |
| return corr.round(3) |
|
|
|
|
| def filter_dataframe( |
| df: pd.DataFrame, |
| numeric_filters: Mapping[str, Tuple[Optional[float], Optional[float]]], |
| categorical_filters: Mapping[str, Iterable[str]], |
| date_filters: Mapping[str, Tuple[Optional[str], Optional[str]]], |
| ) -> pd.DataFrame: |
| """Filter the dataset according to the provided filter definitions.""" |
| filtered = df.copy() |
|
|
| for column, bounds in numeric_filters.items(): |
| if column not in filtered.columns or bounds is None: |
| continue |
| lower, upper = bounds |
| series = filtered[column] |
| if lower is not None: |
| filtered = filtered[series >= lower] |
| if upper is not None: |
| filtered = filtered[series <= upper] |
|
|
| for column, values in categorical_filters.items(): |
| if column not in filtered.columns: |
| continue |
| values = list(values) |
| if not values: |
| continue |
| filtered = filtered[filtered[column].isin(values)] |
|
|
| for column, bounds in date_filters.items(): |
| if column not in filtered.columns or bounds is None: |
| continue |
| start, end = bounds |
| series = pd.to_datetime(filtered[column], errors="coerce") |
| if start: |
| filtered = filtered[series >= pd.to_datetime(start)] |
| if end: |
| filtered = filtered[series <= pd.to_datetime(end)] |
|
|
| return filtered |
|
|
|
|
| def filter_metadata(df: pd.DataFrame, column_types: ColumnTypes, categorical_limit: int = 200) -> Dict[str, object]: |
| """Pre-compute useful metadata for rendering filter controls.""" |
| metadata: Dict[str, object] = {"numeric": {}, "categorical": {}, "datetime": {}} |
|
|
| for column in column_types.numeric: |
| series = df[column].dropna() |
| if series.empty: |
| continue |
| metadata["numeric"][column] = { |
| "min": float(series.min()), |
| "max": float(series.max()), |
| } |
|
|
| for column in column_types.categorical: |
| series = df[column].dropna().astype(str) |
| unique_values = series.unique().tolist() |
| if len(unique_values) > categorical_limit: |
| unique_values = unique_values[:categorical_limit] |
| metadata["categorical"][column] = unique_values |
|
|
| for column in column_types.datetime: |
| series = pd.to_datetime(df[column], errors="coerce") |
| series = series.dropna() |
| if series.empty: |
| continue |
| metadata["datetime"][column] = { |
| "min": series.min().date(), |
| "max": series.max().date(), |
| } |
|
|
| return metadata |
|
|
|
|
| def sample_dataset_options() -> Dict[str, str]: |
| """Return available bundled datasets and their descriptions.""" |
| options: Dict[str, str] = {} |
| if not SAMPLE_DATA_DIR.exists(): |
| return options |
|
|
| for path in sorted(SAMPLE_DATA_DIR.iterdir()): |
| if not path.is_file(): |
| continue |
| if path.suffix.lower() not in {".csv", ".xlsx", ".xls"}: |
| continue |
| description = SAMPLE_DESCRIPTIONS.get(path.name, f"Sample dataset sourced from '{path.name}'.") |
| options[path.name] = description |
| return options |
|
|
|
|
| def load_sample_dataset(selection: str) -> DatasetBundle: |
| """Load a dataset bundled inside the local data directory.""" |
| if not selection: |
| raise ValueError("Please select a sample dataset from the dropdown.") |
|
|
| path = SAMPLE_DATA_DIR / selection |
| if not path.exists(): |
| raise ValueError( |
| f"Sample dataset '{selection}' was not found in the 'data/' directory. " |
| "Ensure the file exists and try again." |
| ) |
|
|
| dataframe = _read_from_path(path, selection) |
| dataframe = ensure_unique_columns(dataframe) |
| dataframe, datetime_cols = coerce_datetime_columns(dataframe) |
| column_types = infer_column_types(dataframe) |
| column_types = ColumnTypes( |
| numeric=column_types.numeric, |
| categorical=column_types.categorical, |
| datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))), |
| ) |
|
|
| return DatasetBundle( |
| dataframe=dataframe, |
| column_types=column_types, |
| source_name=selection, |
| ) |
|
|