Spaces:
Configuration error
Configuration error
| """ | |
| 因子工程引擎 | |
| Factor Engineering Engine - Traditional and Geometric Factors | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.manifold import Isomap | |
| from sklearn.preprocessing import StandardScaler, RobustScaler | |
| from sklearn.decomposition import PCA, FastICA | |
| from sklearn.feature_selection import mutual_info_regression | |
| import umap | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class TechnicalFactorCalculator: | |
| """技术因子计算器""" | |
| def __init__(self): | |
| self.calculated_factors = {} | |
| def calculate_all_factors(self, price_data: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| 计算所有技术因子 | |
| Args: | |
| price_data: 包含OHLCV数据的DataFrame | |
| Returns: | |
| DataFrame: 包含所有技术因子的DataFrame | |
| """ | |
| factors = pd.DataFrame(index=price_data.index) | |
| # 价格因子 | |
| factors = pd.concat([factors, self.calculate_price_factors(price_data)], axis=1) | |
| # 动量因子 | |
| factors = pd.concat([factors, self.calculate_momentum_factors(price_data)], axis=1) | |
| # 波动率因子 | |
| factors = pd.concat([factors, self.calculate_volatility_factors(price_data)], axis=1) | |
| # 成交量因子 | |
| factors = pd.concat([factors, self.calculate_volume_factors(price_data)], axis=1) | |
| # 技术指标因子 | |
| factors = pd.concat([factors, self.calculate_technical_indicators(price_data)], axis=1) | |
| return factors.dropna() | |
| def calculate_price_factors(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """计算价格相关因子""" | |
| factors = pd.DataFrame(index=df.index) | |
| # 基础价格因子 | |
| factors['Close_Open_Ratio'] = df['Close'] / df['Open'] | |
| factors['High_Low_Ratio'] = df['High'] / df['Low'] | |
| factors['Close_High_Ratio'] = df['Close'] / df['High'] | |
| factors['Close_Low_Ratio'] = df['Close'] / df['Low'] | |
| # 价格位置因子 | |
| factors['Price_Position'] = (df['Close'] - df['Low']) / (df['High'] - df['Low'] + 1e-8) | |
| factors['Body_Size'] = np.abs(df['Close'] - df['Open']) / (df['High'] - df['Low'] + 1e-8) | |
| factors['Upper_Shadow'] = (df['High'] - np.maximum(df['Open'], df['Close'])) / (df['High'] - df['Low'] + 1e-8) | |
| factors['Lower_Shadow'] = (np.minimum(df['Open'], df['Close']) - df['Low']) / (df['High'] - df['Low'] + 1e-8) | |
| # 价格差异因子 | |
| factors['HL_Spread'] = (df['High'] - df['Low']) / df['Close'] | |
| factors['OC_Spread'] = (df['Close'] - df['Open']) / df['Open'] | |
| return factors | |
| def calculate_momentum_factors(self, df: pd.DataFrame, periods: List[int] = [3, 5, 10, 20]) -> pd.DataFrame: | |
| """计算动量因子""" | |
| factors = pd.DataFrame(index=df.index) | |
| for period in periods: | |
| # 价格动量 | |
| factors[f'Price_Momentum_{period}'] = df['Close'].pct_change(period) | |
| factors[f'Log_Momentum_{period}'] = np.log(df['Close'] / df['Close'].shift(period)) | |
| # 成交量加权动量 | |
| vwap = self._calculate_vwap(df, period) | |
| factors[f'VWAP_Momentum_{period}'] = (df['Close'] - vwap) / vwap | |
| # 相对强度 | |
| up_moves = df['Close'].diff().clip(lower=0).rolling(period).sum() | |
| down_moves = -df['Close'].diff().clip(upper=0).rolling(period).sum() | |
| factors[f'RS_{period}'] = up_moves / (down_moves + 1e-8) | |
| # 动量加速度 | |
| if period >= 5: | |
| momentum = df['Close'].pct_change(period) | |
| factors[f'Momentum_Accel_{period}'] = momentum - momentum.shift(period // 2) | |
| return factors | |
| def calculate_volatility_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20, 60]) -> pd.DataFrame: | |
| """计算波动率因子""" | |
| factors = pd.DataFrame(index=df.index) | |
| returns = df['Close'].pct_change() | |
| for period in periods: | |
| # 历史波动率 | |
| factors[f'HV_{period}'] = returns.rolling(period).std() * np.sqrt(252) | |
| # 范围波动率 | |
| range_vol = np.log(df['High'] / df['Low']).rolling(period).mean() | |
| factors[f'Range_Vol_{period}'] = range_vol | |
| # Parkinson波动率 | |
| parkinson_vol = np.log(df['High'] / df['Low']).pow(2).rolling(period).mean() | |
| factors[f'Parkinson_Vol_{period}'] = np.sqrt(parkinson_vol * 252 / (4 * np.log(2))) | |
| # Garman-Klass波动率 | |
| ln_hl = np.log(df['High'] / df['Low']) | |
| ln_co = np.log(df['Close'] / df['Open']) | |
| gk_vol = (0.5 * ln_hl.pow(2) - (2 * np.log(2) - 1) * ln_co.pow(2)).rolling(period).mean() | |
| factors[f'GK_Vol_{period}'] = np.sqrt(gk_vol * 252) | |
| # 波动率的波动率 | |
| factors['Vol_of_Vol'] = factors['HV_20'].rolling(20).std() | |
| # 波动率偏度和峰度 | |
| factors['Returns_Skew'] = returns.rolling(60).skew() | |
| factors['Returns_Kurt'] = returns.rolling(60).kurtosis() | |
| return factors | |
| def calculate_volume_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20]) -> pd.DataFrame: | |
| """计算成交量因子""" | |
| factors = pd.DataFrame(index=df.index) | |
| for period in periods: | |
| # 成交量均线 | |
| vol_ma = df['Volume'].rolling(period).mean() | |
| factors[f'Vol_MA_{period}'] = vol_ma | |
| factors[f'Vol_Ratio_{period}'] = df['Volume'] / vol_ma | |
| # 成交量标准化 | |
| vol_std = df['Volume'].rolling(period).std() | |
| factors[f'Vol_Zscore_{period}'] = (df['Volume'] - vol_ma) / (vol_std + 1e-8) | |
| # 价量关系 | |
| price_change = df['Close'].pct_change() | |
| vol_price_corr = price_change.rolling(period).corr(df['Volume']) | |
| factors[f'Vol_Price_Corr_{period}'] = vol_price_corr | |
| # On Balance Volume | |
| obv = (np.sign(df['Close'].diff()) * df['Volume']).cumsum() | |
| factors['OBV'] = obv | |
| factors['OBV_MA_20'] = obv.rolling(20).mean() | |
| factors['OBV_Signal'] = obv - factors['OBV_MA_20'] | |
| # 成交金额 | |
| factors['Turnover'] = df['Close'] * df['Volume'] | |
| factors['Turnover_MA_20'] = factors['Turnover'].rolling(20).mean() | |
| factors['Turnover_Ratio'] = factors['Turnover'] / factors['Turnover_MA_20'] | |
| return factors | |
| def calculate_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """计算技术指标因子""" | |
| factors = pd.DataFrame(index=df.index) | |
| # RSI | |
| factors['RSI_14'] = self._calculate_rsi(df['Close'], 14) | |
| factors['RSI_30'] = self._calculate_rsi(df['Close'], 30) | |
| # MACD | |
| macd, signal, histogram = self._calculate_macd(df['Close']) | |
| factors['MACD'] = macd | |
| factors['MACD_Signal'] = signal | |
| factors['MACD_Histogram'] = histogram | |
| # Bollinger Bands | |
| bb_upper, bb_middle, bb_lower = self._calculate_bollinger_bands(df['Close'], 20, 2) | |
| factors['BB_Upper'] = bb_upper | |
| factors['BB_Middle'] = bb_middle | |
| factors['BB_Lower'] = bb_lower | |
| factors['BB_Width'] = (bb_upper - bb_lower) / bb_middle | |
| factors['BB_Position'] = (df['Close'] - bb_lower) / (bb_upper - bb_lower) | |
| # Stochastic | |
| factors['Stoch_K'], factors['Stoch_D'] = self._calculate_stochastic(df, 14, 3) | |
| # Williams %R | |
| factors['Williams_R'] = self._calculate_williams_r(df, 14) | |
| # Average True Range | |
| factors['ATR'] = self._calculate_atr(df, 14) | |
| factors['ATR_Ratio'] = factors['ATR'] / df['Close'] | |
| return factors | |
| def _calculate_vwap(self, df: pd.DataFrame, period: int) -> pd.Series: | |
| """计算VWAP""" | |
| typical_price = (df['High'] + df['Low'] + df['Close']) / 3 | |
| vwap = (typical_price * df['Volume']).rolling(period).sum() / df['Volume'].rolling(period).sum() | |
| return vwap | |
| def _calculate_rsi(self, prices: pd.Series, period: int) -> pd.Series: | |
| """计算RSI""" | |
| delta = prices.diff() | |
| gain = delta.where(delta > 0, 0).rolling(period).mean() | |
| loss = -delta.where(delta < 0, 0).rolling(period).mean() | |
| rs = gain / (loss + 1e-8) | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi | |
| def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| """计算MACD""" | |
| ema_fast = prices.ewm(span=fast).mean() | |
| ema_slow = prices.ewm(span=slow).mean() | |
| macd = ema_fast - ema_slow | |
| signal_line = macd.ewm(span=signal).mean() | |
| histogram = macd - signal_line | |
| return macd, signal_line, histogram | |
| def _calculate_bollinger_bands(self, prices: pd.Series, period: int, std_dev: float) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| """计算布林带""" | |
| middle = prices.rolling(period).mean() | |
| std = prices.rolling(period).std() | |
| upper = middle + std_dev * std | |
| lower = middle - std_dev * std | |
| return upper, middle, lower | |
| def _calculate_stochastic(self, df: pd.DataFrame, k_period: int, d_period: int) -> Tuple[pd.Series, pd.Series]: | |
| """计算随机指标""" | |
| low_min = df['Low'].rolling(k_period).min() | |
| high_max = df['High'].rolling(k_period).max() | |
| k_percent = 100 * (df['Close'] - low_min) / (high_max - low_min) | |
| d_percent = k_percent.rolling(d_period).mean() | |
| return k_percent, d_percent | |
| def _calculate_williams_r(self, df: pd.DataFrame, period: int) -> pd.Series: | |
| """计算Williams %R""" | |
| high_max = df['High'].rolling(period).max() | |
| low_min = df['Low'].rolling(period).min() | |
| williams_r = -100 * (high_max - df['Close']) / (high_max - low_min) | |
| return williams_r | |
| def _calculate_atr(self, df: pd.DataFrame, period: int) -> pd.Series: | |
| """计算ATR""" | |
| high_low = df['High'] - df['Low'] | |
| high_close = np.abs(df['High'] - df['Close'].shift()) | |
| low_close = np.abs(df['Low'] - df['Close'].shift()) | |
| tr = np.maximum(high_low, np.maximum(high_close, low_close)) | |
| atr = tr.rolling(period).mean() | |
| return atr | |
| class GeometricFactorEngine: | |
| """几何因子工程引擎""" | |
| def __init__(self, n_components: int = 10, random_state: int = 42): | |
| self.n_components = n_components | |
| self.random_state = random_state | |
| self.scaler = StandardScaler() | |
| self.robust_scaler = RobustScaler() | |
| self.pca = None | |
| self.umap_model = None | |
| self.isomap = None | |
| self.ica = None | |
| def fit_transform_all_methods(self, data: pd.DataFrame) -> Dict[str, np.ndarray]: | |
| """ | |
| 使用多种方法进行几何因子提取 | |
| Args: | |
| data: 输入因子数据 | |
| Returns: | |
| Dict: 包含不同方法结果的字典 | |
| """ | |
| results = {} | |
| # 标准化数据 | |
| scaled_data = self.scaler.fit_transform(data) | |
| robust_scaled_data = self.robust_scaler.fit_transform(data) | |
| # PCA分解 | |
| results['pca'] = self.fit_transform_pca(scaled_data) | |
| # UMAP降维 | |
| results['umap'] = self.fit_transform_umap(scaled_data) | |
| # Isomap流形学习 | |
| results['isomap'] = self.fit_transform_isomap(scaled_data) | |
| # ICA独立成分分析 | |
| results['ica'] = self.fit_transform_ica(robust_scaled_data) | |
| # 主成分旋转 | |
| results['rotated_pca'] = self.fit_transform_rotated_pca(scaled_data) | |
| return results | |
| def fit_transform_pca(self, data: np.ndarray, variance_threshold: float = 0.95) -> np.ndarray: | |
| """PCA降维""" | |
| self.pca = PCA(n_components=variance_threshold, random_state=self.random_state) | |
| pca_result = self.pca.fit_transform(data) | |
| # 调整到目标维度 | |
| if pca_result.shape[1] > self.n_components: | |
| pca_result = pca_result[:, :self.n_components] | |
| return pca_result | |
| def fit_transform_umap(self, data: np.ndarray) -> np.ndarray: | |
| """UMAP非线性降维""" | |
| self.umap_model = umap.UMAP( | |
| n_components=self.n_components, | |
| n_neighbors=min(50, data.shape[0] // 4), | |
| min_dist=0.1, | |
| metric='euclidean', | |
| random_state=self.random_state | |
| ) | |
| return self.umap_model.fit_transform(data) | |
| def fit_transform_isomap(self, data: np.ndarray) -> np.ndarray: | |
| """Isomap流形学习""" | |
| n_neighbors = min(30, data.shape[0] // 3) | |
| self.isomap = Isomap(n_components=self.n_components, n_neighbors=n_neighbors) | |
| return self.isomap.fit_transform(data) | |
| def fit_transform_ica(self, data: np.ndarray) -> np.ndarray: | |
| """独立成分分析""" | |
| n_components = min(self.n_components, data.shape[1]) | |
| self.ica = FastICA(n_components=n_components, random_state=self.random_state, max_iter=1000) | |
| return self.ica.fit_transform(data) | |
| def fit_transform_rotated_pca(self, data: np.ndarray) -> np.ndarray: | |
| """旋转主成分分析""" | |
| from sklearn.decomposition import PCA | |
| from scipy.stats import special_ortho_group | |
| # 先进行PCA | |
| pca = PCA(n_components=self.n_components, random_state=self.random_state) | |
| pca_result = pca.fit_transform(data) | |
| # 应用随机正交变换 | |
| rotation_matrix = special_ortho_group.rvs(self.n_components, random_state=self.random_state) | |
| rotated_result = np.dot(pca_result, rotation_matrix) | |
| return rotated_result | |
| def calculate_geometric_features(self, data: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| 计算几何特征 | |
| Args: | |
| data: 输入数据 | |
| Returns: | |
| DataFrame: 几何特征 | |
| """ | |
| features = pd.DataFrame(index=data.index) | |
| # 计算距离特征 | |
| features['Euclidean_Norm'] = np.sqrt(np.sum(data**2, axis=1)) | |
| features['Manhattan_Norm'] = np.sum(np.abs(data), axis=1) | |
| # 计算角度特征 | |
| if data.shape[1] >= 2: | |
| features['Angle_First_Two'] = np.arctan2(data.iloc[:, 1], data.iloc[:, 0]) | |
| features['Radius_First_Two'] = np.sqrt(data.iloc[:, 0]**2 + data.iloc[:, 1]**2) | |
| # 计算重心距离 | |
| centroid = data.mean() | |
| features['Distance_To_Centroid'] = np.sqrt(np.sum((data - centroid)**2, axis=1)) | |
| # 计算相对位置 | |
| features['Relative_Position'] = (features['Distance_To_Centroid'] - | |
| features['Distance_To_Centroid'].rolling(20).mean()) / \ | |
| (features['Distance_To_Centroid'].rolling(20).std() + 1e-8) | |
| return features | |
| def select_informative_factors(self, factors: pd.DataFrame, target: pd.Series, | |
| method: str = 'mutual_info', k: int = 20) -> List[str]: | |
| """ | |
| 选择信息量最大的因子 | |
| Args: | |
| factors: 因子数据 | |
| target: 目标变量 | |
| method: 选择方法 | |
| k: 选择的因子数量 | |
| Returns: | |
| List: 选中的因子名称 | |
| """ | |
| if method == 'mutual_info': | |
| # 使用互信息选择特征 | |
| mi_scores = mutual_info_regression(factors.fillna(0), target.fillna(0)) | |
| factor_scores = pd.Series(mi_scores, index=factors.columns) | |
| selected_factors = factor_scores.nlargest(k).index.tolist() | |
| elif method == 'correlation': | |
| # 使用相关系数选择特征 | |
| correlations = factors.corrwith(target).abs() | |
| selected_factors = correlations.nlargest(k).index.tolist() | |
| elif method == 'variance': | |
| # 使用方差选择特征 | |
| variances = factors.var() | |
| selected_factors = variances.nlargest(k).index.tolist() | |
| else: | |
| raise ValueError(f"Unknown selection method: {method}") | |
| return selected_factors | |
| class FactorCombiner: | |
| """因子合成器""" | |
| def __init__(self): | |
| self.weights = {} | |
| def combine_factors_weighted(self, factors: Dict[str, pd.DataFrame], | |
| weights: Optional[Dict[str, float]] = None) -> pd.DataFrame: | |
| """ | |
| 加权合成因子 | |
| Args: | |
| factors: 因子字典 | |
| weights: 权重字典 | |
| Returns: | |
| DataFrame: 合成后的因子 | |
| """ | |
| if weights is None: | |
| weights = {key: 1.0 for key in factors.keys()} | |
| combined = pd.DataFrame() | |
| for factor_type, factor_data in factors.items(): | |
| weight = weights.get(factor_type, 1.0) | |
| # 标准化因子 | |
| normalized_factors = (factor_data - factor_data.mean()) / (factor_data.std() + 1e-8) | |
| # 添加权重 | |
| weighted_factors = normalized_factors * weight | |
| # 添加前缀 | |
| weighted_factors.columns = [f"{factor_type}_{col}" for col in weighted_factors.columns] | |
| combined = pd.concat([combined, weighted_factors], axis=1) | |
| return combined | |
| def create_composite_factors(self, technical_factors: pd.DataFrame, | |
| geometric_factors: Dict[str, np.ndarray]) -> pd.DataFrame: | |
| """ | |
| 创建复合因子 | |
| Args: | |
| technical_factors: 技术因子 | |
| geometric_factors: 几何因子 | |
| Returns: | |
| DataFrame: 复合因子 | |
| """ | |
| composite = technical_factors.copy() | |
| # 添加几何因子 | |
| for method, geo_factors in geometric_factors.items(): | |
| geo_df = pd.DataFrame( | |
| geo_factors, | |
| index=technical_factors.index[:len(geo_factors)], | |
| columns=[f"Geo_{method}_{i}" for i in range(geo_factors.shape[1])] | |
| ) | |
| composite = pd.concat([composite, geo_df], axis=1) | |
| # 创建交互因子 | |
| composite = self._create_interaction_factors(composite) | |
| # 创建时间序列因子 | |
| composite = self._create_time_series_factors(composite) | |
| return composite.dropna() | |
| def _create_interaction_factors(self, factors: pd.DataFrame) -> pd.DataFrame: | |
| """创建交互因子""" | |
| interaction_factors = factors.copy() | |
| # 选择前几个最重要的因子进行交互 | |
| important_factors = factors.columns[:min(10, len(factors.columns))] | |
| for i, factor1 in enumerate(important_factors[:5]): | |
| for factor2 in important_factors[i+1:6]: | |
| # 乘积交互 | |
| interaction_factors[f"Interact_{factor1}_{factor2}"] = factors[factor1] * factors[factor2] | |
| # 比值交互 | |
| interaction_factors[f"Ratio_{factor1}_{factor2}"] = factors[factor1] / (factors[factor2] + 1e-8) | |
| return interaction_factors | |
| def _create_time_series_factors(self, factors: pd.DataFrame) -> pd.DataFrame: | |
| """创建时间序列因子""" | |
| ts_factors = factors.copy() | |
| # 选择几个重要因子 | |
| important_factors = factors.columns[:min(5, len(factors.columns))] | |
| for factor in important_factors: | |
| # 移动平均 | |
| ts_factors[f"{factor}_MA_5"] = factors[factor].rolling(5).mean() | |
| ts_factors[f"{factor}_MA_20"] = factors[factor].rolling(20).mean() | |
| # 动量 | |
| ts_factors[f"{factor}_Momentum_5"] = factors[factor] - factors[factor].shift(5) | |
| # 标准化 | |
| ts_factors[f"{factor}_Zscore"] = (factors[factor] - factors[factor].rolling(20).mean()) / \ | |
| (factors[factor].rolling(20).std() + 1e-8) | |
| return ts_factors | |
| # 使用示例 | |
| if __name__ == "__main__": | |
| # 创建示例数据 | |
| dates = pd.date_range('2023-01-01', periods=1000, freq='15min') | |
| np.random.seed(42) | |
| price_data = pd.DataFrame({ | |
| 'Open': 100 + np.cumsum(np.random.randn(1000) * 0.1), | |
| 'High': 100 + np.cumsum(np.random.randn(1000) * 0.1), | |
| 'Low': 100 + np.cumsum(np.random.randn(1000) * 0.1), | |
| 'Close': 100 + np.cumsum(np.random.randn(1000) * 0.1), | |
| 'Volume': np.random.randint(1000, 10000, 1000) | |
| }, index=dates) | |
| # 确保OHLC逻辑正确 | |
| price_data['High'] = price_data[['Open', 'High', 'Low', 'Close']].max(axis=1) | |
| price_data['Low'] = price_data[['Open', 'High', 'Low', 'Close']].min(axis=1) | |
| print("Testing Factor Engineering...") | |
| # 技术因子计算 | |
| tech_calculator = TechnicalFactorCalculator() | |
| technical_factors = tech_calculator.calculate_all_factors(price_data) | |
| print(f"Technical factors shape: {technical_factors.shape}") | |
| print(f"Technical factors columns: {technical_factors.columns.tolist()}") | |
| # 几何因子工程 | |
| geo_engine = GeometricFactorEngine(n_components=8) | |
| geometric_factors = geo_engine.fit_transform_all_methods(technical_factors) | |
| print(f"Geometric factors methods: {geometric_factors.keys()}") | |
| for method, factors in geometric_factors.items(): | |
| print(f"{method} shape: {factors.shape}") | |
| # 因子合成 | |
| combiner = FactorCombiner() | |
| composite_factors = combiner.create_composite_factors(technical_factors, geometric_factors) | |
| print(f"Composite factors shape: {composite_factors.shape}") | |
| print(f"Sample composite factor names: {composite_factors.columns.tolist()[:20]}") | |