""" 因子工程引擎 Factor Engineering Engine - Traditional and Geometric Factors """ import pandas as pd import numpy as np from sklearn.manifold import Isomap from sklearn.preprocessing import StandardScaler, RobustScaler from sklearn.decomposition import PCA, FastICA from sklearn.feature_selection import mutual_info_regression import umap from typing import Dict, List, Optional, Tuple, Union import warnings warnings.filterwarnings('ignore') class TechnicalFactorCalculator: """技术因子计算器""" def __init__(self): self.calculated_factors = {} def calculate_all_factors(self, price_data: pd.DataFrame) -> pd.DataFrame: """ 计算所有技术因子 Args: price_data: 包含OHLCV数据的DataFrame Returns: DataFrame: 包含所有技术因子的DataFrame """ factors = pd.DataFrame(index=price_data.index) # 价格因子 factors = pd.concat([factors, self.calculate_price_factors(price_data)], axis=1) # 动量因子 factors = pd.concat([factors, self.calculate_momentum_factors(price_data)], axis=1) # 波动率因子 factors = pd.concat([factors, self.calculate_volatility_factors(price_data)], axis=1) # 成交量因子 factors = pd.concat([factors, self.calculate_volume_factors(price_data)], axis=1) # 技术指标因子 factors = pd.concat([factors, self.calculate_technical_indicators(price_data)], axis=1) return factors.dropna() def calculate_price_factors(self, df: pd.DataFrame) -> pd.DataFrame: """计算价格相关因子""" factors = pd.DataFrame(index=df.index) # 基础价格因子 factors['Close_Open_Ratio'] = df['Close'] / df['Open'] factors['High_Low_Ratio'] = df['High'] / df['Low'] factors['Close_High_Ratio'] = df['Close'] / df['High'] factors['Close_Low_Ratio'] = df['Close'] / df['Low'] # 价格位置因子 factors['Price_Position'] = (df['Close'] - df['Low']) / (df['High'] - df['Low'] + 1e-8) factors['Body_Size'] = np.abs(df['Close'] - df['Open']) / (df['High'] - df['Low'] + 1e-8) factors['Upper_Shadow'] = (df['High'] - np.maximum(df['Open'], df['Close'])) / (df['High'] - df['Low'] + 1e-8) factors['Lower_Shadow'] = (np.minimum(df['Open'], df['Close']) - df['Low']) / (df['High'] - df['Low'] + 1e-8) # 价格差异因子 factors['HL_Spread'] = (df['High'] - df['Low']) / df['Close'] factors['OC_Spread'] = (df['Close'] - df['Open']) / df['Open'] return factors def calculate_momentum_factors(self, df: pd.DataFrame, periods: List[int] = [3, 5, 10, 20]) -> pd.DataFrame: """计算动量因子""" factors = pd.DataFrame(index=df.index) for period in periods: # 价格动量 factors[f'Price_Momentum_{period}'] = df['Close'].pct_change(period) factors[f'Log_Momentum_{period}'] = np.log(df['Close'] / df['Close'].shift(period)) # 成交量加权动量 vwap = self._calculate_vwap(df, period) factors[f'VWAP_Momentum_{period}'] = (df['Close'] - vwap) / vwap # 相对强度 up_moves = df['Close'].diff().clip(lower=0).rolling(period).sum() down_moves = -df['Close'].diff().clip(upper=0).rolling(period).sum() factors[f'RS_{period}'] = up_moves / (down_moves + 1e-8) # 动量加速度 if period >= 5: momentum = df['Close'].pct_change(period) factors[f'Momentum_Accel_{period}'] = momentum - momentum.shift(period // 2) return factors def calculate_volatility_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20, 60]) -> pd.DataFrame: """计算波动率因子""" factors = pd.DataFrame(index=df.index) returns = df['Close'].pct_change() for period in periods: # 历史波动率 factors[f'HV_{period}'] = returns.rolling(period).std() * np.sqrt(252) # 范围波动率 range_vol = np.log(df['High'] / df['Low']).rolling(period).mean() factors[f'Range_Vol_{period}'] = range_vol # Parkinson波动率 parkinson_vol = np.log(df['High'] / df['Low']).pow(2).rolling(period).mean() factors[f'Parkinson_Vol_{period}'] = np.sqrt(parkinson_vol * 252 / (4 * np.log(2))) # Garman-Klass波动率 ln_hl = np.log(df['High'] / df['Low']) ln_co = np.log(df['Close'] / df['Open']) gk_vol = (0.5 * ln_hl.pow(2) - (2 * np.log(2) - 1) * ln_co.pow(2)).rolling(period).mean() factors[f'GK_Vol_{period}'] = np.sqrt(gk_vol * 252) # 波动率的波动率 factors['Vol_of_Vol'] = factors['HV_20'].rolling(20).std() # 波动率偏度和峰度 factors['Returns_Skew'] = returns.rolling(60).skew() factors['Returns_Kurt'] = returns.rolling(60).kurtosis() return factors def calculate_volume_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20]) -> pd.DataFrame: """计算成交量因子""" factors = pd.DataFrame(index=df.index) for period in periods: # 成交量均线 vol_ma = df['Volume'].rolling(period).mean() factors[f'Vol_MA_{period}'] = vol_ma factors[f'Vol_Ratio_{period}'] = df['Volume'] / vol_ma # 成交量标准化 vol_std = df['Volume'].rolling(period).std() factors[f'Vol_Zscore_{period}'] = (df['Volume'] - vol_ma) / (vol_std + 1e-8) # 价量关系 price_change = df['Close'].pct_change() vol_price_corr = price_change.rolling(period).corr(df['Volume']) factors[f'Vol_Price_Corr_{period}'] = vol_price_corr # On Balance Volume obv = (np.sign(df['Close'].diff()) * df['Volume']).cumsum() factors['OBV'] = obv factors['OBV_MA_20'] = obv.rolling(20).mean() factors['OBV_Signal'] = obv - factors['OBV_MA_20'] # 成交金额 factors['Turnover'] = df['Close'] * df['Volume'] factors['Turnover_MA_20'] = factors['Turnover'].rolling(20).mean() factors['Turnover_Ratio'] = factors['Turnover'] / factors['Turnover_MA_20'] return factors def calculate_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """计算技术指标因子""" factors = pd.DataFrame(index=df.index) # RSI factors['RSI_14'] = self._calculate_rsi(df['Close'], 14) factors['RSI_30'] = self._calculate_rsi(df['Close'], 30) # MACD macd, signal, histogram = self._calculate_macd(df['Close']) factors['MACD'] = macd factors['MACD_Signal'] = signal factors['MACD_Histogram'] = histogram # Bollinger Bands bb_upper, bb_middle, bb_lower = self._calculate_bollinger_bands(df['Close'], 20, 2) factors['BB_Upper'] = bb_upper factors['BB_Middle'] = bb_middle factors['BB_Lower'] = bb_lower factors['BB_Width'] = (bb_upper - bb_lower) / bb_middle factors['BB_Position'] = (df['Close'] - bb_lower) / (bb_upper - bb_lower) # Stochastic factors['Stoch_K'], factors['Stoch_D'] = self._calculate_stochastic(df, 14, 3) # Williams %R factors['Williams_R'] = self._calculate_williams_r(df, 14) # Average True Range factors['ATR'] = self._calculate_atr(df, 14) factors['ATR_Ratio'] = factors['ATR'] / df['Close'] return factors def _calculate_vwap(self, df: pd.DataFrame, period: int) -> pd.Series: """计算VWAP""" typical_price = (df['High'] + df['Low'] + df['Close']) / 3 vwap = (typical_price * df['Volume']).rolling(period).sum() / df['Volume'].rolling(period).sum() return vwap def _calculate_rsi(self, prices: pd.Series, period: int) -> pd.Series: """计算RSI""" delta = prices.diff() gain = delta.where(delta > 0, 0).rolling(period).mean() loss = -delta.where(delta < 0, 0).rolling(period).mean() rs = gain / (loss + 1e-8) rsi = 100 - (100 / (1 + rs)) return rsi def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]: """计算MACD""" ema_fast = prices.ewm(span=fast).mean() ema_slow = prices.ewm(span=slow).mean() macd = ema_fast - ema_slow signal_line = macd.ewm(span=signal).mean() histogram = macd - signal_line return macd, signal_line, histogram def _calculate_bollinger_bands(self, prices: pd.Series, period: int, std_dev: float) -> Tuple[pd.Series, pd.Series, pd.Series]: """计算布林带""" middle = prices.rolling(period).mean() std = prices.rolling(period).std() upper = middle + std_dev * std lower = middle - std_dev * std return upper, middle, lower def _calculate_stochastic(self, df: pd.DataFrame, k_period: int, d_period: int) -> Tuple[pd.Series, pd.Series]: """计算随机指标""" low_min = df['Low'].rolling(k_period).min() high_max = df['High'].rolling(k_period).max() k_percent = 100 * (df['Close'] - low_min) / (high_max - low_min) d_percent = k_percent.rolling(d_period).mean() return k_percent, d_percent def _calculate_williams_r(self, df: pd.DataFrame, period: int) -> pd.Series: """计算Williams %R""" high_max = df['High'].rolling(period).max() low_min = df['Low'].rolling(period).min() williams_r = -100 * (high_max - df['Close']) / (high_max - low_min) return williams_r def _calculate_atr(self, df: pd.DataFrame, period: int) -> pd.Series: """计算ATR""" high_low = df['High'] - df['Low'] high_close = np.abs(df['High'] - df['Close'].shift()) low_close = np.abs(df['Low'] - df['Close'].shift()) tr = np.maximum(high_low, np.maximum(high_close, low_close)) atr = tr.rolling(period).mean() return atr class GeometricFactorEngine: """几何因子工程引擎""" def __init__(self, n_components: int = 10, random_state: int = 42): self.n_components = n_components self.random_state = random_state self.scaler = StandardScaler() self.robust_scaler = RobustScaler() self.pca = None self.umap_model = None self.isomap = None self.ica = None def fit_transform_all_methods(self, data: pd.DataFrame) -> Dict[str, np.ndarray]: """ 使用多种方法进行几何因子提取 Args: data: 输入因子数据 Returns: Dict: 包含不同方法结果的字典 """ results = {} # 标准化数据 scaled_data = self.scaler.fit_transform(data) robust_scaled_data = self.robust_scaler.fit_transform(data) # PCA分解 results['pca'] = self.fit_transform_pca(scaled_data) # UMAP降维 results['umap'] = self.fit_transform_umap(scaled_data) # Isomap流形学习 results['isomap'] = self.fit_transform_isomap(scaled_data) # ICA独立成分分析 results['ica'] = self.fit_transform_ica(robust_scaled_data) # 主成分旋转 results['rotated_pca'] = self.fit_transform_rotated_pca(scaled_data) return results def fit_transform_pca(self, data: np.ndarray, variance_threshold: float = 0.95) -> np.ndarray: """PCA降维""" self.pca = PCA(n_components=variance_threshold, random_state=self.random_state) pca_result = self.pca.fit_transform(data) # 调整到目标维度 if pca_result.shape[1] > self.n_components: pca_result = pca_result[:, :self.n_components] return pca_result def fit_transform_umap(self, data: np.ndarray) -> np.ndarray: """UMAP非线性降维""" self.umap_model = umap.UMAP( n_components=self.n_components, n_neighbors=min(50, data.shape[0] // 4), min_dist=0.1, metric='euclidean', random_state=self.random_state ) return self.umap_model.fit_transform(data) def fit_transform_isomap(self, data: np.ndarray) -> np.ndarray: """Isomap流形学习""" n_neighbors = min(30, data.shape[0] // 3) self.isomap = Isomap(n_components=self.n_components, n_neighbors=n_neighbors) return self.isomap.fit_transform(data) def fit_transform_ica(self, data: np.ndarray) -> np.ndarray: """独立成分分析""" n_components = min(self.n_components, data.shape[1]) self.ica = FastICA(n_components=n_components, random_state=self.random_state, max_iter=1000) return self.ica.fit_transform(data) def fit_transform_rotated_pca(self, data: np.ndarray) -> np.ndarray: """旋转主成分分析""" from sklearn.decomposition import PCA from scipy.stats import special_ortho_group # 先进行PCA pca = PCA(n_components=self.n_components, random_state=self.random_state) pca_result = pca.fit_transform(data) # 应用随机正交变换 rotation_matrix = special_ortho_group.rvs(self.n_components, random_state=self.random_state) rotated_result = np.dot(pca_result, rotation_matrix) return rotated_result def calculate_geometric_features(self, data: pd.DataFrame) -> pd.DataFrame: """ 计算几何特征 Args: data: 输入数据 Returns: DataFrame: 几何特征 """ features = pd.DataFrame(index=data.index) # 计算距离特征 features['Euclidean_Norm'] = np.sqrt(np.sum(data**2, axis=1)) features['Manhattan_Norm'] = np.sum(np.abs(data), axis=1) # 计算角度特征 if data.shape[1] >= 2: features['Angle_First_Two'] = np.arctan2(data.iloc[:, 1], data.iloc[:, 0]) features['Radius_First_Two'] = np.sqrt(data.iloc[:, 0]**2 + data.iloc[:, 1]**2) # 计算重心距离 centroid = data.mean() features['Distance_To_Centroid'] = np.sqrt(np.sum((data - centroid)**2, axis=1)) # 计算相对位置 features['Relative_Position'] = (features['Distance_To_Centroid'] - features['Distance_To_Centroid'].rolling(20).mean()) / \ (features['Distance_To_Centroid'].rolling(20).std() + 1e-8) return features def select_informative_factors(self, factors: pd.DataFrame, target: pd.Series, method: str = 'mutual_info', k: int = 20) -> List[str]: """ 选择信息量最大的因子 Args: factors: 因子数据 target: 目标变量 method: 选择方法 k: 选择的因子数量 Returns: List: 选中的因子名称 """ if method == 'mutual_info': # 使用互信息选择特征 mi_scores = mutual_info_regression(factors.fillna(0), target.fillna(0)) factor_scores = pd.Series(mi_scores, index=factors.columns) selected_factors = factor_scores.nlargest(k).index.tolist() elif method == 'correlation': # 使用相关系数选择特征 correlations = factors.corrwith(target).abs() selected_factors = correlations.nlargest(k).index.tolist() elif method == 'variance': # 使用方差选择特征 variances = factors.var() selected_factors = variances.nlargest(k).index.tolist() else: raise ValueError(f"Unknown selection method: {method}") return selected_factors class FactorCombiner: """因子合成器""" def __init__(self): self.weights = {} def combine_factors_weighted(self, factors: Dict[str, pd.DataFrame], weights: Optional[Dict[str, float]] = None) -> pd.DataFrame: """ 加权合成因子 Args: factors: 因子字典 weights: 权重字典 Returns: DataFrame: 合成后的因子 """ if weights is None: weights = {key: 1.0 for key in factors.keys()} combined = pd.DataFrame() for factor_type, factor_data in factors.items(): weight = weights.get(factor_type, 1.0) # 标准化因子 normalized_factors = (factor_data - factor_data.mean()) / (factor_data.std() + 1e-8) # 添加权重 weighted_factors = normalized_factors * weight # 添加前缀 weighted_factors.columns = [f"{factor_type}_{col}" for col in weighted_factors.columns] combined = pd.concat([combined, weighted_factors], axis=1) return combined def create_composite_factors(self, technical_factors: pd.DataFrame, geometric_factors: Dict[str, np.ndarray]) -> pd.DataFrame: """ 创建复合因子 Args: technical_factors: 技术因子 geometric_factors: 几何因子 Returns: DataFrame: 复合因子 """ composite = technical_factors.copy() # 添加几何因子 for method, geo_factors in geometric_factors.items(): geo_df = pd.DataFrame( geo_factors, index=technical_factors.index[:len(geo_factors)], columns=[f"Geo_{method}_{i}" for i in range(geo_factors.shape[1])] ) composite = pd.concat([composite, geo_df], axis=1) # 创建交互因子 composite = self._create_interaction_factors(composite) # 创建时间序列因子 composite = self._create_time_series_factors(composite) return composite.dropna() def _create_interaction_factors(self, factors: pd.DataFrame) -> pd.DataFrame: """创建交互因子""" interaction_factors = factors.copy() # 选择前几个最重要的因子进行交互 important_factors = factors.columns[:min(10, len(factors.columns))] for i, factor1 in enumerate(important_factors[:5]): for factor2 in important_factors[i+1:6]: # 乘积交互 interaction_factors[f"Interact_{factor1}_{factor2}"] = factors[factor1] * factors[factor2] # 比值交互 interaction_factors[f"Ratio_{factor1}_{factor2}"] = factors[factor1] / (factors[factor2] + 1e-8) return interaction_factors def _create_time_series_factors(self, factors: pd.DataFrame) -> pd.DataFrame: """创建时间序列因子""" ts_factors = factors.copy() # 选择几个重要因子 important_factors = factors.columns[:min(5, len(factors.columns))] for factor in important_factors: # 移动平均 ts_factors[f"{factor}_MA_5"] = factors[factor].rolling(5).mean() ts_factors[f"{factor}_MA_20"] = factors[factor].rolling(20).mean() # 动量 ts_factors[f"{factor}_Momentum_5"] = factors[factor] - factors[factor].shift(5) # 标准化 ts_factors[f"{factor}_Zscore"] = (factors[factor] - factors[factor].rolling(20).mean()) / \ (factors[factor].rolling(20).std() + 1e-8) return ts_factors # 使用示例 if __name__ == "__main__": # 创建示例数据 dates = pd.date_range('2023-01-01', periods=1000, freq='15min') np.random.seed(42) price_data = pd.DataFrame({ 'Open': 100 + np.cumsum(np.random.randn(1000) * 0.1), 'High': 100 + np.cumsum(np.random.randn(1000) * 0.1), 'Low': 100 + np.cumsum(np.random.randn(1000) * 0.1), 'Close': 100 + np.cumsum(np.random.randn(1000) * 0.1), 'Volume': np.random.randint(1000, 10000, 1000) }, index=dates) # 确保OHLC逻辑正确 price_data['High'] = price_data[['Open', 'High', 'Low', 'Close']].max(axis=1) price_data['Low'] = price_data[['Open', 'High', 'Low', 'Close']].min(axis=1) print("Testing Factor Engineering...") # 技术因子计算 tech_calculator = TechnicalFactorCalculator() technical_factors = tech_calculator.calculate_all_factors(price_data) print(f"Technical factors shape: {technical_factors.shape}") print(f"Technical factors columns: {technical_factors.columns.tolist()}") # 几何因子工程 geo_engine = GeometricFactorEngine(n_components=8) geometric_factors = geo_engine.fit_transform_all_methods(technical_factors) print(f"Geometric factors methods: {geometric_factors.keys()}") for method, factors in geometric_factors.items(): print(f"{method} shape: {factors.shape}") # 因子合成 combiner = FactorCombiner() composite_factors = combiner.create_composite_factors(technical_factors, geometric_factors) print(f"Composite factors shape: {composite_factors.shape}") print(f"Sample composite factor names: {composite_factors.columns.tolist()[:20]}")