| """ |
| Portfolio financial calculations module. |
| |
| Handles: |
| - Fetching historical price data from yfinance |
| - Calculating portfolio weights |
| - Calculating log returns |
| - Computing covariance matrix |
| - Calculating portfolio variance and volatility |
| - Generating variance breakdown for detailed formulas |
| """ |
|
|
| from typing import Dict, List, Tuple, Optional |
| import numpy as np |
| import pandas as pd |
| import yfinance as yf |
| import streamlit as st |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| import logging |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type |
| from ratelimit import limits, sleep_and_retry |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| TRADING_DAYS_PER_YEAR = 252 |
| MIN_DATA_POINTS = 30 |
| MAX_TICKERS = 100 |
|
|
|
|
| |
| @sleep_and_retry |
| @limits(calls=5, period=10) |
| @retry( |
| stop=stop_after_attempt(3), |
| wait=wait_exponential(multiplier=1, min=2, max=60), |
| retry=retry_if_exception_type((ConnectionError, TimeoutError)), |
| reraise=True |
| ) |
| def _fetch_with_retry(ticker_obj, period: str): |
| """ |
| Internal function with retry logic for fetching data. |
| |
| IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid |
| yfinance timezone request that incorrectly handles 429 errors. |
| |
| Args: |
| ticker_obj: yfinance Ticker object |
| period: Time period (e.g., '1y', '6mo', '3mo', '1mo') |
| |
| Returns: |
| Historical data or None |
| """ |
| return ticker_obj.history(period=period) |
|
|
|
|
| def fetch_single_ticker(ticker: str, period: str = "1y") -> Tuple[str, Optional[pd.Series], Optional[str]]: |
| """ |
| Fetch historical data for a single ticker with rate limiting and exponential backoff. |
| |
| This function runs in a separate process for parallel execution. |
| Uses tenacity for exponential backoff and ratelimit for request throttling. |
| |
| IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid |
| yfinance timezone request that incorrectly handles 429 errors. |
| |
| Args: |
| ticker: Ticker symbol |
| period: Time period for historical data (default: '1y') |
| |
| Returns: |
| Tuple of (ticker, price_series, error_message) |
| """ |
| try: |
| |
| ticker_obj = yf.Ticker(ticker) |
|
|
| |
| |
| periods_to_try = [period, '6mo', '3mo', '1mo'] |
|
|
| for idx, try_period in enumerate(periods_to_try, 1): |
| try: |
| hist = _fetch_with_retry(ticker_obj, try_period) |
| if not hist.empty and len(hist) > 0: |
| logger.info(f"✅ {ticker}: Fetched {len(hist)} days (period: {try_period})") |
| return ticker, hist['Close'], None |
| except Exception as e: |
| logger.warning(f"⚠️ {ticker}: Period '{try_period}' failed - {str(e)}") |
| if idx == len(periods_to_try): |
| |
| logger.error(f"❌ {ticker}: All periods exhausted") |
| return ticker, None, f"All periods failed. Last error: {str(e)}" |
| |
|
|
| |
| logger.error(f"❌ {ticker}: No data available after trying all periods") |
| return ticker, None, "No data available after all retry attempts" |
|
|
| except Exception as e: |
| logger.error(f"❌ {ticker}: Fatal error - {str(e)}") |
| return ticker, None, str(e) |
|
|
|
|
| @st.cache_data(ttl=3600) |
| def fetch_historical_data( |
| tickers: Tuple[str, ...], |
| period: str = "1y" |
| ) -> Tuple[Optional[pd.DataFrame], Optional[str]]: |
| """ |
| Fetch historical price data using yfinance. |
| |
| Uses fallback strategy: |
| 1. Try downloading all tickers together |
| 2. If that fails, download one by one and combine |
| |
| Args: |
| tickers: Tuple of ticker symbols (e.g., ('AAPL', 'GOOGL', 'MSFT')) |
| period: Time period for historical data (default: '1y') |
| |
| Returns: |
| Tuple of (prices_dataframe, error_message) |
| - If successful: (DataFrame, None) |
| - If failed: (None, error_message) |
| """ |
| ticker_list = list(tickers) |
|
|
| |
| try: |
| data = yf.download( |
| ticker_list, |
| period=period, |
| progress=False, |
| threads=False, |
| ignore_tz=True |
| ) |
|
|
| if not data.empty: |
| |
| if len(ticker_list) == 1: |
| prices = data[['Adj Close']].copy() |
| prices.columns = ticker_list |
| else: |
| prices = data['Adj Close'].copy() |
|
|
| |
| prices = prices.dropna() |
|
|
| if len(prices) >= MIN_DATA_POINTS: |
| return prices, None |
|
|
| except Exception as e: |
| |
| logger.warning(f"Batch download failed: {str(e)}, trying individual downloads...") |
|
|
| |
| st.info(f"📥 Fetching data for {len(ticker_list)} tickers in parallel...") |
|
|
| individual_prices = {} |
| failed_tickers = [] |
|
|
| |
| max_workers = min(len(ticker_list), 4) |
|
|
| try: |
| |
| with ProcessPoolExecutor(max_workers=max_workers) as executor: |
| |
| future_to_ticker = { |
| executor.submit(fetch_single_ticker, ticker, period): ticker |
| for ticker in ticker_list |
| } |
|
|
| |
| completed = 0 |
| for future in as_completed(future_to_ticker): |
| ticker = future_to_ticker[future] |
| completed += 1 |
|
|
| try: |
| ticker_symbol, price_series, error = future.result(timeout=30) |
|
|
| if price_series is not None and not price_series.empty: |
| individual_prices[ticker_symbol] = price_series |
| st.success(f"✅ {ticker_symbol}: {len(price_series)} days ({completed}/{len(ticker_list)})") |
| else: |
| failed_tickers.append(ticker_symbol) |
| st.error(f"❌ {ticker_symbol}: {error or 'No data'} ({completed}/{len(ticker_list)})") |
|
|
| except Exception as e: |
| failed_tickers.append(ticker) |
| st.error(f"❌ {ticker}: {str(e)} ({completed}/{len(ticker_list)})") |
|
|
| except Exception as e: |
| st.error(f"Parallel processing error: {str(e)}") |
| |
| pass |
|
|
| |
| if not individual_prices: |
| return None, f"Could not fetch data for any tickers. Failed: {', '.join(failed_tickers)}\n\nTip: Try using the JSON editor to enter a smaller portfolio first, or try again in a few minutes." |
|
|
| |
| prices_df = pd.DataFrame(individual_prices) |
|
|
| |
| prices_df = prices_df.dropna() |
|
|
| |
| if len(prices_df) < MIN_DATA_POINTS: |
| return None, f"Insufficient data: only {len(prices_df)} days available (minimum {MIN_DATA_POINTS} required)" |
|
|
| |
| if failed_tickers: |
| st.warning(f"⚠️ Could not fetch data for: {', '.join(failed_tickers)}") |
|
|
| return prices_df, None |
|
|
|
|
| def calculate_log_returns(prices: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Calculate log returns from price data. |
| |
| Formula: r_t = ln(P_t / P_{t-1}) |
| |
| Args: |
| prices: DataFrame of historical prices (columns = tickers, index = dates) |
| |
| Returns: |
| DataFrame of log returns (first row will be dropped due to NaN) |
| """ |
| |
| returns = np.log(prices / prices.shift(1)) |
|
|
| |
| returns = returns.dropna() |
|
|
| return returns |
|
|
|
|
| def calculate_portfolio_weights(amounts: Dict[str, float]) -> Dict[str, float]: |
| """ |
| Calculate portfolio weights from position amounts. |
| |
| Formula: w_i = amount_i / sum(amounts) |
| |
| Args: |
| amounts: Dictionary mapping tickers to dollar amounts |
| |
| Returns: |
| Dictionary mapping tickers to weights (percentages as decimals) |
| """ |
| total = sum(amounts.values()) |
|
|
| if total <= 0: |
| raise ValueError("Total portfolio amount must be positive") |
|
|
| weights = {ticker: amount / total for ticker, amount in amounts.items()} |
|
|
| |
| weight_sum = sum(weights.values()) |
| if not np.isclose(weight_sum, 1.0, atol=1e-6): |
| |
| weights = {ticker: w / weight_sum for ticker, w in weights.items()} |
|
|
| return weights |
|
|
|
|
| def calculate_covariance_matrix(returns: pd.DataFrame, annualized: bool = False) -> pd.DataFrame: |
| """ |
| Calculate covariance matrix of returns. |
| |
| Args: |
| returns: DataFrame of log returns |
| annualized: If True, multiply by TRADING_DAYS_PER_YEAR (default: False) |
| |
| Returns: |
| DataFrame of covariance matrix (tickers × tickers) |
| """ |
| cov_matrix = returns.cov() |
|
|
| if annualized: |
| cov_matrix = cov_matrix * TRADING_DAYS_PER_YEAR |
|
|
| return cov_matrix |
|
|
|
|
| def calculate_portfolio_variance( |
| weights: Dict[str, float], |
| cov_matrix: pd.DataFrame, |
| annualized: bool = True |
| ) -> float: |
| """ |
| Calculate portfolio variance. |
| |
| Formula: σ²_p = w^T × Σ × w |
| |
| Where: |
| - w = vector of weights |
| - Σ = covariance matrix (annualized) |
| |
| Args: |
| weights: Dictionary of portfolio weights |
| cov_matrix: Covariance matrix (daily, will be annualized if annualized=True) |
| annualized: If True, annualize the covariance matrix (default: True) |
| |
| Returns: |
| Portfolio variance (annualized if annualized=True) |
| """ |
| |
| tickers = list(weights.keys()) |
|
|
| |
| w = np.array([weights[ticker] for ticker in tickers]) |
|
|
| |
| cov = cov_matrix.loc[tickers, tickers].values |
|
|
| |
| if annualized: |
| cov = cov * TRADING_DAYS_PER_YEAR |
|
|
| |
| variance = w @ cov @ w |
|
|
| return float(variance) |
|
|
|
|
| def calculate_portfolio_volatility(variance: float) -> float: |
| """ |
| Calculate portfolio volatility (standard deviation). |
| |
| Formula: σ_p = √(σ²_p) |
| |
| Args: |
| variance: Portfolio variance |
| |
| Returns: |
| Portfolio volatility (standard deviation) |
| """ |
| return float(np.sqrt(variance)) |
|
|
|
|
| def get_variance_breakdown( |
| weights: Dict[str, float], |
| cov_matrix: pd.DataFrame, |
| annualized: bool = True |
| ) -> List[Tuple[str, str, float, float, float, float]]: |
| """ |
| Generate detailed breakdown of variance calculation. |
| |
| Returns a list of all variance components for the detailed formula expansion. |
| |
| Args: |
| weights: Dictionary of portfolio weights |
| cov_matrix: Covariance matrix (daily) |
| annualized: If True, use annualized covariance (default: True) |
| |
| Returns: |
| List of tuples: (ticker_i, ticker_j, w_i, w_j, cov_ij, contribution) |
| where contribution = w_i × w_j × cov_ij |
| """ |
| tickers = list(weights.keys()) |
| n = len(tickers) |
|
|
| breakdown = [] |
|
|
| for i, ticker_i in enumerate(tickers): |
| for j, ticker_j in enumerate(tickers): |
| w_i = weights[ticker_i] |
| w_j = weights[ticker_j] |
|
|
| |
| cov_ij = cov_matrix.loc[ticker_i, ticker_j] |
|
|
| |
| if annualized: |
| cov_ij = cov_ij * TRADING_DAYS_PER_YEAR |
|
|
| |
| contribution = w_i * w_j * cov_ij |
|
|
| breakdown.append((ticker_i, ticker_j, w_i, w_j, cov_ij, contribution)) |
|
|
| return breakdown |
|
|
|
|
| def get_portfolio_metrics( |
| amounts: Dict[str, float], |
| period: str = "1y" |
| ) -> Tuple[Optional[Dict], Optional[str]]: |
| """ |
| Calculate all portfolio metrics in one go. |
| |
| This is a convenience function that orchestrates all calculations. |
| |
| Args: |
| amounts: Dictionary of {ticker: amount} |
| period: Historical data period (default: '1y') |
| |
| Returns: |
| Tuple of (metrics_dict, error_message) |
| |
| metrics_dict contains: |
| - weights: Dict[str, float] |
| - prices: pd.DataFrame |
| - returns: pd.DataFrame |
| - cov_matrix: pd.DataFrame |
| - variance: float |
| - volatility: float |
| - variance_breakdown: List[Tuple] |
| """ |
| try: |
| tickers = list(amounts.keys()) |
|
|
| |
| weights = calculate_portfolio_weights(amounts) |
|
|
| |
| prices, error = fetch_historical_data(tuple(tickers), period) |
| if error: |
| return None, error |
|
|
| |
| returns = calculate_log_returns(prices) |
|
|
| |
| cov_matrix = calculate_covariance_matrix(returns, annualized=False) |
|
|
| |
| variance = calculate_portfolio_variance(weights, cov_matrix, annualized=True) |
|
|
| |
| volatility = calculate_portfolio_volatility(variance) |
|
|
| |
| variance_breakdown = get_variance_breakdown(weights, cov_matrix, annualized=True) |
|
|
| metrics = { |
| 'weights': weights, |
| 'prices': prices, |
| 'returns': returns, |
| 'cov_matrix': cov_matrix, |
| 'variance': variance, |
| 'volatility': volatility, |
| 'variance_breakdown': variance_breakdown, |
| } |
|
|
| return metrics, None |
|
|
| except Exception as e: |
| return None, f"Error calculating portfolio metrics: {str(e)}" |
|
|