Spaces:
Configuration error
Configuration error
| """ | |
| 因果与风险动力学模块 | |
| Causal & Risk Dynamics Layer (FIN 552, FIN 553, ECON 610) | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from statsmodels.tsa.api import VAR | |
| from statsmodels.tsa.stattools import grangercausalitytests, adfuller | |
| from scipy.linalg import cholesky, eig | |
| from scipy.stats import norm, t, chi2, jarque_bera | |
| from scipy.optimize import minimize | |
| from sklearn.covariance import LedoitWolf, EmpiricalCovariance | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.metrics import mutual_info_score | |
| import networkx as nx | |
| from typing import Dict, List, Optional, Tuple, Union, Callable | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| try: | |
| from nolitsa import dimension, lyapunov, delay | |
| NOLITSA_AVAILABLE = True | |
| except ImportError: | |
| NOLITSA_AVAILABLE = False | |
| print("Warning: nolitsa not available. Some chaos analysis features will be disabled.") | |
| class CausalDiscovery: | |
| """因果关系发现器""" | |
| def __init__(self): | |
| self.causal_graph = None | |
| self.var_results = None | |
| self.granger_results = {} | |
| def discover_causal_structure(self, data: pd.DataFrame, maxlags: int = 5, | |
| method: str = 'var') -> Dict[str, any]: | |
| """ | |
| 发现变量间的因果结构 | |
| Args: | |
| data: 输入数据 | |
| maxlags: 最大滞后阶数 | |
| method: 方法选择 ('var', 'granger', 'mutual_info', 'random_forest') | |
| Returns: | |
| Dict: 因果结构分析结果 | |
| """ | |
| results = {} | |
| if method == 'var': | |
| results = self._var_causal_analysis(data, maxlags) | |
| elif method == 'granger': | |
| results = self._granger_causality_analysis(data, maxlags) | |
| elif method == 'mutual_info': | |
| results = self._mutual_info_causality(data, maxlags) | |
| elif method == 'random_forest': | |
| results = self._random_forest_causality(data, maxlags) | |
| else: | |
| raise ValueError(f"Unknown method: {method}") | |
| # 构建因果图 | |
| self.causal_graph = self._build_causal_graph(results) | |
| return results | |
| def _var_causal_analysis(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: | |
| """VAR模型因果分析""" | |
| # 数据预处理 | |
| data_clean = data.dropna() | |
| # 检查平稳性 | |
| stationarity_results = {} | |
| for col in data_clean.columns: | |
| adf_result = adfuller(data_clean[col]) | |
| stationarity_results[col] = { | |
| 'adf_statistic': adf_result[0], | |
| 'p_value': adf_result[1], | |
| 'is_stationary': adf_result[1] < 0.05 | |
| } | |
| # 拟合VAR模型 | |
| try: | |
| model = VAR(data_clean) | |
| var_results = model.fit(maxlags=maxlags, ic='aic') | |
| self.var_results = var_results | |
| # 提取因果关系 | |
| causal_matrix = self._extract_var_causality(var_results) | |
| # 脉冲响应分析 | |
| irf = var_results.irf(periods=10) | |
| # 方差分解 | |
| fevd = var_results.fevd(periods=10) | |
| return { | |
| 'type': 'var', | |
| 'model_results': var_results, | |
| 'causal_matrix': causal_matrix, | |
| 'stationarity': stationarity_results, | |
| 'irf': irf, | |
| 'fevd': fevd, | |
| 'aic': var_results.aic, | |
| 'bic': var_results.bic, | |
| 'optimal_lags': var_results.k_ar | |
| } | |
| except Exception as e: | |
| print(f"VAR analysis failed: {str(e)}") | |
| return {'type': 'var', 'error': str(e)} | |
| def _granger_causality_analysis(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: | |
| """格兰杰因果性分析""" | |
| results = {'type': 'granger', 'pairwise_results': {}} | |
| variables = data.columns.tolist() | |
| causal_matrix = pd.DataFrame(0, index=variables, columns=variables) | |
| for i, cause in enumerate(variables): | |
| for j, effect in enumerate(variables): | |
| if i != j: | |
| try: | |
| # 构造双变量数据 | |
| test_data = data[[effect, cause]].dropna() | |
| if len(test_data) > maxlags * 3: # 确保有足够数据 | |
| gc_test = grangercausalitytests(test_data, maxlags, verbose=False) | |
| # 提取最优滞后的p值 | |
| p_values = [gc_test[lag][0]['ssr_ftest'][1] for lag in range(1, maxlags + 1)] | |
| min_p_value = min(p_values) | |
| optimal_lag = p_values.index(min_p_value) + 1 | |
| results['pairwise_results'][f"{cause}->{effect}"] = { | |
| 'p_value': min_p_value, | |
| 'optimal_lag': optimal_lag, | |
| 'is_causal': min_p_value < 0.05 | |
| } | |
| causal_matrix.loc[cause, effect] = 1 if min_p_value < 0.05 else 0 | |
| except Exception as e: | |
| print(f"Granger test failed for {cause}->{effect}: {str(e)}") | |
| results['causal_matrix'] = causal_matrix | |
| return results | |
| def _mutual_info_causality(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: | |
| """基于互信息的因果分析""" | |
| variables = data.columns.tolist() | |
| causal_matrix = pd.DataFrame(0.0, index=variables, columns=variables) | |
| for cause in variables: | |
| for effect in variables: | |
| if cause != effect: | |
| # 计算不同滞后的互信息 | |
| mi_scores = [] | |
| for lag in range(1, maxlags + 1): | |
| cause_lagged = data[cause].shift(lag).dropna() | |
| effect_current = data[effect].iloc[lag:].dropna() | |
| if len(cause_lagged) == len(effect_current) and len(cause_lagged) > 0: | |
| # 离散化连续变量 | |
| cause_discrete = pd.cut(cause_lagged, bins=10, labels=False) | |
| effect_discrete = pd.cut(effect_current, bins=10, labels=False) | |
| # 计算互信息 | |
| mi = mutual_info_score(cause_discrete, effect_discrete) | |
| mi_scores.append(mi) | |
| if mi_scores: | |
| causal_matrix.loc[cause, effect] = max(mi_scores) | |
| return { | |
| 'type': 'mutual_info', | |
| 'causal_matrix': causal_matrix, | |
| 'threshold': causal_matrix.values.mean() + causal_matrix.values.std() | |
| } | |
| def _random_forest_causality(self, data: pd.DataFrame, maxlags: int) -> Dict[str, any]: | |
| """基于随机森林的因果分析""" | |
| variables = data.columns.tolist() | |
| causal_matrix = pd.DataFrame(0.0, index=variables, columns=variables) | |
| for target in variables: | |
| # 构造特征矩阵 | |
| feature_data = [] | |
| feature_names = [] | |
| for var in variables: | |
| if var != target: | |
| for lag in range(1, maxlags + 1): | |
| feature_data.append(data[var].shift(lag)) | |
| feature_names.append(f"{var}_lag_{lag}") | |
| if feature_data: | |
| X = pd.concat(feature_data, axis=1).dropna() | |
| y = data[target].iloc[maxlags:].dropna() | |
| if len(X) == len(y) and len(X) > 50: # 确保有足够数据 | |
| rf = RandomForestRegressor(n_estimators=100, random_state=42) | |
| rf.fit(X, y) | |
| # 提取特征重要性 | |
| importances = rf.feature_importances_ | |
| # 聚合每个变量的重要性 | |
| for var in variables: | |
| if var != target: | |
| var_importances = [importances[i] for i, name in enumerate(feature_names) | |
| if name.startswith(f"{var}_lag_")] | |
| if var_importances: | |
| causal_matrix.loc[var, target] = sum(var_importances) | |
| return { | |
| 'type': 'random_forest', | |
| 'causal_matrix': causal_matrix, | |
| 'threshold': causal_matrix.values.mean() + causal_matrix.values.std() | |
| } | |
| def _extract_var_causality(self, var_results) -> pd.DataFrame: | |
| """从VAR结果中提取因果关系""" | |
| variables = var_results.names | |
| n_vars = len(variables) | |
| causal_matrix = pd.DataFrame(0, index=variables, columns=variables) | |
| # 使用系数的t统计量判断因果关系 | |
| for i, effect in enumerate(variables): | |
| for j, cause in enumerate(variables): | |
| if i != j: | |
| # 检查所有滞后期的系数 | |
| causality_strength = 0 | |
| for lag in range(var_results.k_ar): | |
| coef = var_results.coefs[lag][i, j] # 从cause到effect的系数 | |
| stderr = var_results.stderr[lag][i, j] | |
| t_stat = abs(coef / (stderr + 1e-8)) | |
| if t_stat > 1.96: # 95%置信度 | |
| causality_strength += t_stat | |
| causal_matrix.loc[cause, effect] = causality_strength | |
| return causal_matrix | |
| def _build_causal_graph(self, causal_results: Dict) -> nx.DiGraph: | |
| """构建因果关系图""" | |
| G = nx.DiGraph() | |
| if 'causal_matrix' in causal_results: | |
| causal_matrix = causal_results['causal_matrix'] | |
| threshold = causal_results.get('threshold', causal_matrix.values.mean()) | |
| # 添加节点 | |
| G.add_nodes_from(causal_matrix.index) | |
| # 添加边 | |
| for cause in causal_matrix.index: | |
| for effect in causal_matrix.columns: | |
| if cause != effect and causal_matrix.loc[cause, effect] > threshold: | |
| G.add_edge(cause, effect, weight=causal_matrix.loc[cause, effect]) | |
| return G | |
| class GradientDynamicsSimulator: | |
| """梯度动力学模拟器""" | |
| def __init__(self): | |
| self.trajectory_history = [] | |
| def simulate_gradient_dynamics(self, b0: np.ndarray, potential_func: Callable, | |
| eta: float = 0.1, sigma: float = 0.05, dt: float = 0.01, | |
| steps: int = 1000, method: str = 'euler') -> Dict[str, any]: | |
| """ | |
| 模拟梯度动力学: db_t = -η ∇U(b_t) dt + σ dW_t | |
| Args: | |
| b0: 初始状态 | |
| potential_func: 势函数 | |
| eta: 学习率/衰减参数 | |
| sigma: 噪声强度 | |
| dt: 时间步长 | |
| steps: 模拟步数 | |
| method: 数值方法 ('euler', 'heun', 'rk4') | |
| Returns: | |
| Dict: 模拟结果 | |
| """ | |
| trajectory = [b0.copy()] | |
| current_b = b0.copy() | |
| potential_values = [potential_func(b0)] | |
| for step in range(steps): | |
| if method == 'euler': | |
| new_b = self._euler_step(current_b, potential_func, eta, sigma, dt) | |
| elif method == 'heun': | |
| new_b = self._heun_step(current_b, potential_func, eta, sigma, dt) | |
| elif method == 'rk4': | |
| new_b = self._rk4_step(current_b, potential_func, eta, sigma, dt) | |
| else: | |
| raise ValueError(f"Unknown method: {method}") | |
| trajectory.append(new_b.copy()) | |
| potential_values.append(potential_func(new_b)) | |
| current_b = new_b | |
| # 检查收敛 | |
| if step > 100 and np.linalg.norm(trajectory[-1] - trajectory[-2]) < 1e-6: | |
| print(f"Converged at step {step}") | |
| break | |
| trajectory = np.array(trajectory) | |
| self.trajectory_history.append(trajectory) | |
| return { | |
| 'trajectory': trajectory, | |
| 'potential_values': np.array(potential_values), | |
| 'final_state': trajectory[-1], | |
| 'convergence_time': len(trajectory), | |
| 'path_length': self._calculate_path_length(trajectory) | |
| } | |
| def _euler_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: | |
| """欧拉方法步进""" | |
| grad_U = self._numerical_gradient(potential_func, b) | |
| drift = -eta * grad_U | |
| diffusion = sigma * np.random.normal(0, 1, size=b.shape) | |
| return b + drift * dt + diffusion * np.sqrt(dt) | |
| def _heun_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: | |
| """Heun方法步进""" | |
| # 预测步 | |
| grad_U1 = self._numerical_gradient(potential_func, b) | |
| drift1 = -eta * grad_U1 | |
| noise = np.random.normal(0, 1, size=b.shape) | |
| b_pred = b + drift1 * dt + sigma * noise * np.sqrt(dt) | |
| # 修正步 | |
| grad_U2 = self._numerical_gradient(potential_func, b_pred) | |
| drift2 = -eta * grad_U2 | |
| drift_avg = (drift1 + drift2) / 2 | |
| return b + drift_avg * dt + sigma * noise * np.sqrt(dt) | |
| def _rk4_step(self, b: np.ndarray, potential_func: Callable, eta: float, sigma: float, dt: float) -> np.ndarray: | |
| """四阶Runge-Kutta方法""" | |
| noise = np.random.normal(0, 1, size=b.shape) | |
| k1 = -eta * self._numerical_gradient(potential_func, b) | |
| k2 = -eta * self._numerical_gradient(potential_func, b + 0.5 * dt * k1) | |
| k3 = -eta * self._numerical_gradient(potential_func, b + 0.5 * dt * k2) | |
| k4 = -eta * self._numerical_gradient(potential_func, b + dt * k3) | |
| drift = (k1 + 2*k2 + 2*k3 + k4) / 6 | |
| diffusion = sigma * noise * np.sqrt(dt) | |
| return b + drift * dt + diffusion | |
| def _numerical_gradient(self, func: Callable, x: np.ndarray, eps: float = 1e-6) -> np.ndarray: | |
| """数值计算梯度""" | |
| grad = np.zeros_like(x) | |
| for i in range(len(x)): | |
| x_plus = x.copy() | |
| x_plus[i] += eps | |
| x_minus = x.copy() | |
| x_minus[i] -= eps | |
| grad[i] = (func(x_plus) - func(x_minus)) / (2 * eps) | |
| return grad | |
| def _calculate_path_length(self, trajectory: np.ndarray) -> float: | |
| """计算轨迹长度""" | |
| diffs = np.diff(trajectory, axis=0) | |
| distances = np.linalg.norm(diffs, axis=1) | |
| return np.sum(distances) | |
| def create_vrp_potential(self, vix_target: float = 0.2, rv_target: float = 0.15) -> Callable: | |
| """创建VRP势函数""" | |
| def vrp_potential(b): | |
| """U(b) = (VIX² - RV)² / 2 + regularization""" | |
| if len(b) < 2: | |
| return 0.5 * (b[0] - vix_target)**2 | |
| vix_sq = b[0] | |
| rv = b[1] | |
| vrp = vix_sq - rv | |
| # 主要项:VRP平方 | |
| main_term = 0.5 * vrp**2 | |
| # 正则化项:防止极值 | |
| reg_term = 0.01 * (vix_sq**2 + rv**2) | |
| # 约束项:确保非负 | |
| constraint_term = 0.1 * (max(0, -vix_sq)**2 + max(0, -rv)**2) | |
| return main_term + reg_term + constraint_term | |
| return vrp_potential | |
| def analyze_attractor_dynamics(self, trajectory: np.ndarray) -> Dict[str, any]: | |
| """分析吸引子动力学""" | |
| results = {} | |
| # 计算平衡点 | |
| final_points = trajectory[-100:] # 最后100个点 | |
| equilibrium = np.mean(final_points, axis=0) | |
| equilibrium_stability = np.std(final_points, axis=0) | |
| results['equilibrium'] = equilibrium | |
| results['equilibrium_stability'] = equilibrium_stability | |
| # 计算回归时间 | |
| distances_to_eq = np.linalg.norm(trajectory - equilibrium, axis=1) | |
| results['return_times'] = self._calculate_return_times(distances_to_eq) | |
| # 相空间分析 | |
| if trajectory.shape[1] >= 2: | |
| results['phase_space_area'] = self._calculate_phase_space_area(trajectory) | |
| results['limit_cycle_detection'] = self._detect_limit_cycles(trajectory) | |
| return results | |
| def _calculate_return_times(self, distances: np.ndarray, threshold: float = None) -> List[int]: | |
| """计算回归时间""" | |
| if threshold is None: | |
| threshold = np.std(distances) | |
| return_times = [] | |
| last_return = 0 | |
| for i, dist in enumerate(distances): | |
| if dist < threshold and i - last_return > 10: # 避免重复计数 | |
| return_times.append(i - last_return) | |
| last_return = i | |
| return return_times | |
| def _calculate_phase_space_area(self, trajectory: np.ndarray) -> float: | |
| """计算相空间面积""" | |
| if trajectory.shape[1] < 2: | |
| return 0.0 | |
| x, y = trajectory[:, 0], trajectory[:, 1] | |
| # 使用鞋带公式计算多边形面积 | |
| return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1))) | |
| def _detect_limit_cycles(self, trajectory: np.ndarray, tolerance: float = 0.1) -> Dict[str, any]: | |
| """检测极限环""" | |
| if trajectory.shape[0] < 100: | |
| return {'has_limit_cycle': False} | |
| # 简单的极限环检测:检查是否存在周期性 | |
| mid_point = len(trajectory) // 2 | |
| recent_traj = trajectory[mid_point:] | |
| # 计算自相关来检测周期性 | |
| if len(recent_traj) > 50: | |
| x_centered = recent_traj[:, 0] - np.mean(recent_traj[:, 0]) | |
| autocorr = np.correlate(x_centered, x_centered, mode='full') | |
| autocorr = autocorr[autocorr.size // 2:] | |
| autocorr = autocorr / autocorr[0] # 标准化 | |
| # 寻找主要周期 | |
| peaks = [] | |
| for i in range(10, min(len(autocorr), 200)): | |
| if (autocorr[i] > autocorr[i-1] and autocorr[i] > autocorr[i+1] and | |
| autocorr[i] > 0.3): | |
| peaks.append((i, autocorr[i])) | |
| has_cycle = len(peaks) > 0 | |
| main_period = peaks[0][0] if peaks else 0 | |
| return { | |
| 'has_limit_cycle': has_cycle, | |
| 'main_period': main_period, | |
| 'cycle_strength': peaks[0][1] if peaks else 0 | |
| } | |
| return {'has_limit_cycle': False} | |
| class RiskMetrics: | |
| """风险计量工具""" | |
| def __init__(self): | |
| self.risk_models = {} | |
| def calculate_var_es(self, returns: pd.Series, confidence_levels: List[float] = [0.95, 0.99], | |
| methods: List[str] = ['historical', 'parametric', 'cornish_fisher']) -> Dict[str, Dict]: | |
| """ | |
| 计算VaR和ES | |
| Args: | |
| returns: 收益率序列 | |
| confidence_levels: 置信水平 | |
| methods: 计算方法 | |
| Returns: | |
| Dict: VaR和ES结果 | |
| """ | |
| results = {} | |
| for method in methods: | |
| method_results = {} | |
| for alpha in confidence_levels: | |
| var, es = self._calculate_single_var_es(returns, alpha, method) | |
| method_results[f'VaR_{int(alpha*100)}'] = var | |
| method_results[f'ES_{int(alpha*100)}'] = es | |
| results[method] = method_results | |
| return results | |
| def _calculate_single_var_es(self, returns: pd.Series, alpha: float, method: str) -> Tuple[float, float]: | |
| """计算单个VaR和ES""" | |
| returns_clean = returns.dropna() | |
| if method == 'historical': | |
| var = np.percentile(returns_clean, (1 - alpha) * 100) | |
| es = returns_clean[returns_clean <= var].mean() | |
| elif method == 'parametric': | |
| mu = returns_clean.mean() | |
| sigma = returns_clean.std() | |
| var = norm.ppf(1 - alpha, mu, sigma) | |
| es = mu - sigma * norm.pdf(norm.ppf(1 - alpha)) / (1 - alpha) | |
| elif method == 't_distribution': | |
| # 拟合t分布 | |
| df, mu, sigma = t.fit(returns_clean) | |
| var = t.ppf(1 - alpha, df, mu, sigma) | |
| es = self._calculate_t_es(returns_clean, alpha, df, mu, sigma) | |
| elif method == 'cornish_fisher': | |
| # Cornish-Fisher展开 | |
| mu = returns_clean.mean() | |
| sigma = returns_clean.std() | |
| skew = returns_clean.skew() | |
| kurt = returns_clean.kurtosis() | |
| z_alpha = norm.ppf(1 - alpha) | |
| cf_adjustment = (z_alpha**2 - 1) * skew / 6 + \ | |
| (z_alpha**3 - 3*z_alpha) * kurt / 24 - \ | |
| (2*z_alpha**3 - 5*z_alpha) * skew**2 / 36 | |
| z_cf = z_alpha + cf_adjustment | |
| var = mu + sigma * z_cf | |
| es = self._estimate_es_cornish_fisher(returns_clean, var, alpha) | |
| else: | |
| raise ValueError(f"Unknown method: {method}") | |
| return var, es | |
| def _calculate_t_es(self, returns: pd.Series, alpha: float, df: float, mu: float, sigma: float) -> float: | |
| """计算t分布的期望短缺""" | |
| t_val = t.ppf(1 - alpha, df) | |
| es = mu - sigma * (t.pdf(t_val, df) / (1 - alpha)) * ((df + t_val**2) / (df - 1)) | |
| return es | |
| def _estimate_es_cornish_fisher(self, returns: pd.Series, var: float, alpha: float) -> float: | |
| """估计Cornish-Fisher展开的ES""" | |
| tail_returns = returns[returns <= var] | |
| if len(tail_returns) > 0: | |
| return tail_returns.mean() | |
| else: | |
| return var * 1.2 # 简单估计 | |
| def calculate_risk_decomposition(self, portfolio_returns: pd.Series, | |
| component_returns: pd.DataFrame) -> Dict[str, any]: | |
| """计算风险分解""" | |
| results = {} | |
| # 总体组合风险 | |
| portfolio_var = portfolio_returns.var() | |
| results['total_variance'] = portfolio_var | |
| # 成分风险贡献 | |
| component_contributions = {} | |
| component_weights = {} | |
| for component in component_returns.columns: | |
| # 计算相关系数 | |
| corr = portfolio_returns.corr(component_returns[component]) | |
| component_var = component_returns[component].var() | |
| # 风险贡献 = 权重 × 相关系数 × 成分标准差 × 组合标准差 | |
| contribution = corr * np.sqrt(component_var * portfolio_var) | |
| component_contributions[component] = contribution | |
| # 计算隐含权重 | |
| weight = contribution / portfolio_var if portfolio_var > 0 else 0 | |
| component_weights[component] = weight | |
| results['component_contributions'] = component_contributions | |
| results['component_weights'] = component_weights | |
| results['diversification_ratio'] = sum(component_contributions.values()) / np.sqrt(portfolio_var) | |
| return results | |
| def perform_stress_testing(self, returns: pd.Series, scenarios: Dict[str, Dict]) -> Dict[str, Dict]: | |
| """执行压力测试""" | |
| stress_results = {} | |
| for scenario_name, scenario_params in scenarios.items(): | |
| scenario_results = {} | |
| if scenario_params['type'] == 'historical': | |
| # 历史情景 | |
| start_date = scenario_params['start_date'] | |
| end_date = scenario_params['end_date'] | |
| historical_returns = returns.loc[start_date:end_date] if isinstance(returns.index, pd.DatetimeIndex) else returns | |
| scenario_results['total_return'] = (1 + historical_returns).prod() - 1 | |
| scenario_results['max_drawdown'] = self._calculate_max_drawdown(historical_returns) | |
| scenario_results['var_95'] = np.percentile(historical_returns, 5) | |
| elif scenario_params['type'] == 'monte_carlo': | |
| # 蒙特卡罗情景 | |
| n_simulations = scenario_params.get('n_simulations', 10000) | |
| shock_size = scenario_params.get('shock_size', -0.1) | |
| # 生成随机冲击 | |
| random_shocks = np.random.normal(shock_size, np.abs(shock_size) * 0.5, n_simulations) | |
| scenario_results['simulated_returns'] = random_shocks | |
| scenario_results['var_95'] = np.percentile(random_shocks, 5) | |
| scenario_results['worst_case'] = np.min(random_shocks) | |
| elif scenario_params['type'] == 'parametric': | |
| # 参数化冲击 | |
| shock_magnitude = scenario_params.get('shock_magnitude', -3) # 3个标准差 | |
| mu = returns.mean() | |
| sigma = returns.std() | |
| shocked_return = mu + shock_magnitude * sigma | |
| scenario_results['shocked_return'] = shocked_return | |
| scenario_results['probability'] = norm.cdf(shock_magnitude) | |
| stress_results[scenario_name] = scenario_results | |
| return stress_results | |
| def _calculate_max_drawdown(self, returns: pd.Series) -> float: | |
| """计算最大回撤""" | |
| cumulative = (1 + returns).cumprod() | |
| running_max = cumulative.expanding().max() | |
| drawdown = (cumulative - running_max) / running_max | |
| return drawdown.min() | |
| class ChaosAnalysis: | |
| """混沌分析工具""" | |
| def __init__(self): | |
| self.embedding_params = {} | |
| def analyze_chaos_indicators(self, series: pd.Series, max_dim: int = 10) -> Dict[str, any]: | |
| """分析混沌指标""" | |
| if not NOLITSA_AVAILABLE: | |
| return {'error': 'nolitsa library not available'} | |
| results = {} | |
| series_array = series.dropna().values | |
| if len(series_array) < 100: | |
| return {'error': 'Insufficient data for chaos analysis'} | |
| try: | |
| # 估计嵌入维数 | |
| embedding_dimension = self._estimate_embedding_dimension(series_array, max_dim) | |
| results['embedding_dimension'] = embedding_dimension | |
| # 估计延迟时间 | |
| delay_time = self._estimate_delay_time(series_array) | |
| results['delay_time'] = delay_time | |
| # 重构相空间 | |
| embedded = dimension.embed(series_array, dim=embedding_dimension, tau=delay_time) | |
| results['phase_space_dimension'] = embedded.shape | |
| # 计算最大Lyapunov指数 | |
| lle = self._calculate_lyapunov_exponent(embedded) | |
| results['lyapunov_exponent'] = lle | |
| results['is_chaotic'] = lle > 0.01 | |
| # 计算关联维数 | |
| correlation_dimension = self._calculate_correlation_dimension(embedded) | |
| results['correlation_dimension'] = correlation_dimension | |
| # 计算Hurst指数 | |
| hurst_exponent = self._calculate_hurst_exponent(series_array) | |
| results['hurst_exponent'] = hurst_exponent | |
| results['persistence_type'] = self._interpret_hurst(hurst_exponent) | |
| except Exception as e: | |
| results['error'] = f"Chaos analysis failed: {str(e)}" | |
| return results | |
| def _estimate_embedding_dimension(self, series: np.ndarray, max_dim: int) -> int: | |
| """估计嵌入维数""" | |
| try: | |
| # 使用false nearest neighbors方法 | |
| dimensions = range(1, max_dim + 1) | |
| fnn_percentages = [] | |
| for dim in dimensions: | |
| embedded = dimension.embed(series, dim=dim, tau=1) | |
| if len(embedded) > 50: | |
| # 简化的false nearest neighbors计算 | |
| fnn_pct = self._calculate_fnn_percentage(embedded, dim) | |
| fnn_percentages.append(fnn_pct) | |
| else: | |
| fnn_percentages.append(100) # 数据不足 | |
| # 找到FNN百分比显著下降的点 | |
| optimal_dim = 1 | |
| for i in range(1, len(fnn_percentages)): | |
| if fnn_percentages[i] < 10 and fnn_percentages[i-1] - fnn_percentages[i] > 20: | |
| optimal_dim = i + 1 | |
| break | |
| return min(optimal_dim, max_dim) | |
| except: | |
| return 3 # 默认值 | |
| def _estimate_delay_time(self, series: np.ndarray) -> int: | |
| """估计延迟时间""" | |
| try: | |
| # 使用自相关函数的第一个零点 | |
| autocorr = np.correlate(series, series, mode='full') | |
| autocorr = autocorr[autocorr.size // 2:] | |
| autocorr = autocorr / autocorr[0] | |
| # 找到第一个接近零的点 | |
| for i in range(1, min(len(autocorr), 50)): | |
| if autocorr[i] <= 0.1: | |
| return i | |
| return 1 # 默认值 | |
| except: | |
| return 1 | |
| def _calculate_fnn_percentage(self, embedded: np.ndarray, dim: int) -> float: | |
| """计算虚假最近邻百分比""" | |
| if len(embedded) < 100: | |
| return 100 | |
| # 简化实现 | |
| n_points = min(len(embedded), 500) | |
| sample_indices = np.random.choice(len(embedded), n_points, replace=False) | |
| false_neighbors = 0 | |
| for i in sample_indices[:100]: # 限制计算量 | |
| point = embedded[i] | |
| distances = np.linalg.norm(embedded - point, axis=1) | |
| nearest_idx = np.argsort(distances)[1] # 第二小的距离(排除自己) | |
| # 简单的虚假邻居判断 | |
| if distances[nearest_idx] > np.std(embedded.flatten()): | |
| false_neighbors += 1 | |
| return (false_neighbors / min(100, len(sample_indices))) * 100 | |
| def _calculate_lyapunov_exponent(self, embedded: np.ndarray) -> float: | |
| """计算最大Lyapunov指数""" | |
| try: | |
| # 使用nolitsa计算 | |
| lle = lyapunov.mle(embedded, maxt=50, window=10) | |
| return float(lle) if not np.isnan(lle) else 0.0 | |
| except: | |
| # 简化实现 | |
| return self._simple_lyapunov_estimate(embedded) | |
| def _simple_lyapunov_estimate(self, embedded: np.ndarray) -> float: | |
| """简化的Lyapunov指数估计""" | |
| if len(embedded) < 50: | |
| return 0.0 | |
| # 计算相邻点的平均发散速度 | |
| divergences = [] | |
| for i in range(len(embedded) - 10): | |
| initial_separation = np.linalg.norm(embedded[i+1] - embedded[i]) | |
| if initial_separation > 1e-10: | |
| final_separation = np.linalg.norm(embedded[i+10] - embedded[i+9]) | |
| if final_separation > 1e-10: | |
| divergence = np.log(final_separation / initial_separation) / 10 | |
| if not np.isnan(divergence) and np.isfinite(divergence): | |
| divergences.append(divergence) | |
| return np.mean(divergences) if divergences else 0.0 | |
| def _calculate_correlation_dimension(self, embedded: np.ndarray) -> float: | |
| """计算关联维数""" | |
| try: | |
| n_points = min(len(embedded), 1000) | |
| sample_data = embedded[np.random.choice(len(embedded), n_points, replace=False)] | |
| # 计算不同半径下的关联积分 | |
| radii = np.logspace(-3, 0, 20) | |
| correlation_integrals = [] | |
| for r in radii: | |
| distances = np.linalg.norm(sample_data[:, np.newaxis] - sample_data, axis=2) | |
| correlation_sum = np.sum(distances < r) - n_points # 排除对角线 | |
| correlation_integral = correlation_sum / (n_points * (n_points - 1)) | |
| correlation_integrals.append(correlation_integral + 1e-10) | |
| # 使用线性回归估计维数 | |
| log_r = np.log(radii) | |
| log_c = np.log(correlation_integrals) | |
| # 选择线性区域 | |
| valid_indices = np.isfinite(log_c) & np.isfinite(log_r) | |
| if np.sum(valid_indices) > 5: | |
| slope, _ = np.polyfit(log_r[valid_indices], log_c[valid_indices], 1) | |
| return max(0, min(slope, 10)) # 限制在合理范围内 | |
| return 1.0 | |
| except: | |
| return 1.0 | |
| def _calculate_hurst_exponent(self, series: np.ndarray) -> float: | |
| """计算Hurst指数""" | |
| try: | |
| n = len(series) | |
| if n < 50: | |
| return 0.5 | |
| # R/S分析 | |
| max_k = int(n / 4) | |
| rs_values = [] | |
| time_scales = [] | |
| for k in range(10, max_k, max(1, max_k // 20)): | |
| # 将序列分割成长度为k的子序列 | |
| n_subseries = n // k | |
| rs_subseries = [] | |
| for i in range(n_subseries): | |
| subseries = series[i*k:(i+1)*k] | |
| mean_sub = np.mean(subseries) | |
| # 计算累积离差 | |
| cumulative_deviations = np.cumsum(subseries - mean_sub) | |
| # 计算R和S | |
| R = np.max(cumulative_deviations) - np.min(cumulative_deviations) | |
| S = np.std(subseries) | |
| if S > 1e-10: | |
| rs_subseries.append(R / S) | |
| if rs_subseries: | |
| rs_values.append(np.mean(rs_subseries)) | |
| time_scales.append(k) | |
| if len(rs_values) >= 5: | |
| # 对数线性回归 | |
| log_rs = np.log(rs_values) | |
| log_scales = np.log(time_scales) | |
| valid_indices = np.isfinite(log_rs) & np.isfinite(log_scales) | |
| if np.sum(valid_indices) >= 5: | |
| hurst, _ = np.polyfit(log_scales[valid_indices], log_rs[valid_indices], 1) | |
| return max(0, min(1, hurst)) # 限制在[0,1]范围内 | |
| return 0.5 | |
| except: | |
| return 0.5 | |
| def _interpret_hurst(self, hurst: float) -> str: | |
| """解释Hurst指数""" | |
| if hurst < 0.4: | |
| return "Mean-reverting (Anti-persistent)" | |
| elif hurst > 0.6: | |
| return "Trending (Persistent)" | |
| else: | |
| return "Random walk" | |
| # 示例和测试 | |
| if __name__ == "__main__": | |
| # 创建示例数据 | |
| np.random.seed(42) | |
| dates = pd.date_range('2023-01-01', periods=1000, freq='D') | |
| # 生成模拟的金融时间序列 | |
| returns = np.random.normal(0.001, 0.02, 1000) | |
| prices = 100 * np.exp(np.cumsum(returns)) | |
| # 创建多变量数据 | |
| data = pd.DataFrame({ | |
| 'Price': prices, | |
| 'Volume': np.random.lognormal(10, 0.5, 1000), | |
| 'Returns': returns, | |
| 'Volatility': pd.Series(returns).rolling(20).std().fillna(0.02) | |
| }, index=dates) | |
| print("Testing Causal & Risk Dynamics...") | |
| # 1. 因果发现测试 | |
| causal_discovery = CausalDiscovery() | |
| causal_results = causal_discovery.discover_causal_structure(data, method='var') | |
| print(f"Causal analysis type: {causal_results.get('type', 'failed')}") | |
| if 'causal_matrix' in causal_results: | |
| print(f"Causal matrix shape: {causal_results['causal_matrix'].shape}") | |
| # 2. 梯度动力学模拟测试 | |
| simulator = GradientDynamicsSimulator() | |
| vrp_potential = simulator.create_vrp_potential() | |
| initial_state = np.array([0.25, 0.15]) # VIX^2, RV | |
| dynamics_results = simulator.simulate_gradient_dynamics( | |
| initial_state, vrp_potential, steps=500 | |
| ) | |
| print(f"Dynamics simulation final state: {dynamics_results['final_state']}") | |
| print(f"Convergence time: {dynamics_results['convergence_time']}") | |
| # 3. 风险计量测试 | |
| risk_metrics = RiskMetrics() | |
| returns_series = pd.Series(returns, index=dates) | |
| var_es_results = risk_metrics.calculate_var_es(returns_series) | |
| print("VaR/ES Results:") | |
| for method, results in var_es_results.items(): | |
| print(f" {method}: VaR_95 = {results['VaR_95']:.4f}, ES_95 = {results['ES_95']:.4f}") | |
| # 4. 混沌分析测试 | |
| chaos_analyzer = ChaosAnalysis() | |
| chaos_results = chaos_analyzer.analyze_chaos_indicators(returns_series) | |
| if 'error' not in chaos_results: | |
| print("Chaos Analysis Results:") | |
| print(f" Lyapunov exponent: {chaos_results.get('lyapunov_exponent', 'N/A'):.4f}") | |
| print(f" Hurst exponent: {chaos_results.get('hurst_exponent', 'N/A'):.4f}") | |
| print(f" Is chaotic: {chaos_results.get('is_chaotic', False)}") | |
| print(f" Persistence type: {chaos_results.get('persistence_type', 'N/A')}") | |
| else: | |
| print(f"Chaos analysis error: {chaos_results['error']}") | |