| import os |
| import sys |
| import json |
| import pandas as pd |
| import pymongo |
| from src.exception.exception import CustomException |
| from dotenv import load_dotenv |
| import certifi |
|
|
| load_dotenv() |
|
|
| MONGO_DB_URL = os.getenv("MONGO_URI") |
| ca = certifi.where() |
|
|
| class DataExtraction: |
| def __init__(self): |
| try: |
| self.mongo_client = pymongo.MongoClient(MONGO_DB_URL, tlsCAFile=ca) |
| print("MongoDB connection established.") |
| except pymongo.errors.ConnectionError as e: |
| raise CustomException(f"Failed to connect to MongoDB: {str(e)}", sys) |
| |
| def excel_to_json_chunks(self, file_path, chunk_size=5000): |
| try: |
| total_rows = pd.read_excel(file_path, engine='openpyxl').shape[0] |
| |
| for start_row in range(0, total_rows, chunk_size): |
| chunk = pd.read_excel(file_path, engine='openpyxl', skiprows=start_row, nrows=chunk_size, header=0) |
| chunk.reset_index(drop=True, inplace=True) |
| records = list(json.loads(chunk.T.to_json()).values()) |
| yield records |
| except FileNotFoundError: |
| raise CustomException(f"File not found at path: {file_path}", sys) |
| except Exception as e: |
| raise CustomException(f"Failed to read Excel file in chunks: {str(e)}", sys) |
|
|
| def insert_data_mongodb(self, records, database, collection): |
| try: |
| db = self.mongo_client[database] |
| coll = db[collection] |
| result = coll.insert_many(records) |
| return len(result.inserted_ids) |
| except Exception as e: |
| raise CustomException(f"Failed to insert data into MongoDB: {str(e)}", sys) |
|
|
|
|
| if __name__ == '__main__': |
| FILE_PATH = "notebook/data/Online Retail.xlsx" |
| DATABASE = "MLData" |
| COLLECTION = "DynamicPricing" |
|
|
| extobj = DataExtraction() |
| |
| try: |
| total_inserted = 0 |
| for records in extobj.excel_to_json_chunks(FILE_PATH): |
| inserted_count = extobj.insert_data_mongodb(records, DATABASE, COLLECTION) |
| total_inserted += inserted_count |
| print(f"Inserted {inserted_count} records. Total inserted: {total_inserted}") |
| |
| print(f"Successfully inserted {total_inserted} records into MongoDB.") |
| |
| except CustomException as e: |
| print(f"Error: {e}") |
|
|