| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from io import StringIO |
| from typing import Any |
|
|
| import backtrader as bt |
| import pandas as pd |
| import yfinance as yf |
| from backtesting import Backtest, Strategy |
|
|
|
|
| @dataclass |
| class BacktestResult: |
| engine: str |
| metrics: dict[str, Any] |
| equity_curve: pd.DataFrame |
| trades: pd.DataFrame |
| input_rows: int |
|
|
|
|
| def _normalize_ohlcv_columns(df: pd.DataFrame) -> pd.DataFrame: |
| if isinstance(df.columns, pd.MultiIndex): |
| df.columns = df.columns.get_level_values(0) |
|
|
| rename_map = {} |
| for col in df.columns: |
| lower = str(col).lower() |
| if lower == "open": |
| rename_map[col] = "Open" |
| elif lower == "high": |
| rename_map[col] = "High" |
| elif lower == "low": |
| rename_map[col] = "Low" |
| elif lower == "close": |
| rename_map[col] = "Close" |
| elif lower == "volume": |
| rename_map[col] = "Volume" |
| normalized = df.rename(columns=rename_map).copy() |
| for required in ["Open", "High", "Low", "Close"]: |
| if required not in normalized.columns: |
| raise ValueError(f"Missing required OHLC column: {required}") |
| if "Volume" not in normalized.columns: |
| normalized["Volume"] = 0 |
| if not isinstance(normalized.index, pd.DatetimeIndex): |
| normalized.index = pd.to_datetime(normalized.index, utc=True, errors="coerce") |
| if normalized.index.tz is not None: |
| normalized.index = normalized.index.tz_convert("UTC").tz_localize(None) |
| normalized = normalized.dropna(subset=["Open", "High", "Low", "Close"]) |
| normalized = normalized.sort_index() |
| return normalized |
|
|
|
|
| def load_price_data_from_yfinance( |
| symbol: str, |
| start: str, |
| end: str, |
| interval: str = "1d", |
| ) -> pd.DataFrame: |
| df = yf.download(symbol, start=start, end=end, interval=interval, auto_adjust=False) |
| if df is None or df.empty: |
| raise ValueError(f"No market data returned for symbol: {symbol}") |
| return _normalize_ohlcv_columns(df) |
|
|
|
|
| def load_price_data_from_csv_text(csv_text: str) -> pd.DataFrame: |
| df = pd.read_csv(StringIO(csv_text)) |
| lowered = {str(c).lower(): c for c in df.columns} |
| if "date" in lowered: |
| date_col = lowered["date"] |
| df[date_col] = pd.to_datetime(df[date_col], utc=True, errors="coerce") |
| df = df.set_index(date_col) |
| elif "datetime" in lowered: |
| dt_col = lowered["datetime"] |
| df[dt_col] = pd.to_datetime(df[dt_col], utc=True, errors="coerce") |
| df = df.set_index(dt_col) |
| return _normalize_ohlcv_columns(df) |
|
|
|
|
| class SmaCrossBacktestingPy(Strategy): |
| fast_period = 10 |
| slow_period = 30 |
|
|
| def init(self) -> None: |
| close = pd.Series(self.data.Close) |
| self.fast = self.I(lambda x: pd.Series(x).rolling(self.fast_period).mean(), close) |
| self.slow = self.I(lambda x: pd.Series(x).rolling(self.slow_period).mean(), close) |
|
|
| def next(self) -> None: |
| if self.fast[-1] > self.slow[-1] and not self.position: |
| self.buy() |
| elif self.fast[-1] < self.slow[-1] and self.position: |
| self.position.close() |
|
|
|
|
| class SmaCrossBacktrader(bt.Strategy): |
| params = (("fast_period", 10), ("slow_period", 30)) |
|
|
| def __init__(self) -> None: |
| self.fast = bt.indicators.SimpleMovingAverage( |
| self.data.close, period=self.params.fast_period |
| ) |
| self.slow = bt.indicators.SimpleMovingAverage( |
| self.data.close, period=self.params.slow_period |
| ) |
| self.crossover = bt.indicators.CrossOver(self.fast, self.slow) |
| self.equity_points: list[tuple[pd.Timestamp, float]] = [] |
|
|
| def next(self) -> None: |
| if self.crossover > 0 and not self.position: |
| self.buy() |
| elif self.crossover < 0 and self.position: |
| self.close() |
| dt = self.data.datetime.datetime(0) |
| self.equity_points.append((pd.Timestamp(dt, tz="UTC"), self.broker.getvalue())) |
|
|
|
|
| def run_backtesting_py( |
| data: pd.DataFrame, |
| fast_period: int, |
| slow_period: int, |
| initial_cash: float, |
| commission: float, |
| ) -> BacktestResult: |
| strategy_cls = type( |
| "ConfiguredSmaCrossBacktestingPy", |
| (SmaCrossBacktestingPy,), |
| {"fast_period": fast_period, "slow_period": slow_period}, |
| ) |
| bt_obj = Backtest(data, strategy_cls, cash=initial_cash, commission=commission) |
| stats = bt_obj.run() |
| equity_curve = stats.get("_equity_curve", pd.DataFrame()) |
| trades = stats.get("_trades", pd.DataFrame()) |
| metrics = { |
| "Return [%]": float(stats.get("Return [%]", 0.0)), |
| "Buy & Hold Return [%]": float(stats.get("Buy & Hold Return [%]", 0.0)), |
| "Sharpe Ratio": float(stats.get("Sharpe Ratio", 0.0) or 0.0), |
| "Max Drawdown [%]": float(stats.get("Max. Drawdown [%]", 0.0)), |
| "# Trades": int(stats.get("# Trades", 0)), |
| "Win Rate [%]": float(stats.get("Win Rate [%]", 0.0)), |
| } |
| return BacktestResult( |
| engine="backtesting.py", |
| metrics=metrics, |
| equity_curve=equity_curve, |
| trades=trades, |
| input_rows=len(data), |
| ) |
|
|
|
|
| def run_backtrader( |
| data: pd.DataFrame, |
| fast_period: int, |
| slow_period: int, |
| initial_cash: float, |
| commission: float, |
| ) -> BacktestResult: |
| cerebro = bt.Cerebro() |
| configured = type( |
| "ConfiguredSmaCrossBacktrader", |
| (SmaCrossBacktrader,), |
| {"params": (("fast_period", fast_period), ("slow_period", slow_period))}, |
| ) |
| cerebro.addstrategy(configured) |
| feed = bt.feeds.PandasData(dataname=data) |
| cerebro.adddata(feed) |
| cerebro.broker.setcash(initial_cash) |
| cerebro.broker.setcommission(commission=commission) |
| cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown") |
| cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name="sharpe", timeframe=bt.TimeFrame.Days) |
| cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades") |
|
|
| starting_value = cerebro.broker.getvalue() |
| run_result = cerebro.run() |
| ending_value = cerebro.broker.getvalue() |
| strategy = run_result[0] |
|
|
| dd = strategy.analyzers.drawdown.get_analysis() |
| sharpe = strategy.analyzers.sharpe.get_analysis() |
| ta = strategy.analyzers.trades.get_analysis() |
|
|
| total_closed = int(getattr(ta.total, "closed", 0) if hasattr(ta, "total") else 0) |
| won_total = int(getattr(ta.won, "total", 0) if hasattr(ta, "won") else 0) |
| win_rate = (won_total / total_closed * 100) if total_closed else 0.0 |
| ret_pct = ((ending_value - starting_value) / starting_value * 100) if starting_value else 0.0 |
|
|
| equity_curve = pd.DataFrame(strategy.equity_points, columns=["Time", "Equity"]) |
| if not equity_curve.empty: |
| equity_curve = equity_curve.set_index("Time") |
|
|
| metrics = { |
| "Return [%]": float(ret_pct), |
| "Sharpe Ratio": float(sharpe.get("sharperatio", 0.0) or 0.0), |
| "Max Drawdown [%]": float(getattr(dd.max, "drawdown", 0.0) if hasattr(dd, "max") else 0.0), |
| "# Trades": total_closed, |
| "Win Rate [%]": float(win_rate), |
| "Final Equity": float(ending_value), |
| } |
| return BacktestResult( |
| engine="backtrader", |
| metrics=metrics, |
| equity_curve=equity_curve, |
| trades=pd.DataFrame(), |
| input_rows=len(data), |
| ) |
|
|
|
|
| def run_backtest( |
| engine: str, |
| data: pd.DataFrame, |
| fast_period: int, |
| slow_period: int, |
| initial_cash: float, |
| commission: float, |
| ) -> BacktestResult: |
| if fast_period >= slow_period: |
| raise ValueError("fast_period must be smaller than slow_period.") |
| if engine == "backtesting.py": |
| return run_backtesting_py(data, fast_period, slow_period, initial_cash, commission) |
| if engine == "backtrader": |
| return run_backtrader(data, fast_period, slow_period, initial_cash, commission) |
| raise ValueError(f"Unsupported engine: {engine}") |
|
|