import logging from fastapi import FastAPI from datasets import load_dataset import pandas as pd import numpy as np # ------------------------------------------------- # Logging configuration # ------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" ) logger = logging.getLogger(__name__) app = FastAPI() # ------------------------------------------------- # Lazy dataset cache # ------------------------------------------------- DF = None def load_data_once(): """ Load the dataset only once using streaming mode. Hugging Face Spaces cannot download large datasets at startup, so streaming=True avoids timeouts and memory issues. """ global DF if DF is None: logger.info("Loading dataset (streaming mode): kurry/sp500_earnings_transcripts ...") ds = load_dataset( "kurry/sp500_earnings_transcripts", split="train", streaming=True ) # Convert streaming dataset → pandas DataFrame # Limit rows to avoid memory overload rows = [] for i, item in enumerate(ds): rows.append(item) if i > 5000: # Safety limit for Spaces break DF = pd.DataFrame(rows) logger.info(f"Loaded {len(DF)} rows into DataFrame") return DF # ------------------------------------------------- # Utility: convert NumPy → Python # ------------------------------------------------- def to_python(obj): if isinstance(obj, (np.integer, np.int64)): return int(obj) if isinstance(obj, (np.floating, np.float64)): return float(obj) if isinstance(obj, (np.bool_)): return bool(obj) if isinstance(obj, pd.Timestamp): return obj.isoformat() return obj def clean_dict(d): return {k: to_python(v) for k, v in d.items()} # ------------------------------------------------- # Routes # ------------------------------------------------- @app.on_event("startup") def startup_event(): logger.info("🚀 Earnings Transcript API starting up") @app.get("/") def root(): logger.info("Root endpoint called") return {"message": "Earnings Transcript API is running"} @app.get("/tickers") def list_tickers(): logger.info("Listing all tickers") df = load_data_once() if "symbol" not in df.columns: return {"error": "Dataset does not contain 'symbol' column"} tickers = sorted(df["symbol"].dropna().unique().tolist()) logger.info(f"Returned {len(tickers)} tickers") return {"tickers": tickers} @app.get("/transcript/{symbol}") def get_transcript(symbol: str): logger.info(f"Transcript request received for symbol: {symbol}") df = load_data_once() symbol = symbol.upper() if "symbol" not in df.columns: return {"error": "Dataset missing 'symbol' column"} rows = df[df["symbol"] == symbol] if rows.empty: logger.warning(f"No transcripts found for symbol: {symbol}") return {"error": f"No transcripts found for symbol {symbol}"} row = rows.iloc[0] base_info = clean_dict(row.to_dict()) # Extract structured content (correct column name) segments = row.get("structured_content", None) if isinstance(segments, list): logger.info(f"Cleaning {len(segments)} segments for {symbol}") cleaned_segments = [ clean_dict(seg) for seg in segments if isinstance(seg, dict) ] base_info["segments"] = cleaned_segments else: logger.info(f"No structured_content found for {symbol}") base_info["segments"] = [] logger.info(f"Returning transcript for {symbol}") return base_info