File size: 2,332 Bytes
aba2f7b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | import os
import sys
import json
import pandas as pd
import pymongo
from src.exception.exception import CustomException
from dotenv import load_dotenv
import certifi
load_dotenv()
MONGO_DB_URL = os.getenv("MONGO_URI")
ca = certifi.where()
class DataExtraction:
def __init__(self):
try:
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL, tlsCAFile=ca)
print("MongoDB connection established.")
except pymongo.errors.ConnectionError as e:
raise CustomException(f"Failed to connect to MongoDB: {str(e)}", sys)
def excel_to_json_chunks(self, file_path, chunk_size=5000):
try:
total_rows = pd.read_excel(file_path, engine='openpyxl').shape[0]
for start_row in range(0, total_rows, chunk_size):
chunk = pd.read_excel(file_path, engine='openpyxl', skiprows=start_row, nrows=chunk_size, header=0)
chunk.reset_index(drop=True, inplace=True)
records = list(json.loads(chunk.T.to_json()).values())
yield records
except FileNotFoundError:
raise CustomException(f"File not found at path: {file_path}", sys)
except Exception as e:
raise CustomException(f"Failed to read Excel file in chunks: {str(e)}", sys)
def insert_data_mongodb(self, records, database, collection):
try:
db = self.mongo_client[database]
coll = db[collection]
result = coll.insert_many(records)
return len(result.inserted_ids)
except Exception as e:
raise CustomException(f"Failed to insert data into MongoDB: {str(e)}", sys)
if __name__ == '__main__':
FILE_PATH = "notebook/data/Online Retail.xlsx"
DATABASE = "MLData"
COLLECTION = "DynamicPricing"
extobj = DataExtraction()
try:
total_inserted = 0
for records in extobj.excel_to_json_chunks(FILE_PATH):
inserted_count = extobj.insert_data_mongodb(records, DATABASE, COLLECTION)
total_inserted += inserted_count
print(f"Inserted {inserted_count} records. Total inserted: {total_inserted}")
print(f"Successfully inserted {total_inserted} records into MongoDB.")
except CustomException as e:
print(f"Error: {e}")
|