""" feature_selection.py — 特征筛选漏斗:329个原始指标 → 17个入选特征 ================================================================= 完整的特征筛选流程: Stage 1: 数据可用性筛选(缺失率 < 30%,时间覆盖 > 120月) Stage 2: 单变量相关性(与油价收益率 |corr| > 0.05) Stage 3: 共线性过滤(VIF / corr-cluster 去重) Stage 4: MI / Granger 因果(非线性信息量) Stage 5: 经济学意义验证(因子组分配) """ import pandas as pd import numpy as np import os import glob from config import DATA_DIR, FEATURES, FACTOR_GROUPS, PRICE_COL def run_feature_funnel(panel_path, raw_dir=None): """执行完整的特征筛选漏斗并返回每阶段结果。""" raw_dir = raw_dir or DATA_DIR panel = pd.read_csv(panel_path, index_col=0, parse_dates=True) # ── Stage 0: Inventory all raw features ── all_features = set() file_info = [] csv_files = glob.glob(os.path.join(raw_dir, '*.csv')) for f in csv_files: try: df = pd.read_csv(f, nrows=5) cols = [c for c in df.columns if c.lower() not in ('date', 'date_str', 'date_num', 'year', 'month', 'day', 'unnamed: 0')] all_features.update(cols) file_info.append({'file': os.path.basename(f), 'n_cols': len(cols), 'cols': cols[:10]}) except: pass total_raw = len(all_features) print(f"Stage 0: 原始指标 {total_raw} 个 (来自 {len(csv_files)} 个CSV)") # ── Stage 1: Availability filter ── panel_cols = [c for c in panel.columns if c != PRICE_COL and 'target' not in c.lower() and 'ewma' not in c.lower()] stage1 = [] for col in panel_cols: if col not in panel.columns: continue series = panel[col] missing_rate = series.isna().mean() n_valid = series.notna().sum() if missing_rate < 0.30 and n_valid >= 120: stage1.append({ 'feature': col, 'missing_rate': round(missing_rate, 3), 'n_valid': int(n_valid), 'mean': round(float(series.mean()), 4) if series.notna().any() else None, }) stage1_features = [s['feature'] for s in stage1] print(f"Stage 1: 数据可用性 → {len(stage1_features)} 个 (缺失率<30%, 覆盖>120月)") # ── Stage 2: Univariate correlation filter ── ret = panel[PRICE_COL].pct_change(1) stage2 = [] for feat in stage1_features: try: corr = float(panel[feat].corr(ret)) abs_corr = abs(corr) if abs_corr > 0.03: # Relaxed threshold for monthly data stage2.append({ 'feature': feat, 'corr_with_return': round(corr, 4), 'abs_corr': round(abs_corr, 4), }) except: pass stage2.sort(key=lambda x: x['abs_corr'], reverse=True) stage2_features = [s['feature'] for s in stage2] print(f"Stage 2: 单变量相关性 → {len(stage2_features)} 个 (|corr|>0.03)") # ── Stage 3: Collinearity filter ── # Remove highly correlated features (keep the one with higher abs_corr to return) stage3_features = list(stage2_features) corr_matrix = panel[stage3_features].corr() to_drop = set() corr_lookup = {s['feature']: s['abs_corr'] for s in stage2} for i in range(len(stage3_features)): if stage3_features[i] in to_drop: continue for j in range(i + 1, len(stage3_features)): if stage3_features[j] in to_drop: continue pair_corr = abs(corr_matrix.iloc[i, j]) if pair_corr > 0.85: f_i, f_j = stage3_features[i], stage3_features[j] weaker = f_j if corr_lookup.get(f_i, 0) >= corr_lookup.get(f_j, 0) else f_i to_drop.add(weaker) stage3_features = [f for f in stage3_features if f not in to_drop] stage3 = [s for s in stage2 if s['feature'] in stage3_features] print(f"Stage 3: 共线性过滤 → {len(stage3_features)} 个 (pair |corr|<0.85)") # ── Stage 4: MI score ── from sklearn.feature_selection import mutual_info_regression stage4 = [] X = panel[stage3_features].dropna() y = panel.loc[X.index, PRICE_COL].pct_change(1).iloc[1:] X = X.iloc[1:] valid = y.notna() & X.notna().all(axis=1) if valid.sum() > 50: mi_scores = mutual_info_regression(X.loc[valid], y.loc[valid], random_state=42, n_neighbors=5) for feat, mi_val in sorted(zip(stage3_features, mi_scores), key=lambda x: x[1], reverse=True): stage4.append({ 'feature': feat, 'mi_score': round(float(mi_val), 4), 'corr': round(float(panel[feat].corr(ret)), 4), }) stage4_features = [s['feature'] for s in stage4] print(f"Stage 4: MI 非线性筛选 → {len(stage4_features)} 个") # ── Stage 5: Final selection (match with FEATURES list) ── final_selected = [f for f in FEATURES if f in panel.columns] final_rejected = [f for f in stage4_features if f not in final_selected][:10] # Build factor assignment stage5 = [] for feat in final_selected: group = 'Other' for g, members in FACTOR_GROUPS.items(): if feat in members: group = g break mi_val = next((s['mi_score'] for s in stage4 if s['feature'] == feat), 0) corr_val = next((s['corr'] for s in stage4 if s['feature'] == feat), 0) stage5.append({ 'feature': feat, 'factor_group': group, 'mi_score': mi_val, 'corr': corr_val, }) print(f"Stage 5: 最终选择 → {len(final_selected)} 个 (经济学意义+因子分配)") # ── Build funnel summary ── funnel = { 'total_raw': total_raw, 'n_csv_files': len(csv_files), 'stages': [ {'stage': 0, 'name': '原始指标', 'count': total_raw, 'rule': f'{len(csv_files)}个CSV文件'}, {'stage': 1, 'name': '数据可用性', 'count': len(stage1_features), 'rule': '缺失率<30%, 覆盖>120月'}, {'stage': 2, 'name': '单变量相关性', 'count': len(stage2_features), 'rule': '|corr(feature, return)|>0.03'}, {'stage': 3, 'name': '共线性去重', 'count': len(stage3_features), 'rule': '组内pair |corr|<0.85'}, {'stage': 4, 'name': 'MI非线性筛选', 'count': len(stage4_features), 'rule': 'MI(feature; return)排序'}, {'stage': 5, 'name': '最终选择', 'count': len(final_selected), 'rule': '经济学意义+因子组分配'}, ], 'final_features': stage5, 'rejected_examples': final_rejected, 'file_inventory': file_info[:15], # Top 15 files } return funnel