""" 因果与风险动力学模块 Causal & Risk Dynamics Layer (FIN 552, FIN 553, ECON 610) """ import pandas as pd import numpy as np from statsmodels.tsa.api import VAR from statsmodels.tsa.stattools import grangercausalitytests, adfuller from scipy.linalg import cholesky, eig from scipy.stats import norm, t, chi2, jarque_bera from scipy.optimize import minimize from sklearn.covariance import LedoitWolf, EmpiricalCovariance from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mutual_info_score import networkx as nx from typing import Dict, List, Optional, Tuple, Union, Callable import warnings warnings.filterwarnings('ignore') try: from nolitsa import dimension, lyapunov, delay NOLITSA_AVAILABLE = True except ImportError: NOLITSA_AVAILABLE = False print("Warning: nolitsa not available. Some chaos analysis features will be disabled.") class CausalDiscovery: """因果关系发现器""" def __init__(self): self.causal_graph = None self.var_results = None self.granger_results = {} def discover_causal_structure(self, data: pd.DataFrame, maxlags: int = 5, method: str = 'var') -> Dict[str, any]: """ 发现变量间的因果结构 Args: data: 输入数据 maxlags: 最大滞后阶数 method: 方法选择 ('var', 'granger', 'mutual_info', 'random_forest') Returns: Dict: 因果结构分析结果 """ results = {} if method == 'var': results = self._var_causal_analysis(data, maxlags) elif method == 'granger': results = self._granger_causality_analysis(data, maxlags) elif method == 'mutual_info': results = self._mutual_info_causality(data, maxlags) elif method == 'random_forest': results = self._random_forest_causality(data, maxlags) else: raise ValueError(f"Unknown method: {method}") # 构建因果图 self.causal_graph = self._build_causal_graph(results) return results def _var_causal_analysis(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: """VAR模型因果分析""" # 数据预处理 data_clean = data.dropna() # 检查平稳性 stationarity_results = {} for col in data_clean.columns: adf_result = adfuller(data_clean[col]) stationarity_results[col] = { 'adf_statistic': adf_result[0], 'p_value': adf_result[1], 'is_stationary': adf_result[1] < 0.05 } # 拟合VAR模型 try: model = VAR(data_clean) var_results = model.fit(maxlags=maxlags, ic='aic') self.var_results = var_results # 提取因果关系 causal_matrix = self._extract_var_causality(var_results) # 脉冲响应分析 irf = var_results.irf(periods=10) # 方差分解 fevd = var_results.fevd(periods=10) return { 'type': 'var', 'model_results': var_results, 'causal_matrix': causal_matrix, 'stationarity': stationarity_results, 'irf': irf, 'fevd': fevd, 'aic': var_results.aic, 'bic': var_results.bic, 'optimal_lags': var_results.k_ar } except Exception as e: print(f"VAR analysis failed: {str(e)}") return {'type': 'var', 'error': str(e)} def _granger_causality_analysis(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: """格兰杰因果性分析""" results = {'type': 'granger', 'pairwise_results': {}} variables = data.columns.tolist() causal_matrix = pd.DataFrame(0, index=variables, columns=variables) for i, cause in enumerate(variables): for j, effect in enumerate(variables): if i != j: try: # 构造双变量数据 test_data = data[[effect, cause]].dropna() if len(test_data) > maxlags * 3: # 确保有足够数据 gc_test = grangercausalitytests(test_data, maxlags, verbose=False) # 提取最优滞后的p值 p_values = [gc_test[lag][0]['ssr_ftest'][1] for lag in range(1, maxlags + 1)] min_p_value = min(p_values) optimal_lag = p_values.index(min_p_value) + 1 results['pairwise_results'][f"{cause}->{effect}"] = { 'p_value': min_p_value, 'optimal_lag': optimal_lag, 'is_causal': min_p_value < 0.05 } causal_matrix.loc[cause, effect] = 1 if min_p_value < 0.05 else 0 except Exception as e: print(f"Granger test failed for {cause}->{effect}: {str(e)}") results['causal_matrix'] = causal_matrix return results def _mutual_info_causality(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: """基于互信息的因果分析""" variables = data.columns.tolist() causal_matrix = pd.DataFrame(0.0, index=variables, columns=variables) for cause in variables: for effect in variables: if cause != effect: # 计算不同滞后的互信息 mi_scores = [] for lag in range(1, maxlags + 1): cause_lagged = data[cause].shift(lag).dropna() effect_current = data[effect].iloc[lag:].dropna() if len(cause_lagged) == len(effect_current) and len(cause_lagged) > 0: # 离散化连续变量 cause_discrete = pd.cut(cause_lagged, bins=10, labels=False) effect_discrete = pd.cut(effect_current, bins=10, labels=False) # 计算互信息 mi = mutual_info_score(cause_discrete, effect_discrete) mi_scores.append(mi) if mi_scores: causal_matrix.loc[cause, effect] = max(mi_scores) return { 'type': 'mutual_info', 'causal_matrix': causal_matrix, 'threshold': causal_matrix.values.mean() + causal_matrix.values.std() } def _random_forest_causality(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: """基于随机森林的因果分析""" variables = data.columns.tolist() causal_matrix = pd.DataFrame(0.0, index=variables, columns=variables) for target in variables: # 构造特征矩阵 feature_data = [] feature_names = [] for var in variables: if var != target: for lag in range(1, maxlags + 1): feature_data.append(data[var].shift(lag)) feature_names.append(f"{var}_lag_{lag}") if feature_data: X = pd.concat(feature_data, axis=1).dropna() y = data[target].iloc[maxlags:].dropna() if len(X) == len(y) and len(X) > 50: # 确保有足够数据 rf = RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(X, y) # 提取特征重要性 importances = rf.feature_importances_ # 聚合每个变量的重要性 for var in variables: if var != target: var_importances = [importances[i] for i, name in enumerate(feature_names) if name.startswith(f"{var}_lag_")] if var_importances: causal_matrix.loc[var, target] = sum(var_importances) return { 'type': 'random_forest', 'causal_matrix': causal_matrix, 'threshold': causal_matrix.values.mean() + causal_matrix.values.std() } def _extract_var_causality(self, var_results) -> pd.DataFrame: """从VAR结果中提取因果关系""" variables = var_results.names n_vars = len(variables) causal_matrix = pd.DataFrame(0, index=variables, columns=variables) # 使用系数的t统计量判断因果关系 for i, effect in enumerate(variables): for j, cause in enumerate(variables): if i != j: # 检查所有滞后期的系数 causality_strength = 0 for lag in range(var_results.k_ar): coef = var_results.coefs[lag][i, j] # 从cause到effect的系数 stderr = var_results.stderr[lag][i, j] t_stat = abs(coef / (stderr + 1e-8)) if t_stat > 1.96: # 95%置信度 causality_strength += t_stat causal_matrix.loc[cause, effect] = causality_strength return causal_matrix def _build_causal_graph(self, causal_results: Dict) -> nx.DiGraph: """构建因果关系图""" G = nx.DiGraph() if 'causal_matrix' in causal_results: causal_matrix = causal_results['causal_matrix'] threshold = causal_results.get('threshold', causal_matrix.values.mean()) # 添加节点 G.add_nodes_from(causal_matrix.index) # 添加边 for cause in causal_matrix.index: for effect in causal_matrix.columns: if cause != effect and causal_matrix.loc[cause, effect] > threshold: G.add_edge(cause, effect, weight=causal_matrix.loc[cause, effect]) return G class GradientDynamicsSimulator: """梯度动力学模拟器""" def __init__(self): self.trajectory_history = [] def simulate_gradient_dynamics(self, b0: np.ndarray, potential_func: Callable, eta: float = 0.1, sigma: float = 0.05, dt: float = 0.01, steps: int = 1000, method: str = 'euler') -> Dict[str, any]: """ 模拟梯度动力学: db_t = -η ∇U(b_t) dt + σ dW_t Args: b0: 初始状态 potential_func: 势函数 eta: 学习率/衰减参数 sigma: 噪声强度 dt: 时间步长 steps: 模拟步数 method: 数值方法 ('euler', 'heun', 'rk4') Returns: Dict: 模拟结果 """ trajectory = [b0.copy()] current_b = b0.copy() potential_values = [potential_func(b0)] for step in range(steps): if method == 'euler': new_b = self._euler_step(current_b, potential_func, eta, sigma, dt) elif method == 'heun': new_b = self._heun_step(current_b, potential_func, eta, sigma, dt) elif method == 'rk4': new_b = self._rk4_step(current_b, potential_func, eta, sigma, dt) else: raise ValueError(f"Unknown method: {method}") trajectory.append(new_b.copy()) potential_values.append(potential_func(new_b)) current_b = new_b # 检查收敛 if step > 100 and np.linalg.norm(trajectory[-1] - trajectory[-2]) < 1e-6: print(f"Converged at step {step}") break trajectory = np.array(trajectory) self.trajectory_history.append(trajectory) return { 'trajectory': trajectory, 'potential_values': np.array(potential_values), 'final_state': trajectory[-1], 'convergence_time': len(trajectory), 'path_length': self._calculate_path_length(trajectory) } def _euler_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: """欧拉方法步进""" grad_U = self._numerical_gradient(potential_func, b) drift = -eta * grad_U diffusion = sigma * np.random.normal(0, 1, size=b.shape) return b + drift * dt + diffusion * np.sqrt(dt) def _heun_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: """Heun方法步进""" # 预测步 grad_U1 = self._numerical_gradient(potential_func, b) drift1 = -eta * grad_U1 noise = np.random.normal(0, 1, size=b.shape) b_pred = b + drift1 * dt + sigma * noise * np.sqrt(dt) # 修正步 grad_U2 = self._numerical_gradient(potential_func, b_pred) drift2 = -eta * grad_U2 drift_avg = (drift1 + drift2) / 2 return b + drift_avg * dt + sigma * noise * np.sqrt(dt) def _rk4_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: """四阶Runge-Kutta方法""" noise = np.random.normal(0, 1, size=b.shape) k1 = -eta * self._numerical_gradient(potential_func, b) k2 = -eta * self._numerical_gradient(potential_func, b + 0.5 * dt * k1) k3 = -eta * self._numerical_gradient(potential_func, b + 0.5 * dt * k2) k4 = -eta * self._numerical_gradient(potential_func, b + dt * k3) drift = (k1 + 2*k2 + 2*k3 + k4) / 6 diffusion = sigma * noise * np.sqrt(dt) return b + drift * dt + diffusion def _numerical_gradient(self, func: Callable, x: np.ndarray, eps: float = 1e-6) -> np.ndarray: """数值计算梯度""" grad = np.zeros_like(x) for i in range(len(x)): x_plus = x.copy() x_plus[i] += eps x_minus = x.copy() x_minus[i] -= eps grad[i] = (func(x_plus) - func(x_minus)) / (2 * eps) return grad def _calculate_path_length(self, trajectory: np.ndarray) -> float: """计算轨迹长度""" diffs = np.diff(trajectory, axis=0) distances = np.linalg.norm(diffs, axis=1) return np.sum(distances) def create_vrp_potential(self, vix_target: float = 0.2, rv_target: float = 0.15) -> Callable: """创建VRP势函数""" def vrp_potential(b): """U(b) = (VIX² - RV)² / 2 + regularization""" if len(b) < 2: return 0.5 * (b[0] - vix_target)**2 vix_sq = b[0] rv = b[1] vrp = vix_sq - rv # 主要项:VRP平方 main_term = 0.5 * vrp**2 # 正则化项:防止极值 reg_term = 0.01 * (vix_sq**2 + rv**2) # 约束项:确保非负 constraint_term = 0.1 * (max(0, -vix_sq)**2 + max(0, -rv)**2) return main_term + reg_term + constraint_term return vrp_potential def analyze_attractor_dynamics(self, trajectory: np.ndarray) -> Dict[str, any]: """分析吸引子动力学""" results = {} # 计算平衡点 final_points = trajectory[-100:] # 最后100个点 equilibrium = np.mean(final_points, axis=0) equilibrium_stability = np.std(final_points, axis=0) results['equilibrium'] = equilibrium results['equilibrium_stability'] = equilibrium_stability # 计算回归时间 distances_to_eq = np.linalg.norm(trajectory - equilibrium, axis=1) results['return_times'] = self._calculate_return_times(distances_to_eq) # 相空间分析 if trajectory.shape[1] >= 2: results['phase_space_area'] = self._calculate_phase_space_area(trajectory) results['limit_cycle_detection'] = self._detect_limit_cycles(trajectory) return results def _calculate_return_times(self, distances: np.ndarray, threshold: float = None) -> List[int]: """计算回归时间""" if threshold is None: threshold = np.std(distances) return_times = [] last_return = 0 for i, dist in enumerate(distances): if dist < threshold and i - last_return > 10: # 避免重复计数 return_times.append(i - last_return) last_return = i return return_times def _calculate_phase_space_area(self, trajectory: np.ndarray) -> float: """计算相空间面积""" if trajectory.shape[1] < 2: return 0.0 x, y = trajectory[:, 0], trajectory[:, 1] # 使用鞋带公式计算多边形面积 return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1))) def _detect_limit_cycles(self, trajectory: np.ndarray, tolerance: float = 0.1) -> Dict[str, any]: """检测极限环""" if trajectory.shape[0] < 100: return {'has_limit_cycle': False} # 简单的极限环检测:检查是否存在周期性 mid_point = len(trajectory) // 2 recent_traj = trajectory[mid_point:] # 计算自相关来检测周期性 if len(recent_traj) > 50: x_centered = recent_traj[:, 0] - np.mean(recent_traj[:, 0]) autocorr = np.correlate(x_centered, x_centered, mode='full') autocorr = autocorr[autocorr.size // 2:] autocorr = autocorr / autocorr[0] # 标准化 # 寻找主要周期 peaks = [] for i in range(10, min(len(autocorr), 200)): if (autocorr[i] > autocorr[i-1] and autocorr[i] > autocorr[i+1] and autocorr[i] > 0.3): peaks.append((i, autocorr[i])) has_cycle = len(peaks) > 0 main_period = peaks[0][0] if peaks else 0 return { 'has_limit_cycle': has_cycle, 'main_period': main_period, 'cycle_strength': peaks[0][1] if peaks else 0 } return {'has_limit_cycle': False} class RiskMetrics: """风险计量工具""" def __init__(self): self.risk_models = {} def calculate_var_es(self, returns: pd.Series, confidence_levels: List[float] = [0.95, 0.99], methods: List[str] = ['historical', 'parametric', 'cornish_fisher']) -> Dict[str, Dict]: """ 计算VaR和ES Args: returns: 收益率序列 confidence_levels: 置信水平 methods: 计算方法 Returns: Dict: VaR和ES结果 """ results = {} for method in methods: method_results = {} for alpha in confidence_levels: var, es = self._calculate_single_var_es(returns, alpha, method) method_results[f'VaR_{int(alpha*100)}'] = var method_results[f'ES_{int(alpha*100)}'] = es results[method] = method_results return results def _calculate_single_var_es(self, returns: pd.Series, alpha: float, method: str) -> Tuple[float, float]: """计算单个VaR和ES""" returns_clean = returns.dropna() if method == 'historical': var = np.percentile(returns_clean, (1 - alpha) * 100) es = returns_clean[returns_clean <= var].mean() elif method == 'parametric': mu = returns_clean.mean() sigma = returns_clean.std() var = norm.ppf(1 - alpha, mu, sigma) es = mu - sigma * norm.pdf(norm.ppf(1 - alpha)) / (1 - alpha) elif method == 't_distribution': # 拟合t分布 df, mu, sigma = t.fit(returns_clean) var = t.ppf(1 - alpha, df, mu, sigma) es = self._calculate_t_es(returns_clean, alpha, df, mu, sigma) elif method == 'cornish_fisher': # Cornish-Fisher展开 mu = returns_clean.mean() sigma = returns_clean.std() skew = returns_clean.skew() kurt = returns_clean.kurtosis() z_alpha = norm.ppf(1 - alpha) cf_adjustment = (z_alpha**2 - 1) * skew / 6 + \ (z_alpha**3 - 3*z_alpha) * kurt / 24 - \ (2*z_alpha**3 - 5*z_alpha) * skew**2 / 36 z_cf = z_alpha + cf_adjustment var = mu + sigma * z_cf es = self._estimate_es_cornish_fisher(returns_clean, var, alpha) else: raise ValueError(f"Unknown method: {method}") return var, es def _calculate_t_es(self, returns: pd.Series, alpha: float, df: float, mu: float, sigma: float) -> float: """计算t分布的期望短缺""" t_val = t.ppf(1 - alpha, df) es = mu - sigma * (t.pdf(t_val, df) / (1 - alpha)) * ((df + t_val**2) / (df - 1)) return es def _estimate_es_cornish_fisher(self, returns: pd.Series, var: float, alpha: float) -> float: """估计Cornish-Fisher展开的ES""" tail_returns = returns[returns <= var] if len(tail_returns) > 0: return tail_returns.mean() else: return var * 1.2 # 简单估计 def calculate_risk_decomposition(self, portfolio_returns: pd.Series, component_returns: pd.DataFrame) -> Dict[str, any]: """计算风险分解""" results = {} # 总体组合风险 portfolio_var = portfolio_returns.var() results['total_variance'] = portfolio_var # 成分风险贡献 component_contributions = {} component_weights = {} for component in component_returns.columns: # 计算相关系数 corr = portfolio_returns.corr(component_returns[component]) component_var = component_returns[component].var() # 风险贡献 = 权重 × 相关系数 × 成分标准差 × 组合标准差 contribution = corr * np.sqrt(component_var * portfolio_var) component_contributions[component] = contribution # 计算隐含权重 weight = contribution / portfolio_var if portfolio_var > 0 else 0 component_weights[component] = weight results['component_contributions'] = component_contributions results['component_weights'] = component_weights results['diversification_ratio'] = sum(component_contributions.values()) / np.sqrt(portfolio_var) return results def perform_stress_testing(self, returns: pd.Series, scenarios: Dict[str, Dict]) -> Dict[str, Dict]: """执行压力测试""" stress_results = {} for scenario_name, scenario_params in scenarios.items(): scenario_results = {} if scenario_params['type'] == 'historical': # 历史情景 start_date = scenario_params['start_date'] end_date = scenario_params['end_date'] historical_returns = returns.loc[start_date:end_date] if isinstance(returns.index, pd.DatetimeIndex) else returns scenario_results['total_return'] = (1 + historical_returns).prod() - 1 scenario_results['max_drawdown'] = self._calculate_max_drawdown(historical_returns) scenario_results['var_95'] = np.percentile(historical_returns, 5) elif scenario_params['type'] == 'monte_carlo': # 蒙特卡罗情景 n_simulations = scenario_params.get('n_simulations', 10000) shock_size = scenario_params.get('shock_size', -0.1) # 生成随机冲击 random_shocks = np.random.normal(shock_size, np.abs(shock_size) * 0.5, n_simulations) scenario_results['simulated_returns'] = random_shocks scenario_results['var_95'] = np.percentile(random_shocks, 5) scenario_results['worst_case'] = np.min(random_shocks) elif scenario_params['type'] == 'parametric': # 参数化冲击 shock_magnitude = scenario_params.get('shock_magnitude', -3) # 3个标准差 mu = returns.mean() sigma = returns.std() shocked_return = mu + shock_magnitude * sigma scenario_results['shocked_return'] = shocked_return scenario_results['probability'] = norm.cdf(shock_magnitude) stress_results[scenario_name] = scenario_results return stress_results def _calculate_max_drawdown(self, returns: pd.Series) -> float: """计算最大回撤""" cumulative = (1 + returns).cumprod() running_max = cumulative.expanding().max() drawdown = (cumulative - running_max) / running_max return drawdown.min() class ChaosAnalysis: """混沌分析工具""" def __init__(self): self.embedding_params = {} def analyze_chaos_indicators(self, series: pd.Series, max_dim: int = 10) -> Dict[str, any]: """分析混沌指标""" if not NOLITSA_AVAILABLE: return {'error': 'nolitsa library not available'} results = {} series_array = series.dropna().values if len(series_array) < 100: return {'error': 'Insufficient data for chaos analysis'} try: # 估计嵌入维数 embedding_dimension = self._estimate_embedding_dimension(series_array, max_dim) results['embedding_dimension'] = embedding_dimension # 估计延迟时间 delay_time = self._estimate_delay_time(series_array) results['delay_time'] = delay_time # 重构相空间 embedded = dimension.embed(series_array, dim=embedding_dimension, tau=delay_time) results['phase_space_dimension'] = embedded.shape # 计算最大Lyapunov指数 lle = self._calculate_lyapunov_exponent(embedded) results['lyapunov_exponent'] = lle results['is_chaotic'] = lle > 0.01 # 计算关联维数 correlation_dimension = self._calculate_correlation_dimension(embedded) results['correlation_dimension'] = correlation_dimension # 计算Hurst指数 hurst_exponent = self._calculate_hurst_exponent(series_array) results['hurst_exponent'] = hurst_exponent results['persistence_type'] = self._interpret_hurst(hurst_exponent) except Exception as e: results['error'] = f"Chaos analysis failed: {str(e)}" return results def _estimate_embedding_dimension(self, series: np.ndarray, max_dim: int) -> int: """估计嵌入维数""" try: # 使用false nearest neighbors方法 dimensions = range(1, max_dim + 1) fnn_percentages = [] for dim in dimensions: embedded = dimension.embed(series, dim=dim, tau=1) if len(embedded) > 50: # 简化的false nearest neighbors计算 fnn_pct = self._calculate_fnn_percentage(embedded, dim) fnn_percentages.append(fnn_pct) else: fnn_percentages.append(100) # 数据不足 # 找到FNN百分比显著下降的点 optimal_dim = 1 for i in range(1, len(fnn_percentages)): if fnn_percentages[i] < 10 and fnn_percentages[i-1] - fnn_percentages[i] > 20: optimal_dim = i + 1 break return min(optimal_dim, max_dim) except: return 3 # 默认值 def _estimate_delay_time(self, series: np.ndarray) -> int: """估计延迟时间""" try: # 使用自相关函数的第一个零点 autocorr = np.correlate(series, series, mode='full') autocorr = autocorr[autocorr.size // 2:] autocorr = autocorr / autocorr[0] # 找到第一个接近零的点 for i in range(1, min(len(autocorr), 50)): if autocorr[i] <= 0.1: return i return 1 # 默认值 except: return 1 def _calculate_fnn_percentage(self, embedded: np.ndarray, dim: int) -> float: """计算虚假最近邻百分比""" if len(embedded) < 100: return 100 # 简化实现 n_points = min(len(embedded), 500) sample_indices = np.random.choice(len(embedded), n_points, replace=False) false_neighbors = 0 for i in sample_indices[:100]: # 限制计算量 point = embedded[i] distances = np.linalg.norm(embedded - point, axis=1) nearest_idx = np.argsort(distances)[1] # 第二小的距离(排除自己) # 简单的虚假邻居判断 if distances[nearest_idx] > np.std(embedded.flatten()): false_neighbors += 1 return (false_neighbors / min(100, len(sample_indices))) * 100 def _calculate_lyapunov_exponent(self, embedded: np.ndarray) -> float: """计算最大Lyapunov指数""" try: # 使用nolitsa计算 lle = lyapunov.mle(embedded, maxt=50, window=10) return float(lle) if not np.isnan(lle) else 0.0 except: # 简化实现 return self._simple_lyapunov_estimate(embedded) def _simple_lyapunov_estimate(self, embedded: np.ndarray) -> float: """简化的Lyapunov指数估计""" if len(embedded) < 50: return 0.0 # 计算相邻点的平均发散速度 divergences = [] for i in range(len(embedded) - 10): initial_separation = np.linalg.norm(embedded[i+1] - embedded[i]) if initial_separation > 1e-10: final_separation = np.linalg.norm(embedded[i+10] - embedded[i+9]) if final_separation > 1e-10: divergence = np.log(final_separation / initial_separation) / 10 if not np.isnan(divergence) and np.isfinite(divergence): divergences.append(divergence) return np.mean(divergences) if divergences else 0.0 def _calculate_correlation_dimension(self, embedded: np.ndarray) -> float: """计算关联维数""" try: n_points = min(len(embedded), 1000) sample_data = embedded[np.random.choice(len(embedded), n_points, replace=False)] # 计算不同半径下的关联积分 radii = np.logspace(-3, 0, 20) correlation_integrals = [] for r in radii: distances = np.linalg.norm(sample_data[:, np.newaxis] - sample_data, axis=2) correlation_sum = np.sum(distances < r) - n_points # 排除对角线 correlation_integral = correlation_sum / (n_points * (n_points - 1)) correlation_integrals.append(correlation_integral + 1e-10) # 使用线性回归估计维数 log_r = np.log(radii) log_c = np.log(correlation_integrals) # 选择线性区域 valid_indices = np.isfinite(log_c) & np.isfinite(log_r) if np.sum(valid_indices) > 5: slope, _ = np.polyfit(log_r[valid_indices], log_c[valid_indices], 1) return max(0, min(slope, 10)) # 限制在合理范围内 return 1.0 except: return 1.0 def _calculate_hurst_exponent(self, series: np.ndarray) -> float: """计算Hurst指数""" try: n = len(series) if n < 50: return 0.5 # R/S分析 max_k = int(n / 4) rs_values = [] time_scales = [] for k in range(10, max_k, max(1, max_k // 20)): # 将序列分割成长度为k的子序列 n_subseries = n // k rs_subseries = [] for i in range(n_subseries): subseries = series[i*k:(i+1)*k] mean_sub = np.mean(subseries) # 计算累积离差 cumulative_deviations = np.cumsum(subseries - mean_sub) # 计算R和S R = np.max(cumulative_deviations) - np.min(cumulative_deviations) S = np.std(subseries) if S > 1e-10: rs_subseries.append(R / S) if rs_subseries: rs_values.append(np.mean(rs_subseries)) time_scales.append(k) if len(rs_values) >= 5: # 对数线性回归 log_rs = np.log(rs_values) log_scales = np.log(time_scales) valid_indices = np.isfinite(log_rs) & np.isfinite(log_scales) if np.sum(valid_indices) >= 5: hurst, _ = np.polyfit(log_scales[valid_indices], log_rs[valid_indices], 1) return max(0, min(1, hurst)) # 限制在[0,1]范围内 return 0.5 except: return 0.5 def _interpret_hurst(self, hurst: float) -> str: """解释Hurst指数""" if hurst < 0.4: return "Mean-reverting (Anti-persistent)" elif hurst > 0.6: return "Trending (Persistent)" else: return "Random walk" # 示例和测试 if __name__ == "__main__": # 创建示例数据 np.random.seed(42) dates = pd.date_range('2023-01-01', periods=1000, freq='D') # 生成模拟的金融时间序列 returns = np.random.normal(0.001, 0.02, 1000) prices = 100 * np.exp(np.cumsum(returns)) # 创建多变量数据 data = pd.DataFrame({ 'Price': prices, 'Volume': np.random.lognormal(10, 0.5, 1000), 'Returns': returns, 'Volatility': pd.Series(returns).rolling(20).std().fillna(0.02) }, index=dates) print("Testing Causal & Risk Dynamics...") # 1. 因果发现测试 causal_discovery = CausalDiscovery() causal_results = causal_discovery.discover_causal_structure(data, method='var') print(f"Causal analysis type: {causal_results.get('type', 'failed')}") if 'causal_matrix' in causal_results: print(f"Causal matrix shape: {causal_results['causal_matrix'].shape}") # 2. 梯度动力学模拟测试 simulator = GradientDynamicsSimulator() vrp_potential = simulator.create_vrp_potential() initial_state = np.array([0.25, 0.15]) # VIX^2, RV dynamics_results = simulator.simulate_gradient_dynamics( initial_state, vrp_potential, steps=500 ) print(f"Dynamics simulation final state: {dynamics_results['final_state']}") print(f"Convergence time: {dynamics_results['convergence_time']}") # 3. 风险计量测试 risk_metrics = RiskMetrics() returns_series = pd.Series(returns, index=dates) var_es_results = risk_metrics.calculate_var_es(returns_series) print("VaR/ES Results:") for method, results in var_es_results.items(): print(f" {method}: VaR_95 = {results['VaR_95']:.4f}, ES_95 = {results['ES_95']:.4f}") # 4. 混沌分析测试 chaos_analyzer = ChaosAnalysis() chaos_results = chaos_analyzer.analyze_chaos_indicators(returns_series) if 'error' not in chaos_results: print("Chaos Analysis Results:") print(f" Lyapunov exponent: {chaos_results.get('lyapunov_exponent', 'N/A'):.4f}") print(f" Hurst exponent: {chaos_results.get('hurst_exponent', 'N/A'):.4f}") print(f" Is chaotic: {chaos_results.get('is_chaotic', False)}") print(f" Persistence type: {chaos_results.get('persistence_type', 'N/A')}") else: print(f"Chaos analysis error: {chaos_results['error']}")