Spaces:
Build error
Build error
| """ | |
| feature_selection.py — 特征筛选漏斗:329个原始指标 → 17个入选特征 | |
| ================================================================= | |
| 完整的特征筛选流程: | |
| Stage 1: 数据可用性筛选(缺失率 < 30%,时间覆盖 > 120月) | |
| Stage 2: 单变量相关性(与油价收益率 |corr| > 0.05) | |
| Stage 3: 共线性过滤(VIF / corr-cluster 去重) | |
| Stage 4: MI / Granger 因果(非线性信息量) | |
| Stage 5: 经济学意义验证(因子组分配) | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import glob | |
| from config import DATA_DIR, FEATURES, FACTOR_GROUPS, PRICE_COL | |
| def run_feature_funnel(panel_path, raw_dir=None): | |
| """执行完整的特征筛选漏斗并返回每阶段结果。""" | |
| raw_dir = raw_dir or DATA_DIR | |
| panel = pd.read_csv(panel_path, index_col=0, parse_dates=True) | |
| # ── Stage 0: Inventory all raw features ── | |
| all_features = set() | |
| file_info = [] | |
| csv_files = glob.glob(os.path.join(raw_dir, '*.csv')) | |
| for f in csv_files: | |
| try: | |
| df = pd.read_csv(f, nrows=5) | |
| cols = [c for c in df.columns if c.lower() not in | |
| ('date', 'date_str', 'date_num', 'year', 'month', 'day', 'unnamed: 0')] | |
| all_features.update(cols) | |
| file_info.append({'file': os.path.basename(f), 'n_cols': len(cols), 'cols': cols[:10]}) | |
| except: | |
| pass | |
| total_raw = len(all_features) | |
| print(f"Stage 0: 原始指标 {total_raw} 个 (来自 {len(csv_files)} 个CSV)") | |
| # ── Stage 1: Availability filter ── | |
| panel_cols = [c for c in panel.columns if c != PRICE_COL and 'target' not in c.lower() | |
| and 'ewma' not in c.lower()] | |
| stage1 = [] | |
| for col in panel_cols: | |
| if col not in panel.columns: | |
| continue | |
| series = panel[col] | |
| missing_rate = series.isna().mean() | |
| n_valid = series.notna().sum() | |
| if missing_rate < 0.30 and n_valid >= 120: | |
| stage1.append({ | |
| 'feature': col, | |
| 'missing_rate': round(missing_rate, 3), | |
| 'n_valid': int(n_valid), | |
| 'mean': round(float(series.mean()), 4) if series.notna().any() else None, | |
| }) | |
| stage1_features = [s['feature'] for s in stage1] | |
| print(f"Stage 1: 数据可用性 → {len(stage1_features)} 个 (缺失率<30%, 覆盖>120月)") | |
| # ── Stage 2: Univariate correlation filter ── | |
| ret = panel[PRICE_COL].pct_change(1) | |
| stage2 = [] | |
| for feat in stage1_features: | |
| try: | |
| corr = float(panel[feat].corr(ret)) | |
| abs_corr = abs(corr) | |
| if abs_corr > 0.03: # Relaxed threshold for monthly data | |
| stage2.append({ | |
| 'feature': feat, | |
| 'corr_with_return': round(corr, 4), | |
| 'abs_corr': round(abs_corr, 4), | |
| }) | |
| except: | |
| pass | |
| stage2.sort(key=lambda x: x['abs_corr'], reverse=True) | |
| stage2_features = [s['feature'] for s in stage2] | |
| print(f"Stage 2: 单变量相关性 → {len(stage2_features)} 个 (|corr|>0.03)") | |
| # ── Stage 3: Collinearity filter ── | |
| # Remove highly correlated features (keep the one with higher abs_corr to return) | |
| stage3_features = list(stage2_features) | |
| corr_matrix = panel[stage3_features].corr() | |
| to_drop = set() | |
| corr_lookup = {s['feature']: s['abs_corr'] for s in stage2} | |
| for i in range(len(stage3_features)): | |
| if stage3_features[i] in to_drop: | |
| continue | |
| for j in range(i + 1, len(stage3_features)): | |
| if stage3_features[j] in to_drop: | |
| continue | |
| pair_corr = abs(corr_matrix.iloc[i, j]) | |
| if pair_corr > 0.85: | |
| f_i, f_j = stage3_features[i], stage3_features[j] | |
| weaker = f_j if corr_lookup.get(f_i, 0) >= corr_lookup.get(f_j, 0) else f_i | |
| to_drop.add(weaker) | |
| stage3_features = [f for f in stage3_features if f not in to_drop] | |
| stage3 = [s for s in stage2 if s['feature'] in stage3_features] | |
| print(f"Stage 3: 共线性过滤 → {len(stage3_features)} 个 (pair |corr|<0.85)") | |
| # ── Stage 4: MI score ── | |
| from sklearn.feature_selection import mutual_info_regression | |
| stage4 = [] | |
| X = panel[stage3_features].dropna() | |
| y = panel.loc[X.index, PRICE_COL].pct_change(1).iloc[1:] | |
| X = X.iloc[1:] | |
| valid = y.notna() & X.notna().all(axis=1) | |
| if valid.sum() > 50: | |
| mi_scores = mutual_info_regression(X.loc[valid], y.loc[valid], random_state=42, n_neighbors=5) | |
| for feat, mi_val in sorted(zip(stage3_features, mi_scores), key=lambda x: x[1], reverse=True): | |
| stage4.append({ | |
| 'feature': feat, | |
| 'mi_score': round(float(mi_val), 4), | |
| 'corr': round(float(panel[feat].corr(ret)), 4), | |
| }) | |
| stage4_features = [s['feature'] for s in stage4] | |
| print(f"Stage 4: MI 非线性筛选 → {len(stage4_features)} 个") | |
| # ── Stage 5: Final selection (match with FEATURES list) ── | |
| final_selected = [f for f in FEATURES if f in panel.columns] | |
| final_rejected = [f for f in stage4_features if f not in final_selected][:10] | |
| # Build factor assignment | |
| stage5 = [] | |
| for feat in final_selected: | |
| group = 'Other' | |
| for g, members in FACTOR_GROUPS.items(): | |
| if feat in members: | |
| group = g | |
| break | |
| mi_val = next((s['mi_score'] for s in stage4 if s['feature'] == feat), 0) | |
| corr_val = next((s['corr'] for s in stage4 if s['feature'] == feat), 0) | |
| stage5.append({ | |
| 'feature': feat, | |
| 'factor_group': group, | |
| 'mi_score': mi_val, | |
| 'corr': corr_val, | |
| }) | |
| print(f"Stage 5: 最终选择 → {len(final_selected)} 个 (经济学意义+因子分配)") | |
| # ── Build funnel summary ── | |
| funnel = { | |
| 'total_raw': total_raw, | |
| 'n_csv_files': len(csv_files), | |
| 'stages': [ | |
| {'stage': 0, 'name': '原始指标', 'count': total_raw, 'rule': f'{len(csv_files)}个CSV文件'}, | |
| {'stage': 1, 'name': '数据可用性', 'count': len(stage1_features), 'rule': '缺失率<30%, 覆盖>120月'}, | |
| {'stage': 2, 'name': '单变量相关性', 'count': len(stage2_features), 'rule': '|corr(feature, return)|>0.03'}, | |
| {'stage': 3, 'name': '共线性去重', 'count': len(stage3_features), 'rule': '组内pair |corr|<0.85'}, | |
| {'stage': 4, 'name': 'MI非线性筛选', 'count': len(stage4_features), 'rule': 'MI(feature; return)排序'}, | |
| {'stage': 5, 'name': '最终选择', 'count': len(final_selected), 'rule': '经济学意义+因子组分配'}, | |
| ], | |
| 'final_features': stage5, | |
| 'rejected_examples': final_rejected, | |
| 'file_inventory': file_info[:15], # Top 15 files | |
| } | |
| return funnel | |