| import sys |
| import os |
| import numpy as np |
| import pandas as pd |
| from sklearn.compose import ColumnTransformer |
| from sklearn.preprocessing import OrdinalEncoder, StandardScaler |
| from sklearn.model_selection import train_test_split |
| from src.exception.exception import CustomException |
| from src.logging.logger import logging |
| from src.utils.main_utils.utils import save_numpy_array_data,save_object |
| from src.entity.artifact_entity import ( |
| DataTransformationArtifact, |
| DataValidationArtifact |
| ) |
| from src.entity.config_entity import DataTransformationConfig |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| class DataTransformation: |
| def __init__(self,data_validation_artifact:DataValidationArtifact, |
| data_transformation_config:DataTransformationConfig): |
| try: |
| self.data_validation_artifact:DataValidationArtifact=data_validation_artifact |
| self.data_transformation_config:DataTransformationConfig=data_transformation_config |
| except Exception as e: |
| raise CustomException(e,sys) |
| |
| @staticmethod |
| def read_data(file_path) -> pd.DataFrame: |
| try: |
| return pd.read_csv(file_path) |
| except Exception as e: |
| raise CustomException(e, sys) |
|
|
| def get_data_transformer_object(self, categorical_features, numeric_cols, ordinal_categories): |
| try: |
| logging.info("Creating data transformer object...") |
| |
| |
| ordinal_encoder = OrdinalEncoder( |
| categories=[ordinal_categories[col] for col in categorical_features], |
| dtype=int, |
| handle_unknown='use_encoded_value', |
| unknown_value=-1, |
| |
| ) |
| |
| scaler = StandardScaler() |
| |
| |
| cat_transformers = [("ordinal", ordinal_encoder, categorical_features)] |
| num_transformers = [("scaler", scaler, numeric_cols)] |
| |
| |
| preprocessor = ColumnTransformer( |
| transformers=cat_transformers + num_transformers, |
| remainder='passthrough' |
| ) |
| |
| logging.info("Data transformer object created successfully.") |
| return preprocessor |
| except Exception as e: |
| raise CustomException(e, sys) |
|
|
| def perform_feature_engineering(self, df): |
| |
| df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce') |
| df= df[(df['Quantity'] >= 0) & (df['UnitPrice'] >= 0)] |
| df = df.copy() |
|
|
| df.loc[:, 'hour'] = df['InvoiceDate'].dt.hour |
| df.loc[:, 'weekday'] = df['InvoiceDate'].dt.weekday |
| df.loc[:, 'month'] = df['InvoiceDate'].dt.month |
| df.loc[:, 'week'] = df['InvoiceDate'].dt.isocalendar().week |
| df.loc[:, 'total_sales'] = df['UnitPrice'] * df['Quantity'] |
|
|
| def calculate_peak_periods(data, group_by_column, top_n_percent=20): |
| period_sales = data.groupby(group_by_column)['total_sales'].sum().reset_index() |
| period_sales_sorted = period_sales.sort_values(by='total_sales', ascending=False) |
| num_peak_periods = int((top_n_percent / 100) * len(period_sales_sorted)) |
| peak_periods = period_sales_sorted.head(num_peak_periods)[group_by_column].tolist() |
| return peak_periods |
|
|
| peak_months = calculate_peak_periods(df, 'month') |
| peak_weeks = calculate_peak_periods(df, 'week') |
| peak_weekdays = calculate_peak_periods(df, 'weekday') |
| peak_hours = calculate_peak_periods(df, 'hour') |
| print("Peak Months:", peak_months) |
| print("Peak Weeks:", peak_weeks) |
| print("Peak Weekdays:", peak_weekdays) |
| print("Peak Hours:", peak_hours) |
|
|
| def assign_peak_level_combined(row): |
| peak_count = sum([ |
| row['month'] in peak_months, |
| row['week'] in peak_weeks, |
| row['weekday'] in peak_weekdays, |
| row['hour'] in peak_hours |
| ]) |
| return ['Very Low', 'Low', 'Medium', 'High', 'Very High'][peak_count] |
|
|
| df.loc[:, 'peak_period_level'] = df.apply(assign_peak_level_combined, axis=1) |
| product_demand = df.groupby('StockCode')['Quantity'].sum().reset_index() |
| product_demand.rename(columns={'Quantity': 'total_quantity'}, inplace=True) |
| high_demand_threshold = np.percentile(product_demand['total_quantity'], 80) |
| low_demand_threshold = np.percentile(product_demand['total_quantity'], 20) |
|
|
| def assign_demand_level(total_quantity): |
| if total_quantity > high_demand_threshold: |
| return 'High' |
| elif total_quantity < low_demand_threshold: |
| return 'Low' |
| else: |
| return 'Medium' |
|
|
| product_demand['overall_demand_level'] = product_demand['total_quantity'].apply(assign_demand_level) |
| df = df.merge(product_demand[['StockCode', 'overall_demand_level']], on='StockCode') |
| reference_date = df['InvoiceDate'].max() |
| |
| print(reference_date) |
| rfm = df.groupby('CustomerID').agg({ |
| 'InvoiceDate': lambda x: (reference_date - x.max()).days, |
| 'InvoiceNo': 'count', |
| 'UnitPrice': 'sum' |
| }).reset_index() |
| rfm.columns = ['CustomerID', 'Recency', 'Frequency', 'Monetary'] |
| |
| |
| |
| num_bins = 4 |
| recency_bins = pd.qcut(rfm['Recency'], num_bins, duplicates='drop') |
| frequency_bins = pd.qcut(rfm['Frequency'], num_bins, duplicates='drop') |
| monetary_bins = pd.qcut(rfm['Monetary'], num_bins, duplicates='drop') |
| rfm['RecencySegment'] = pd.qcut(rfm['Recency'], num_bins, labels=[f"Q{i+1}" for i in range(len(recency_bins.cat.categories))], duplicates='drop') |
| rfm['FrequencySegment'] = pd.qcut(rfm['Frequency'], num_bins, labels=[f"Q{i+1}" for i in range(len(frequency_bins.cat.categories))], duplicates='drop') |
| rfm['MonetarySegment'] = pd.qcut(rfm['Monetary'], num_bins, labels=[f"Q{i+1}" for i in range(len(monetary_bins.cat.categories))], duplicates='drop') |
| df = df.merge(rfm[['CustomerID', 'RecencySegment','FrequencySegment','MonetarySegment']], on='CustomerID', how='left') |
| all_countries_stats = df.groupby('Country').agg( |
| total_sales=('total_sales', 'sum'), |
| avg_unit_price=('UnitPrice', 'mean') |
| ).reset_index() |
| countries_excluding_uk_stats = all_countries_stats[all_countries_stats['Country'] != 'United Kingdom'] |
| high_demand_threshold_excluding_uk = np.percentile(countries_excluding_uk_stats['total_sales'], 80) |
| low_demand_threshold_excluding_uk = np.percentile(countries_excluding_uk_stats['total_sales'], 20) |
| high_demand_threshold_including_uk = np.percentile(all_countries_stats['total_sales'], 80) |
| low_demand_threshold_including_uk = np.percentile(all_countries_stats['total_sales'], 20) |
|
|
| def demand_level(country, total_sales): |
| high_threshold, low_threshold = ( |
| (high_demand_threshold_including_uk, low_demand_threshold_including_uk) |
| if country == 'United Kingdom' |
| else (high_demand_threshold_excluding_uk, low_demand_threshold_excluding_uk) |
| ) |
| return 'High' if total_sales > high_threshold else 'Low' if total_sales < low_threshold else 'Medium' |
| all_countries_stats['country_purchasing_power'] = all_countries_stats.apply( |
| lambda x: demand_level(x['Country'], x['total_sales']), axis=1 |
| ) |
| df = df.merge(all_countries_stats[['Country', 'country_purchasing_power']], on='Country', how='left') |
| grouped_sales = df.groupby(['StockCode', 'Country'])['total_sales'].sum().reset_index() |
|
|
| def calculate_thresholds(group): |
| thresholds = { |
| 'very_low_threshold': group['total_sales'].quantile(0.10), |
| 'low_threshold': group['total_sales'].quantile(0.25), |
| 'medium_threshold': group['total_sales'].quantile(0.50), |
| 'high_threshold': group['total_sales'].quantile(0.75), |
| 'very_high_threshold': group['total_sales'].quantile(0.90), |
| } |
| return pd.Series(thresholds) |
|
|
| thresholds = grouped_sales.groupby('StockCode').apply(calculate_thresholds).reset_index() |
| grouped_sales = grouped_sales.merge(thresholds, on='StockCode') |
|
|
| def assign_level(row): |
| if row['total_sales'] <= row['very_low_threshold']: |
| return 'Very Low' |
| elif row['total_sales'] <= row['low_threshold']: |
| return 'Low' |
| elif row['total_sales'] <= row['medium_threshold']: |
| return 'Medium' |
| elif row['total_sales'] <= row['high_threshold']: |
| return 'High' |
| else: |
| return 'Very High' |
|
|
| grouped_sales['sales_level_by_country'] = grouped_sales.apply(assign_level, axis=1) |
| df = df.merge(grouped_sales[['StockCode', 'Country', 'sales_level_by_country']],on=['StockCode','Country'],how='left') |
|
|
| def adjust_unit_price(row): |
| price_adjustment = 0 |
|
|
| if row['peak_period_level'] == 'High': |
| price_adjustment += 0.10 |
| elif row['peak_period_level'] == 'Very High': |
| price_adjustment += 0.20 |
| elif row['peak_period_level'] == 'Low': |
| price_adjustment -= 0.05 |
| elif row['peak_period_level'] == 'Very Low': |
| price_adjustment -= 0.10 |
|
|
| if row['overall_demand_level'] == 'High': |
| price_adjustment += 0.15 |
| elif row['overall_demand_level'] == 'Medium': |
| price_adjustment += 0.05 |
| elif row['overall_demand_level'] == 'Low': |
| price_adjustment -= 0.10 |
|
|
| if row['RecencySegment'] == 'High': |
| price_adjustment += 0.05 |
| elif row['RecencySegment'] == 'Medium': |
| price_adjustment += 0.03 |
| elif row['RecencySegment'] == 'Low' or row['RecencySegment'] == 'Very Low': |
| price_adjustment -= 0.05 |
|
|
| if row['FrequencySegment'] == 'High': |
| price_adjustment += 0.10 |
| elif row['FrequencySegment'] == 'Medium': |
| price_adjustment += 0.05 |
| elif row['FrequencySegment'] == 'Low' or row['FrequencySegment'] == 'Very Low': |
| price_adjustment -= 0.05 |
|
|
| if row['MonetarySegment'] == 'High': |
| price_adjustment += 0.10 |
| elif row['MonetarySegment'] == 'Medium': |
| price_adjustment += 0.05 |
| elif row['MonetarySegment'] == 'Low' or row['MonetarySegment'] == 'Very Low': |
| price_adjustment -= 0.05 |
|
|
| if row['country_purchasing_power'] == 'High': |
| price_adjustment += 0.08 |
| elif row['country_purchasing_power'] == 'Medium': |
| price_adjustment += 0.04 |
| elif row['country_purchasing_power'] == 'Low': |
| price_adjustment -= 0.08 |
|
|
| if row['sales_level_by_country'] == 'Very High': |
| price_adjustment += 0.15 |
| elif row['sales_level_by_country'] == 'High': |
| price_adjustment += 0.12 |
| elif row['sales_level_by_country'] == 'Medium': |
| price_adjustment += 0.05 |
| elif row['sales_level_by_country'] == 'Low': |
| price_adjustment -= 0.10 |
| elif row['sales_level_by_country'] == 'Very Low': |
| price_adjustment -= 0.15 |
|
|
| new_unit_price = row['UnitPrice'] * (1 + price_adjustment) |
| return new_unit_price |
|
|
| df['adjusted_unit_price'] = df.apply(adjust_unit_price, axis=1) |
|
|
| return df |
| |
| def split_train_test(self, X: pd.DataFrame, y: pd.Series, test_size: float = 0.2, random_state: int = 42): |
| logging.info("Splitting dataset into train and test sets...") |
| |
| train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=test_size, random_state=random_state) |
| |
| train_df = pd.concat([train_X, train_y], axis=1) |
| test_df = pd.concat([test_X, test_y], axis=1) |
| |
| logging.info(f"Train shape: {train_df.shape}, Test shape: {test_df.shape}") |
| |
| return train_df, test_df |
|
|
| def initiate_data_transformation(self) -> DataTransformationArtifact: |
| logging.info("Entered initiate_data_transformation method of DataTransformation class") |
| try: |
| logging.info("Starting data transformation") |
| df = self.read_data(self.data_validation_artifact.valid_data_file_path) |
| df = self.perform_feature_engineering(df) |
|
|
| categorical_features = ['peak_period_level', 'overall_demand_level', 'RecencySegment', |
| 'FrequencySegment', 'MonetarySegment', 'country_purchasing_power', 'sales_level_by_country'] |
|
|
| numeric_cols = ['Quantity', 'UnitPrice', 'total_sales'] |
| target_column = 'adjusted_unit_price' |
| X = df.drop(columns=[target_column]) |
| y = df[target_column] |
| train_df, test_df = self.split_train_test(X, y) |
| print("Train columns:", train_df.columns) |
| print("Test columns:", test_df.columns) |
| print("Expected categorical features:", categorical_features) |
| print("Expected numeric features:", numeric_cols) |
|
|
| ordinal_categories = { |
| "peak_period_level": ["Very Low", "Low", "Medium", "High", "Very High"], |
| "overall_demand_level": ["Low", "Medium", "High"], |
| "RecencySegment": ["Very Low", "Low", "Medium", "High"], |
| "FrequencySegment": ["Very Low", "Low", "Medium", "High"], |
| "MonetarySegment": ["Very Low", "Low", "Medium", "High"], |
| "country_purchasing_power": ["Low", "Medium", "High"], |
| "sales_level_by_country": ["Very Low", "Low", "Medium", "High", "Very High"] |
| } |
|
|
| transformer = self.get_data_transformer_object( |
| categorical_features=list(ordinal_categories.keys()), |
| numeric_cols=['Quantity', 'UnitPrice', 'total_sales'], |
| ordinal_categories=ordinal_categories |
| ) |
|
|
| input_feature_train_df = train_df[categorical_features + ['UnitPrice', 'Quantity','total_sales']] |
| target_feature_train_df = train_df[target_column] |
|
|
| input_feature_test_df = test_df[categorical_features + ['UnitPrice', 'Quantity','total_sales']] |
| target_feature_test_df = test_df[target_column] |
|
|
| logging.info("Applying preprocessing object to datasets...") |
| input_feature_train_arr = transformer.fit_transform(input_feature_train_df) |
| input_feature_test_arr = transformer.transform(input_feature_test_df) |
|
|
| |
|
|
| train_arr = np.c_[input_feature_train_arr, np.array(target_feature_train_df)] |
| test_arr = np.c_[input_feature_test_arr, np.array(target_feature_test_df)] |
|
|
| save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr) |
| save_numpy_array_data(self.data_transformation_config.transformed_test_file_path, array=test_arr) |
| save_object(self.data_transformation_config.transformed_object_file_path, transformer) |
| save_object( "final_model/preprocessor.pkl", transformer) |
|
|
| data_transformation_artifact = DataTransformationArtifact( |
| transformed_object_file_path=self.data_transformation_config.transformed_object_file_path, |
| transformed_train_file_path=self.data_transformation_config.transformed_train_file_path, |
| transformed_test_file_path=self.data_transformation_config.transformed_test_file_path, |
| ) |
| return data_transformation_artifact |
|
|
| except Exception as e: |
| raise CustomException(e, sys) |