hk-trading-platform / modules /factor_engine.py
Humphreykowl's picture
Upload 7 files
85fdeb5 verified
"""
因子工程引擎
Factor Engineering Engine - Traditional and Geometric Factors
"""
import pandas as pd
import numpy as np
from sklearn.manifold import Isomap
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.decomposition import PCA, FastICA
from sklearn.feature_selection import mutual_info_regression
import umap
from typing import Dict, List, Optional, Tuple, Union
import warnings
warnings.filterwarnings('ignore')
class TechnicalFactorCalculator:
"""技术因子计算器"""
def __init__(self):
self.calculated_factors = {}
def calculate_all_factors(self, price_data: pd.DataFrame) -> pd.DataFrame:
"""
计算所有技术因子
Args:
price_data: 包含OHLCV数据的DataFrame
Returns:
DataFrame: 包含所有技术因子的DataFrame
"""
factors = pd.DataFrame(index=price_data.index)
# 价格因子
factors = pd.concat([factors, self.calculate_price_factors(price_data)], axis=1)
# 动量因子
factors = pd.concat([factors, self.calculate_momentum_factors(price_data)], axis=1)
# 波动率因子
factors = pd.concat([factors, self.calculate_volatility_factors(price_data)], axis=1)
# 成交量因子
factors = pd.concat([factors, self.calculate_volume_factors(price_data)], axis=1)
# 技术指标因子
factors = pd.concat([factors, self.calculate_technical_indicators(price_data)], axis=1)
return factors.dropna()
def calculate_price_factors(self, df: pd.DataFrame) -> pd.DataFrame:
"""计算价格相关因子"""
factors = pd.DataFrame(index=df.index)
# 基础价格因子
factors['Close_Open_Ratio'] = df['Close'] / df['Open']
factors['High_Low_Ratio'] = df['High'] / df['Low']
factors['Close_High_Ratio'] = df['Close'] / df['High']
factors['Close_Low_Ratio'] = df['Close'] / df['Low']
# 价格位置因子
factors['Price_Position'] = (df['Close'] - df['Low']) / (df['High'] - df['Low'] + 1e-8)
factors['Body_Size'] = np.abs(df['Close'] - df['Open']) / (df['High'] - df['Low'] + 1e-8)
factors['Upper_Shadow'] = (df['High'] - np.maximum(df['Open'], df['Close'])) / (df['High'] - df['Low'] + 1e-8)
factors['Lower_Shadow'] = (np.minimum(df['Open'], df['Close']) - df['Low']) / (df['High'] - df['Low'] + 1e-8)
# 价格差异因子
factors['HL_Spread'] = (df['High'] - df['Low']) / df['Close']
factors['OC_Spread'] = (df['Close'] - df['Open']) / df['Open']
return factors
def calculate_momentum_factors(self, df: pd.DataFrame, periods: List[int] = [3, 5, 10, 20]) -> pd.DataFrame:
"""计算动量因子"""
factors = pd.DataFrame(index=df.index)
for period in periods:
# 价格动量
factors[f'Price_Momentum_{period}'] = df['Close'].pct_change(period)
factors[f'Log_Momentum_{period}'] = np.log(df['Close'] / df['Close'].shift(period))
# 成交量加权动量
vwap = self._calculate_vwap(df, period)
factors[f'VWAP_Momentum_{period}'] = (df['Close'] - vwap) / vwap
# 相对强度
up_moves = df['Close'].diff().clip(lower=0).rolling(period).sum()
down_moves = -df['Close'].diff().clip(upper=0).rolling(period).sum()
factors[f'RS_{period}'] = up_moves / (down_moves + 1e-8)
# 动量加速度
if period >= 5:
momentum = df['Close'].pct_change(period)
factors[f'Momentum_Accel_{period}'] = momentum - momentum.shift(period // 2)
return factors
def calculate_volatility_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20, 60]) -> pd.DataFrame:
"""计算波动率因子"""
factors = pd.DataFrame(index=df.index)
returns = df['Close'].pct_change()
for period in periods:
# 历史波动率
factors[f'HV_{period}'] = returns.rolling(period).std() * np.sqrt(252)
# 范围波动率
range_vol = np.log(df['High'] / df['Low']).rolling(period).mean()
factors[f'Range_Vol_{period}'] = range_vol
# Parkinson波动率
parkinson_vol = np.log(df['High'] / df['Low']).pow(2).rolling(period).mean()
factors[f'Parkinson_Vol_{period}'] = np.sqrt(parkinson_vol * 252 / (4 * np.log(2)))
# Garman-Klass波动率
ln_hl = np.log(df['High'] / df['Low'])
ln_co = np.log(df['Close'] / df['Open'])
gk_vol = (0.5 * ln_hl.pow(2) - (2 * np.log(2) - 1) * ln_co.pow(2)).rolling(period).mean()
factors[f'GK_Vol_{period}'] = np.sqrt(gk_vol * 252)
# 波动率的波动率
factors['Vol_of_Vol'] = factors['HV_20'].rolling(20).std()
# 波动率偏度和峰度
factors['Returns_Skew'] = returns.rolling(60).skew()
factors['Returns_Kurt'] = returns.rolling(60).kurtosis()
return factors
def calculate_volume_factors(self, df: pd.DataFrame, periods: List[int] = [5, 10, 20]) -> pd.DataFrame:
"""计算成交量因子"""
factors = pd.DataFrame(index=df.index)
for period in periods:
# 成交量均线
vol_ma = df['Volume'].rolling(period).mean()
factors[f'Vol_MA_{period}'] = vol_ma
factors[f'Vol_Ratio_{period}'] = df['Volume'] / vol_ma
# 成交量标准化
vol_std = df['Volume'].rolling(period).std()
factors[f'Vol_Zscore_{period}'] = (df['Volume'] - vol_ma) / (vol_std + 1e-8)
# 价量关系
price_change = df['Close'].pct_change()
vol_price_corr = price_change.rolling(period).corr(df['Volume'])
factors[f'Vol_Price_Corr_{period}'] = vol_price_corr
# On Balance Volume
obv = (np.sign(df['Close'].diff()) * df['Volume']).cumsum()
factors['OBV'] = obv
factors['OBV_MA_20'] = obv.rolling(20).mean()
factors['OBV_Signal'] = obv - factors['OBV_MA_20']
# 成交金额
factors['Turnover'] = df['Close'] * df['Volume']
factors['Turnover_MA_20'] = factors['Turnover'].rolling(20).mean()
factors['Turnover_Ratio'] = factors['Turnover'] / factors['Turnover_MA_20']
return factors
def calculate_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""计算技术指标因子"""
factors = pd.DataFrame(index=df.index)
# RSI
factors['RSI_14'] = self._calculate_rsi(df['Close'], 14)
factors['RSI_30'] = self._calculate_rsi(df['Close'], 30)
# MACD
macd, signal, histogram = self._calculate_macd(df['Close'])
factors['MACD'] = macd
factors['MACD_Signal'] = signal
factors['MACD_Histogram'] = histogram
# Bollinger Bands
bb_upper, bb_middle, bb_lower = self._calculate_bollinger_bands(df['Close'], 20, 2)
factors['BB_Upper'] = bb_upper
factors['BB_Middle'] = bb_middle
factors['BB_Lower'] = bb_lower
factors['BB_Width'] = (bb_upper - bb_lower) / bb_middle
factors['BB_Position'] = (df['Close'] - bb_lower) / (bb_upper - bb_lower)
# Stochastic
factors['Stoch_K'], factors['Stoch_D'] = self._calculate_stochastic(df, 14, 3)
# Williams %R
factors['Williams_R'] = self._calculate_williams_r(df, 14)
# Average True Range
factors['ATR'] = self._calculate_atr(df, 14)
factors['ATR_Ratio'] = factors['ATR'] / df['Close']
return factors
def _calculate_vwap(self, df: pd.DataFrame, period: int) -> pd.Series:
"""计算VWAP"""
typical_price = (df['High'] + df['Low'] + df['Close']) / 3
vwap = (typical_price * df['Volume']).rolling(period).sum() / df['Volume'].rolling(period).sum()
return vwap
def _calculate_rsi(self, prices: pd.Series, period: int) -> pd.Series:
"""计算RSI"""
delta = prices.diff()
gain = delta.where(delta > 0, 0).rolling(period).mean()
loss = -delta.where(delta < 0, 0).rolling(period).mean()
rs = gain / (loss + 1e-8)
rsi = 100 - (100 / (1 + rs))
return rsi
def _calculate_macd(self, prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算MACD"""
ema_fast = prices.ewm(span=fast).mean()
ema_slow = prices.ewm(span=slow).mean()
macd = ema_fast - ema_slow
signal_line = macd.ewm(span=signal).mean()
histogram = macd - signal_line
return macd, signal_line, histogram
def _calculate_bollinger_bands(self, prices: pd.Series, period: int, std_dev: float) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""计算布林带"""
middle = prices.rolling(period).mean()
std = prices.rolling(period).std()
upper = middle + std_dev * std
lower = middle - std_dev * std
return upper, middle, lower
def _calculate_stochastic(self, df: pd.DataFrame, k_period: int, d_period: int) -> Tuple[pd.Series, pd.Series]:
"""计算随机指标"""
low_min = df['Low'].rolling(k_period).min()
high_max = df['High'].rolling(k_period).max()
k_percent = 100 * (df['Close'] - low_min) / (high_max - low_min)
d_percent = k_percent.rolling(d_period).mean()
return k_percent, d_percent
def _calculate_williams_r(self, df: pd.DataFrame, period: int) -> pd.Series:
"""计算Williams %R"""
high_max = df['High'].rolling(period).max()
low_min = df['Low'].rolling(period).min()
williams_r = -100 * (high_max - df['Close']) / (high_max - low_min)
return williams_r
def _calculate_atr(self, df: pd.DataFrame, period: int) -> pd.Series:
"""计算ATR"""
high_low = df['High'] - df['Low']
high_close = np.abs(df['High'] - df['Close'].shift())
low_close = np.abs(df['Low'] - df['Close'].shift())
tr = np.maximum(high_low, np.maximum(high_close, low_close))
atr = tr.rolling(period).mean()
return atr
class GeometricFactorEngine:
"""几何因子工程引擎"""
def __init__(self, n_components: int = 10, random_state: int = 42):
self.n_components = n_components
self.random_state = random_state
self.scaler = StandardScaler()
self.robust_scaler = RobustScaler()
self.pca = None
self.umap_model = None
self.isomap = None
self.ica = None
def fit_transform_all_methods(self, data: pd.DataFrame) -> Dict[str, np.ndarray]:
"""
使用多种方法进行几何因子提取
Args:
data: 输入因子数据
Returns:
Dict: 包含不同方法结果的字典
"""
results = {}
# 标准化数据
scaled_data = self.scaler.fit_transform(data)
robust_scaled_data = self.robust_scaler.fit_transform(data)
# PCA分解
results['pca'] = self.fit_transform_pca(scaled_data)
# UMAP降维
results['umap'] = self.fit_transform_umap(scaled_data)
# Isomap流形学习
results['isomap'] = self.fit_transform_isomap(scaled_data)
# ICA独立成分分析
results['ica'] = self.fit_transform_ica(robust_scaled_data)
# 主成分旋转
results['rotated_pca'] = self.fit_transform_rotated_pca(scaled_data)
return results
def fit_transform_pca(self, data: np.ndarray, variance_threshold: float = 0.95) -> np.ndarray:
"""PCA降维"""
self.pca = PCA(n_components=variance_threshold, random_state=self.random_state)
pca_result = self.pca.fit_transform(data)
# 调整到目标维度
if pca_result.shape[1] > self.n_components:
pca_result = pca_result[:, :self.n_components]
return pca_result
def fit_transform_umap(self, data: np.ndarray) -> np.ndarray:
"""UMAP非线性降维"""
self.umap_model = umap.UMAP(
n_components=self.n_components,
n_neighbors=min(50, data.shape[0] // 4),
min_dist=0.1,
metric='euclidean',
random_state=self.random_state
)
return self.umap_model.fit_transform(data)
def fit_transform_isomap(self, data: np.ndarray) -> np.ndarray:
"""Isomap流形学习"""
n_neighbors = min(30, data.shape[0] // 3)
self.isomap = Isomap(n_components=self.n_components, n_neighbors=n_neighbors)
return self.isomap.fit_transform(data)
def fit_transform_ica(self, data: np.ndarray) -> np.ndarray:
"""独立成分分析"""
n_components = min(self.n_components, data.shape[1])
self.ica = FastICA(n_components=n_components, random_state=self.random_state, max_iter=1000)
return self.ica.fit_transform(data)
def fit_transform_rotated_pca(self, data: np.ndarray) -> np.ndarray:
"""旋转主成分分析"""
from sklearn.decomposition import PCA
from scipy.stats import special_ortho_group
# 先进行PCA
pca = PCA(n_components=self.n_components, random_state=self.random_state)
pca_result = pca.fit_transform(data)
# 应用随机正交变换
rotation_matrix = special_ortho_group.rvs(self.n_components, random_state=self.random_state)
rotated_result = np.dot(pca_result, rotation_matrix)
return rotated_result
def calculate_geometric_features(self, data: pd.DataFrame) -> pd.DataFrame:
"""
计算几何特征
Args:
data: 输入数据
Returns:
DataFrame: 几何特征
"""
features = pd.DataFrame(index=data.index)
# 计算距离特征
features['Euclidean_Norm'] = np.sqrt(np.sum(data**2, axis=1))
features['Manhattan_Norm'] = np.sum(np.abs(data), axis=1)
# 计算角度特征
if data.shape[1] >= 2:
features['Angle_First_Two'] = np.arctan2(data.iloc[:, 1], data.iloc[:, 0])
features['Radius_First_Two'] = np.sqrt(data.iloc[:, 0]**2 + data.iloc[:, 1]**2)
# 计算重心距离
centroid = data.mean()
features['Distance_To_Centroid'] = np.sqrt(np.sum((data - centroid)**2, axis=1))
# 计算相对位置
features['Relative_Position'] = (features['Distance_To_Centroid'] -
features['Distance_To_Centroid'].rolling(20).mean()) / \
(features['Distance_To_Centroid'].rolling(20).std() + 1e-8)
return features
def select_informative_factors(self, factors: pd.DataFrame, target: pd.Series,
method: str = 'mutual_info', k: int = 20) -> List[str]:
"""
选择信息量最大的因子
Args:
factors: 因子数据
target: 目标变量
method: 选择方法
k: 选择的因子数量
Returns:
List: 选中的因子名称
"""
if method == 'mutual_info':
# 使用互信息选择特征
mi_scores = mutual_info_regression(factors.fillna(0), target.fillna(0))
factor_scores = pd.Series(mi_scores, index=factors.columns)
selected_factors = factor_scores.nlargest(k).index.tolist()
elif method == 'correlation':
# 使用相关系数选择特征
correlations = factors.corrwith(target).abs()
selected_factors = correlations.nlargest(k).index.tolist()
elif method == 'variance':
# 使用方差选择特征
variances = factors.var()
selected_factors = variances.nlargest(k).index.tolist()
else:
raise ValueError(f"Unknown selection method: {method}")
return selected_factors
class FactorCombiner:
"""因子合成器"""
def __init__(self):
self.weights = {}
def combine_factors_weighted(self, factors: Dict[str, pd.DataFrame],
weights: Optional[Dict[str, float]] = None) -> pd.DataFrame:
"""
加权合成因子
Args:
factors: 因子字典
weights: 权重字典
Returns:
DataFrame: 合成后的因子
"""
if weights is None:
weights = {key: 1.0 for key in factors.keys()}
combined = pd.DataFrame()
for factor_type, factor_data in factors.items():
weight = weights.get(factor_type, 1.0)
# 标准化因子
normalized_factors = (factor_data - factor_data.mean()) / (factor_data.std() + 1e-8)
# 添加权重
weighted_factors = normalized_factors * weight
# 添加前缀
weighted_factors.columns = [f"{factor_type}_{col}" for col in weighted_factors.columns]
combined = pd.concat([combined, weighted_factors], axis=1)
return combined
def create_composite_factors(self, technical_factors: pd.DataFrame,
geometric_factors: Dict[str, np.ndarray]) -> pd.DataFrame:
"""
创建复合因子
Args:
technical_factors: 技术因子
geometric_factors: 几何因子
Returns:
DataFrame: 复合因子
"""
composite = technical_factors.copy()
# 添加几何因子
for method, geo_factors in geometric_factors.items():
geo_df = pd.DataFrame(
geo_factors,
index=technical_factors.index[:len(geo_factors)],
columns=[f"Geo_{method}_{i}" for i in range(geo_factors.shape[1])]
)
composite = pd.concat([composite, geo_df], axis=1)
# 创建交互因子
composite = self._create_interaction_factors(composite)
# 创建时间序列因子
composite = self._create_time_series_factors(composite)
return composite.dropna()
def _create_interaction_factors(self, factors: pd.DataFrame) -> pd.DataFrame:
"""创建交互因子"""
interaction_factors = factors.copy()
# 选择前几个最重要的因子进行交互
important_factors = factors.columns[:min(10, len(factors.columns))]
for i, factor1 in enumerate(important_factors[:5]):
for factor2 in important_factors[i+1:6]:
# 乘积交互
interaction_factors[f"Interact_{factor1}_{factor2}"] = factors[factor1] * factors[factor2]
# 比值交互
interaction_factors[f"Ratio_{factor1}_{factor2}"] = factors[factor1] / (factors[factor2] + 1e-8)
return interaction_factors
def _create_time_series_factors(self, factors: pd.DataFrame) -> pd.DataFrame:
"""创建时间序列因子"""
ts_factors = factors.copy()
# 选择几个重要因子
important_factors = factors.columns[:min(5, len(factors.columns))]
for factor in important_factors:
# 移动平均
ts_factors[f"{factor}_MA_5"] = factors[factor].rolling(5).mean()
ts_factors[f"{factor}_MA_20"] = factors[factor].rolling(20).mean()
# 动量
ts_factors[f"{factor}_Momentum_5"] = factors[factor] - factors[factor].shift(5)
# 标准化
ts_factors[f"{factor}_Zscore"] = (factors[factor] - factors[factor].rolling(20).mean()) / \
(factors[factor].rolling(20).std() + 1e-8)
return ts_factors
# 使用示例
if __name__ == "__main__":
# 创建示例数据
dates = pd.date_range('2023-01-01', periods=1000, freq='15min')
np.random.seed(42)
price_data = pd.DataFrame({
'Open': 100 + np.cumsum(np.random.randn(1000) * 0.1),
'High': 100 + np.cumsum(np.random.randn(1000) * 0.1),
'Low': 100 + np.cumsum(np.random.randn(1000) * 0.1),
'Close': 100 + np.cumsum(np.random.randn(1000) * 0.1),
'Volume': np.random.randint(1000, 10000, 1000)
}, index=dates)
# 确保OHLC逻辑正确
price_data['High'] = price_data[['Open', 'High', 'Low', 'Close']].max(axis=1)
price_data['Low'] = price_data[['Open', 'High', 'Low', 'Close']].min(axis=1)
print("Testing Factor Engineering...")
# 技术因子计算
tech_calculator = TechnicalFactorCalculator()
technical_factors = tech_calculator.calculate_all_factors(price_data)
print(f"Technical factors shape: {technical_factors.shape}")
print(f"Technical factors columns: {technical_factors.columns.tolist()}")
# 几何因子工程
geo_engine = GeometricFactorEngine(n_components=8)
geometric_factors = geo_engine.fit_transform_all_methods(technical_factors)
print(f"Geometric factors methods: {geometric_factors.keys()}")
for method, factors in geometric_factors.items():
print(f"{method} shape: {factors.shape}")
# 因子合成
combiner = FactorCombiner()
composite_factors = combiner.create_composite_factors(technical_factors, geometric_factors)
print(f"Composite factors shape: {composite_factors.shape}")
print(f"Sample composite factor names: {composite_factors.columns.tolist()[:20]}")