MF / src /nav_metrics_engine.py
Parthiban97's picture
Upload 15 files
b0e15c1 verified
from __future__ import annotations
import json
import sqlite3
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
import requests
import yfinance as yf
TRADING_DAYS = 252
RF_RATE = 0.06
TRAILING_YEARS = 3
NAV_STALE_DAYS = 30
# ── Disk cache config ─────────────────────────────────────────────────────────
# NAV history is refreshed if older than 7 days; benchmark index once a day.
_CACHE_DB_PATH = Path.home() / ".mf_nav_cache.db"
_NAV_TTL_SECS = 7 * 86_400 # 7 days
_BENCH_TTL_SECS = 1 * 86_400 # 1 day
_DB_LOCK = threading.Lock() # one writer at a time across threads
OUTPUT_METRICS: tuple[str, ...] = (
"Alpha",
"Beta",
"Standard Deviation",
"Volatility",
"Mean",
"Sharpe Ratio",
"Sortino Ratio",
"Up Market Capture\nRatio",
"Down Market Capture\nRatio",
"Maximum Drawdown",
"R-Squared",
"Information Ratio",
)
NAV_ONLY_METRICS: set[str] = {
"Standard Deviation",
"Volatility",
"Mean",
"Sharpe Ratio",
"Sortino Ratio",
"Maximum Drawdown",
}
BENCHMARK_DEPENDENT_METRICS: set[str] = {
"Alpha",
"Beta",
"Up Market Capture\nRatio",
"Down Market Capture\nRatio",
"R-Squared",
"Information Ratio",
}
# Common Indian benchmark labels -> Yahoo Finance ticker
# Last verified: March 2026
# ^NIFTYJR was delisted β€” correct ticker for Nifty Next 50 is now ^NSMIDCP
BENCHMARK_MAP: dict[str, str] = {
# ── Nifty broad indices ────────────────────────────────────────────────
"nifty 50": "^NSEI",
"nifty50": "^NSEI",
"nifty 50 tri": "^NSEI",
"nifty next 50": "^NSMIDCP",
"nifty next 50 tri": "^NSMIDCP",
"nifty junior": "^NSMIDCP",
"nifty 100": "^CNX100",
"nifty 100 tri": "^CNX100",
"nifty 100 (tri)": "^CNX100",
"nifty 200": "^CNX200",
"nifty 500": "^CRSLDX",
"nifty 500 tri": "^CRSLDX",
"nifty 500 (tri)": "^CRSLDX",
"nifty500": "^CRSLDX",
"nifty500 multicap 50:25:25 tri": "NIFTY500_MULTICAP_50_25_25.NS",
"nifty500 multicap 50:25:25 (tri)": "NIFTY500_MULTICAP_50_25_25.NS",
"nifty 500 multicap 50:25:25 (tri)": "NIFTY500_MULTICAP_50_25_25.NS",
"nifty500 multicap momentum quality 50 tri": "NIFTY500_MULTICAP_50_25_25.NS",
# ── Nifty midcap / smallcap ────────────────────────────────────────────
"nifty midcap 150": "NIFTY_MIDCAP_100.NS",
"nifty midcap 150 tri": "NIFTY_MIDCAP_100.NS",
"nifty midcap 150 index (tri)": "NIFTY_MIDCAP_100.NS",
"nifty midcap 100": "NIFTY_MIDCAP_100.NS",
"nifty midcap 50": "^NSEMDCP50",
"nifty midcap": "NIFTY_MIDCAP_100.NS",
"nifty large midcap 250 tri": "NIFTY_LARGEMIDCAP_250.NS",
"nifty large midcap 250": "NIFTY_LARGEMIDCAP_250.NS",
"nifty large - midcap 250 index": "NIFTY_LARGEMIDCAP_250.NS",
"nifty large - midcap 250": "NIFTY_LARGEMIDCAP_250.NS",
"nifty smallcap 250": "NIFTYSMLCAP250.NS",
"nifty smallcap 250 tri": "NIFTYSMLCAP250.NS",
"nifty small cap 250 (tri)": "NIFTYSMLCAP250.NS",
"nifty smallcap 100": "^CNXSC",
"nifty smallcap": "NIFTYSMLCAP250.NS",
# ── BSE ───────────────────────────────────────────────────────────────
"sensex": "^BSESN",
"bse sensex": "^BSESN",
"bse 100": "^BSE100",
"bse 200": "^BSE100",
"bse 500": "^BSE500",
"s&p bse liquid rate index": "^NSEI", # no direct Yahoo ticker; use Nifty as proxy
# ── Sector / thematic ─────────────────────────────────────────────────
"nifty bank": "^NSEBANK",
"nifty bank tri": "^NSEBANK",
"nifty bank (tri)": "^NSEBANK",
"nifty private bank": "NIFTY_PVT_BANK.NS",
"nifty private bank tri": "NIFTY_PVT_BANK.NS",
"nifty it": "^CNXIT",
"nifty it tri": "^CNXIT",
"nifty financial services": "NIFTY_FIN_SERVICE.NS",
"nifty financial services tri": "NIFTY_FIN_SERVICE.NS",
"nifty financial services index (tri)": "NIFTY_FIN_SERVICE.NS",
"nifty financial services ex-bank tri": "NIFTY_FIN_SERVICE.NS",
"nifty pharma": "^CNXPHARMA",
"nifty pharma tri": "^CNXPHARMA",
"nifty healthcare": "NIFTY_HEALTHCARE.NS",
"nifty healthcare tri": "NIFTY_HEALTHCARE.NS",
"nifty healthcare tri.": "NIFTY_HEALTHCARE.NS", # trailing dot variant
"nifty fmcg": "^CNXFMCG",
"nifty fmcg tri": "^CNXFMCG",
"nifty infrastructure": "^CNXINFRA",
"nifty infrastructure tri": "^CNXINFRA",
"nifty india consumption": "NIFTY_INDIA_CONSUMPTION.NS",
"nifty india consumption tri": "NIFTY_INDIA_CONSUMPTION.NS",
"nifty india consumption index (tri)": "NIFTY_INDIA_CONSUMPTION.NS",
"nifty india manufacturing tri": "NIFTY_INDIA_MANUFACTURING.NS",
"nifty india defence tri": "NIFTY_INDIA_DEFENCE.NS",
"nifty housing tri": "NIFTY_HOUSING.NS",
"nifty cpse tri": "NIFTY_CPSE.NS",
"nifty mnc tri": "NIFTY_MNC.NS",
"nifty commodities tri": "^CNXCMDT",
"nifty 100 esg tri": "NIFTY100_ESG.NS",
"nifty 100 low volatility 30 tri": "NIFTY100_LOWVOL30.NS",
"nifty ipo tri": "NIFTY_IPO.NS",
# ── Factor / strategy ─────────────────────────────────────────────────
"nifty 200 momentum 30 tri": "NIFTY200_MOMENTUM_30.NS",
# ── Debt / liquid / overnight β€” use Nifty 1D rate / GSec proxies ──────
"nifty 1d rate index": "^NSEI", # overnight / liquid funds; no direct Yahoo
"nifty 1d rate": "^NSEI",
"crisil liquid overnight index": "^NSEI",
"nifty 3 year sdl": "^NSEI",
"nifty 4-8 yr g-sec index": "^NSEI",
"nifty composite g-sec index": "^NSEI",
# ── Hybrid / balanced ─────────────────────────────────────────────────
# AK = AdvisorKhoj composite benchmarks β€” no direct Yahoo ticker
# Mapped to closest equity index proxy based on fund category
"ak hybrid balanced tri": "^NSEI", # Dynamic Asset Allocation β†’ Nifty 50
"ak hybrid aggressive tri": "^NSEI", # Aggressive Hybrid β†’ Nifty 50
"ak hybrid conservative tri": "^NSEI", # Conservative Hybrid β†’ Nifty 50
"ak multi asset allocation tri": "^CRSLDX", # Multi Asset β†’ Nifty 500
"ak equity savings tri": "^NSEI", # Equity Savings β†’ Nifty 50
# ── Global ────────────────────────────────────────────────────────────
"msci acwi tri": "URTH", # iShares MSCI ACWI ETF as proxy
"s&p global 1200 tri": "URTH",
"nifty 50 arbitrage index": "^NSEI", # arbitrage funds; Nifty proxy
}
# ── Cache backend: SQLite (local) or Neon/Postgres (production) ──────────────
#
# Your Neon DSN (pooler endpoint β€” correct for serverless/HuggingFace):
# postgresql://neondb_owner:npg_b0JC5rvQlGPN@ep-damp-river-advc7q1j-pooler.c-2.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require
#
# How to switch backends (zero code change needed):
#
# LOCAL TESTING (SQLite, default β€” no setup):
# β†’ Do NOT set DATABASE_URL in your local .env. Uses ~/.mf_nav_cache.db.
#
# NEON / HUGGINGFACE SPACES:
# β†’ Add to your .env OR HuggingFace Space Secret:
# DATABASE_URL=postgresql://neondb_owner:npg_b0JC5rvQlGPN@ep-damp-river-advc7q1j-pooler.c-2.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require
# β†’ Add to requirements.txt:
# psycopg2-binary
# β†’ Done. Code detects DATABASE_URL and uses Neon automatically.
#
# WHY POOLER ENDPOINT (not direct):
# HuggingFace Spaces can spin up many workers concurrently.
# Pooler endpoint (ep-...-pooler.c-2...) handles connection bursts safely.
# Direct endpoint (ep-... without -pooler) has a hard cap of ~100 connections.
#
# WHY channel_binding=require:
# Your Neon project enforces channel binding. psycopg2 supports it via libpq >= 14.
# The param is passed through the DSN string β€” no extra code needed.
#
# Table schema (identical for SQLite and Postgres):
# nav_cache(key TEXT PRIMARY KEY, data TEXT NOT NULL, ts DOUBLE PRECISION NOT NULL)
import os as _os
_DATABASE_URL = _os.environ.get("DATABASE_URL", "")
_USE_POSTGRES = bool(_DATABASE_URL)
# ── Thread-local Postgres connection pool ─────────────────────────────────────
# Opening a new psycopg2 connection per cache query costs ~100-200ms on Neon
# (TLS handshake + auth). With 12 parallel workers Γ— 2 queries/fund Γ— 478 funds
# that is ~1000 round-trips. Fix: one persistent connection per thread, reused
# across all queries that thread handles.
import threading as _threading
_tls = _threading.local()
def _get_pg_conn():
"""
Return a thread-local persistent Neon connection, creating one if needed.
Falls back to a fresh connection if the cached one has gone away.
"""
import psycopg2 # type: ignore
conn = getattr(_tls, "pg_conn", None)
if conn is not None:
try:
# Lightweight liveness check β€” closed flag or dead socket
if not conn.closed:
conn.cursor().execute("SELECT 1")
return conn
except Exception:
pass # Connection is dead β€” fall through to re-create
conn = psycopg2.connect(
_DATABASE_URL,
connect_timeout=10,
keepalives=1,
keepalives_idle=30,
keepalives_interval=10,
keepalives_count=3,
)
_tls.pg_conn = conn
return conn
def _init_cache_db() -> None:
"""Create cache table if it doesn't exist (idempotent, works for both backends)."""
if _USE_POSTGRES:
try:
conn = _get_pg_conn()
with conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS nav_cache (
key TEXT PRIMARY KEY,
data TEXT NOT NULL,
ts DOUBLE PRECISION NOT NULL
)
""")
conn.close()
except Exception as e:
print(f"[cache] Postgres init warning: {e}")
else:
with _DB_LOCK, sqlite3.connect(_CACHE_DB_PATH) as db:
db.execute("""
CREATE TABLE IF NOT EXISTS nav_cache (
key TEXT PRIMARY KEY,
data TEXT NOT NULL,
ts REAL NOT NULL
)
""")
db.commit()
def _cache_get(key: str, ttl: float) -> pd.DataFrame | None:
"""Return cached DataFrame if fresh, else None. Works for SQLite and Neon."""
# Check bulk preload first β€” zero network cost
if key in _PRELOAD_CACHE:
return _PRELOAD_CACHE[key]
try:
if _USE_POSTGRES:
conn = _get_pg_conn()
with conn.cursor() as cur:
cur.execute(
"SELECT data, ts FROM nav_cache WHERE key = %s", (key,)
)
row = cur.fetchone()
# Do NOT close β€” thread-local connection is reused
else:
with sqlite3.connect(_CACHE_DB_PATH) as db:
row = db.execute(
"SELECT data, ts FROM nav_cache WHERE key = ?", (key,)
).fetchone()
if row and (time.time() - row[1]) < ttl:
import io as _sio
return pd.read_json(_sio.StringIO(row[0]), orient="split")
except Exception:
pass
return None
def _cache_set(key: str, df: pd.DataFrame) -> None:
"""Persist DataFrame. Works for SQLite and Neon. Write failures are non-fatal."""
try:
serialised = df.to_json(orient="split", date_format="iso")
if _USE_POSTGRES:
conn = _get_pg_conn()
with conn.cursor() as cur:
cur.execute("""
INSERT INTO nav_cache (key, data, ts)
VALUES (%s, %s, %s)
ON CONFLICT (key) DO UPDATE
SET data = EXCLUDED.data,
ts = EXCLUDED.ts
""", (key, serialised, time.time()))
conn.commit()
# Do NOT close β€” thread-local connection is reused
else:
with _DB_LOCK, sqlite3.connect(_CACHE_DB_PATH) as db:
db.execute(
"INSERT OR REPLACE INTO nav_cache (key, data, ts) VALUES (?, ?, ?)",
(key, serialised, time.time()),
)
db.commit()
except Exception:
pass # cache write failure is non-fatal
# Initialise at import time (fast, idempotent).
try:
_init_cache_db()
except Exception:
pass
# ── In-process cache (lives for the duration of one run) ─────────────────────
@dataclass
class NavEngineCache:
"""
Two-level cache:
L1 β€” in-process dict (zero latency within a run, thread-safe via dict GIL)
L2 β€” SQLite on disk (persists across runs; TTL-based)
"""
nav_history: dict[str, pd.DataFrame | None] = field(default_factory=dict)
benchmark_history: dict[str, pd.DataFrame | None] = field(default_factory=dict)
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def _normalize_benchmark_name(name: str) -> str:
return " ".join((name or "").lower().replace("-", " ").replace("_", " ").split())
def resolve_benchmark_ticker(benchmark_type: str) -> str:
# Guard against corrupt scraper artifacts (Java object toString strings)
raw = (benchmark_type or "").strip()
if raw.startswith("com.") or "@" in raw:
return "^NSEI" # fallback for corrupt benchmark strings
normalized = _normalize_benchmark_name(raw)
if not normalized:
return "^NSEI"
if normalized in BENCHMARK_MAP:
ticker = BENCHMARK_MAP[normalized]
else:
ticker = "^NSEI"
for key, t in BENCHMARK_MAP.items():
if key in normalized:
ticker = t
break
# Second-level fallback: some NSE index tickers resolve from BENCHMARK_MAP
# but are not available on yfinance (delisted/unavailable symbols).
# Map them to the nearest available proxy so _prewarm_benchmarks doesn't fail.
_YF_UNAVAILABLE: dict[str, str] = {
"NIFTY_CPSE.NS": "^NSEI", # PSU index β†’ broad market
"NIFTYSMLCAP250.NS": "^CNXSC", # Smallcap 250 β†’ Smallcap 100
"NIFTY_IPO.NS": "^NSEI", # IPO index β†’ no yf equivalent
"NIFTY200_MOMENTUM_30.NS": "^NSEI", # momentum factor β†’ broad market
"NIFTY_HOUSING.NS": "^NSEI",
"NIFTY_LARGEMIDCAP_250.NS": "^NSEI",
"NIFTY_INDIA_CONSUMPTION.NS": "^NSEI",
"NIFTY_HEALTHCARE.NS": "^NSEI",
"NIFTY100_ESG.NS": "^NSEI",
"NIFTY100_LOWVOL30.NS": "^NSEI",
"NIFTY_MNC.NS": "^NSEI",
"NIFTY_INDIA_MANUFACTURING.NS": "^NSEI",
"NIFTY500_MULTICAP_50_25_25.NS": "^NSEI",
}
return _YF_UNAVAILABLE.get(ticker, ticker)
def _safe_float(value: Any) -> float | None:
if value is None:
return None
text = str(value).strip().replace(",", "")
if text in {"", "-", "β€”", "N/A", "N/A*", "na", "nan", "None"}:
return None
try:
return float(text)
except ValueError:
return None
def _request_json_with_retries(
url: str, max_retries: int = 3, timeout: int = 20
) -> dict[str, Any] | None:
for attempt in range(1, max_retries + 1):
try:
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
return resp.json()
except Exception:
if attempt == max_retries:
return None
return None
# ── Bulk preload cache ────────────────────────────────────────────────────────
# Populated once before parallel workers start. _cache_get checks here first,
# avoiding per-fund Neon round-trips on warm cache runs.
_PRELOAD_CACHE: dict[str, "pd.DataFrame"] = {}
def _bulk_preload_cache(scheme_codes: list[str], benchmark_tickers: list[str]) -> None:
"""
Load ALL nav + benchmark entries from Neon in 2 SQL queries.
Call once before ThreadPoolExecutor starts β€” cuts Neon queries from ~766 to 2.
SQLite is local/fast so skipped.
"""
import io as _sio
global _PRELOAD_CACHE
if not _USE_POSTGRES:
return
nav_keys = [f"nav:{c}" for c in scheme_codes if c]
bench_keys = [f"bench:{t}" for t in benchmark_tickers if t]
all_keys = nav_keys + bench_keys
if not all_keys:
return
try:
conn = _get_pg_conn()
now = time.time()
placeholders = ",".join(["%s"] * len(all_keys))
with conn.cursor() as cur:
cur.execute(
f"SELECT key, data, ts FROM nav_cache WHERE key IN ({placeholders})",
all_keys,
)
rows_fetched = cur.fetchall()
loaded_nav = loaded_bench = 0
for key, data, ts in rows_fetched:
ttl = _NAV_TTL_SECS if key.startswith("nav:") else _BENCH_TTL_SECS
if (now - ts) >= ttl:
continue
try:
df = pd.read_json(_sio.StringIO(data), orient="split")
# Normalise dates β€” JSON round-trip strips tz info
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None).dt.normalize()
except Exception:
continue
_PRELOAD_CACHE[key] = df
if key.startswith("nav:"):
loaded_nav += 1
else:
loaded_bench += 1
print(f"[cache] Bulk preload: {loaded_nav} NAV + {loaded_bench} benchmark entries from Neon")
except Exception as e:
print(f"[cache] Bulk preload failed (falling back to per-query): {e}")
def _prewarm_benchmarks(benchmark_tickers: list[str]) -> None:
"""
Download all unique benchmark tickers in parallel BEFORE workers start.
Complexity: O(B) time where B = unique benchmarks (68 in production).
Each already-cached ticker hits _PRELOAD_CACHE in O(1) β€” zero network.
Each cold ticker downloads once via yfinance and is stored in Neon + _PRELOAD_CACHE.
Workers then get O(1) cache hits for all benchmark lookups.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
unique = list(dict.fromkeys(t for t in benchmark_tickers if t)) # preserve order, dedup
if not unique:
return
# Filter: only fetch tickers not already in preload cache
cold = [t for t in unique if f"bench:{t}" not in _PRELOAD_CACHE]
warm = len(unique) - len(cold)
if warm:
print(f"[bench-prewarm] {warm}/{len(unique)} already in cache")
if not cold:
return
print(f"[bench-prewarm] Downloading {len(cold)} cold benchmark tickers in parallel…")
def _fetch_one(ticker: str) -> tuple[str, bool]:
df = _fetch_benchmark_history(ticker) # handles cache_set + _PRELOAD_CACHE population
return ticker, df is not None
ok = failed = 0
with ThreadPoolExecutor(max_workers=min(len(cold), 20)) as ex:
futures = {ex.submit(_fetch_one, t): t for t in cold}
for fut in as_completed(futures):
ticker, success = fut.result()
if success:
ok += 1
else:
failed += 1
print(f" [bench-prewarm] WARN: could not fetch {ticker}")
print(f"[bench-prewarm] Done: {ok} fetched, {failed} failed, {warm} from cache")
def _fetch_nav_history(scheme_code: str) -> pd.DataFrame | None:
"""Fetch from Neon cache first, then mfapi."""
cache_key = f"nav:{scheme_code}"
cached = _cache_get(cache_key, _NAV_TTL_SECS)
if cached is not None:
return cached
url = f"https://api.mfapi.in/mf/{scheme_code}"
payload = _request_json_with_retries(url)
if not payload or "data" not in payload:
return None
try:
nav_df = pd.DataFrame(payload["data"])
if nav_df.empty or "date" not in nav_df or "nav" not in nav_df:
return None
nav_df["date"] = pd.to_datetime(nav_df["date"], dayfirst=True, errors="coerce").dt.tz_localize(None).dt.normalize()
nav_df["nav"] = pd.to_numeric(nav_df["nav"], errors="coerce")
nav_df = nav_df.dropna(subset=["date", "nav"]).sort_values("date")
if nav_df.empty:
return None
df = nav_df[["date", "nav"]]
_cache_set(cache_key, df)
return df
except Exception:
return None
def _fetch_benchmark_history(ticker: str) -> pd.DataFrame | None:
"""Fetch from disk cache (L2) first, then yfinance."""
cache_key = f"bench:{ticker}"
cached = _cache_get(cache_key, _BENCH_TTL_SECS)
if cached is not None:
return cached
df = _download_benchmark(ticker)
if df is not None:
_cache_set(cache_key, df)
return df
def _download_benchmark(ticker: str) -> pd.DataFrame | None:
"""
Raw yfinance download (no caching logic here).
Parallel workers hitting yfinance simultaneously can get 401 Invalid Crumb
errors because yfinance refreshes its session cookie lazily. Fix:
- Retry up to 4 times with exponential backoff (0.5s, 1s, 2s)
- Each retry creates a fresh Ticker session, which re-fetches the crumb
- Suppress noisy 'possibly delisted' stderr from yfinance
"""
import contextlib, io as _io
def _suppress_yf_stderr(fn, *args, **kwargs):
"""Run fn suppressing yfinance's noisy stderr warnings."""
with contextlib.redirect_stderr(_io.StringIO()):
return fn(*args, **kwargs)
for attempt in range(4):
if attempt > 0:
time.sleep(0.5 * (2 ** (attempt - 1))) # 0.5s, 1s, 2s
try:
bench = _suppress_yf_stderr(
yf.download,
ticker,
start="2000-01-01",
progress=False,
auto_adjust=False,
threads=False,
)
if bench is None or bench.empty:
continue
if isinstance(bench.columns, pd.MultiIndex):
bench.columns = [str(col[0]) for col in bench.columns]
bench = bench.reset_index()
price_col = "Adj Close" if "Adj Close" in bench.columns else "Close"
if price_col not in bench.columns:
continue
bench = bench[["Date", price_col]].rename(
columns={"Date": "date", price_col: "benchmark"}
)
bench["date"] = pd.to_datetime(bench["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
bench["benchmark"] = pd.to_numeric(bench["benchmark"], errors="coerce")
bench = bench.dropna(subset=["date", "benchmark"]).sort_values("date")
if len(bench) >= 60:
return bench
except Exception:
continue
# Secondary fallback: Ticker().history() uses a separate session/crumb path
for attempt in range(3):
if attempt > 0:
time.sleep(0.5 * attempt)
try:
hist = _suppress_yf_stderr(
yf.Ticker(ticker).history,
period="10y",
auto_adjust=False,
)
if hist is None or hist.empty:
continue
hist = hist.reset_index()
price_col = "Adj Close" if "Adj Close" in hist.columns else "Close"
if price_col not in hist.columns:
continue
hist = hist[["Date", price_col]].rename(
columns={"Date": "date", price_col: "benchmark"}
)
hist["date"] = pd.to_datetime(hist["date"], errors="coerce").dt.tz_localize(None).dt.normalize()
hist["benchmark"] = pd.to_numeric(hist["benchmark"], errors="coerce")
hist = hist.dropna(subset=["date", "benchmark"]).sort_values("date")
if len(hist) >= 60:
return hist
except Exception:
continue
return None
def _trailing_3y_window(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
max_date = df["date"].max()
if pd.isna(max_date):
return pd.DataFrame(columns=df.columns)
cutoff = max_date - pd.DateOffset(years=TRAILING_YEARS)
return df[df["date"] >= cutoff].copy()
def _nav_history_is_stale(nav_df: pd.DataFrame) -> bool:
if nav_df is None or nav_df.empty or "date" not in nav_df.columns:
return True
latest = pd.to_datetime(nav_df["date"], errors="coerce").max()
if pd.isna(latest):
return True
latest = pd.Timestamp(latest).tz_localize(None).normalize()
cutoff = pd.Timestamp.now().tz_localize(None).normalize() - pd.Timedelta(days=NAV_STALE_DAYS)
return latest < cutoff
def _compute_nav_only_metrics(
nav_df: pd.DataFrame,
needed_metrics: list[str],
benchmark_reason: str,
) -> tuple[dict[str, float | None], dict[str, str]]:
needed = [m for m in needed_metrics if m in OUTPUT_METRICS]
out = {m: None for m in needed}
skip: dict[str, str] = {}
if not needed:
return out, skip
for m in needed:
if m in BENCHMARK_DEPENDENT_METRICS:
skip[m] = benchmark_reason
window = _trailing_3y_window(nav_df[["date", "nav"]].copy())
if window.empty:
for m in needed:
if m in NAV_ONLY_METRICS:
skip[m] = "less than 3 years of NAV history"
return out, skip
returns = window["nav"].pct_change().dropna()
if len(returns) < 30:
for m in needed:
if m in NAV_ONLY_METRICS:
skip[m] = f"fewer than 30 NAV return points ({len(returns)})"
return out, skip
mean_daily = returns.mean()
mean_annual = mean_daily * TRADING_DAYS
vol = returns.std(ddof=1) * np.sqrt(TRADING_DAYS)
if pd.notna(vol):
if "Standard Deviation" in out:
out["Standard Deviation"] = float(vol * 100)
if "Volatility" in out:
out["Volatility"] = float(vol * 100)
if "Mean" in out and pd.notna(mean_annual):
out["Mean"] = float(mean_annual * 100)
if "Sharpe Ratio" in out:
if pd.notna(vol) and vol > 0:
sharpe = (mean_annual - RF_RATE) / vol
out["Sharpe Ratio"] = float(sharpe) if pd.notna(sharpe) else None
if out["Sharpe Ratio"] is None:
skip["Sharpe Ratio"] = "volatility is zero or NaN (NAV-only fallback)"
if "Sortino Ratio" in out:
downside = returns[returns < 0]
if not downside.empty:
downside_std = downside.std(ddof=1) * np.sqrt(TRADING_DAYS)
if pd.notna(downside_std) and downside_std > 0:
sortino = (mean_annual - RF_RATE) / downside_std
out["Sortino Ratio"] = float(sortino) if pd.notna(sortino) else None
elif out.get("Sharpe Ratio") is not None:
out["Sortino Ratio"] = float(out["Sharpe Ratio"])
elif out.get("Sharpe Ratio") is not None:
out["Sortino Ratio"] = float(out["Sharpe Ratio"])
if out["Sortino Ratio"] is None:
skip["Sortino Ratio"] = "no valid downside deviation (NAV-only fallback)"
if "Maximum Drawdown" in out:
cumulative = (1 + returns).cumprod()
peak = cumulative.cummax()
drawdown = (cumulative - peak) / peak
if not drawdown.empty:
max_drawdown = drawdown.min()
out["Maximum Drawdown"] = (
float(max_drawdown * 100) if pd.notna(max_drawdown) else None
)
if out["Maximum Drawdown"] is None:
skip["Maximum Drawdown"] = "unable to compute NAV-only drawdown"
return out, skip
def _compute_metrics(
returns_df: pd.DataFrame,
) -> tuple[dict[str, float | None], dict[str, str]]:
skip: dict[str, str] = {}
if returns_df.empty:
for k in OUTPUT_METRICS:
skip[k] = "empty returns dataframe after merge/window"
return {k: None for k in OUTPUT_METRICS}, skip
fund = returns_df["fund_return"]
bench = returns_df["benchmark_return"]
result: dict[str, float | None] = {k: None for k in OUTPUT_METRICS}
if len(fund) < 30:
for k in OUTPUT_METRICS:
skip[k] = f"fewer than 30 data points ({len(fund)}) after join"
return result, skip
mean_daily = fund.mean()
bench_mean_daily = bench.mean()
mean_annual = mean_daily * TRADING_DAYS
bench_annual = bench_mean_daily * TRADING_DAYS
vol = fund.std(ddof=1) * np.sqrt(TRADING_DAYS)
if pd.notna(vol):
result["Standard Deviation"] = float(vol * 100)
result["Volatility"] = float(vol * 100)
result["Mean"] = float(mean_annual * 100) if pd.notna(mean_annual) else None
bench_var = bench.var(ddof=1)
beta = None
if pd.notna(bench_var) and bench_var and bench_var > 0:
cov = np.cov(fund, bench)[0, 1]
beta = cov / bench_var
result["Beta"] = float(beta) if beta is not None and pd.notna(beta) else None
if result["Beta"] is None:
skip["Beta"] = (
"benchmark variance is zero or NaN"
if not (pd.notna(bench_var) and bench_var and bench_var > 0)
else "beta computation returned NaN"
)
if beta is not None and pd.notna(mean_annual):
alpha = mean_annual - (RF_RATE + beta * (bench_annual - RF_RATE))
result["Alpha"] = float(alpha * 100) if pd.notna(alpha) else None
if result["Alpha"] is None:
skip["Alpha"] = (
"Beta is None β€” Alpha requires Beta"
if result["Beta"] is None
else "Alpha computation returned NaN"
)
if vol and vol > 0:
sharpe = (mean_annual - RF_RATE) / vol
result["Sharpe Ratio"] = float(sharpe) if pd.notna(sharpe) else None
if result["Sharpe Ratio"] is None:
skip["Sharpe Ratio"] = "volatility is zero or NaN"
downside = fund[fund < 0]
if not downside.empty:
downside_std = downside.std(ddof=1) * np.sqrt(TRADING_DAYS)
if pd.notna(downside_std) and downside_std > 0:
sortino = (mean_annual - RF_RATE) / downside_std
result["Sortino Ratio"] = float(sortino) if pd.notna(sortino) else None
elif result["Sharpe Ratio"] is not None:
result["Sortino Ratio"] = float(result["Sharpe Ratio"])
else:
skip["Sortino Ratio"] = "downside std dev is zero and Sharpe fallback unavailable"
elif result["Sharpe Ratio"] is not None:
result["Sortino Ratio"] = float(result["Sharpe Ratio"])
else:
skip["Sortino Ratio"] = (
"no negative daily returns in 3Y window and Sharpe fallback unavailable"
)
cumulative = (1 + fund).cumprod()
peak = cumulative.cummax()
drawdown = (cumulative - peak) / peak
if not drawdown.empty:
max_drawdown = drawdown.min()
result["Maximum Drawdown"] = (
float(max_drawdown * 100) if pd.notna(max_drawdown) else None
)
corr = fund.corr(bench)
if pd.notna(corr):
result["R-Squared"] = float(corr ** 2)
else:
skip["R-Squared"] = "fund/benchmark correlation is NaN"
active = fund - bench
tracking_error = active.std(ddof=1) * np.sqrt(TRADING_DAYS)
if pd.notna(tracking_error) and tracking_error > 0:
info_ratio = (mean_annual - bench_annual) / tracking_error
result["Information Ratio"] = (
float(info_ratio) if pd.notna(info_ratio) else None
)
else:
skip["Information Ratio"] = (
"tracking error is zero β€” fund mirrors benchmark"
if (pd.notna(tracking_error) and tracking_error == 0)
else "tracking error is NaN"
)
up = returns_df[returns_df["benchmark_return"] > 0]
if not up.empty:
up_bench = up["benchmark_return"].mean()
if pd.notna(up_bench) and up_bench != 0:
up_capture = (up["fund_return"].mean() / up_bench) * 100
result["Up Market Capture\nRatio"] = (
float(up_capture) if pd.notna(up_capture) else None
)
else:
skip["Up Market Capture\nRatio"] = "benchmark mean on up-days is zero or NaN"
else:
skip["Up Market Capture\nRatio"] = "no benchmark up-days in 3Y window"
down = returns_df[returns_df["benchmark_return"] < 0]
if not down.empty:
down_bench = down["benchmark_return"].mean()
if pd.notna(down_bench) and down_bench != 0:
down_capture = (down["fund_return"].mean() / down_bench) * 100
result["Down Market Capture\nRatio"] = (
float(down_capture) if pd.notna(down_capture) else None
)
else:
skip["Down Market Capture\nRatio"] = "benchmark mean on down-days is zero or NaN"
else:
skip["Down Market Capture\nRatio"] = "no benchmark down-days in 3Y window"
return result, skip
def compute_nav_metrics_for_scheme(
*,
scheme_code: str,
benchmark_type: str,
needed_metrics: list[str],
cache: NavEngineCache,
) -> tuple[dict[str, float | None], dict[str, str]]:
"""
Compute trailing-3Y risk metrics for a scheme.
Thread-safe: uses NavEngineCache._lock to serialise L1 dict writes so
concurrent ThreadPoolExecutor workers don't race on the same key.
"""
needed = [m for m in needed_metrics if m in OUTPUT_METRICS]
if not needed:
return {}, {}
code = str(scheme_code or "").strip()
if not code:
reason = "no scheme code β€” category header or unresolved scheme"
return {m: None for m in needed}, {m: reason for m in needed}
# ── NAV history (L1 check then L2 fetch) ──────────────────────────────
with cache._lock:
if code not in cache.nav_history:
cache.nav_history[code] = None # sentinel prevents duplicate fetches
nav_df = cache.nav_history.get(code)
if nav_df is None and cache.nav_history.get(code) is None:
fetched = _fetch_nav_history(code)
with cache._lock:
cache.nav_history[code] = fetched
nav_df = fetched
elif nav_df is None:
nav_df = _fetch_nav_history(code)
with cache._lock:
cache.nav_history[code] = nav_df
if nav_df is None or nav_df.empty:
reason = f"MFAPI returned no NAV history for scheme code {code}"
return {m: None for m in needed}, {m: reason for m in needed}
if _nav_history_is_stale(nav_df):
latest = pd.to_datetime(nav_df["date"], errors="coerce").max()
latest_str = (
pd.Timestamp(latest).tz_localize(None).normalize().strftime("%Y-%m-%d")
if pd.notna(latest) else "unknown"
)
reason = f"NAV history is stale for scheme code {code} (latest NAV {latest_str})"
return {m: None for m in needed}, {m: reason for m in needed}
# ── Benchmark history (L1 check then L2 fetch) ────────────────────────
ticker = resolve_benchmark_ticker(benchmark_type)
def _ensure_benchmark(t: str) -> pd.DataFrame | None:
with cache._lock:
if t not in cache.benchmark_history:
cache.benchmark_history[t] = None
bench = cache.benchmark_history.get(t)
if bench is None:
fetched_b = _fetch_benchmark_history(t)
with cache._lock:
cache.benchmark_history[t] = fetched_b
return fetched_b
return bench
bench_df = _ensure_benchmark(ticker)
if (bench_df is None or bench_df.empty or len(bench_df) < 60) and ticker != "^NSEI":
bench_df = _ensure_benchmark("^NSEI")
if bench_df is None or bench_df.empty:
reason = f"benchmark history unavailable for ticker={ticker} and NIFTY 50 fallback also failed"
return _compute_nav_only_metrics(nav_df, needed, reason)
# ── Merge + compute ───────────────────────────────────────────────────
# Strip tz from both sides β€” yfinance returns UTC-aware, JSON cache is naive
nav_df = nav_df.copy()
bench_df = bench_df.copy()
nav_df["date"] = pd.to_datetime(nav_df["date"]).dt.tz_localize(None).dt.normalize()
bench_df["date"] = pd.to_datetime(bench_df["date"]).dt.tz_localize(None).dt.normalize()
# Debt funds (Liquid, Overnight, Ultra Short etc.) publish NAV every calendar
# day including weekends/holidays, while equity benchmarks only publish on
# trading days. A naive inner-join on date yields almost no matching rows
# (<30) causing all metrics to return None.
# Fix: forward-fill NAV to the benchmark's trading-day calendar so the merge
# always produces a full 3Y of matched rows regardless of fund type.
bench_dates = bench_df[["date"]].drop_duplicates().sort_values("date")
nav_reindexed = (
nav_df.set_index("date")
.reindex(bench_dates["date"])
.ffill() # carry last known NAV forward
.dropna()
.reset_index()
.rename(columns={"index": "date"})
)
merged = pd.merge(nav_reindexed, bench_df, on="date", how="inner")
if merged.empty:
reason = f"no overlapping dates between NAV (scheme={code}) and benchmark (ticker={ticker})"
return _compute_nav_only_metrics(nav_df, needed, reason)
merged = _trailing_3y_window(merged)
if merged.empty:
reason = f"less than 3 years of overlapping data for scheme={code}"
return {m: None for m in needed}, {m: reason for m in needed}
merged["fund_return"] = merged["nav"].pct_change()
merged["benchmark_return"] = merged["benchmark"].pct_change()
merged = merged.dropna(subset=["fund_return", "benchmark_return"]).copy()
if merged.empty:
reason = "all rows dropped after computing benchmark-joined returns"
return _compute_nav_only_metrics(nav_df, needed, reason)
all_metrics, all_skip = _compute_metrics(merged)
metrics = {m: all_metrics.get(m) for m in needed}
skip_reasons = {
m: all_skip[m]
for m in needed
if m in all_skip and metrics.get(m) is None
}
# Defensive top-up for NAV-only metrics
if any(m in NAV_ONLY_METRICS and metrics.get(m) is None for m in needed):
nav_only, nav_only_skip = _compute_nav_only_metrics(
nav_df, needed, "benchmark-dependent metric unavailable"
)
for m in needed:
if (
m in NAV_ONLY_METRICS
and metrics.get(m) is None
and nav_only.get(m) is not None
):
metrics[m] = nav_only[m]
skip_reasons.pop(m, None)
elif (
metrics.get(m) is None
and m not in skip_reasons
and m in nav_only_skip
):
skip_reasons[m] = nav_only_skip[m]
return metrics, skip_reasons