Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import sqlite3 | |
| import threading | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| import yfinance as yf | |
| TRADING_DAYS = 252 | |
| RF_RATE = 0.06 | |
| TRAILING_YEARS = 3 | |
| NAV_STALE_DAYS = 30 | |
| # ββ Disk cache config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NAV history is refreshed if older than 7 days; benchmark index once a day. | |
| _CACHE_DB_PATH = Path.home() / ".mf_nav_cache.db" | |
| _NAV_TTL_SECS = 7 * 86_400 # 7 days | |
| _BENCH_TTL_SECS = 1 * 86_400 # 1 day | |
| _DB_LOCK = threading.Lock() # one writer at a time across threads | |
| OUTPUT_METRICS: tuple[str, ...] = ( | |
| "Alpha", | |
| "Beta", | |
| "Standard Deviation", | |
| "Volatility", | |
| "Mean", | |
| "Sharpe Ratio", | |
| "Sortino Ratio", | |
| "Up Market Capture\nRatio", | |
| "Down Market Capture\nRatio", | |
| "Maximum Drawdown", | |
| "R-Squared", | |
| "Information Ratio", | |
| ) | |
| NAV_ONLY_METRICS: set[str] = { | |
| "Standard Deviation", | |
| "Volatility", | |
| "Mean", | |
| "Sharpe Ratio", | |
| "Sortino Ratio", | |
| "Maximum Drawdown", | |
| } | |
| BENCHMARK_DEPENDENT_METRICS: set[str] = { | |
| "Alpha", | |
| "Beta", | |
| "Up Market Capture\nRatio", | |
| "Down Market Capture\nRatio", | |
| "R-Squared", | |
| "Information Ratio", | |
| } | |
| # Common Indian benchmark labels -> Yahoo Finance ticker | |
| # Last verified: March 2026 | |
| # ^NIFTYJR was delisted β correct ticker for Nifty Next 50 is now ^NSMIDCP | |
| BENCHMARK_MAP: dict[str, str] = { | |
| # ββ Nifty broad indices ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "nifty 50": "^NSEI", | |
| "nifty50": "^NSEI", | |
| "nifty 50 tri": "^NSEI", | |
| "nifty next 50": "^NSMIDCP", | |
| "nifty next 50 tri": "^NSMIDCP", | |
| "nifty junior": "^NSMIDCP", | |
| "nifty 100": "^CNX100", | |
| "nifty 100 tri": "^CNX100", | |
| "nifty 100 (tri)": "^CNX100", | |
| "nifty 200": "^CNX200", | |
| "nifty 500": "^CRSLDX", | |
| "nifty 500 tri": "^CRSLDX", | |
| "nifty 500 (tri)": "^CRSLDX", | |
| "nifty500": "^CRSLDX", | |
| "nifty500 multicap 50:25:25 tri": "NIFTY500_MULTICAP_50_25_25.NS", | |
| "nifty500 multicap 50:25:25 (tri)": "NIFTY500_MULTICAP_50_25_25.NS", | |
| "nifty 500 multicap 50:25:25 (tri)": "NIFTY500_MULTICAP_50_25_25.NS", | |
| "nifty500 multicap momentum quality 50 tri": "NIFTY500_MULTICAP_50_25_25.NS", | |
| # ββ Nifty midcap / smallcap ββββββββββββββββββββββββββββββββββββββββββββ | |
| "nifty midcap 150": "NIFTY_MIDCAP_100.NS", | |
| "nifty midcap 150 tri": "NIFTY_MIDCAP_100.NS", | |
| "nifty midcap 150 index (tri)": "NIFTY_MIDCAP_100.NS", | |
| "nifty midcap 100": "NIFTY_MIDCAP_100.NS", | |
| "nifty midcap 50": "^NSEMDCP50", | |
| "nifty midcap": "NIFTY_MIDCAP_100.NS", | |
| "nifty large midcap 250 tri": "NIFTY_LARGEMIDCAP_250.NS", | |
| "nifty large midcap 250": "NIFTY_LARGEMIDCAP_250.NS", | |
| "nifty large - midcap 250 index": "NIFTY_LARGEMIDCAP_250.NS", | |
| "nifty large - midcap 250": "NIFTY_LARGEMIDCAP_250.NS", | |
| "nifty smallcap 250": "NIFTYSMLCAP250.NS", | |
| "nifty smallcap 250 tri": "NIFTYSMLCAP250.NS", | |
| "nifty small cap 250 (tri)": "NIFTYSMLCAP250.NS", | |
| "nifty smallcap 100": "^CNXSC", | |
| "nifty smallcap": "NIFTYSMLCAP250.NS", | |
| # ββ BSE βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "sensex": "^BSESN", | |
| "bse sensex": "^BSESN", | |
| "bse 100": "^BSE100", | |
| "bse 200": "^BSE100", | |
| "bse 500": "^BSE500", | |
| "s&p bse liquid rate index": "^NSEI", # no direct Yahoo ticker; use Nifty as proxy | |
| # ββ Sector / thematic βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "nifty bank": "^NSEBANK", | |
| "nifty bank tri": "^NSEBANK", | |
| "nifty bank (tri)": "^NSEBANK", | |
| "nifty private bank": "NIFTY_PVT_BANK.NS", | |
| "nifty private bank tri": "NIFTY_PVT_BANK.NS", | |
| "nifty it": "^CNXIT", | |
| "nifty it tri": "^CNXIT", | |
| "nifty financial services": "NIFTY_FIN_SERVICE.NS", | |
| "nifty financial services tri": "NIFTY_FIN_SERVICE.NS", | |
| "nifty financial services index (tri)": "NIFTY_FIN_SERVICE.NS", | |
| "nifty financial services ex-bank tri": "NIFTY_FIN_SERVICE.NS", | |
| "nifty pharma": "^CNXPHARMA", | |
| "nifty pharma tri": "^CNXPHARMA", | |
| "nifty healthcare": "NIFTY_HEALTHCARE.NS", | |
| "nifty healthcare tri": "NIFTY_HEALTHCARE.NS", | |
| "nifty healthcare tri.": "NIFTY_HEALTHCARE.NS", # trailing dot variant | |
| "nifty fmcg": "^CNXFMCG", | |
| "nifty fmcg tri": "^CNXFMCG", | |
| "nifty infrastructure": "^CNXINFRA", | |
| "nifty infrastructure tri": "^CNXINFRA", | |
| "nifty india consumption": "NIFTY_INDIA_CONSUMPTION.NS", | |
| "nifty india consumption tri": "NIFTY_INDIA_CONSUMPTION.NS", | |
| "nifty india consumption index (tri)": "NIFTY_INDIA_CONSUMPTION.NS", | |
| "nifty india manufacturing tri": "NIFTY_INDIA_MANUFACTURING.NS", | |
| "nifty india defence tri": "NIFTY_INDIA_DEFENCE.NS", | |
| "nifty housing tri": "NIFTY_HOUSING.NS", | |
| "nifty cpse tri": "NIFTY_CPSE.NS", | |
| "nifty mnc tri": "NIFTY_MNC.NS", | |
| "nifty commodities tri": "^CNXCMDT", | |
| "nifty 100 esg tri": "NIFTY100_ESG.NS", | |
| "nifty 100 low volatility 30 tri": "NIFTY100_LOWVOL30.NS", | |
| "nifty ipo tri": "NIFTY_IPO.NS", | |
| # ββ Factor / strategy βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "nifty 200 momentum 30 tri": "NIFTY200_MOMENTUM_30.NS", | |
| # ββ Debt / liquid / overnight β use Nifty 1D rate / GSec proxies ββββββ | |
| "nifty 1d rate index": "^NSEI", # overnight / liquid funds; no direct Yahoo | |
| "nifty 1d rate": "^NSEI", | |
| "crisil liquid overnight index": "^NSEI", | |
| "nifty 3 year sdl": "^NSEI", | |
| "nifty 4-8 yr g-sec index": "^NSEI", | |
| "nifty composite g-sec index": "^NSEI", | |
| # ββ Hybrid / balanced βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AK = AdvisorKhoj composite benchmarks β no direct Yahoo ticker | |
| # Mapped to closest equity index proxy based on fund category | |
| "ak hybrid balanced tri": "^NSEI", # Dynamic Asset Allocation β Nifty 50 | |
| "ak hybrid aggressive tri": "^NSEI", # Aggressive Hybrid β Nifty 50 | |
| "ak hybrid conservative tri": "^NSEI", # Conservative Hybrid β Nifty 50 | |
| "ak multi asset allocation tri": "^CRSLDX", # Multi Asset β Nifty 500 | |
| "ak equity savings tri": "^NSEI", # Equity Savings β Nifty 50 | |
| # ββ Global ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "msci acwi tri": "URTH", # iShares MSCI ACWI ETF as proxy | |
| "s&p global 1200 tri": "URTH", | |
| "nifty 50 arbitrage index": "^NSEI", # arbitrage funds; Nifty proxy | |
| } | |
| # ββ Cache backend: SQLite (local) or Neon/Postgres (production) ββββββββββββββ | |
| # | |
| # Your Neon DSN (pooler endpoint β correct for serverless/HuggingFace): | |
| # postgresql://neondb_owner:npg_b0JC5rvQlGPN@ep-damp-river-advc7q1j-pooler.c-2.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require | |
| # | |
| # How to switch backends (zero code change needed): | |
| # | |
| # LOCAL TESTING (SQLite, default β no setup): | |
| # β Do NOT set DATABASE_URL in your local .env. Uses ~/.mf_nav_cache.db. | |
| # | |
| # NEON / HUGGINGFACE SPACES: | |
| # β Add to your .env OR HuggingFace Space Secret: | |
| # DATABASE_URL=postgresql://neondb_owner:npg_b0JC5rvQlGPN@ep-damp-river-advc7q1j-pooler.c-2.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require | |
| # β Add to requirements.txt: | |
| # psycopg2-binary | |
| # β Done. Code detects DATABASE_URL and uses Neon automatically. | |
| # | |
| # WHY POOLER ENDPOINT (not direct): | |
| # HuggingFace Spaces can spin up many workers concurrently. | |
| # Pooler endpoint (ep-...-pooler.c-2...) handles connection bursts safely. | |
| # Direct endpoint (ep-... without -pooler) has a hard cap of ~100 connections. | |
| # | |
| # WHY channel_binding=require: | |
| # Your Neon project enforces channel binding. psycopg2 supports it via libpq >= 14. | |
| # The param is passed through the DSN string β no extra code needed. | |
| # | |
| # Table schema (identical for SQLite and Postgres): | |
| # nav_cache(key TEXT PRIMARY KEY, data TEXT NOT NULL, ts DOUBLE PRECISION NOT NULL) | |
| import os as _os | |
| _DATABASE_URL = _os.environ.get("DATABASE_URL", "") | |
| _USE_POSTGRES = bool(_DATABASE_URL) | |
| # ββ Thread-local Postgres connection pool βββββββββββββββββββββββββββββββββββββ | |
| # Opening a new psycopg2 connection per cache query costs ~100-200ms on Neon | |
| # (TLS handshake + auth). With 12 parallel workers Γ 2 queries/fund Γ 478 funds | |
| # that is ~1000 round-trips. Fix: one persistent connection per thread, reused | |
| # across all queries that thread handles. | |
| import threading as _threading | |
| _tls = _threading.local() | |
| def _get_pg_conn(): | |
| """ | |
| Return a thread-local persistent Neon connection, creating one if needed. | |
| Falls back to a fresh connection if the cached one has gone away. | |
| """ | |
| import psycopg2 # type: ignore | |
| conn = getattr(_tls, "pg_conn", None) | |
| if conn is not None: | |
| try: | |
| # Lightweight liveness check β closed flag or dead socket | |
| if not conn.closed: | |
| conn.cursor().execute("SELECT 1") | |
| return conn | |
| except Exception: | |
| pass # Connection is dead β fall through to re-create | |
| conn = psycopg2.connect( | |
| _DATABASE_URL, | |
| connect_timeout=10, | |
| keepalives=1, | |
| keepalives_idle=30, | |
| keepalives_interval=10, | |
| keepalives_count=3, | |
| ) | |
| _tls.pg_conn = conn | |
| return conn | |
| def _init_cache_db() -> None: | |
| """Create cache table if it doesn't exist (idempotent, works for both backends).""" | |
| if _USE_POSTGRES: | |
| try: | |
| conn = _get_pg_conn() | |
| with conn: | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS nav_cache ( | |
| key TEXT PRIMARY KEY, | |
| data TEXT NOT NULL, | |
| ts DOUBLE PRECISION NOT NULL | |
| ) | |
| """) | |
| conn.close() | |
| except Exception as e: | |
| print(f"[cache] Postgres init warning: {e}") | |
| else: | |
| with _DB_LOCK, sqlite3.connect(_CACHE_DB_PATH) as db: | |
| db.execute(""" | |
| CREATE TABLE IF NOT EXISTS nav_cache ( | |
| key TEXT PRIMARY KEY, | |
| data TEXT NOT NULL, | |
| ts REAL NOT NULL | |
| ) | |
| """) | |
| db.commit() | |
| def _cache_get(key: str, ttl: float) -> pd.DataFrame | None: | |
| """Return cached DataFrame if fresh, else None. Works for SQLite and Neon.""" | |
| # Check bulk preload first β zero network cost | |
| if key in _PRELOAD_CACHE: | |
| return _PRELOAD_CACHE[key] | |
| try: | |
| if _USE_POSTGRES: | |
| conn = _get_pg_conn() | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| "SELECT data, ts FROM nav_cache WHERE key = %s", (key,) | |
| ) | |
| row = cur.fetchone() | |
| # Do NOT close β thread-local connection is reused | |
| else: | |
| with sqlite3.connect(_CACHE_DB_PATH) as db: | |
| row = db.execute( | |
| "SELECT data, ts FROM nav_cache WHERE key = ?", (key,) | |
| ).fetchone() | |
| if row and (time.time() - row[1]) < ttl: | |
| import io as _sio | |
| return pd.read_json(_sio.StringIO(row[0]), orient="split") | |
| except Exception: | |
| pass | |
| return None | |
| def _cache_set(key: str, df: pd.DataFrame) -> None: | |
| """Persist DataFrame. Works for SQLite and Neon. Write failures are non-fatal.""" | |
| try: | |
| serialised = df.to_json(orient="split", date_format="iso") | |
| if _USE_POSTGRES: | |
| conn = _get_pg_conn() | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| INSERT INTO nav_cache (key, data, ts) | |
| VALUES (%s, %s, %s) | |
| ON CONFLICT (key) DO UPDATE | |
| SET data = EXCLUDED.data, | |
| ts = EXCLUDED.ts | |
| """, (key, serialised, time.time())) | |
| conn.commit() | |
| # Do NOT close β thread-local connection is reused | |
| else: | |
| with _DB_LOCK, sqlite3.connect(_CACHE_DB_PATH) as db: | |
| db.execute( | |
| "INSERT OR REPLACE INTO nav_cache (key, data, ts) VALUES (?, ?, ?)", | |
| (key, serialised, time.time()), | |
| ) | |
| db.commit() | |
| except Exception: | |
| pass # cache write failure is non-fatal | |
| # Initialise at import time (fast, idempotent). | |
| try: | |
| _init_cache_db() | |
| except Exception: | |
| pass | |
| # ββ In-process cache (lives for the duration of one run) βββββββββββββββββββββ | |
| class NavEngineCache: | |
| """ | |
| Two-level cache: | |
| L1 β in-process dict (zero latency within a run, thread-safe via dict GIL) | |
| L2 β SQLite on disk (persists across runs; TTL-based) | |
| """ | |
| nav_history: dict[str, pd.DataFrame | None] = field(default_factory=dict) | |
| benchmark_history: dict[str, pd.DataFrame | None] = field(default_factory=dict) | |
| _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) | |
| def _normalize_benchmark_name(name: str) -> str: | |
| return " ".join((name or "").lower().replace("-", " ").replace("_", " ").split()) | |
| def resolve_benchmark_ticker(benchmark_type: str) -> str: | |
| # Guard against corrupt scraper artifacts (Java object toString strings) | |
| raw = (benchmark_type or "").strip() | |
| if raw.startswith("com.") or "@" in raw: | |
| return "^NSEI" # fallback for corrupt benchmark strings | |
| normalized = _normalize_benchmark_name(raw) | |
| if not normalized: | |
| return "^NSEI" | |
| if normalized in BENCHMARK_MAP: | |
| ticker = BENCHMARK_MAP[normalized] | |
| else: | |
| ticker = "^NSEI" | |
| for key, t in BENCHMARK_MAP.items(): | |
| if key in normalized: | |
| ticker = t | |
| break | |
| # Second-level fallback: some NSE index tickers resolve from BENCHMARK_MAP | |
| # but are not available on yfinance (delisted/unavailable symbols). | |
| # Map them to the nearest available proxy so _prewarm_benchmarks doesn't fail. | |
| _YF_UNAVAILABLE: dict[str, str] = { | |
| "NIFTY_CPSE.NS": "^NSEI", # PSU index β broad market | |
| "NIFTYSMLCAP250.NS": "^CNXSC", # Smallcap 250 β Smallcap 100 | |
| "NIFTY_IPO.NS": "^NSEI", # IPO index β no yf equivalent | |
| "NIFTY200_MOMENTUM_30.NS": "^NSEI", # momentum factor β broad market | |
| "NIFTY_HOUSING.NS": "^NSEI", | |
| "NIFTY_LARGEMIDCAP_250.NS": "^NSEI", | |
| "NIFTY_INDIA_CONSUMPTION.NS": "^NSEI", | |
| "NIFTY_HEALTHCARE.NS": "^NSEI", | |
| "NIFTY100_ESG.NS": "^NSEI", | |
| "NIFTY100_LOWVOL30.NS": "^NSEI", | |
| "NIFTY_MNC.NS": "^NSEI", | |
| "NIFTY_INDIA_MANUFACTURING.NS": "^NSEI", | |
| "NIFTY500_MULTICAP_50_25_25.NS": "^NSEI", | |
| } | |
| return _YF_UNAVAILABLE.get(ticker, ticker) | |
| def _safe_float(value: Any) -> float | None: | |
| if value is None: | |
| return None | |
| text = str(value).strip().replace(",", "") | |
| if text in {"", "-", "β", "N/A", "N/A*", "na", "nan", "None"}: | |
| return None | |
| try: | |
| return float(text) | |
| except ValueError: | |
| return None | |
| def _request_json_with_retries( | |
| url: str, max_retries: int = 3, timeout: int = 20 | |
| ) -> dict[str, Any] | None: | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| resp = requests.get(url, timeout=timeout) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except Exception: | |
| if attempt == max_retries: | |
| return None | |
| return None | |
| # ββ Bulk preload cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Populated once before parallel workers start. _cache_get checks here first, | |
| # avoiding per-fund Neon round-trips on warm cache runs. | |
| _PRELOAD_CACHE: dict[str, "pd.DataFrame"] = {} | |
| def _bulk_preload_cache(scheme_codes: list[str], benchmark_tickers: list[str]) -> None: | |
| """ | |
| Load ALL nav + benchmark entries from Neon in 2 SQL queries. | |
| Call once before ThreadPoolExecutor starts β cuts Neon queries from ~766 to 2. | |
| SQLite is local/fast so skipped. | |
| """ | |
| import io as _sio | |
| global _PRELOAD_CACHE | |
| if not _USE_POSTGRES: | |
| return | |
| nav_keys = [f"nav:{c}" for c in scheme_codes if c] | |
| bench_keys = [f"bench:{t}" for t in benchmark_tickers if t] | |
| all_keys = nav_keys + bench_keys | |
| if not all_keys: | |
| return | |
| try: | |
| conn = _get_pg_conn() | |
| now = time.time() | |
| placeholders = ",".join(["%s"] * len(all_keys)) | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| f"SELECT key, data, ts FROM nav_cache WHERE key IN ({placeholders})", | |
| all_keys, | |
| ) | |
| rows_fetched = cur.fetchall() | |
| loaded_nav = loaded_bench = 0 | |
| for key, data, ts in rows_fetched: | |
| ttl = _NAV_TTL_SECS if key.startswith("nav:") else _BENCH_TTL_SECS | |
| if (now - ts) >= ttl: | |
| continue | |
| try: | |
| df = pd.read_json(_sio.StringIO(data), orient="split") | |
| # Normalise dates β JSON round-trip strips tz info | |
| if "date" in df.columns: | |
| df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None).dt.normalize() | |
| except Exception: | |
| continue | |
| _PRELOAD_CACHE[key] = df | |
| if key.startswith("nav:"): | |
| loaded_nav += 1 | |
| else: | |
| loaded_bench += 1 | |
| print(f"[cache] Bulk preload: {loaded_nav} NAV + {loaded_bench} benchmark entries from Neon") | |
| except Exception as e: | |
| print(f"[cache] Bulk preload failed (falling back to per-query): {e}") | |
| def _prewarm_benchmarks(benchmark_tickers: list[str]) -> None: | |
| """ | |
| Download all unique benchmark tickers in parallel BEFORE workers start. | |
| Complexity: O(B) time where B = unique benchmarks (68 in production). | |
| Each already-cached ticker hits _PRELOAD_CACHE in O(1) β zero network. | |
| Each cold ticker downloads once via yfinance and is stored in Neon + _PRELOAD_CACHE. | |
| Workers then get O(1) cache hits for all benchmark lookups. | |
| """ | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| unique = list(dict.fromkeys(t for t in benchmark_tickers if t)) # preserve order, dedup | |
| if not unique: | |
| return | |
| # Filter: only fetch tickers not already in preload cache | |
| cold = [t for t in unique if f"bench:{t}" not in _PRELOAD_CACHE] | |
| warm = len(unique) - len(cold) | |
| if warm: | |
| print(f"[bench-prewarm] {warm}/{len(unique)} already in cache") | |
| if not cold: | |
| return | |
| print(f"[bench-prewarm] Downloading {len(cold)} cold benchmark tickers in parallelβ¦") | |
| def _fetch_one(ticker: str) -> tuple[str, bool]: | |
| df = _fetch_benchmark_history(ticker) # handles cache_set + _PRELOAD_CACHE population | |
| return ticker, df is not None | |
| ok = failed = 0 | |
| with ThreadPoolExecutor(max_workers=min(len(cold), 20)) as ex: | |
| futures = {ex.submit(_fetch_one, t): t for t in cold} | |
| for fut in as_completed(futures): | |
| ticker, success = fut.result() | |
| if success: | |
| ok += 1 | |
| else: | |
| failed += 1 | |
| print(f" [bench-prewarm] WARN: could not fetch {ticker}") | |
| print(f"[bench-prewarm] Done: {ok} fetched, {failed} failed, {warm} from cache") | |
| def _fetch_nav_history(scheme_code: str) -> pd.DataFrame | None: | |
| """Fetch from Neon cache first, then mfapi.""" | |
| cache_key = f"nav:{scheme_code}" | |
| cached = _cache_get(cache_key, _NAV_TTL_SECS) | |
| if cached is not None: | |
| return cached | |
| url = f"https://api.mfapi.in/mf/{scheme_code}" | |
| payload = _request_json_with_retries(url) | |
| if not payload or "data" not in payload: | |
| return None | |
| try: | |
| nav_df = pd.DataFrame(payload["data"]) | |
| if nav_df.empty or "date" not in nav_df or "nav" not in nav_df: | |
| return None | |
| nav_df["date"] = pd.to_datetime(nav_df["date"], dayfirst=True, errors="coerce").dt.tz_localize(None).dt.normalize() | |
| nav_df["nav"] = pd.to_numeric(nav_df["nav"], errors="coerce") | |
| nav_df = nav_df.dropna(subset=["date", "nav"]).sort_values("date") | |
| if nav_df.empty: | |
| return None | |
| df = nav_df[["date", "nav"]] | |
| _cache_set(cache_key, df) | |
| return df | |
| except Exception: | |
| return None | |
| def _fetch_benchmark_history(ticker: str) -> pd.DataFrame | None: | |
| """Fetch from disk cache (L2) first, then yfinance.""" | |
| cache_key = f"bench:{ticker}" | |
| cached = _cache_get(cache_key, _BENCH_TTL_SECS) | |
| if cached is not None: | |
| return cached | |
| df = _download_benchmark(ticker) | |
| if df is not None: | |
| _cache_set(cache_key, df) | |
| return df | |
| def _download_benchmark(ticker: str) -> pd.DataFrame | None: | |
| """ | |
| Raw yfinance download (no caching logic here). | |
| Parallel workers hitting yfinance simultaneously can get 401 Invalid Crumb | |
| errors because yfinance refreshes its session cookie lazily. Fix: | |
| - Retry up to 4 times with exponential backoff (0.5s, 1s, 2s) | |
| - Each retry creates a fresh Ticker session, which re-fetches the crumb | |
| - Suppress noisy 'possibly delisted' stderr from yfinance | |
| """ | |
| import contextlib, io as _io | |
| def _suppress_yf_stderr(fn, *args, **kwargs): | |
| """Run fn suppressing yfinance's noisy stderr warnings.""" | |
| with contextlib.redirect_stderr(_io.StringIO()): | |
| return fn(*args, **kwargs) | |
| for attempt in range(4): | |
| if attempt > 0: | |
| time.sleep(0.5 * (2 ** (attempt - 1))) # 0.5s, 1s, 2s | |
| try: | |
| bench = _suppress_yf_stderr( | |
| yf.download, | |
| ticker, | |
| start="2000-01-01", | |
| progress=False, | |
| auto_adjust=False, | |
| threads=False, | |
| ) | |
| if bench is None or bench.empty: | |
| continue | |
| if isinstance(bench.columns, pd.MultiIndex): | |
| bench.columns = [str(col[0]) for col in bench.columns] | |
| bench = bench.reset_index() | |
| price_col = "Adj Close" if "Adj Close" in bench.columns else "Close" | |
| if price_col not in bench.columns: | |
| continue | |
| bench = bench[["Date", price_col]].rename( | |
| columns={"Date": "date", price_col: "benchmark"} | |
| ) | |
| bench["date"] = pd.to_datetime(bench["date"], errors="coerce").dt.tz_localize(None).dt.normalize() | |
| bench["benchmark"] = pd.to_numeric(bench["benchmark"], errors="coerce") | |
| bench = bench.dropna(subset=["date", "benchmark"]).sort_values("date") | |
| if len(bench) >= 60: | |
| return bench | |
| except Exception: | |
| continue | |
| # Secondary fallback: Ticker().history() uses a separate session/crumb path | |
| for attempt in range(3): | |
| if attempt > 0: | |
| time.sleep(0.5 * attempt) | |
| try: | |
| hist = _suppress_yf_stderr( | |
| yf.Ticker(ticker).history, | |
| period="10y", | |
| auto_adjust=False, | |
| ) | |
| if hist is None or hist.empty: | |
| continue | |
| hist = hist.reset_index() | |
| price_col = "Adj Close" if "Adj Close" in hist.columns else "Close" | |
| if price_col not in hist.columns: | |
| continue | |
| hist = hist[["Date", price_col]].rename( | |
| columns={"Date": "date", price_col: "benchmark"} | |
| ) | |
| hist["date"] = pd.to_datetime(hist["date"], errors="coerce").dt.tz_localize(None).dt.normalize() | |
| hist["benchmark"] = pd.to_numeric(hist["benchmark"], errors="coerce") | |
| hist = hist.dropna(subset=["date", "benchmark"]).sort_values("date") | |
| if len(hist) >= 60: | |
| return hist | |
| except Exception: | |
| continue | |
| return None | |
| def _trailing_3y_window(df: pd.DataFrame) -> pd.DataFrame: | |
| if df.empty: | |
| return df | |
| max_date = df["date"].max() | |
| if pd.isna(max_date): | |
| return pd.DataFrame(columns=df.columns) | |
| cutoff = max_date - pd.DateOffset(years=TRAILING_YEARS) | |
| return df[df["date"] >= cutoff].copy() | |
| def _nav_history_is_stale(nav_df: pd.DataFrame) -> bool: | |
| if nav_df is None or nav_df.empty or "date" not in nav_df.columns: | |
| return True | |
| latest = pd.to_datetime(nav_df["date"], errors="coerce").max() | |
| if pd.isna(latest): | |
| return True | |
| latest = pd.Timestamp(latest).tz_localize(None).normalize() | |
| cutoff = pd.Timestamp.now().tz_localize(None).normalize() - pd.Timedelta(days=NAV_STALE_DAYS) | |
| return latest < cutoff | |
| def _compute_nav_only_metrics( | |
| nav_df: pd.DataFrame, | |
| needed_metrics: list[str], | |
| benchmark_reason: str, | |
| ) -> tuple[dict[str, float | None], dict[str, str]]: | |
| needed = [m for m in needed_metrics if m in OUTPUT_METRICS] | |
| out = {m: None for m in needed} | |
| skip: dict[str, str] = {} | |
| if not needed: | |
| return out, skip | |
| for m in needed: | |
| if m in BENCHMARK_DEPENDENT_METRICS: | |
| skip[m] = benchmark_reason | |
| window = _trailing_3y_window(nav_df[["date", "nav"]].copy()) | |
| if window.empty: | |
| for m in needed: | |
| if m in NAV_ONLY_METRICS: | |
| skip[m] = "less than 3 years of NAV history" | |
| return out, skip | |
| returns = window["nav"].pct_change().dropna() | |
| if len(returns) < 30: | |
| for m in needed: | |
| if m in NAV_ONLY_METRICS: | |
| skip[m] = f"fewer than 30 NAV return points ({len(returns)})" | |
| return out, skip | |
| mean_daily = returns.mean() | |
| mean_annual = mean_daily * TRADING_DAYS | |
| vol = returns.std(ddof=1) * np.sqrt(TRADING_DAYS) | |
| if pd.notna(vol): | |
| if "Standard Deviation" in out: | |
| out["Standard Deviation"] = float(vol * 100) | |
| if "Volatility" in out: | |
| out["Volatility"] = float(vol * 100) | |
| if "Mean" in out and pd.notna(mean_annual): | |
| out["Mean"] = float(mean_annual * 100) | |
| if "Sharpe Ratio" in out: | |
| if pd.notna(vol) and vol > 0: | |
| sharpe = (mean_annual - RF_RATE) / vol | |
| out["Sharpe Ratio"] = float(sharpe) if pd.notna(sharpe) else None | |
| if out["Sharpe Ratio"] is None: | |
| skip["Sharpe Ratio"] = "volatility is zero or NaN (NAV-only fallback)" | |
| if "Sortino Ratio" in out: | |
| downside = returns[returns < 0] | |
| if not downside.empty: | |
| downside_std = downside.std(ddof=1) * np.sqrt(TRADING_DAYS) | |
| if pd.notna(downside_std) and downside_std > 0: | |
| sortino = (mean_annual - RF_RATE) / downside_std | |
| out["Sortino Ratio"] = float(sortino) if pd.notna(sortino) else None | |
| elif out.get("Sharpe Ratio") is not None: | |
| out["Sortino Ratio"] = float(out["Sharpe Ratio"]) | |
| elif out.get("Sharpe Ratio") is not None: | |
| out["Sortino Ratio"] = float(out["Sharpe Ratio"]) | |
| if out["Sortino Ratio"] is None: | |
| skip["Sortino Ratio"] = "no valid downside deviation (NAV-only fallback)" | |
| if "Maximum Drawdown" in out: | |
| cumulative = (1 + returns).cumprod() | |
| peak = cumulative.cummax() | |
| drawdown = (cumulative - peak) / peak | |
| if not drawdown.empty: | |
| max_drawdown = drawdown.min() | |
| out["Maximum Drawdown"] = ( | |
| float(max_drawdown * 100) if pd.notna(max_drawdown) else None | |
| ) | |
| if out["Maximum Drawdown"] is None: | |
| skip["Maximum Drawdown"] = "unable to compute NAV-only drawdown" | |
| return out, skip | |
| def _compute_metrics( | |
| returns_df: pd.DataFrame, | |
| ) -> tuple[dict[str, float | None], dict[str, str]]: | |
| skip: dict[str, str] = {} | |
| if returns_df.empty: | |
| for k in OUTPUT_METRICS: | |
| skip[k] = "empty returns dataframe after merge/window" | |
| return {k: None for k in OUTPUT_METRICS}, skip | |
| fund = returns_df["fund_return"] | |
| bench = returns_df["benchmark_return"] | |
| result: dict[str, float | None] = {k: None for k in OUTPUT_METRICS} | |
| if len(fund) < 30: | |
| for k in OUTPUT_METRICS: | |
| skip[k] = f"fewer than 30 data points ({len(fund)}) after join" | |
| return result, skip | |
| mean_daily = fund.mean() | |
| bench_mean_daily = bench.mean() | |
| mean_annual = mean_daily * TRADING_DAYS | |
| bench_annual = bench_mean_daily * TRADING_DAYS | |
| vol = fund.std(ddof=1) * np.sqrt(TRADING_DAYS) | |
| if pd.notna(vol): | |
| result["Standard Deviation"] = float(vol * 100) | |
| result["Volatility"] = float(vol * 100) | |
| result["Mean"] = float(mean_annual * 100) if pd.notna(mean_annual) else None | |
| bench_var = bench.var(ddof=1) | |
| beta = None | |
| if pd.notna(bench_var) and bench_var and bench_var > 0: | |
| cov = np.cov(fund, bench)[0, 1] | |
| beta = cov / bench_var | |
| result["Beta"] = float(beta) if beta is not None and pd.notna(beta) else None | |
| if result["Beta"] is None: | |
| skip["Beta"] = ( | |
| "benchmark variance is zero or NaN" | |
| if not (pd.notna(bench_var) and bench_var and bench_var > 0) | |
| else "beta computation returned NaN" | |
| ) | |
| if beta is not None and pd.notna(mean_annual): | |
| alpha = mean_annual - (RF_RATE + beta * (bench_annual - RF_RATE)) | |
| result["Alpha"] = float(alpha * 100) if pd.notna(alpha) else None | |
| if result["Alpha"] is None: | |
| skip["Alpha"] = ( | |
| "Beta is None β Alpha requires Beta" | |
| if result["Beta"] is None | |
| else "Alpha computation returned NaN" | |
| ) | |
| if vol and vol > 0: | |
| sharpe = (mean_annual - RF_RATE) / vol | |
| result["Sharpe Ratio"] = float(sharpe) if pd.notna(sharpe) else None | |
| if result["Sharpe Ratio"] is None: | |
| skip["Sharpe Ratio"] = "volatility is zero or NaN" | |
| downside = fund[fund < 0] | |
| if not downside.empty: | |
| downside_std = downside.std(ddof=1) * np.sqrt(TRADING_DAYS) | |
| if pd.notna(downside_std) and downside_std > 0: | |
| sortino = (mean_annual - RF_RATE) / downside_std | |
| result["Sortino Ratio"] = float(sortino) if pd.notna(sortino) else None | |
| elif result["Sharpe Ratio"] is not None: | |
| result["Sortino Ratio"] = float(result["Sharpe Ratio"]) | |
| else: | |
| skip["Sortino Ratio"] = "downside std dev is zero and Sharpe fallback unavailable" | |
| elif result["Sharpe Ratio"] is not None: | |
| result["Sortino Ratio"] = float(result["Sharpe Ratio"]) | |
| else: | |
| skip["Sortino Ratio"] = ( | |
| "no negative daily returns in 3Y window and Sharpe fallback unavailable" | |
| ) | |
| cumulative = (1 + fund).cumprod() | |
| peak = cumulative.cummax() | |
| drawdown = (cumulative - peak) / peak | |
| if not drawdown.empty: | |
| max_drawdown = drawdown.min() | |
| result["Maximum Drawdown"] = ( | |
| float(max_drawdown * 100) if pd.notna(max_drawdown) else None | |
| ) | |
| corr = fund.corr(bench) | |
| if pd.notna(corr): | |
| result["R-Squared"] = float(corr ** 2) | |
| else: | |
| skip["R-Squared"] = "fund/benchmark correlation is NaN" | |
| active = fund - bench | |
| tracking_error = active.std(ddof=1) * np.sqrt(TRADING_DAYS) | |
| if pd.notna(tracking_error) and tracking_error > 0: | |
| info_ratio = (mean_annual - bench_annual) / tracking_error | |
| result["Information Ratio"] = ( | |
| float(info_ratio) if pd.notna(info_ratio) else None | |
| ) | |
| else: | |
| skip["Information Ratio"] = ( | |
| "tracking error is zero β fund mirrors benchmark" | |
| if (pd.notna(tracking_error) and tracking_error == 0) | |
| else "tracking error is NaN" | |
| ) | |
| up = returns_df[returns_df["benchmark_return"] > 0] | |
| if not up.empty: | |
| up_bench = up["benchmark_return"].mean() | |
| if pd.notna(up_bench) and up_bench != 0: | |
| up_capture = (up["fund_return"].mean() / up_bench) * 100 | |
| result["Up Market Capture\nRatio"] = ( | |
| float(up_capture) if pd.notna(up_capture) else None | |
| ) | |
| else: | |
| skip["Up Market Capture\nRatio"] = "benchmark mean on up-days is zero or NaN" | |
| else: | |
| skip["Up Market Capture\nRatio"] = "no benchmark up-days in 3Y window" | |
| down = returns_df[returns_df["benchmark_return"] < 0] | |
| if not down.empty: | |
| down_bench = down["benchmark_return"].mean() | |
| if pd.notna(down_bench) and down_bench != 0: | |
| down_capture = (down["fund_return"].mean() / down_bench) * 100 | |
| result["Down Market Capture\nRatio"] = ( | |
| float(down_capture) if pd.notna(down_capture) else None | |
| ) | |
| else: | |
| skip["Down Market Capture\nRatio"] = "benchmark mean on down-days is zero or NaN" | |
| else: | |
| skip["Down Market Capture\nRatio"] = "no benchmark down-days in 3Y window" | |
| return result, skip | |
| def compute_nav_metrics_for_scheme( | |
| *, | |
| scheme_code: str, | |
| benchmark_type: str, | |
| needed_metrics: list[str], | |
| cache: NavEngineCache, | |
| ) -> tuple[dict[str, float | None], dict[str, str]]: | |
| """ | |
| Compute trailing-3Y risk metrics for a scheme. | |
| Thread-safe: uses NavEngineCache._lock to serialise L1 dict writes so | |
| concurrent ThreadPoolExecutor workers don't race on the same key. | |
| """ | |
| needed = [m for m in needed_metrics if m in OUTPUT_METRICS] | |
| if not needed: | |
| return {}, {} | |
| code = str(scheme_code or "").strip() | |
| if not code: | |
| reason = "no scheme code β category header or unresolved scheme" | |
| return {m: None for m in needed}, {m: reason for m in needed} | |
| # ββ NAV history (L1 check then L2 fetch) ββββββββββββββββββββββββββββββ | |
| with cache._lock: | |
| if code not in cache.nav_history: | |
| cache.nav_history[code] = None # sentinel prevents duplicate fetches | |
| nav_df = cache.nav_history.get(code) | |
| if nav_df is None and cache.nav_history.get(code) is None: | |
| fetched = _fetch_nav_history(code) | |
| with cache._lock: | |
| cache.nav_history[code] = fetched | |
| nav_df = fetched | |
| elif nav_df is None: | |
| nav_df = _fetch_nav_history(code) | |
| with cache._lock: | |
| cache.nav_history[code] = nav_df | |
| if nav_df is None or nav_df.empty: | |
| reason = f"MFAPI returned no NAV history for scheme code {code}" | |
| return {m: None for m in needed}, {m: reason for m in needed} | |
| if _nav_history_is_stale(nav_df): | |
| latest = pd.to_datetime(nav_df["date"], errors="coerce").max() | |
| latest_str = ( | |
| pd.Timestamp(latest).tz_localize(None).normalize().strftime("%Y-%m-%d") | |
| if pd.notna(latest) else "unknown" | |
| ) | |
| reason = f"NAV history is stale for scheme code {code} (latest NAV {latest_str})" | |
| return {m: None for m in needed}, {m: reason for m in needed} | |
| # ββ Benchmark history (L1 check then L2 fetch) ββββββββββββββββββββββββ | |
| ticker = resolve_benchmark_ticker(benchmark_type) | |
| def _ensure_benchmark(t: str) -> pd.DataFrame | None: | |
| with cache._lock: | |
| if t not in cache.benchmark_history: | |
| cache.benchmark_history[t] = None | |
| bench = cache.benchmark_history.get(t) | |
| if bench is None: | |
| fetched_b = _fetch_benchmark_history(t) | |
| with cache._lock: | |
| cache.benchmark_history[t] = fetched_b | |
| return fetched_b | |
| return bench | |
| bench_df = _ensure_benchmark(ticker) | |
| if (bench_df is None or bench_df.empty or len(bench_df) < 60) and ticker != "^NSEI": | |
| bench_df = _ensure_benchmark("^NSEI") | |
| if bench_df is None or bench_df.empty: | |
| reason = f"benchmark history unavailable for ticker={ticker} and NIFTY 50 fallback also failed" | |
| return _compute_nav_only_metrics(nav_df, needed, reason) | |
| # ββ Merge + compute βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Strip tz from both sides β yfinance returns UTC-aware, JSON cache is naive | |
| nav_df = nav_df.copy() | |
| bench_df = bench_df.copy() | |
| nav_df["date"] = pd.to_datetime(nav_df["date"]).dt.tz_localize(None).dt.normalize() | |
| bench_df["date"] = pd.to_datetime(bench_df["date"]).dt.tz_localize(None).dt.normalize() | |
| # Debt funds (Liquid, Overnight, Ultra Short etc.) publish NAV every calendar | |
| # day including weekends/holidays, while equity benchmarks only publish on | |
| # trading days. A naive inner-join on date yields almost no matching rows | |
| # (<30) causing all metrics to return None. | |
| # Fix: forward-fill NAV to the benchmark's trading-day calendar so the merge | |
| # always produces a full 3Y of matched rows regardless of fund type. | |
| bench_dates = bench_df[["date"]].drop_duplicates().sort_values("date") | |
| nav_reindexed = ( | |
| nav_df.set_index("date") | |
| .reindex(bench_dates["date"]) | |
| .ffill() # carry last known NAV forward | |
| .dropna() | |
| .reset_index() | |
| .rename(columns={"index": "date"}) | |
| ) | |
| merged = pd.merge(nav_reindexed, bench_df, on="date", how="inner") | |
| if merged.empty: | |
| reason = f"no overlapping dates between NAV (scheme={code}) and benchmark (ticker={ticker})" | |
| return _compute_nav_only_metrics(nav_df, needed, reason) | |
| merged = _trailing_3y_window(merged) | |
| if merged.empty: | |
| reason = f"less than 3 years of overlapping data for scheme={code}" | |
| return {m: None for m in needed}, {m: reason for m in needed} | |
| merged["fund_return"] = merged["nav"].pct_change() | |
| merged["benchmark_return"] = merged["benchmark"].pct_change() | |
| merged = merged.dropna(subset=["fund_return", "benchmark_return"]).copy() | |
| if merged.empty: | |
| reason = "all rows dropped after computing benchmark-joined returns" | |
| return _compute_nav_only_metrics(nav_df, needed, reason) | |
| all_metrics, all_skip = _compute_metrics(merged) | |
| metrics = {m: all_metrics.get(m) for m in needed} | |
| skip_reasons = { | |
| m: all_skip[m] | |
| for m in needed | |
| if m in all_skip and metrics.get(m) is None | |
| } | |
| # Defensive top-up for NAV-only metrics | |
| if any(m in NAV_ONLY_METRICS and metrics.get(m) is None for m in needed): | |
| nav_only, nav_only_skip = _compute_nav_only_metrics( | |
| nav_df, needed, "benchmark-dependent metric unavailable" | |
| ) | |
| for m in needed: | |
| if ( | |
| m in NAV_ONLY_METRICS | |
| and metrics.get(m) is None | |
| and nav_only.get(m) is not None | |
| ): | |
| metrics[m] = nav_only[m] | |
| skip_reasons.pop(m, None) | |
| elif ( | |
| metrics.get(m) is None | |
| and m not in skip_reasons | |
| and m in nav_only_skip | |
| ): | |
| skip_reasons[m] = nav_only_skip[m] | |
| return metrics, skip_reasons |