| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| import pandas as pd |
| import numpy as np |
| import tensorflow as tf |
| from yahoo_fin.stock_info import get_data |
| from sklearn.preprocessing import MinMaxScaler |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| from pytorch_forecasting import TemporalFusionTransformer |
| from bs4 import BeautifulSoup |
| import requests |
| from dotenv import load_dotenv |
| import os |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" |
|
|
| import torch |
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
| MODEL_PATH = "lib/20_lstm_model.h5" |
| model = tf.keras.models.load_model(MODEL_PATH) |
|
|
| model_name_news = "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
| tokenizer = AutoTokenizer.from_pretrained(model_name_news) |
| sentiment_model = AutoModelForSequenceClassification.from_pretrained( |
| model_name_news).to(device) |
|
|
| best_model_path = 'lib/tft_pred.ckpt' |
|
|
| best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path).to(device) |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["GET", "POST", "PUT", "DELETE"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| class TickerRequest(BaseModel): |
| ticker: str |
| start_date: str |
| end_date: str |
| interval: str = "1d" |
|
|
| def fetch_and_process_ticker_data(ticker, start_date, end_date, interval="1d"): |
| df = pd.DataFrame() |
| try: |
| temp = get_data(ticker, start_date=start_date, |
| end_date=end_date, index_as_date=True, interval=interval) |
| temp = temp.drop(columns="close") |
| temp["revenue"] = temp["adjclose"] * temp["volume"] |
| temp["daily_profit"] = temp["adjclose"] - temp["open"] |
| df = pd.concat([df, temp], axis=0) |
|
|
| except Exception as error: |
| raise HTTPException( |
| status_code=500, detail=f"Error processing ticker {ticker}: {error}") |
| return df |
|
|
|
|
| def ticker_encoded(df): |
| label_map = {'ATOM': 0, 'HBIO': 1, 'IBEX': 2, 'MYFW': 3, 'NATH': 4} |
|
|
| ticker_encoded = [] |
|
|
| for i in df.iloc(): |
|
|
| ticker_name = i['ticker'] |
|
|
| encoded_ticker = label_map[ticker_name] |
|
|
| ticker_encoded.append(encoded_ticker) |
| df['ticker_encoded'] = ticker_encoded |
|
|
| return df |
|
|
|
|
| def normalize(df): |
| price_scaler = MinMaxScaler() |
| volume_revenue_scaler = MinMaxScaler() |
| profit_scaler = MinMaxScaler() |
|
|
| df[["open", "high", "low", "adjclose"]] = price_scaler.fit_transform( |
| df[["open", "high", "low", "adjclose"]]) |
| df[["volume", "revenue"]] = volume_revenue_scaler.fit_transform( |
| df[["volume", "revenue"]]) |
| df[["daily_profit"]] = profit_scaler.fit_transform(df[["daily_profit"]]) |
|
|
| return df, price_scaler |
|
|
|
|
| def create_sequence(dataset): |
| sequences = [] |
| labels = [] |
| dates = [] |
| stock = [] |
|
|
| df_copy = dataset.drop(columns=["date"]) |
|
|
| start_idx = 0 |
| for stop_idx in range(20, len(dataset)): |
| set_ = set(dataset.iloc[start_idx:stop_idx]["ticker_encoded"].values) |
| target_day_ticker_class = dataset.iloc[stop_idx]["ticker_encoded"] |
|
|
| if len(set_) == 1 and target_day_ticker_class == list(set_)[0]: |
| sequences.append(df_copy.iloc[start_idx:stop_idx].values) |
| labels.append(df_copy.iloc[stop_idx][["open", "adjclose"]]) |
| date_string = dataset.iloc[stop_idx]["date"].strftime('%Y-%m-%d') |
| dates.append(date_string) |
| stock.append(str(dataset.iloc[stop_idx]["ticker_encoded"])) |
|
|
| start_idx += 1 |
|
|
| return np.array(sequences), np.array(labels), dates, stock |
|
|
|
|
| def scaling_predictions(price_scaler, combined_dataset_prediction): |
|
|
| price_scaler.min_ = np.array([price_scaler.min_[0], price_scaler.min_[3]]) |
|
|
| price_scaler.scale_ = np.array( |
| [price_scaler.scale_[0], price_scaler.scale_[3]]) |
|
|
| combined_dataset_prediction_inverse = price_scaler.inverse_transform( |
| combined_dataset_prediction) |
|
|
| return combined_dataset_prediction_inverse |
|
|
|
|
| def storing_predictions(df, dates, stock, combined_dataset_prediction_inverse): |
|
|
| df['pred_open'] = np.nan |
|
|
| df['pred_closing'] = np.nan |
|
|
| for idx, row in df.iterrows(): |
|
|
| current_row_date = row.date.strftime('%Y-%m-%d') |
|
|
| current_row_ticker = str(row.ticker_encoded) |
|
|
| for i in range(len(dates)): |
|
|
| if current_row_date == dates[i] and stock[i] == current_row_ticker: |
|
|
| opening_price = combined_dataset_prediction_inverse[i][0] |
| closing_price = combined_dataset_prediction_inverse[i][1] |
| df.loc[idx, 'pred_open'] = opening_price |
| df.loc[idx, 'pred_closing'] = closing_price |
|
|
| break |
| df = df.dropna(subset=['pred_open', 'pred_closing']).reset_index(drop=True) |
|
|
| return df |
|
|
|
|
| def scrape_news(ticker_name): |
|
|
| columns = ['datatime', 'title', 'source', |
| 'link', 'top_sentiment', 'sentiment_score'] |
| df = pd.DataFrame(columns=columns) |
|
|
| for i in range(1, 3): |
|
|
| url = f'https://markets.businessinsider.com/news/{ticker_name}-stock?p={i}' |
| response = requests.get(url) |
| html = response.text |
| soup = BeautifulSoup(html, 'lxml') |
|
|
| articles = soup.find_all('div', class_='latest-news__story') |
|
|
| for article in articles: |
| datatime = article.find( |
| 'time', class_='latest-news__date').get('datetime') |
|
|
| title = article.find('a', class_='news-link').text |
|
|
| source = article.find('span', class_='latest-news__source').text |
|
|
| link = article.find('a', class_='news-link').get('href') |
|
|
| top_sentiment = '' |
|
|
| sentiment_score = 0 |
|
|
| temp = pd.DataFrame( |
| [[datatime, title, source, link, top_sentiment, sentiment_score]], columns=df.columns) |
|
|
| df = pd.concat([temp, df], axis=0) |
|
|
| return df |
|
|
|
|
| def add_recent_news(main_df, news_df, lookback_days=10): |
|
|
| news_df.drop(columns=['top_sentiment', 'sentiment_score'], inplace=True) |
|
|
| main_df['date'] = pd.to_datetime(main_df['date']) |
| news_df['datatime'] = pd.to_datetime(news_df['datatime']) |
|
|
| news_list = [] |
| last_available_news = '' |
|
|
| for _, row in main_df.iterrows(): |
| current_date = row['date'] |
| current_ticker = row['ticker'] |
| news_articles = '' |
|
|
| for _, news_row in news_df.iterrows(): |
| extracted_date = news_row['datatime'] |
|
|
| if (current_date - extracted_date).days <= lookback_days and extracted_date < current_date: |
| news_articles += news_row['title'] + " " |
|
|
| if not news_articles.strip(): |
| for _, news_row in news_df[::-1].iterrows(): |
| if news_row['datatime'] < current_date: |
| news_articles = news_row['title'] |
| break |
|
|
| last_available_news = news_articles.strip() or last_available_news |
| news_list.append(last_available_news) |
|
|
| main_df['news'] = news_list |
|
|
| return main_df |
|
|
|
|
| def news_sentiment(df): |
|
|
| news_column_name = 'news' |
| texts = df[news_column_name].tolist() |
|
|
| inputs = tokenizer(texts, padding=True, |
| truncation=True, return_tensors="pt") |
| inputs = {key: val.to(device) for key, val in inputs.items()} |
|
|
|
|
| with torch.no_grad(): |
| outputs = sentiment_model(**inputs) |
|
|
| logits = outputs.logits |
| probs = torch.softmax(logits, dim=-1) |
|
|
| labels = ["negative", "neutral", "positive"] |
|
|
| predictions = torch.argmax(probs, dim=-1) |
|
|
| df['predicted_sentiment'] = pd.Series( |
| [labels[pred] for pred in predictions], index=df[df[news_column_name].notna()].index) |
|
|
| sentiment_map = { |
| 'positive': 1, |
| 'neutral': 0, |
| 'negative': -1 |
| } |
|
|
| df['sentiment_score'] = df['predicted_sentiment'].map(sentiment_map) |
|
|
| df = df.drop(columns=['news']) |
|
|
| return df |
|
|
|
|
| def get_tft_predictions(df): |
| for i in range(1, 21): |
| df[f'open_lag_{i}'] = df.groupby('ticker')['open'].shift(i) |
| df[f'adjclose_lag_{i}'] = df.groupby('ticker')['adjclose'].shift(i) |
|
|
| lag_columns = [f'open_lag_{i}' for i in range( |
| 1, 21)] + [f'adjclose_lag_{i}' for i in range(1, 21)] |
|
|
| df.dropna(subset=lag_columns, inplace=True) |
|
|
| predictions = best_tft.predict(df.to(device), mode="quantiles") |
|
|
| return predictions |
|
|
|
|
| @app.get("/fetch-ticker-data/{ticker_name}/{start_date}/{end_date}/{interval}") |
| async def fetch_ticker_data(ticker_name: str, start_date: str, end_date: str, interval: str): |
| try: |
| result_df = fetch_and_process_ticker_data( |
| ticker=ticker_name, |
| start_date=start_date, |
| end_date=end_date, |
| interval=interval |
| ) |
| return result_df.to_dict(orient="records") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/predict-prices/{ticker_name}/{start_date}/{end_date}/{interval}") |
| async def predict_prices(ticker_name: str, start_date: str, end_date: str, interval: str): |
| try: |
| result_df = fetch_and_process_ticker_data( |
| ticker=ticker_name, |
| start_date=start_date, |
| end_date=end_date, |
| interval=interval |
| ) |
|
|
| raw_data = result_df.tail(60) |
| raw_data = raw_data.reset_index() |
|
|
| raw_data.rename(columns={"index": "date"}, inplace=True) |
| raw_data = ticker_encoded(raw_data) |
|
|
| temp_df = raw_data.copy() |
|
|
| normalized_data, scaler = normalize(raw_data) |
| normalized_data = normalized_data.drop(columns=['ticker']) |
|
|
| sequences, _, dates, stock = create_sequence(normalized_data) |
| combined_dataset_prediction = model.predict(sequences) |
| combined_dataset_prediction_inverse = scaling_predictions( |
| scaler, combined_dataset_prediction) |
|
|
| lstm_pred_df = storing_predictions( |
| temp_df, dates, stock, combined_dataset_prediction_inverse) |
| news_df = scrape_news(ticker_name=ticker_name) |
|
|
| combined_with_news_df = add_recent_news(lstm_pred_df, news_df) |
| sentiment_df = news_sentiment(combined_with_news_df) |
|
|
| sentiment_df['time_idx'] = range(1, len(sentiment_df) + 1) |
|
|
| predicted_values = get_tft_predictions(sentiment_df) |
|
|
| final_pred_open_price = predicted_values[0].item() |
| final_pred_closing_price = predicted_values[1].item() |
|
|
| return {"open": final_pred_open_price, 'close': final_pred_closing_price} |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|