oilverse-api / core /feature_selection.py
孙家明
deploy: OilVerse for HuggingFace (Node.js 18 fix)
fab9847
"""
feature_selection.py — 特征筛选漏斗:329个原始指标 → 17个入选特征
=================================================================
完整的特征筛选流程:
Stage 1: 数据可用性筛选(缺失率 < 30%,时间覆盖 > 120月)
Stage 2: 单变量相关性(与油价收益率 |corr| > 0.05)
Stage 3: 共线性过滤(VIF / corr-cluster 去重)
Stage 4: MI / Granger 因果(非线性信息量)
Stage 5: 经济学意义验证(因子组分配)
"""
import pandas as pd
import numpy as np
import os
import glob
from config import DATA_DIR, FEATURES, FACTOR_GROUPS, PRICE_COL
def run_feature_funnel(panel_path, raw_dir=None):
"""执行完整的特征筛选漏斗并返回每阶段结果。"""
raw_dir = raw_dir or DATA_DIR
panel = pd.read_csv(panel_path, index_col=0, parse_dates=True)
# ── Stage 0: Inventory all raw features ──
all_features = set()
file_info = []
csv_files = glob.glob(os.path.join(raw_dir, '*.csv'))
for f in csv_files:
try:
df = pd.read_csv(f, nrows=5)
cols = [c for c in df.columns if c.lower() not in
('date', 'date_str', 'date_num', 'year', 'month', 'day', 'unnamed: 0')]
all_features.update(cols)
file_info.append({'file': os.path.basename(f), 'n_cols': len(cols), 'cols': cols[:10]})
except:
pass
total_raw = len(all_features)
print(f"Stage 0: 原始指标 {total_raw} 个 (来自 {len(csv_files)} 个CSV)")
# ── Stage 1: Availability filter ──
panel_cols = [c for c in panel.columns if c != PRICE_COL and 'target' not in c.lower()
and 'ewma' not in c.lower()]
stage1 = []
for col in panel_cols:
if col not in panel.columns:
continue
series = panel[col]
missing_rate = series.isna().mean()
n_valid = series.notna().sum()
if missing_rate < 0.30 and n_valid >= 120:
stage1.append({
'feature': col,
'missing_rate': round(missing_rate, 3),
'n_valid': int(n_valid),
'mean': round(float(series.mean()), 4) if series.notna().any() else None,
})
stage1_features = [s['feature'] for s in stage1]
print(f"Stage 1: 数据可用性 → {len(stage1_features)} 个 (缺失率<30%, 覆盖>120月)")
# ── Stage 2: Univariate correlation filter ──
ret = panel[PRICE_COL].pct_change(1)
stage2 = []
for feat in stage1_features:
try:
corr = float(panel[feat].corr(ret))
abs_corr = abs(corr)
if abs_corr > 0.03: # Relaxed threshold for monthly data
stage2.append({
'feature': feat,
'corr_with_return': round(corr, 4),
'abs_corr': round(abs_corr, 4),
})
except:
pass
stage2.sort(key=lambda x: x['abs_corr'], reverse=True)
stage2_features = [s['feature'] for s in stage2]
print(f"Stage 2: 单变量相关性 → {len(stage2_features)} 个 (|corr|>0.03)")
# ── Stage 3: Collinearity filter ──
# Remove highly correlated features (keep the one with higher abs_corr to return)
stage3_features = list(stage2_features)
corr_matrix = panel[stage3_features].corr()
to_drop = set()
corr_lookup = {s['feature']: s['abs_corr'] for s in stage2}
for i in range(len(stage3_features)):
if stage3_features[i] in to_drop:
continue
for j in range(i + 1, len(stage3_features)):
if stage3_features[j] in to_drop:
continue
pair_corr = abs(corr_matrix.iloc[i, j])
if pair_corr > 0.85:
f_i, f_j = stage3_features[i], stage3_features[j]
weaker = f_j if corr_lookup.get(f_i, 0) >= corr_lookup.get(f_j, 0) else f_i
to_drop.add(weaker)
stage3_features = [f for f in stage3_features if f not in to_drop]
stage3 = [s for s in stage2 if s['feature'] in stage3_features]
print(f"Stage 3: 共线性过滤 → {len(stage3_features)} 个 (pair |corr|<0.85)")
# ── Stage 4: MI score ──
from sklearn.feature_selection import mutual_info_regression
stage4 = []
X = panel[stage3_features].dropna()
y = panel.loc[X.index, PRICE_COL].pct_change(1).iloc[1:]
X = X.iloc[1:]
valid = y.notna() & X.notna().all(axis=1)
if valid.sum() > 50:
mi_scores = mutual_info_regression(X.loc[valid], y.loc[valid], random_state=42, n_neighbors=5)
for feat, mi_val in sorted(zip(stage3_features, mi_scores), key=lambda x: x[1], reverse=True):
stage4.append({
'feature': feat,
'mi_score': round(float(mi_val), 4),
'corr': round(float(panel[feat].corr(ret)), 4),
})
stage4_features = [s['feature'] for s in stage4]
print(f"Stage 4: MI 非线性筛选 → {len(stage4_features)} 个")
# ── Stage 5: Final selection (match with FEATURES list) ──
final_selected = [f for f in FEATURES if f in panel.columns]
final_rejected = [f for f in stage4_features if f not in final_selected][:10]
# Build factor assignment
stage5 = []
for feat in final_selected:
group = 'Other'
for g, members in FACTOR_GROUPS.items():
if feat in members:
group = g
break
mi_val = next((s['mi_score'] for s in stage4 if s['feature'] == feat), 0)
corr_val = next((s['corr'] for s in stage4 if s['feature'] == feat), 0)
stage5.append({
'feature': feat,
'factor_group': group,
'mi_score': mi_val,
'corr': corr_val,
})
print(f"Stage 5: 最终选择 → {len(final_selected)} 个 (经济学意义+因子分配)")
# ── Build funnel summary ──
funnel = {
'total_raw': total_raw,
'n_csv_files': len(csv_files),
'stages': [
{'stage': 0, 'name': '原始指标', 'count': total_raw, 'rule': f'{len(csv_files)}个CSV文件'},
{'stage': 1, 'name': '数据可用性', 'count': len(stage1_features), 'rule': '缺失率<30%, 覆盖>120月'},
{'stage': 2, 'name': '单变量相关性', 'count': len(stage2_features), 'rule': '|corr(feature, return)|>0.03'},
{'stage': 3, 'name': '共线性去重', 'count': len(stage3_features), 'rule': '组内pair |corr|<0.85'},
{'stage': 4, 'name': 'MI非线性筛选', 'count': len(stage4_features), 'rule': 'MI(feature; return)排序'},
{'stage': 5, 'name': '最终选择', 'count': len(final_selected), 'rule': '经济学意义+因子组分配'},
],
'final_features': stage5,
'rejected_examples': final_rejected,
'file_inventory': file_info[:15], # Top 15 files
}
return funnel