|
|
| from sklearn.impute import SimpleImputer |
| import pandas as pd |
| import numpy as np |
| import json |
|
|
| def data_quality(df: pd.DataFrame): |
| df.drop_duplicates(inplace=True) |
| return df |
|
|
| def standardize_data_types(df: pd.DataFrame) -> pd.DataFrame: |
| for col in df.columns: |
| if df[col].isin([True, False]).all(): |
| continue |
| if df[col].dtype == 'object' and df[col].str.replace('.', '', 1).str.isnumeric().all(): |
| df[col] = pd.to_numeric(df[col], errors='ignore') |
| try: |
| df[col] = pd.to_datetime(df[col], errors='coerce') |
| if df[col].notna().sum() == 0: |
| df[col] = df[col].astype(str) |
| except Exception: |
| pass |
| try: |
| if df[col].apply(lambda x: isinstance(x, str) and x.startswith("[") and x.endswith("]")).all(): |
| df[col] = df[col].apply(json.loads) |
| except Exception: |
| pass |
| if df[col].dtype == 'object' and df[col].dropna().isin(["TRUE", "FALSE"]).all(): |
| df[col] = df[col].map({"TRUE": True, "FALSE": False}) |
| if df[col].dtype == 'object': |
| df[col] = df[col].astype(str) |
| df.fillna("", inplace=True) |
| return df |
|
|
| def handle_missing_data(df: pd.DataFrame) -> pd.DataFrame: |
| numeric_col = df.select_dtypes(include=['number']).columns |
| if not numeric_col.empty: |
| df[numeric_col] = SimpleImputer(strategy='median').fit_transform(df[numeric_col]) |
| categorical_col = df.select_dtypes(include=['object']).columns |
| if not categorical_col.empty: |
| df[categorical_col] = SimpleImputer(strategy='most_frequent').fit_transform(df[categorical_col]) |
| return df |
|
|
| def handle_outliers(df: pd.DataFrame) -> pd.DataFrame: |
| numeric_col = df.select_dtypes(include=['number','int64', 'float64']).columns |
| if not numeric_col.empty: |
| for col in numeric_col: |
| Q1 = df[col].quantile(0.25) |
| Q3 = df[col].quantile(0.75) |
| IQR = Q3 - Q1 |
| lower = Q1 - 1.5 * IQR |
| upper = Q3 + 1.5 * IQR |
| df[col] = df[col].apply(lambda x: lower if x < lower else upper if x > upper else x) |
| return df |
|
|
| def generate_final_report(df: pd.DataFrame, file_path: str): |
| with open(file_path, "w") as file: |
| file.write("FINAL DATA PREPROCESSING REPORT\n") |
| file.write("=" * 50 + "\n\n") |
| missing = df.isnull().sum() |
| for col, count in missing.items(): |
| file.write(f"{col}: {count} missing values\n") |
| file.write(f"Total Duplicate Rows: {df.duplicated().sum()}\n") |
| file.write("Preprocessing Completed Successfully!\n") |
|
|
| def save_cleaned_data(df: pd.DataFrame, file_path: str): |
| df.to_csv(file_path, index=False) |
| return file_path |
|
|