| """ |
| |
| """ |
| import logging |
|
|
| from django.conf import settings |
|
|
| import requests |
| from bs4 import BeautifulSoup |
| from core.models import MutualFund |
| from core.constants import MONEYCONTROL_TOPFUNDS_URL |
| from data_pipeline.interfaces.api_client import DataClient |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| settings.MORNINGSTAR_API_HEADERS = { |
| "X-RapidAPI-Key": settings.MORNINGSTAR_KEY, |
| "X-RapidAPI-Host": settings.MORNINGSTAR_HOST, |
| } |
|
|
|
|
| class MFList(DataClient): |
| model = MutualFund |
|
|
| |
| api_url = "https://lt.morningstar.com/api/rest.svc/g9vi2nsqjb/security/screener?page=1&pageSize=15000&sortOrder=name%20asc&outputType=json&version=1&languageId=en¤cyId=INR&universeIds=FOIND%24%24ALL%7CFCIND%24%24ALL&securityDataPoints=secId%2ClegalName%2CclosePrice%2CclosePriceDate%2Cyield_M12%2CongoingCharge%2CcategoryName%2CMedalist_RatingNumber%2CstarRatingM255%2CreturnD1%2CreturnW1%2CreturnM1%2CreturnM3%2CreturnM6%2CreturnM0%2CreturnM12%2CreturnM36%2CreturnM60%2CreturnM120%2CmaxFrontEndLoad%2CmanagerTenure%2CmaxDeferredLoad%2CexpenseRatio%2Cisin%2CinitialPurchase%2CfundTnav%2CequityStyleBox%2CbondStyleBox%2CaverageMarketCapital%2CaverageCreditQualityCode%2CeffectiveDuration%2CmorningstarRiskM255%2CalphaM36%2CbetaM36%2Cr2M36%2CstandardDeviationM36%2CsharpeM36%2CtrackRecordExtension&filters=&term=" |
|
|
| def __init__(self) -> None: |
| self.api_response = None |
| self.transformed_data = None |
|
|
| def extract(self): |
|
|
| super().extract() |
|
|
| logger.info("Calling Morningstar API") |
| response = requests.get(self.api_url) |
|
|
| |
| if response.status_code == 200: |
|
|
| |
| self.api_response = response.json() |
| logger.info( |
| f'Morningstar API response received {len(self.api_response["rows"])} funds' |
| ) |
|
|
| else: |
| logger.info("Received status code: {response.status_code}") |
| logger.info(response.json()) |
|
|
| def transform(self): |
| """ |
| Transform the data to the format required by the model |
| """ |
|
|
| super().transform() |
|
|
| self.transformed_data = [ |
| { |
| "fund_name": fund["legalName"], |
| "isin_number": fund.get("isin"), |
| "security_id": fund["secId"], |
| "data": {"list_info": fund}, |
| } |
| for fund in self.api_response["rows"] |
| ] |
|
|
| def load(self): |
| """ |
| Load the data into the database |
| """ |
|
|
| create_count = 0 |
| update_count = 0 |
| for data_dict in self.transformed_data: |
| try: |
| mf = self.model.objects.get(isin_number=data_dict["isin_number"]) |
| mf.data.update(data_dict["data"]) |
| mf.save() |
| update_count += 1 |
| except self.model.DoesNotExist: |
| mf = self.model(**data_dict) |
| mf.save() |
| create_count += 1 |
|
|
| logger.info( |
| "Created %s records; Updated %s records", create_count, update_count |
| ) |
|
|
|
|
| class MFQuote(DataClient): |
| model = MutualFund |
|
|
| |
| api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/get-quote" |
|
|
| def __init__(self, isin) -> None: |
| self.api_response = None |
| self.transformed_data = None |
| self.isin = isin |
| self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
| def extract(self): |
|
|
| logger.info(f"Calling Morningstar Quote API for quotes with isin {self.isin}") |
| querystring = {"securityId": self.mf.security_id} |
|
|
| response = requests.get( |
| self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
| ) |
|
|
| |
| if response.status_code == 200: |
| |
| self.api_response = response.json() |
| else: |
| logger.info(f"API response: %s", response.status_code) |
| response.raise_for_status() |
|
|
| def load(self): |
| self.mf.data.update({"quotes": self.transformed_data}) |
| self.mf.save() |
| logger.info(f"Successfully stored data of quotes for {self.mf.fund_name}") |
|
|
|
|
| class MFHoldings(DataClient): |
| model = MutualFund |
| api_url = f"https://{settings.MORNINGSTAR_HOST}/etf/portfolio/get-holdings" |
|
|
| def __init__(self, isin) -> None: |
| self.api_response = None |
| self.transformed_data = None |
| self.isin = isin |
| self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
| def extract(self): |
|
|
| querystring = {"securityId": self.mf.security_id} |
|
|
| response = requests.get( |
| self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
| ) |
|
|
| |
| if response.status_code == 200: |
| |
| self.api_response = response.json() |
| else: |
| logger.info(f"received status code {response.status_code} for {self.isin}") |
| logger.debug(response.content) |
| response.raise_for_status() |
|
|
| def load(self): |
| self.mf.data.update({"holdings": self.transformed_data}) |
| self.mf.save() |
| logger.info(f"Successfully stored data of holdings for {self.mf.fund_name}") |
|
|
|
|
| class MFRiskMeasures(DataClient): |
| model = MutualFund |
| api_url = ( |
| f"https://{settings.MORNINGSTAR_HOST}/etf/risk/get-risk-volatility-measures" |
| ) |
|
|
| def __init__(self, isin) -> None: |
| self.api_response = None |
| self.isin = isin |
| self.mf = self.model.objects.get(isin_number=self.isin) |
|
|
| def extract(self): |
|
|
| querystring = {"securityId": self.mf.security_id} |
|
|
| response = requests.get( |
| self.api_url, headers=settings.MORNINGSTAR_API_HEADERS, params=querystring |
| ) |
|
|
| |
| if response.status_code == 200: |
| |
| self.api_response = response.json() |
| else: |
| logger.info(response.json()) |
| response.raise_for_status() |
|
|
| def load(self): |
| self.mf.data.update({"risk_measures": self.transformed_data}) |
| self.mf.save() |
| logger.info( |
| f"Successfully stored data of risk measures for {self.mf.fund_name}" |
| ) |
|
|
|
|
| class MFRanking(DataClient): |
|
|
| api_url = MONEYCONTROL_TOPFUNDS_URL |
| model = MutualFund |
|
|
| def __init__(self) -> None: |
| self.api_response = None |
| self.transformed_data = None |
|
|
| def extract(self) -> None: |
| """ |
| Fetches the top mutual funds from MoneyControl website based on their returns and |
| returns a tuple containing lists of fund names, fund types, CRISIL ranks, |
| INF numbers, and AUM data of top mutual funds. |
| """ |
| super().extract() |
|
|
| logger.info("Fetching top mutual funds from MoneyControl website") |
| response = requests.get(self.api_url) |
|
|
| |
| response.raise_for_status() |
|
|
| soup = BeautifulSoup(response.text, "html.parser") |
|
|
| |
| fund_rows = soup.find_all("tr", class_=lambda x: x and "INF" in x) |
| logger.info("Found %s rows", len(fund_rows)) |
|
|
| fund_details = [] |
|
|
| |
| for row in fund_rows: |
| columns = row.find_all("td") |
| fund_name = columns[0].text.strip() |
| fund_type = columns[2].text.strip() |
| crisil_rank = columns[3].text.strip() |
| aum = columns[4].text.strip() |
| isin_number = row["class"][0] |
|
|
| fund_details.append( |
| { |
| "fund_name": fund_name, |
| "fund_type": fund_type, |
| "crisil_rank": crisil_rank, |
| "isin_number": isin_number, |
| "aum": aum, |
| } |
| ) |
|
|
| self.api_response = fund_details |
|
|
| def load(self) -> None: |
| """ |
| Load the data into the database |
| """ |
|
|
| |
| MutualFund.objects.exclude(rank=None).update(rank=None) |
|
|
| for rank, fund_details in enumerate(self.transformed_data, 1): |
| mf = MutualFund.objects.get(isin_number=fund_details["isin_number"]) |
| mf.crisil_rank = ( |
| fund_details["crisil_rank"] if fund_details["crisil_rank"] != "-" else 0 |
| ) |
| mf.rank = rank |
| mf.aum = float(fund_details["aum"].replace(",", "")) |
| mf.save() |
| logger.info( |
| f"Updated {rank=} {mf.fund_name} | {fund_details=} {fund_details['crisil_rank']=} {fund_details['aum']=}" |
| ) |
|
|