""" Portfolio financial calculations module. Handles: - Fetching historical price data from yfinance - Calculating portfolio weights - Calculating log returns - Computing covariance matrix - Calculating portfolio variance and volatility - Generating variance breakdown for detailed formulas """ from typing import Dict, List, Tuple, Optional import numpy as np import pandas as pd import yfinance as yf import streamlit as st from concurrent.futures import ProcessPoolExecutor, as_completed import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from ratelimit import limits, sleep_and_retry # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants TRADING_DAYS_PER_YEAR = 252 MIN_DATA_POINTS = 30 MAX_TICKERS = 100 # Rate limiter: Max 5 calls per 10 seconds per ticker (conservative for Yahoo Finance) @sleep_and_retry @limits(calls=5, period=10) @retry( stop=stop_after_attempt(3), # Max 3 retries wait=wait_exponential(multiplier=1, min=2, max=60), # Exponential backoff: 2s, 4s, 8s, ... max 60s retry=retry_if_exception_type((ConnectionError, TimeoutError)), # Only retry on network errors reraise=True ) def _fetch_with_retry(ticker_obj, period: str): """ Internal function with retry logic for fetching data. IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid yfinance timezone request that incorrectly handles 429 errors. Args: ticker_obj: yfinance Ticker object period: Time period (e.g., '1y', '6mo', '3mo', '1mo') Returns: Historical data or None """ return ticker_obj.history(period=period) def fetch_single_ticker(ticker: str, period: str = "1y") -> Tuple[str, Optional[pd.Series], Optional[str]]: """ Fetch historical data for a single ticker with rate limiting and exponential backoff. This function runs in a separate process for parallel execution. Uses tenacity for exponential backoff and ratelimit for request throttling. IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid yfinance timezone request that incorrectly handles 429 errors. Args: ticker: Ticker symbol period: Time period for historical data (default: '1y') Returns: Tuple of (ticker, price_series, error_message) """ try: # Create fresh Ticker object in this process ticker_obj = yf.Ticker(ticker) # Fallback periods to try (from longest to shortest) # Avoid 'max' as it may trigger timezone request periods_to_try = [period, '6mo', '3mo', '1mo'] for idx, try_period in enumerate(periods_to_try, 1): try: hist = _fetch_with_retry(ticker_obj, try_period) if not hist.empty and len(hist) > 0: logger.info(f"✅ {ticker}: Fetched {len(hist)} days (period: {try_period})") return ticker, hist['Close'], None except Exception as e: logger.warning(f"⚠️ {ticker}: Period '{try_period}' failed - {str(e)}") if idx == len(periods_to_try): # Last attempt failed logger.error(f"❌ {ticker}: All periods exhausted") return ticker, None, f"All periods failed. Last error: {str(e)}" # Continue to next period # All periods failed logger.error(f"❌ {ticker}: No data available after trying all periods") return ticker, None, "No data available after all retry attempts" except Exception as e: logger.error(f"❌ {ticker}: Fatal error - {str(e)}") return ticker, None, str(e) @st.cache_data(ttl=3600) # Cache for 1 hour def fetch_historical_data( tickers: Tuple[str, ...], # Tuple for hashability (caching requirement) period: str = "1y" ) -> Tuple[Optional[pd.DataFrame], Optional[str]]: """ Fetch historical price data using yfinance. Uses fallback strategy: 1. Try downloading all tickers together 2. If that fails, download one by one and combine Args: tickers: Tuple of ticker symbols (e.g., ('AAPL', 'GOOGL', 'MSFT')) period: Time period for historical data (default: '1y') Returns: Tuple of (prices_dataframe, error_message) - If successful: (DataFrame, None) - If failed: (None, error_message) """ ticker_list = list(tickers) # Strategy 1: Try downloading all tickers together try: data = yf.download( ticker_list, period=period, progress=False, threads=False, # Disable threading for better reliability ignore_tz=True ) if not data.empty: # Extract 'Adj Close' prices if len(ticker_list) == 1: prices = data[['Adj Close']].copy() prices.columns = ticker_list else: prices = data['Adj Close'].copy() # Drop rows with NaN values prices = prices.dropna() if len(prices) >= MIN_DATA_POINTS: return prices, None except Exception as e: # Log the error but continue to fallback strategy logger.warning(f"Batch download failed: {str(e)}, trying individual downloads...") # Strategy 2: Parallel download using ProcessPoolExecutor st.info(f"📥 Fetching data for {len(ticker_list)} tickers in parallel...") individual_prices = {} failed_tickers = [] # Determine number of workers (max 4 to avoid overwhelming the API) max_workers = min(len(ticker_list), 4) try: # Use ProcessPoolExecutor for true parallel execution with ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all ticker fetch jobs future_to_ticker = { executor.submit(fetch_single_ticker, ticker, period): ticker for ticker in ticker_list } # Process results as they complete completed = 0 for future in as_completed(future_to_ticker): ticker = future_to_ticker[future] completed += 1 try: ticker_symbol, price_series, error = future.result(timeout=30) if price_series is not None and not price_series.empty: individual_prices[ticker_symbol] = price_series st.success(f"✅ {ticker_symbol}: {len(price_series)} days ({completed}/{len(ticker_list)})") else: failed_tickers.append(ticker_symbol) st.error(f"❌ {ticker_symbol}: {error or 'No data'} ({completed}/{len(ticker_list)})") except Exception as e: failed_tickers.append(ticker) st.error(f"❌ {ticker}: {str(e)} ({completed}/{len(ticker_list)})") except Exception as e: st.error(f"Parallel processing error: {str(e)}") # Fall back to empty result pass # Check if we got any data if not individual_prices: return None, f"Could not fetch data for any tickers. Failed: {', '.join(failed_tickers)}\n\nTip: Try using the JSON editor to enter a smaller portfolio first, or try again in a few minutes." # Combine all individual price series prices_df = pd.DataFrame(individual_prices) # Drop rows with NaN values prices_df = prices_df.dropna() # Check we have enough data points if len(prices_df) < MIN_DATA_POINTS: return None, f"Insufficient data: only {len(prices_df)} days available (minimum {MIN_DATA_POINTS} required)" # Warn about failed tickers if failed_tickers: st.warning(f"⚠️ Could not fetch data for: {', '.join(failed_tickers)}") return prices_df, None def calculate_log_returns(prices: pd.DataFrame) -> pd.DataFrame: """ Calculate log returns from price data. Formula: r_t = ln(P_t / P_{t-1}) Args: prices: DataFrame of historical prices (columns = tickers, index = dates) Returns: DataFrame of log returns (first row will be dropped due to NaN) """ # Calculate log returns: ln(price_t / price_{t-1}) returns = np.log(prices / prices.shift(1)) # Drop the first row (NaN) returns = returns.dropna() return returns def calculate_portfolio_weights(amounts: Dict[str, float]) -> Dict[str, float]: """ Calculate portfolio weights from position amounts. Formula: w_i = amount_i / sum(amounts) Args: amounts: Dictionary mapping tickers to dollar amounts Returns: Dictionary mapping tickers to weights (percentages as decimals) """ total = sum(amounts.values()) if total <= 0: raise ValueError("Total portfolio amount must be positive") weights = {ticker: amount / total for ticker, amount in amounts.items()} # Validate weights sum to 1.0 (accounting for floating point errors) weight_sum = sum(weights.values()) if not np.isclose(weight_sum, 1.0, atol=1e-6): # Normalize to ensure exact sum = 1.0 weights = {ticker: w / weight_sum for ticker, w in weights.items()} return weights def calculate_covariance_matrix(returns: pd.DataFrame, annualized: bool = False) -> pd.DataFrame: """ Calculate covariance matrix of returns. Args: returns: DataFrame of log returns annualized: If True, multiply by TRADING_DAYS_PER_YEAR (default: False) Returns: DataFrame of covariance matrix (tickers × tickers) """ cov_matrix = returns.cov() if annualized: cov_matrix = cov_matrix * TRADING_DAYS_PER_YEAR return cov_matrix def calculate_portfolio_variance( weights: Dict[str, float], cov_matrix: pd.DataFrame, annualized: bool = True ) -> float: """ Calculate portfolio variance. Formula: σ²_p = w^T × Σ × w Where: - w = vector of weights - Σ = covariance matrix (annualized) Args: weights: Dictionary of portfolio weights cov_matrix: Covariance matrix (daily, will be annualized if annualized=True) annualized: If True, annualize the covariance matrix (default: True) Returns: Portfolio variance (annualized if annualized=True) """ # Ensure tickers are in same order tickers = list(weights.keys()) # Create weight vector (as numpy array) w = np.array([weights[ticker] for ticker in tickers]) # Get covariance matrix for these tickers cov = cov_matrix.loc[tickers, tickers].values # Annualize if requested if annualized: cov = cov * TRADING_DAYS_PER_YEAR # Calculate variance: w^T × Σ × w variance = w @ cov @ w return float(variance) def calculate_portfolio_volatility(variance: float) -> float: """ Calculate portfolio volatility (standard deviation). Formula: σ_p = √(σ²_p) Args: variance: Portfolio variance Returns: Portfolio volatility (standard deviation) """ return float(np.sqrt(variance)) def get_variance_breakdown( weights: Dict[str, float], cov_matrix: pd.DataFrame, annualized: bool = True ) -> List[Tuple[str, str, float, float, float, float]]: """ Generate detailed breakdown of variance calculation. Returns a list of all variance components for the detailed formula expansion. Args: weights: Dictionary of portfolio weights cov_matrix: Covariance matrix (daily) annualized: If True, use annualized covariance (default: True) Returns: List of tuples: (ticker_i, ticker_j, w_i, w_j, cov_ij, contribution) where contribution = w_i × w_j × cov_ij """ tickers = list(weights.keys()) n = len(tickers) breakdown = [] for i, ticker_i in enumerate(tickers): for j, ticker_j in enumerate(tickers): w_i = weights[ticker_i] w_j = weights[ticker_j] # Get covariance value cov_ij = cov_matrix.loc[ticker_i, ticker_j] # Annualize if requested if annualized: cov_ij = cov_ij * TRADING_DAYS_PER_YEAR # Calculate contribution to total variance contribution = w_i * w_j * cov_ij breakdown.append((ticker_i, ticker_j, w_i, w_j, cov_ij, contribution)) return breakdown def get_portfolio_metrics( amounts: Dict[str, float], period: str = "1y" ) -> Tuple[Optional[Dict], Optional[str]]: """ Calculate all portfolio metrics in one go. This is a convenience function that orchestrates all calculations. Args: amounts: Dictionary of {ticker: amount} period: Historical data period (default: '1y') Returns: Tuple of (metrics_dict, error_message) metrics_dict contains: - weights: Dict[str, float] - prices: pd.DataFrame - returns: pd.DataFrame - cov_matrix: pd.DataFrame - variance: float - volatility: float - variance_breakdown: List[Tuple] """ try: tickers = list(amounts.keys()) # 1. Calculate weights weights = calculate_portfolio_weights(amounts) # 2. Fetch historical data (convert to tuple for caching) prices, error = fetch_historical_data(tuple(tickers), period) if error: return None, error # 3. Calculate returns returns = calculate_log_returns(prices) # 4. Calculate covariance matrix cov_matrix = calculate_covariance_matrix(returns, annualized=False) # 5. Calculate variance variance = calculate_portfolio_variance(weights, cov_matrix, annualized=True) # 6. Calculate volatility volatility = calculate_portfolio_volatility(variance) # 7. Get variance breakdown variance_breakdown = get_variance_breakdown(weights, cov_matrix, annualized=True) metrics = { 'weights': weights, 'prices': prices, 'returns': returns, 'cov_matrix': cov_matrix, 'variance': variance, 'volatility': volatility, 'variance_breakdown': variance_breakdown, } return metrics, None except Exception as e: return None, f"Error calculating portfolio metrics: {str(e)}"