""" Index Fund Ingest — capture index funds the same way as raw CSV (mftool/AMFI). Two sources: - mftool (default): Same as raw CSV under PS — AMFI category 38 (Index Funds/ETFs). Returns only the schemes AMFI lists under that category (curated, ~same count as your fund-stats CSV Index Fund section). Output format matches PS: "Index Fund", hyphenated fund names. - mfapi: Search mfapi.in and filter by index; use when you need more schemes. Usage: python -m src.index_fund_ingest [--output index_funds.csv] # default: mftool python -m src.index_fund_ingest --source mfapi [--limit 100] # mfapi search Then: enrich the output CSV, merge into main fund CSV, run data_engine as usual. """ from __future__ import annotations import argparse import csv import re import time from datetime import datetime, timedelta from pathlib import Path import requests # Same AMFI gateway as mftool (get_open_ended_other_scheme_performance) AMFI_FUND_PERFORMANCE_URL = "https://www.amfiindia.com/gateway/pollingsebi/api/amfi/fundperformance" AMFI_CATEGORY_OTHER = 5 AMFI_SUBCATEGORY_INDEX_FUNDS = 38 # "Index Funds/ETFs" MFAPI_LIST = "https://api.mfapi.in/mf" MFAPI_SEARCH = "https://api.mfapi.in/mf/search" MFAPI_NAV = "https://api.mfapi.in/mf/{scheme_code}" SLEEP = 0.3 # polite delay between API calls # CSV headers matching project fund-stats CSV (must match data_engine / csv_enrichment) FUND_CSV_HEADERS = [ "Fund", "Category", "Scheme Code", "Launch Date", "Total Assets (in Cr)", "TER", "Turn over (%)", "CAGR Since Inception", "1 Year CAGR", "1 Year Category CAGR", "1 Year Benchmark CAGR", "3 Years CAGR", "3 Years Category CAGR", "3 Years Benchmark CAGR", "5 Years CAGR", "5 Years Category CAGR", "5 Years Benchmark CAGR", "10 Years CAGR", "10 Years Category CAGR", "10 Years Benchmark CAGR", "Benchmark Type", "NAV", "Alpha", "Beta", "Standard Deviation", "Sharpe Ratio", "Volatility", "Mean", "Sortino Ratio", "Up Market Capture\nRatio", "Down Market Capture\nRatio", "Maximum Drawdown", "R-Squared", "Information Ratio", "P/E Ratio", "P/B Ratio", ] # Raw CSV under PS uses "Index Fund" (no "Equity:" prefix) for this category INDEX_FUND_CATEGORY_PS = "Index Fund" # mfapi scheme_category (from NAV meta) -> our Category label CATEGORY_MAP = { "index fund": "Equity: Index Fund", "index funds": "Equity: Index Fund", "equity scheme - index fund": "Equity: Index Fund", "equity scheme - index funds": "Equity: Index Fund", } def _to_hyphenated(name: str) -> str: """Convert scheme name to hyphenated form like raw CSV under PS (e.g. DSP-Nifty-50-Index-Fund-Regular-Plan-Growth).""" if not name: return "" # Replace spaces and multiple hyphens with single hyphen, strip s = re.sub(r"[\s_]+", "-", name.strip()) return re.sub(r"-+", "-", s).strip("-") def _get_amfi_report_date() -> str: """DD-MMM-YYYY for AMFI API. Use last weekday (API returns empty for weekend dates).""" today = datetime.now().date() d = today for _ in range(7): if d.weekday() < 5: # Mon=0 .. Fri=4 break d -= timedelta(days=1) return d.strftime("%d-%b-%Y") # Scheme name fragments -> Benchmark Type (for nav_metrics_engine) # Order matters: more specific (e.g. Nifty 500) before generic (Nifty 50) BENCHMARK_INFER = [ (r"nifty\s*500|nifty500", "Nifty 500"), (r"nifty\s*200|nifty200", "Nifty 200"), (r"nifty\s*100|nifty100", "Nifty 100"), (r"nifty\s*next\s*50|nifty\s*junior|niftyjr", "Nifty Next 50"), (r"nifty\s*50|nifty50", "Nifty 50"), (r"nifty\s*midcap\s*150|midcap\s*150", "Nifty Midcap 150"), (r"nifty\s*smallcap\s*250|smallcap\s*250", "Nifty Smallcap 250"), (r"sensex|bse\s*sensex", "BSE Sensex"), (r"bse\s*100", "BSE 100"), (r"bse\s*500", "BSE 500"), ] def _normalize_category(meta_category: str | None) -> str: if not meta_category: return "Equity: Index Fund" key = meta_category.strip().lower() for k, v in CATEGORY_MAP.items(): if k in key: return v if "index" in key: return "Equity: Index Fund" return meta_category.strip() def _infer_benchmark(scheme_name: str) -> str: name = (scheme_name or "").lower() for pattern, bench in BENCHMARK_INFER: if re.search(pattern, name): return bench return "Nifty 50" # safe default for index funds def _search_mfapi(query: str, limit: int = 200) -> list[dict]: """Return list of {schemeCode, schemeName} from mfapi search.""" try: resp = requests.get(MFAPI_SEARCH, params={"q": query}, timeout=15) resp.raise_for_status() data = resp.json() if isinstance(data, list): return data[:limit] return [] except Exception as e: print(f" [search] error for '{query}': {e}") return [] def _fetch_nav_meta(scheme_code: str) -> dict | None: """Fetch NAV endpoint and return meta only (scheme_name, scheme_category).""" url = MFAPI_NAV.format(scheme_code=scheme_code) try: resp = requests.get(url, params={"limit": 1}, timeout=15) resp.raise_for_status() data = resp.json() meta = data.get("meta") or {} return { "scheme_name": meta.get("scheme_name") or "", "scheme_category": meta.get("scheme_category") or "", "fund_house": meta.get("fund_house") or "", } except Exception as e: print(f" [nav meta] {scheme_code}: {e}") return None def get_index_funds_via_mftool(verbose: bool = True) -> list[dict]: """ Fetch index funds from the same AMFI API used by mftool (category 5, subCategory 38). Returns the same curated list as would appear in the raw CSV under PS — not 10k schemes. Each item: scheme_name, benchmark_type. Scheme code is left blank; enrichment will resolve. """ out: list[dict] = [] base_date = datetime.now().date() for day_back in range(8): # try up to 8 days back to get a date with data d = base_date - timedelta(days=day_back) if d.weekday() >= 5: # skip weekend continue report_date = d.strftime("%d-%b-%Y") payload = { "maturityType": 1, "category": AMFI_CATEGORY_OTHER, "subCategory": AMFI_SUBCATEGORY_INDEX_FUNDS, "mfid": 0, "reportDate": report_date, } try: resp = requests.post( AMFI_FUND_PERFORMANCE_URL, headers={"User-Agent": "Mozilla/5.0"}, json=payload, timeout=25, ) resp.raise_for_status() data = resp.json() raw_list = data.get("data") or [] for item in raw_list: name = (item.get("schemeName") or "").strip() if not name: continue # Exclude ETFs so we match raw CSV (Index Fund section has open-ended funds only) if " ETF" in name or name.endswith(" ETF"): continue benchmark = (item.get("benchmark") or "").strip() or "Nifty 50" out.append({ "scheme_name": name, "benchmark_type": benchmark, "scheme_code": "", # AMFI API doesn't return code; enrichment resolves "category": INDEX_FUND_CATEGORY_PS, }) if out: if verbose: print(f"[mftool] AMFI category 38 (Index Funds/ETFs): {len(out)} schemes (report date {report_date})") break except Exception as e: if verbose and day_back == 0: print(f"[mftool] AMFI request failed for {report_date}: {e}") continue if not out and verbose: print("[mftool] No schemes returned (tried several weekdays). Check AMFI API.") return out def _is_index_scheme(meta_category: str, scheme_name: str) -> bool: """True if this scheme should be treated as index fund.""" cat = (meta_category or "").lower() name = (scheme_name or "").lower() if "index" in cat: return True if "index" in name and ("fund" in name or "etf" not in name): return True # Explicit index benchmarks in name if re.search(r"nifty\s*50|nifty\s*next\s*50|sensex|nifty\s*100|nifty\s*500", name): return True return False def discover_index_schemes( search_queries: list[str] | None = None, limit_per_query: int = 150, require_index_category: bool = True, verbose: bool = True, ) -> list[dict]: """ Discover index fund schemes via mfapi search and NAV meta. Returns list of dicts: scheme_code, scheme_name, category, benchmark_type. """ if search_queries is None: search_queries = ["Index", "Index Fund", "Nifty 50", "Nifty Next 50", "Sensex"] seen_codes: set[int] = set() out: list[dict] = [] for q in search_queries: if verbose: print(f"[discover] search q={q!r} …") candidates = _search_mfapi(q, limit=limit_per_query) for item in candidates: code = item.get("schemeCode") if code is None or code in seen_codes: continue name = item.get("schemeName") or "" time.sleep(SLEEP) meta = _fetch_nav_meta(str(code)) if not meta: continue cat = meta.get("scheme_category") or "" if require_index_category and not _is_index_scheme(cat, name): continue seen_codes.add(code) category = _normalize_category(cat) benchmark = _infer_benchmark(meta.get("scheme_name") or name) out.append({ "scheme_code": str(code), "scheme_name": meta.get("scheme_name") or name, "category": category, "benchmark_type": benchmark, }) if verbose: print(f" + {meta.get('scheme_name', name)[:55]} | {category} | {benchmark}") return out def write_fund_csv(rows: list[dict], path: str | Path) -> None: """Write CSV with FUND_CSV_HEADERS; each row is a dict with those keys (blank = '').""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8-sig", newline="") as f: w = csv.DictWriter(f, fieldnames=FUND_CSV_HEADERS, restval="", extrasaction="ignore") w.writeheader() w.writerows(rows) def build_csv_rows(schemes: list[dict], use_ps_format: bool = False) -> list[dict]: """Convert discover output to CSV row dicts (metrics blank). use_ps_format: when True, Fund = hyphenated name, Category = 'Index Fund' (matches raw CSV under PS). """ rows = [] for s in schemes: row = {h: "" for h in FUND_CSV_HEADERS} name = s.get("scheme_name") or "" row["Fund"] = _to_hyphenated(name) if use_ps_format else name.replace(",", " ") row["Category"] = s.get("category") or ("Index Fund" if use_ps_format else "Equity: Index Fund") row["Scheme Code"] = s.get("scheme_code") or "" row["Benchmark Type"] = s.get("benchmark_type") or "Nifty 50" rows.append(row) return rows def run_ingest( output_path: str | Path = "index_funds.csv", source: str = "mftool", search_queries: list[str] | None = None, limit_per_query: int = 150, verbose: bool = True, ) -> tuple[list[dict], Path]: """ Discover index schemes, build CSV rows, write CSV. source: "mftool" = same as raw CSV (AMFI category 38, curated list). "mfapi" = search mfapi. Returns (list of scheme dicts, output path). """ if source.lower() == "mftool": schemes = get_index_funds_via_mftool(verbose=verbose) use_ps_format = True else: schemes = discover_index_schemes( search_queries=search_queries, limit_per_query=limit_per_query, require_index_category=True, verbose=verbose, ) use_ps_format = False rows = build_csv_rows(schemes, use_ps_format=use_ps_format) out = Path(output_path) write_fund_csv(rows, out) if verbose: print(f"\n[ingest] Wrote {len(rows)} rows to {out.absolute()} (source={source})") print(" Next: run CSV enrichment on this file, then merge into main fund CSV.") return schemes, out def main() -> None: ap = argparse.ArgumentParser( description="Index fund ingest — same list as raw CSV (mftool/AMFI) or mfapi search" ) ap.add_argument("--output", "-o", default="index_funds.csv", help="Output CSV path") ap.add_argument( "--source", choices=("mftool", "mfapi"), default="mftool", help="mftool = AMFI category 38 (same as raw CSV under PS). mfapi = search (more schemes).", ) ap.add_argument("--search", "-s", action="append", default=None, help="[mfapi only] Search query (repeatable). Default: Index, Index Fund, ...") ap.add_argument("--limit", "-n", type=int, default=150, help="[mfapi only] Max schemes per search query") ap.add_argument("--quiet", "-q", action="store_true", help="Less output") args = ap.parse_args() run_ingest( output_path=args.output, source=args.source, search_queries=args.search, limit_per_query=args.limit, verbose=not args.quiet, ) if __name__ == "__main__": main()