Dynamic-Pricing / src /components /data_transformation.py
Ayush456's picture
Upload folder using huggingface_hub
aba2f7b verified
import sys
import os
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from src.exception.exception import CustomException # Assuming you have this
from src.logging.logger import logging # Assuming you have this
from src.utils.main_utils.utils import save_numpy_array_data,save_object
from src.entity.artifact_entity import (
DataTransformationArtifact,
DataValidationArtifact
)
from src.entity.config_entity import DataTransformationConfig
# @dataclass
# class DataTransformationConfig:
# transformed_object_file_path: str = os.path.join("artifacts", "preprocessor.pkl")
# transformed_train_file_path: str = os.path.join("artifacts", "transformed_train.npy")
# transformed_test_file_path: str = os.path.join("artifacts", "transformed_test.npy")
# label_encoders_file_path: str = os.path.join("artifacts", "label_encoders.pkl")
# @dataclass
# class DataTransformationArtifact:
# transformed_object_file_path: str
# transformed_train_file_path: str
# transformed_test_file_path: str
# label_encoders_file_path: str
class DataTransformation:
def __init__(self,data_validation_artifact:DataValidationArtifact,
data_transformation_config:DataTransformationConfig):
try:
self.data_validation_artifact:DataValidationArtifact=data_validation_artifact
self.data_transformation_config:DataTransformationConfig=data_transformation_config
except Exception as e:
raise CustomException(e,sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
try:
return pd.read_csv(file_path)
except Exception as e:
raise CustomException(e, sys)
def get_data_transformer_object(self, categorical_features, numeric_cols, ordinal_categories):
try:
logging.info("Creating data transformer object...")
# Define the ordinal encoder with the given category order
ordinal_encoder = OrdinalEncoder(
categories=[ordinal_categories[col] for col in categorical_features],
dtype=int,
handle_unknown='use_encoded_value',
unknown_value=-1, # Fix: Use a float value instead of np.nan
# dtype=np.float64 # Fix: Ensure dtype is float to match NaNs
)
# Standard Scaler for numerical features
scaler = StandardScaler()
# Define transformations
cat_transformers = [("ordinal", ordinal_encoder, categorical_features)]
num_transformers = [("scaler", scaler, numeric_cols)]
# Combine transformers using ColumnTransformer
preprocessor = ColumnTransformer(
transformers=cat_transformers + num_transformers,
remainder='passthrough'
)
logging.info("Data transformer object created successfully.")
return preprocessor
except Exception as e:
raise CustomException(e, sys)
def perform_feature_engineering(self, df):
# All of your complex feature engineering logic goes here
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
df= df[(df['Quantity'] >= 0) & (df['UnitPrice'] >= 0)]
df = df.copy() # Ensure df is not a slice
df.loc[:, 'hour'] = df['InvoiceDate'].dt.hour
df.loc[:, 'weekday'] = df['InvoiceDate'].dt.weekday
df.loc[:, 'month'] = df['InvoiceDate'].dt.month
df.loc[:, 'week'] = df['InvoiceDate'].dt.isocalendar().week
df.loc[:, 'total_sales'] = df['UnitPrice'] * df['Quantity']
def calculate_peak_periods(data, group_by_column, top_n_percent=20):
period_sales = data.groupby(group_by_column)['total_sales'].sum().reset_index()
period_sales_sorted = period_sales.sort_values(by='total_sales', ascending=False)
num_peak_periods = int((top_n_percent / 100) * len(period_sales_sorted))
peak_periods = period_sales_sorted.head(num_peak_periods)[group_by_column].tolist()
return peak_periods
peak_months = calculate_peak_periods(df, 'month')
peak_weeks = calculate_peak_periods(df, 'week')
peak_weekdays = calculate_peak_periods(df, 'weekday')
peak_hours = calculate_peak_periods(df, 'hour')
print("Peak Months:", peak_months)
print("Peak Weeks:", peak_weeks)
print("Peak Weekdays:", peak_weekdays)
print("Peak Hours:", peak_hours)
def assign_peak_level_combined(row):
peak_count = sum([
row['month'] in peak_months,
row['week'] in peak_weeks,
row['weekday'] in peak_weekdays,
row['hour'] in peak_hours
])
return ['Very Low', 'Low', 'Medium', 'High', 'Very High'][peak_count]
df.loc[:, 'peak_period_level'] = df.apply(assign_peak_level_combined, axis=1)
product_demand = df.groupby('StockCode')['Quantity'].sum().reset_index()
product_demand.rename(columns={'Quantity': 'total_quantity'}, inplace=True)
high_demand_threshold = np.percentile(product_demand['total_quantity'], 80)
low_demand_threshold = np.percentile(product_demand['total_quantity'], 20)
def assign_demand_level(total_quantity):
if total_quantity > high_demand_threshold:
return 'High'
elif total_quantity < low_demand_threshold:
return 'Low'
else:
return 'Medium'
product_demand['overall_demand_level'] = product_demand['total_quantity'].apply(assign_demand_level)
df = df.merge(product_demand[['StockCode', 'overall_demand_level']], on='StockCode')
reference_date = df['InvoiceDate'].max()
print(reference_date)
rfm = df.groupby('CustomerID').agg({
'InvoiceDate': lambda x: (reference_date - x.max()).days,
'InvoiceNo': 'count',
'UnitPrice': 'sum'
}).reset_index()
rfm.columns = ['CustomerID', 'Recency', 'Frequency', 'Monetary']
# rfm['RecencySegment'] = pd.qcut(rfm['Recency'], 4, labels=['High', 'Medium', 'Low', 'Very Low'], duplicates='drop')
# rfm['FrequencySegment'] = pd.qcut(rfm['Frequency'], 4, labels=['Very Low', 'Low', 'Medium', 'High'], duplicates='drop')
# rfm['MonetarySegment'] = pd.qcut(rfm['Monetary'], 4, labels=['Very Low', 'Low', 'Medium', 'High'], duplicates='drop')
num_bins = 4
recency_bins = pd.qcut(rfm['Recency'], num_bins, duplicates='drop')
frequency_bins = pd.qcut(rfm['Frequency'], num_bins, duplicates='drop')
monetary_bins = pd.qcut(rfm['Monetary'], num_bins, duplicates='drop')
rfm['RecencySegment'] = pd.qcut(rfm['Recency'], num_bins, labels=[f"Q{i+1}" for i in range(len(recency_bins.cat.categories))], duplicates='drop')
rfm['FrequencySegment'] = pd.qcut(rfm['Frequency'], num_bins, labels=[f"Q{i+1}" for i in range(len(frequency_bins.cat.categories))], duplicates='drop')
rfm['MonetarySegment'] = pd.qcut(rfm['Monetary'], num_bins, labels=[f"Q{i+1}" for i in range(len(monetary_bins.cat.categories))], duplicates='drop')
df = df.merge(rfm[['CustomerID', 'RecencySegment','FrequencySegment','MonetarySegment']], on='CustomerID', how='left')
all_countries_stats = df.groupby('Country').agg(
total_sales=('total_sales', 'sum'),
avg_unit_price=('UnitPrice', 'mean')
).reset_index()
countries_excluding_uk_stats = all_countries_stats[all_countries_stats['Country'] != 'United Kingdom']
high_demand_threshold_excluding_uk = np.percentile(countries_excluding_uk_stats['total_sales'], 80)
low_demand_threshold_excluding_uk = np.percentile(countries_excluding_uk_stats['total_sales'], 20)
high_demand_threshold_including_uk = np.percentile(all_countries_stats['total_sales'], 80)
low_demand_threshold_including_uk = np.percentile(all_countries_stats['total_sales'], 20)
def demand_level(country, total_sales):
high_threshold, low_threshold = (
(high_demand_threshold_including_uk, low_demand_threshold_including_uk)
if country == 'United Kingdom'
else (high_demand_threshold_excluding_uk, low_demand_threshold_excluding_uk)
)
return 'High' if total_sales > high_threshold else 'Low' if total_sales < low_threshold else 'Medium'
all_countries_stats['country_purchasing_power'] = all_countries_stats.apply(
lambda x: demand_level(x['Country'], x['total_sales']), axis=1
)
df = df.merge(all_countries_stats[['Country', 'country_purchasing_power']], on='Country', how='left')
grouped_sales = df.groupby(['StockCode', 'Country'])['total_sales'].sum().reset_index()
def calculate_thresholds(group):
thresholds = {
'very_low_threshold': group['total_sales'].quantile(0.10),
'low_threshold': group['total_sales'].quantile(0.25),
'medium_threshold': group['total_sales'].quantile(0.50),
'high_threshold': group['total_sales'].quantile(0.75),
'very_high_threshold': group['total_sales'].quantile(0.90),
}
return pd.Series(thresholds)
thresholds = grouped_sales.groupby('StockCode').apply(calculate_thresholds).reset_index()
grouped_sales = grouped_sales.merge(thresholds, on='StockCode')
def assign_level(row):
if row['total_sales'] <= row['very_low_threshold']:
return 'Very Low'
elif row['total_sales'] <= row['low_threshold']:
return 'Low'
elif row['total_sales'] <= row['medium_threshold']:
return 'Medium'
elif row['total_sales'] <= row['high_threshold']:
return 'High'
else:
return 'Very High'
grouped_sales['sales_level_by_country'] = grouped_sales.apply(assign_level, axis=1)
df = df.merge(grouped_sales[['StockCode', 'Country', 'sales_level_by_country']],on=['StockCode','Country'],how='left')
def adjust_unit_price(row):
price_adjustment = 0
if row['peak_period_level'] == 'High':
price_adjustment += 0.10
elif row['peak_period_level'] == 'Very High':
price_adjustment += 0.20
elif row['peak_period_level'] == 'Low':
price_adjustment -= 0.05
elif row['peak_period_level'] == 'Very Low':
price_adjustment -= 0.10
if row['overall_demand_level'] == 'High':
price_adjustment += 0.15
elif row['overall_demand_level'] == 'Medium':
price_adjustment += 0.05
elif row['overall_demand_level'] == 'Low':
price_adjustment -= 0.10
if row['RecencySegment'] == 'High':
price_adjustment += 0.05
elif row['RecencySegment'] == 'Medium':
price_adjustment += 0.03
elif row['RecencySegment'] == 'Low' or row['RecencySegment'] == 'Very Low':
price_adjustment -= 0.05
if row['FrequencySegment'] == 'High':
price_adjustment += 0.10
elif row['FrequencySegment'] == 'Medium':
price_adjustment += 0.05
elif row['FrequencySegment'] == 'Low' or row['FrequencySegment'] == 'Very Low':
price_adjustment -= 0.05
if row['MonetarySegment'] == 'High':
price_adjustment += 0.10
elif row['MonetarySegment'] == 'Medium':
price_adjustment += 0.05
elif row['MonetarySegment'] == 'Low' or row['MonetarySegment'] == 'Very Low':
price_adjustment -= 0.05
if row['country_purchasing_power'] == 'High':
price_adjustment += 0.08
elif row['country_purchasing_power'] == 'Medium':
price_adjustment += 0.04
elif row['country_purchasing_power'] == 'Low':
price_adjustment -= 0.08
if row['sales_level_by_country'] == 'Very High':
price_adjustment += 0.15
elif row['sales_level_by_country'] == 'High':
price_adjustment += 0.12
elif row['sales_level_by_country'] == 'Medium':
price_adjustment += 0.05
elif row['sales_level_by_country'] == 'Low':
price_adjustment -= 0.10
elif row['sales_level_by_country'] == 'Very Low':
price_adjustment -= 0.15
new_unit_price = row['UnitPrice'] * (1 + price_adjustment)
return new_unit_price
df['adjusted_unit_price'] = df.apply(adjust_unit_price, axis=1)
return df
def split_train_test(self, X: pd.DataFrame, y: pd.Series, test_size: float = 0.2, random_state: int = 42):
logging.info("Splitting dataset into train and test sets...")
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=test_size, random_state=random_state)
train_df = pd.concat([train_X, train_y], axis=1)
test_df = pd.concat([test_X, test_y], axis=1)
logging.info(f"Train shape: {train_df.shape}, Test shape: {test_df.shape}")
return train_df, test_df
def initiate_data_transformation(self) -> DataTransformationArtifact:
logging.info("Entered initiate_data_transformation method of DataTransformation class")
try:
logging.info("Starting data transformation")
df = self.read_data(self.data_validation_artifact.valid_data_file_path)
df = self.perform_feature_engineering(df)
categorical_features = ['peak_period_level', 'overall_demand_level', 'RecencySegment',
'FrequencySegment', 'MonetarySegment', 'country_purchasing_power', 'sales_level_by_country']
numeric_cols = ['Quantity', 'UnitPrice', 'total_sales']
target_column = 'adjusted_unit_price'
X = df.drop(columns=[target_column]) # Features (all columns except target)
y = df[target_column]
train_df, test_df = self.split_train_test(X, y)
print("Train columns:", train_df.columns)
print("Test columns:", test_df.columns)
print("Expected categorical features:", categorical_features)
print("Expected numeric features:", numeric_cols)
ordinal_categories = {
"peak_period_level": ["Very Low", "Low", "Medium", "High", "Very High"],
"overall_demand_level": ["Low", "Medium", "High"],
"RecencySegment": ["Very Low", "Low", "Medium", "High"],
"FrequencySegment": ["Very Low", "Low", "Medium", "High"],
"MonetarySegment": ["Very Low", "Low", "Medium", "High"],
"country_purchasing_power": ["Low", "Medium", "High"],
"sales_level_by_country": ["Very Low", "Low", "Medium", "High", "Very High"]
}
transformer = self.get_data_transformer_object(
categorical_features=list(ordinal_categories.keys()),
numeric_cols=['Quantity', 'UnitPrice', 'total_sales'],
ordinal_categories=ordinal_categories
)
input_feature_train_df = train_df[categorical_features + ['UnitPrice', 'Quantity','total_sales']]
target_feature_train_df = train_df[target_column]
input_feature_test_df = test_df[categorical_features + ['UnitPrice', 'Quantity','total_sales']]
target_feature_test_df = test_df[target_column]
logging.info("Applying preprocessing object to datasets...")
input_feature_train_arr = transformer.fit_transform(input_feature_train_df)
input_feature_test_arr = transformer.transform(input_feature_test_df)
train_arr = np.c_[input_feature_train_arr, np.array(target_feature_train_df)]
test_arr = np.c_[input_feature_test_arr, np.array(target_feature_test_df)]
save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr)
save_numpy_array_data(self.data_transformation_config.transformed_test_file_path, array=test_arr)
save_object(self.data_transformation_config.transformed_object_file_path, transformer)
save_object( "final_model/preprocessor.pkl", transformer)
data_transformation_artifact = DataTransformationArtifact(
transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
transformed_test_file_path=self.data_transformation_config.transformed_test_file_path,
)
return data_transformation_artifact
except Exception as e:
raise CustomException(e, sys)