| """ |
| This module contains the client classes for retrieving stock data from various sources. |
| """ |
|
|
| import time |
| import logging |
|
|
| import requests |
| from bs4 import BeautifulSoup |
|
|
| from django.conf import settings |
| from core.models import Stock |
| from data_pipeline.interfaces.api_client import DataClient |
| from core.constants import MONEYCONTROL_TOPSTOCKS_URL |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class StockRankings(DataClient): |
| """ """ |
|
|
| model = Stock |
| api_url = MONEYCONTROL_TOPSTOCKS_URL |
|
|
| def __init__(self) -> None: |
| self.api_response = None |
| self.transformed_data = None |
|
|
| def extract(self) -> None: |
|
|
| logger.info("Fetching data from %s", self.api_url) |
| with requests.Session() as session: |
| try: |
| response = session.get(self.api_url) |
| response.raise_for_status() |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error fetching data from {self.api_url}: {e}") |
| return |
|
|
| soup = BeautifulSoup(response.text, "html.parser") |
|
|
| |
| table = soup.find("table", {"id": "indicesTable"}) |
| if not table: |
| logger.warning( |
| "Table with id 'indicesTable' not found for stock during scraping ranks." |
| ) |
| raise Exception( |
| "Table with id 'indicesTable' not found for stock during scraping ranks." |
| ) |
|
|
| data = [] |
| rows = table.find_all("tr") |
| logger.info(f"Found {len(rows)} rows in the table") |
| isin_fails = 0 |
| |
| for idx, row in enumerate(rows, start=1): |
| columns = row.find_all("td") |
| isin_number = None |
| if columns: |
| link = columns[0].find("a").get("href") if columns[0] else None |
| if link is not None: |
| try: |
| logger.info(f"Fetching stock details from link {link}") |
| response = session.get(link) |
| time.sleep(2) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, "html.parser") |
| isin_element = soup.select_one( |
| 'li.clearfix span:contains("ISIN") + p' |
| ) |
| if isin_element: |
| isin_number = isin_element.get_text(strip=True) |
| else: |
| isin_fails += 1 |
| logger.warning(f"ISIN not found for link {link}") |
| except requests.exceptions.RequestException as e: |
| logger.exception( |
| f"Error fetching ISIN from link {link}: {e}" |
| ) |
| data.append( |
| { |
| "name": columns[0].get_text(strip=True), |
| "ltp": columns[1].get_text(strip=True), |
| "link": link, |
| "volume": columns[4].get_text(strip=True), |
| "percentage_change": columns[2].get_text(strip=True), |
| "price_change": columns[3].get_text(strip=True), |
| "rank": idx, |
| "isin_number": isin_number, |
| } |
| ) |
| logger.info(f"ISIN not found for {isin_fails} stocks out of {len(rows)}") |
|
|
| self.api_response = data |
|
|
| def load(self) -> None: |
| """ |
| Load the data into the database |
| """ |
| logger.info("Loading ranking data into the database...") |
| |
| Stock.objects.exclude(rank=None).update(rank=None) |
|
|
| for rank, stock_details in enumerate(self.transformed_data, 1): |
| try: |
| stock = Stock.objects.get(isin_number=stock_details["isin_number"]) |
| except Stock.DoesNotExist: |
| logger.warning( |
| f"No matching stock found for ISIN: {stock_details['isin_number']} creating new object..." |
| ) |
| stock = Stock.objects.create(data={"stock_rank": stock_details}) |
|
|
| else: |
| stock.data.update({"stock_rank": stock_details}) |
|
|
| stock.name = stock_details["name"] |
| stock.ltp = stock_details["ltp"] |
| stock.percentage_change = stock_details["percentage_change"] |
| stock.price_change = stock_details["price_change"] |
| stock.link = stock_details["link"] |
| stock.volume = stock_details["volume"] |
| stock.isin_number = stock_details["isin_number"] |
| stock.rank = stock_details["rank"] |
| stock.save() |
|
|
| logger.info( |
| f"Saved {rank=} {stock.name} | {stock_details=} {stock_details['isin_number']=}" |
| ) |
|
|
|
|
| class StockDetails(DataClient): |
| """ |
| Retrieves and updates stock details from the Morningstar API. |
| """ |
|
|
| model = Stock |
| api_url = f"https://{settings.MORNINGSTAR_HOST}/stock/get-detail" |
|
|
| def __init__(self, perf_id: str, isin_number: str) -> None: |
| """ |
| Initializes the StockDetails object. |
| |
| Args: |
| perf_id (str): Performance ID of the stock. |
| isin_number (str): ISIN number of the stock. |
| """ |
| if not perf_id: |
| raise ValueError("Performance ID cannot be empty.") |
| if not isin_number: |
| raise ValueError("ISIN number cannot be empty.") |
|
|
| self.api_response = {"details": None} |
| self.perf_id = perf_id |
| self.isin_number = isin_number |
|
|
| def _request(self) -> requests.Response: |
|
|
| querystring = {"PerformanceId": self.perf_id} |
| return requests.get( |
| self.api_url, |
| headers=settings.MORNINGSTAR_API_HEADERS, |
| params=querystring, |
| ) |
|
|
| def extract(self) -> None: |
| """ |
| Extracts stock details from the Morningstar API. |
| """ |
|
|
| response = self._request() |
|
|
| requests_count = 1 |
| while response.status_code != 200: |
| if response.status_code == 429: |
|
|
| logger.info( |
| f"API response: %s. Waiting for %s secs", |
| response.status_code, |
| 30 * requests_count, |
| ) |
| time.sleep(30 * requests_count) |
| response = self._request() |
| if requests_count > 3: |
| logger.warning( |
| f"API response: %s. Max retries reached", response.status_code |
| ) |
| break |
| requests_count += 1 |
|
|
| else: |
| self.api_response["details"] = response.json() |
| logger.info(f"API response: %s", response.status_code) |
|
|
| def load(self) -> None: |
| """ |
| Loads the retrieved stock details into the database. |
| """ |
|
|
| stock = Stock.objects.filter(isin_number=self.isin_number).first() |
| if stock is None: |
| logger.warning(f"No matching stock found for ISIN: {self.isin_number}") |
| return |
| stock.data = self.transformed_data |
| stock.save() |
| logger.info(f"Successfully stored data for {stock.isin_number}.") |
|
|