Spaces:
Configuration error
Configuration error
| """ | |
| 回测引擎 | |
| Backtesting Engine (FIN 555, FIN 557, FIN 598) | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple, Union, Callable | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from datetime import datetime, timedelta | |
| import logging | |
| from scipy import stats | |
| from scipy.optimize import minimize_scalar | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| try: | |
| from nolitsa import dimension, lyapunov | |
| NOLITSA_AVAILABLE = True | |
| except ImportError: | |
| NOLITSA_AVAILABLE = False | |
| class BacktestEngine: | |
| """回测引擎主类""" | |
| def __init__(self, initial_capital: float = 1000000, | |
| transaction_cost: float = 0.001, | |
| slippage: float = 0.0005, | |
| max_leverage: float = 1.0): | |
| self.initial_capital = initial_capital | |
| self.transaction_cost = transaction_cost | |
| self.slippage = slippage | |
| self.max_leverage = max_leverage | |
| # 回测历史 | |
| self.portfolio_history = [] | |
| self.trade_history = [] | |
| self.rebalance_history = [] | |
| # 当前状态 | |
| self.current_weights = None | |
| self.current_capital = initial_capital | |
| self.current_positions = None | |
| # 性能统计 | |
| self.performance_stats = {} | |
| self.logger = logging.getLogger(__name__) | |
| def run_backtest(self, prices: pd.DataFrame, | |
| signals: pd.DataFrame, | |
| strategy_func: Callable, | |
| rebalance_frequency: str = 'daily', | |
| risk_management: Dict = None) -> Dict[str, any]: | |
| """ | |
| 运行完整回测 | |
| Args: | |
| prices: 资产价格数据 (index: 时间, columns: 资产) | |
| signals: 交易信号数据 | |
| strategy_func: 策略函数,输入信号输出权重 | |
| rebalance_frequency: 再平衡频率 ('daily', 'weekly', 'monthly') | |
| risk_management: 风险管理参数 | |
| Returns: | |
| Dict: 回测结果 | |
| """ | |
| # 数据对齐 | |
| common_index = prices.index.intersection(signals.index) | |
| prices = prices.loc[common_index] | |
| signals = signals.loc[common_index] | |
| if len(common_index) == 0: | |
| raise ValueError("No common dates between prices and signals") | |
| # 初始化 | |
| self._initialize_backtest(prices.columns) | |
| # 确定再平衡日期 | |
| rebalance_dates = self._get_rebalance_dates(common_index, rebalance_frequency) | |
| # 主回测循环 | |
| portfolio_values = [self.initial_capital] | |
| returns_series = [] | |
| for i, date in enumerate(common_index): | |
| current_prices = prices.loc[date] | |
| current_signals = signals.loc[date] | |
| # 更新投资组合价值 | |
| if i > 0: | |
| previous_prices = prices.loc[common_index[i-1]] | |
| price_returns = (current_prices / previous_prices - 1).fillna(0) | |
| # 计算投资组合收益 | |
| if self.current_weights is not None: | |
| portfolio_return = np.dot(self.current_weights, price_returns) | |
| self.current_capital *= (1 + portfolio_return) | |
| returns_series.append(portfolio_return) | |
| else: | |
| returns_series.append(0.0) | |
| # 检查是否需要再平衡 | |
| if date in rebalance_dates or i == 0: | |
| # 计算目标权重 | |
| target_weights = strategy_func(current_signals, current_prices, date) | |
| # 应用风险管理 | |
| if risk_management: | |
| target_weights = self._apply_risk_management( | |
| target_weights, current_prices, risk_management | |
| ) | |
| # 执行再平衡 | |
| self._execute_rebalance(target_weights, current_prices, date) | |
| # 记录投资组合状态 | |
| portfolio_values.append(self.current_capital) | |
| self._record_portfolio_state(date, current_prices) | |
| # 计算性能指标 | |
| returns_df = pd.Series(returns_series, index=common_index[1:]) | |
| portfolio_values_df = pd.Series(portfolio_values[1:], index=common_index) | |
| performance_stats = self.calculate_performance_metrics( | |
| returns_df, portfolio_values_df | |
| ) | |
| # 汇总结果 | |
| backtest_results = { | |
| 'portfolio_values': portfolio_values_df, | |
| 'returns': returns_df, | |
| 'performance_stats': performance_stats, | |
| 'trade_history': pd.DataFrame(self.trade_history), | |
| 'rebalance_history': pd.DataFrame(self.rebalance_history), | |
| 'final_capital': self.current_capital, | |
| 'total_return': (self.current_capital - self.initial_capital) / self.initial_capital | |
| } | |
| return backtest_results | |
| def _initialize_backtest(self, asset_names: List[str]): | |
| """初始化回测""" | |
| n_assets = len(asset_names) | |
| self.current_weights = np.zeros(n_assets) | |
| self.current_positions = pd.Series(0.0, index=asset_names) | |
| self.asset_names = asset_names | |
| # 清空历史记录 | |
| self.portfolio_history.clear() | |
| self.trade_history.clear() | |
| self.rebalance_history.clear() | |
| def _get_rebalance_dates(self, date_index: pd.DatetimeIndex, frequency: str) -> List[pd.Timestamp]: | |
| """获取再平衡日期""" | |
| if frequency == 'daily': | |
| return date_index.tolist() | |
| elif frequency == 'weekly': | |
| return date_index[date_index.weekday == 0].tolist() # 每周一 | |
| elif frequency == 'monthly': | |
| return date_index.groupby([date_index.year, date_index.month]).first().tolist() | |
| else: | |
| raise ValueError(f"Unsupported rebalance frequency: {frequency}") | |
| def _execute_rebalance(self, target_weights: np.ndarray, | |
| current_prices: pd.Series, date: pd.Timestamp): | |
| """执行再平衡""" | |
| if self.current_weights is None: | |
| self.current_weights = np.zeros(len(target_weights)) | |
| # 计算权重变化 | |
| weight_changes = target_weights - self.current_weights | |
| # 计算交易量 | |
| trade_values = np.abs(weight_changes) * self.current_capital | |
| # 计算交易成本 | |
| transaction_costs = self._calculate_transaction_costs( | |
| weight_changes, current_prices, self.current_capital | |
| ) | |
| # 扣除交易成本 | |
| self.current_capital -= transaction_costs['total'] | |
| # 更新权重 | |
| self.current_weights = target_weights.copy() | |
| # 更新持仓 | |
| self.current_positions = (self.current_weights * self.current_capital / current_prices).fillna(0) | |
| # 记录交易 | |
| for i, asset in enumerate(self.asset_names): | |
| if abs(weight_changes[i]) > 1e-6: | |
| trade_record = { | |
| 'date': date, | |
| 'asset': asset, | |
| 'action': 'buy' if weight_changes[i] > 0 else 'sell', | |
| 'weight_change': weight_changes[i], | |
| 'value': abs(weight_changes[i]) * self.current_capital, | |
| 'price': current_prices[asset], | |
| 'shares': abs(weight_changes[i]) * self.current_capital / current_prices[asset] | |
| } | |
| self.trade_history.append(trade_record) | |
| # 记录再平衡 | |
| rebalance_record = { | |
| 'date': date, | |
| 'previous_weights': self.current_weights - weight_changes, | |
| 'target_weights': target_weights.copy(), | |
| 'actual_weights': self.current_weights.copy(), | |
| 'transaction_costs': transaction_costs, | |
| 'portfolio_value': self.current_capital | |
| } | |
| self.rebalance_history.append(rebalance_record) | |
| def _calculate_transaction_costs(self, weight_changes: np.ndarray, | |
| prices: pd.Series, portfolio_value: float) -> Dict[str, float]: | |
| """计算交易成本""" | |
| trade_values = np.abs(weight_changes) * portfolio_value | |
| total_trade_value = np.sum(trade_values) | |
| costs = {} | |
| # 固定佣金成本 | |
| costs['commission'] = total_trade_value * self.transaction_cost | |
| # 滑点成本 | |
| costs['slippage'] = total_trade_value * self.slippage | |
| # 买卖价差成本(简化) | |
| costs['bid_ask_spread'] = total_trade_value * 0.001 | |
| # 市场冲击成本(与交易量的平方根成正比) | |
| market_impact_rate = 0.0001 * np.sqrt(total_trade_value / portfolio_value) | |
| costs['market_impact'] = total_trade_value * market_impact_rate | |
| costs['total'] = sum(costs.values()) | |
| return costs | |
| def _apply_risk_management(self, target_weights: np.ndarray, | |
| prices: pd.Series, risk_params: Dict) -> np.ndarray: | |
| """应用风险管理规则""" | |
| adjusted_weights = target_weights.copy() | |
| # 单一资产权重限制 | |
| if 'max_weight' in risk_params: | |
| max_weight = risk_params['max_weight'] | |
| adjusted_weights = np.minimum(adjusted_weights, max_weight) | |
| # 最小权重限制 | |
| if 'min_weight' in risk_params: | |
| min_weight = risk_params['min_weight'] | |
| adjusted_weights = np.maximum(adjusted_weights, min_weight) | |
| # 杠杆限制 | |
| if 'max_leverage' in risk_params: | |
| max_leverage = risk_params['max_leverage'] | |
| total_exposure = np.sum(np.abs(adjusted_weights)) | |
| if total_exposure > max_leverage: | |
| adjusted_weights *= max_leverage / total_exposure | |
| # 换手率限制 | |
| if 'max_turnover' in risk_params and self.current_weights is not None: | |
| max_turnover = risk_params['max_turnover'] | |
| weight_changes = adjusted_weights - self.current_weights | |
| current_turnover = np.sum(np.abs(weight_changes)) | |
| if current_turnover > max_turnover: | |
| # 按比例缩减权重变化 | |
| scale_factor = max_turnover / current_turnover | |
| adjusted_weights = self.current_weights + weight_changes * scale_factor | |
| # 再次归一化权重(如果需要) | |
| if np.sum(adjusted_weights) > 0: | |
| adjusted_weights = adjusted_weights / np.sum(adjusted_weights) | |
| return adjusted_weights | |
| def _record_portfolio_state(self, date: pd.Timestamp, prices: pd.Series): | |
| """记录投资组合状态""" | |
| state = { | |
| 'date': date, | |
| 'portfolio_value': self.current_capital, | |
| 'weights': self.current_weights.copy() if self.current_weights is not None else None, | |
| 'positions': self.current_positions.copy(), | |
| 'prices': prices.copy() | |
| } | |
| self.portfolio_history.append(state) | |
| def calculate_performance_metrics(self, returns: pd.Series, | |
| portfolio_values: pd.Series, | |
| risk_free_rate: float = 0.02) -> Dict[str, float]: | |
| """计算性能指标""" | |
| metrics = {} | |
| # 基本收益统计 | |
| total_return = (portfolio_values.iloc[-1] - portfolio_values.iloc[0]) / portfolio_values.iloc[0] | |
| annualized_return = (1 + total_return) ** (252 / len(returns)) - 1 | |
| # 风险统计 | |
| volatility = returns.std() * np.sqrt(252) | |
| downside_vol = returns[returns < 0].std() * np.sqrt(252) | |
| # 风险调整指标 | |
| sharpe_ratio = (annualized_return - risk_free_rate) / volatility if volatility > 0 else 0 | |
| sortino_ratio = (annualized_return - risk_free_rate) / downside_vol if downside_vol > 0 else 0 | |
| # 回撤分析 | |
| drawdown_stats = self.calculate_drawdown_stats(portfolio_values) | |
| # VaR和ES | |
| var_95 = np.percentile(returns, 5) | |
| var_99 = np.percentile(returns, 1) | |
| es_95 = returns[returns <= var_95].mean() | |
| es_99 = returns[returns <= var_99].mean() | |
| # 其他指标 | |
| win_rate = (returns > 0).mean() | |
| profit_factor = returns[returns > 0].sum() / abs(returns[returns < 0].sum()) if (returns < 0).any() else np.inf | |
| # 偏度和峰度 | |
| skewness = stats.skew(returns) | |
| kurtosis = stats.kurtosis(returns) | |
| # Calmar比率 | |
| calmar_ratio = annualized_return / abs(drawdown_stats['max_drawdown']) if drawdown_stats['max_drawdown'] < 0 else np.inf | |
| # 信息比率(这里简化处理,假设基准收益为0) | |
| information_ratio = annualized_return / volatility if volatility > 0 else 0 | |
| metrics.update({ | |
| 'total_return': total_return, | |
| 'annualized_return': annualized_return, | |
| 'volatility': volatility, | |
| 'downside_volatility': downside_vol, | |
| 'sharpe_ratio': sharpe_ratio, | |
| 'sortino_ratio': sortino_ratio, | |
| 'calmar_ratio': calmar_ratio, | |
| 'information_ratio': information_ratio, | |
| 'max_drawdown': drawdown_stats['max_drawdown'], | |
| 'max_drawdown_duration': drawdown_stats['max_drawdown_duration'], | |
| 'var_95': var_95, | |
| 'var_99': var_99, | |
| 'expected_shortfall_95': es_95, | |
| 'expected_shortfall_99': es_99, | |
| 'win_rate': win_rate, | |
| 'profit_factor': profit_factor, | |
| 'skewness': skewness, | |
| 'kurtosis': kurtosis, | |
| 'num_trades': len(self.trade_history), | |
| 'avg_trade_size': np.mean([t['value'] for t in self.trade_history]) if self.trade_history else 0 | |
| }) | |
| return metrics | |
| def calculate_drawdown_stats(self, portfolio_values: pd.Series) -> Dict[str, float]: | |
| """计算回撤统计""" | |
| # 计算累积最高点 | |
| cumulative_max = portfolio_values.expanding().max() | |
| # 计算回撤 | |
| drawdowns = (portfolio_values - cumulative_max) / cumulative_max | |
| # 最大回撤 | |
| max_drawdown = drawdowns.min() | |
| # 最大回撤持续时间 | |
| drawdown_duration = 0 | |
| max_duration = 0 | |
| for dd in drawdowns: | |
| if dd < 0: | |
| drawdown_duration += 1 | |
| max_duration = max(max_duration, drawdown_duration) | |
| else: | |
| drawdown_duration = 0 | |
| # 平均回撤 | |
| avg_drawdown = drawdowns[drawdowns < 0].mean() if (drawdowns < 0).any() else 0 | |
| # 回撤频率 | |
| drawdown_periods = (drawdowns < -0.05).sum() # 大于5%回撤的期数 | |
| return { | |
| 'max_drawdown': max_drawdown, | |
| 'max_drawdown_duration': max_duration, | |
| 'avg_drawdown': avg_drawdown, | |
| 'drawdown_frequency': drawdown_periods / len(drawdowns) | |
| } | |
| def calculate_stability_metrics(self, returns: pd.Series) -> Dict[str, float]: | |
| """计算稳定性指标""" | |
| stability_metrics = {} | |
| if NOLITSA_AVAILABLE and len(returns) > 100: | |
| try: | |
| # Lyapunov指数分析 | |
| returns_array = returns.dropna().values | |
| # 嵌入维数和延迟参数估计 | |
| embedding_dim = min(10, len(returns_array) // 20) | |
| delay = 1 | |
| # 重构相空间 | |
| embedded = dimension.embed(returns_array, dim=embedding_dim, tau=delay) | |
| # 计算最大Lyapunov指数 | |
| lle = lyapunov.mle(embedded, maxt=min(50, len(embedded)//4)) | |
| stability_metrics['lyapunov_exponent'] = float(lle) if not np.isnan(lle) else 0.0 | |
| stability_metrics['is_chaotic'] = lle > 0.01 | |
| stability_metrics['system_stability'] = 'unstable' if lle > 0.05 else 'stable' | |
| except Exception as e: | |
| self.logger.warning(f"Lyapunov analysis failed: {str(e)}") | |
| stability_metrics['lyapunov_exponent'] = 0.0 | |
| stability_metrics['is_chaotic'] = False | |
| stability_metrics['system_stability'] = 'unknown' | |
| else: | |
| # 简化稳定性分析 | |
| volatility_stability = returns.rolling(window=60).std().std() if len(returns) > 60 else np.nan | |
| return_stability = returns.rolling(window=60).mean().std() if len(returns) > 60 else np.nan | |
| stability_metrics.update({ | |
| 'volatility_stability': volatility_stability, | |
| 'return_stability': return_stability, | |
| 'lyapunov_exponent': 0.0, | |
| 'is_chaotic': False, | |
| 'system_stability': 'stable' if volatility_stability < 0.01 else 'unstable' | |
| }) | |
| # 计算Hurst指数 | |
| hurst_exponent = self._calculate_hurst_exponent(returns.values) | |
| stability_metrics['hurst_exponent'] = hurst_exponent | |
| stability_metrics['persistence_type'] = self._interpret_hurst(hurst_exponent) | |
| return stability_metrics | |
| def _calculate_hurst_exponent(self, series: np.ndarray) -> float: | |
| """计算Hurst指数""" | |
| try: | |
| n = len(series) | |
| if n < 50: | |
| return 0.5 | |
| # R/S分析 | |
| max_k = n // 4 | |
| rs_values = [] | |
| time_scales = [] | |
| for k in range(10, max_k, max(1, max_k // 20)): | |
| n_subseries = n // k | |
| rs_subseries = [] | |
| for i in range(n_subseries): | |
| subseries = series[i*k:(i+1)*k] | |
| if len(subseries) < 2: | |
| continue | |
| mean_sub = np.mean(subseries) | |
| cumulative_deviations = np.cumsum(subseries - mean_sub) | |
| R = np.max(cumulative_deviations) - np.min(cumulative_deviations) | |
| S = np.std(subseries) | |
| if S > 1e-10: | |
| rs_subseries.append(R / S) | |
| if rs_subseries: | |
| rs_values.append(np.mean(rs_subseries)) | |
| time_scales.append(k) | |
| if len(rs_values) >= 5: | |
| log_rs = np.log(rs_values) | |
| log_scales = np.log(time_scales) | |
| # 线性回归 | |
| valid_indices = np.isfinite(log_rs) & np.isfinite(log_scales) | |
| if np.sum(valid_indices) >= 5: | |
| hurst, _ = np.polyfit(log_scales[valid_indices], log_rs[valid_indices], 1) | |
| return max(0, min(1, hurst)) | |
| return 0.5 | |
| except Exception: | |
| return 0.5 | |
| def _interpret_hurst(self, hurst: float) -> str: | |
| """解释Hurst指数""" | |
| if hurst < 0.4: | |
| return "Mean-reverting" | |
| elif hurst > 0.6: | |
| return "Trending" | |
| else: | |
| return "Random walk" | |
| def generate_backtest_report(self, backtest_results: Dict, | |
| save_path: Optional[str] = None) -> str: | |
| """生成回测报告""" | |
| report_lines = [] | |
| # 标题 | |
| report_lines.append("=" * 60) | |
| report_lines.append("PORTFOLIO BACKTEST REPORT") | |
| report_lines.append("=" * 60) | |
| report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| report_lines.append("") | |
| # 基本信息 | |
| report_lines.append("BASIC INFORMATION") | |
| report_lines.append("-" * 20) | |
| report_lines.append(f"Initial Capital: ${self.initial_capital:,.2f}") | |
| report_lines.append(f"Final Capital: ${backtest_results['final_capital']:,.2f}") | |
| report_lines.append(f"Total Return: {backtest_results['total_return']:.2%}") | |
| report_lines.append(f"Number of Trades: {len(backtest_results['trade_history'])}") | |
| report_lines.append("") | |
| # 性能指标 | |
| perf_stats = backtest_results['performance_stats'] | |
| report_lines.append("PERFORMANCE METRICS") | |
| report_lines.append("-" * 20) | |
| report_lines.append(f"Annualized Return: {perf_stats['annualized_return']:.2%}") | |
| report_lines.append(f"Volatility: {perf_stats['volatility']:.2%}") | |
| report_lines.append(f"Sharpe Ratio: {perf_stats['sharpe_ratio']:.3f}") | |
| report_lines.append(f"Sortino Ratio: {perf_stats['sortino_ratio']:.3f}") | |
| report_lines.append(f"Calmar Ratio: {perf_stats['calmar_ratio']:.3f}") | |
| report_lines.append(f"Max Drawdown: {perf_stats['max_drawdown']:.2%}") | |
| report_lines.append(f"Win Rate: {perf_stats['win_rate']:.2%}") | |
| report_lines.append("") | |
| # 风险指标 | |
| report_lines.append("RISK METRICS") | |
| report_lines.append("-" * 20) | |
| report_lines.append(f"VaR (95%): {perf_stats['var_95']:.2%}") | |
| report_lines.append(f"VaR (99%): {perf_stats['var_99']:.2%}") | |
| report_lines.append(f"Expected Shortfall (95%): {perf_stats['expected_shortfall_95']:.2%}") | |
| report_lines.append(f"Skewness: {perf_stats['skewness']:.3f}") | |
| report_lines.append(f"Kurtosis: {perf_stats['kurtosis']:.3f}") | |
| report_lines.append("") | |
| # 稳定性分析 | |
| stability_stats = self.calculate_stability_metrics(backtest_results['returns']) | |
| report_lines.append("STABILITY ANALYSIS") | |
| report_lines.append("-" * 20) | |
| report_lines.append(f"Hurst Exponent: {stability_stats['hurst_exponent']:.3f}") | |
| report_lines.append(f"Persistence Type: {stability_stats['persistence_type']}") | |
| report_lines.append(f"Lyapunov Exponent: {stability_stats['lyapunov_exponent']:.6f}") | |
| report_lines.append(f"System Stability: {stability_stats['system_stability']}") | |
| report_lines.append("") | |
| # 交易统计 | |
| if len(backtest_results['trade_history']) > 0: | |
| trades_df = backtest_results['trade_history'] | |
| report_lines.append("TRADING STATISTICS") | |
| report_lines.append("-" * 20) | |
| report_lines.append(f"Average Trade Size: ${perf_stats['avg_trade_size']:,.2f}") | |
| report_lines.append(f"Buy Trades: {(trades_df['action'] == 'buy').sum()}") | |
| report_lines.append(f"Sell Trades: {(trades_df['action'] == 'sell').sum()}") | |
| if 'value' in trades_df.columns: | |
| report_lines.append(f"Total Trading Volume: ${trades_df['value'].sum():,.2f}") | |
| report_lines.append("") | |
| report_lines.append("=" * 60) | |
| report_text = "\n".join(report_lines) | |
| # 保存报告 | |
| if save_path: | |
| with open(save_path, 'w', encoding='utf-8') as f: | |
| f.write(report_text) | |
| self.logger.info(f"Backtest report saved to {save_path}") | |
| return report_text | |
| def plot_results(self, backtest_results: Dict, save_path: Optional[str] = None): | |
| """绘制回测结果图表""" | |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) | |
| fig.suptitle('Portfolio Backtest Results', fontsize=16) | |
| # 1. 组合价值曲线 | |
| portfolio_values = backtest_results['portfolio_values'] | |
| axes[0, 0].plot(portfolio_values.index, portfolio_values.values, linewidth=2, color='blue') | |
| axes[0, 0].set_title('Portfolio Value Over Time') | |
| axes[0, 0].set_ylabel('Portfolio Value ($)') | |
| axes[0, 0].grid(True, alpha=0.3) | |
| axes[0, 0].tick_params(axis='x', rotation=45) | |
| # 2. 回撤曲线 | |
| cumulative_max = portfolio_values.expanding().max() | |
| drawdowns = (portfolio_values - cumulative_max) / cumulative_max | |
| axes[0, 1].fill_between(drawdowns.index, drawdowns.values, 0, color='red', alpha=0.3) | |
| axes[0, 1].set_title('Drawdown Over Time') | |
| axes[0, 1].set_ylabel('Drawdown (%)') | |
| axes[0, 1].grid(True, alpha=0.3) | |
| axes[0, 1].tick_params(axis='x', rotation=45) | |
| # 3. 收益分布 | |
| returns = backtest_results['returns'] | |
| axes[1, 0].hist(returns.values, bins=50, density=True, alpha=0.7, color='green') | |
| axes[1, 0].axvline(returns.mean(), color='red', linestyle='--', label=f'Mean: {returns.mean():.4f}') | |
| axes[1, 0].set_title('Returns Distribution') | |
| axes[1, 0].set_xlabel('Daily Returns') | |
| axes[1, 0].set_ylabel('Density') | |
| axes[1, 0].legend() | |
| axes[1, 0].grid(True, alpha=0.3) | |
| # 4. 滚动Sharpe比率 | |
| rolling_sharpe = (returns.rolling(window=60).mean() / returns.rolling(window=60).std()) * np.sqrt(252) | |
| axes[1, 1].plot(rolling_sharpe.index, rolling_sharpe.values, linewidth=1.5, color='purple') | |
| axes[1, 1].axhline(y=0, color='black', linestyle='-', alpha=0.3) | |
| axes[1, 1].set_title('Rolling Sharpe Ratio (60-day)') | |
| axes[1, 1].set_ylabel('Sharpe Ratio') | |
| axes[1, 1].grid(True, alpha=0.3) | |
| axes[1, 1].tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') | |
| self.logger.info(f"Backtest plots saved to {save_path}") | |
| return fig | |
| # 辅助函数和策略示例 | |
| def simple_momentum_strategy(signals: pd.Series, prices: pd.Series, date: pd.Timestamp) -> np.ndarray: | |
| """简单动量策略示例""" | |
| # 基于信号的简单策略 | |
| if isinstance(signals, pd.Series): | |
| # 标准化信号 | |
| signal_values = signals.fillna(0).values | |
| # 转换为权重(简单线性变换) | |
| weights = np.abs(signal_values) | |
| # 归一化 | |
| if np.sum(weights) > 0: | |
| weights = weights / np.sum(weights) | |
| else: | |
| weights = np.ones(len(signal_values)) / len(signal_values) | |
| return weights | |
| else: | |
| # 如果信号是DataFrame,取第一列 | |
| return simple_momentum_strategy(signals.iloc[:, 0], prices, date) | |
| def mean_reversion_strategy(signals: pd.Series, prices: pd.Series, date: pd.Timestamp) -> np.ndarray: | |
| """均值回归策略示例""" | |
| # 计算价格相对于移动平均的偏离 | |
| if len(prices) > 20: | |
| ma_20 = prices.rolling(20).mean().iloc[-1] | |
| current_price = prices.iloc[-1] | |
| # 偏离度 | |
| deviation = (ma_20 - current_price) / ma_20 | |
| # 基于偏离度分配权重 | |
| if abs(deviation) > 0.05: # 5%阈值 | |
| weights = np.array([1.0 if deviation > 0 else 0.0]) # 简化为单资产 | |
| else: | |
| weights = np.array([0.0]) | |
| return weights | |
| else: | |
| return np.array([1.0]) | |
| # 使用示例和测试 | |
| if __name__ == "__main__": | |
| print("Testing Backtesting Engine...") | |
| # 创建示例数据 | |
| np.random.seed(42) | |
| dates = pd.date_range('2022-01-01', '2023-12-31', freq='D') | |
| n_assets = 3 | |
| # 生成模拟价格数据 | |
| returns = np.random.multivariate_normal( | |
| mean=[0.0005, 0.0008, 0.0003], # 不同资产的预期收益 | |
| cov=[[0.0004, 0.0001, 0.00005], | |
| [0.0001, 0.0009, 0.0002], | |
| [0.00005, 0.0002, 0.0016]], | |
| size=len(dates) | |
| ) | |
| # 转换为价格 | |
| prices = pd.DataFrame( | |
| 100 * np.exp(np.cumsum(returns, axis=0)), | |
| index=dates, | |
| columns=[f'Asset_{i}' for i in range(n_assets)] | |
| ) | |
| # 生成交易信号(简单示例) | |
| signals = pd.DataFrame( | |
| np.random.randn(len(dates), n_assets) * 0.1, | |
| index=dates, | |
| columns=[f'Signal_{i}' for i in range(n_assets)] | |
| ) | |
| # 初始化回测引擎 | |
| backtest_engine = BacktestEngine( | |
| initial_capital=1000000, | |
| transaction_cost=0.001, | |
| slippage=0.0005 | |
| ) | |
| # 风险管理参数 | |
| risk_management = { | |
| 'max_weight': 0.6, | |
| 'min_weight': 0.0, | |
| 'max_leverage': 1.0, | |
| 'max_turnover': 0.5 | |
| } | |
| print("Running backtest...") | |
| # 运行回测 | |
| results = backtest_engine.run_backtest( | |
| prices=prices, | |
| signals=signals, | |
| strategy_func=simple_momentum_strategy, | |
| rebalance_frequency='weekly', | |
| risk_management=risk_management | |
| ) | |
| print("Backtest completed!") | |
| print(f"Final portfolio value: ${results['final_capital']:,.2f}") | |
| print(f"Total return: {results['total_return']:.2%}") | |
| # 显示性能指标 | |
| perf_stats = results['performance_stats'] | |
| print(f"\nPerformance Metrics:") | |
| print(f"Annualized Return: {perf_stats['annualized_return']:.2%}") | |
| print(f"Volatility: {perf_stats['volatility']:.2%}") | |
| print(f"Sharpe Ratio: {perf_stats['sharpe_ratio']:.3f}") | |
| print(f"Max Drawdown: {perf_stats['max_drawdown']:.2%}") | |
| print(f"Win Rate: {perf_stats['win_rate']:.2%}") | |
| # 稳定性分析 | |
| stability_stats = backtest_engine.calculate_stability_metrics(results['returns']) | |
| print(f"\nStability Analysis:") | |
| print(f"Hurst Exponent: {stability_stats['hurst_exponent']:.3f}") | |
| print(f"System Stability: {stability_stats['system_stability']}") | |
| # 生成报告 | |
| report = backtest_engine.generate_backtest_report(results) | |
| print("\n" + "="*60) | |
| print("SAMPLE REPORT:") | |
| print("="*60) | |
| print(report[:1000] + "..." if len(report) > 1000 else report) | |
| print("\nBacktest engine tests completed!") | |