""" engine.py — Walk-Forward 预测引擎 ===================================== 核心组件: 1. 数据加载与目标构建 2. Walk-Forward 主循环 ├─ 特征选择 (MI) ├─ 波动率预测 (EWMA + Ridge residual) ├─ 区间预测 (QR + LightGBM) × 1M/3M ├─ Vol-Adaptive 宽度调整 ├─ LightGBM 特征重要性 ├─ 风险等级 / 偏置推断 ├─ Regime 检测 (余弦相似度) └─ 因子归因 (Ridge) """ import pandas as pd import numpy as np import warnings from sklearn.linear_model import Ridge, QuantileRegressor from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import mutual_info_regression import lightgbm as lgb # TFT (Temporal Fusion Transformer) — frontier deep learning model try: from core.tft_model import tft_predict_step _TFT_AVAILABLE = True except ImportError: _TFT_AVAILABLE = False from config import ( FEATURES, FACTOR_GROUPS, PRICE_COL, TRAIN_WINDOW, MAX_FEATURES, MIN_TRAIN_SAMPLES, REGIME_SIGNATURES, REGIME_TYPE_MAP, ) warnings.filterwarnings('ignore') # ═══════════════════════════════════════════════════════════ # DATA LOADING # ═══════════════════════════════════════════════════════════ def load_panel(panel_path, price_col=None): """加载月度面板并构建目标变量和地缘特征。 Args: price_col: 可选,指定价格列(默认 config.PRICE_COL)。 当切换目标时(如 Brent),自动从特征列表中移除该列。 """ if price_col is None: price_col = PRICE_COL monthly = pd.read_csv(panel_path, index_col=0, parse_dates=True) # Geopolitical features try: geo = pd.read_csv('data/csv_raw/geopolitical_shocks.csv') event_cols = [c for c in geo.columns if c not in ['date_str', 'date_num', 'year', 'month', 'day']] geo['date'] = pd.to_datetime(geo[['year', 'month', 'day']]) geo_monthly = geo.set_index('date')[event_cols].resample('ME').sum() geo_monthly['geo_shock_count'] = geo_monthly[event_cols].sum(axis=1) geo_monthly['geo_active_events'] = (geo_monthly[event_cols] > 0).sum(axis=1) monthly = monthly.merge( geo_monthly[['geo_shock_count', 'geo_active_events']], left_index=True, right_index=True, how='left' ) monthly['geo_shock_count'] = monthly['geo_shock_count'].fillna(0) monthly['geo_active_events'] = monthly['geo_active_events'].fillna(0) print("✓ 地缘政治特征已添加") except Exception as e: print(f"⚠ GPR 数据未加载: {e}") monthly['geo_shock_count'] = 0 monthly['geo_active_events'] = 0 # Targets (基于指定价格列) monthly['target_ret_1m'] = monthly[price_col].pct_change(1).shift(-1) monthly['target_ret_3m'] = monthly[price_col].pct_change(3).shift(-3) monthly['target_abs_ret_1m'] = monthly['target_ret_1m'].abs() monthly['target_abs_ret_3m'] = monthly['target_ret_3m'].abs() monthly['target_up_1m'] = (monthly['target_ret_1m'] > 0).astype(int) monthly['ewma_vol'] = monthly[price_col].pct_change(1).abs().ewm(alpha=0.06).mean() # Filter features: 移除目标价格列自身,防止 target leakage features = [f for f in FEATURES if f in monthly.columns and f != price_col] # 如果目标是 Brent,把 WTI_spot 加到特征(如果不在列表里) if price_col != 'WTI_spot' and 'WTI_spot' in monthly.columns and 'WTI_spot' not in features: features.append('WTI_spot') targets = ['target_ret_1m', 'target_ret_3m', 'target_abs_ret_1m', 'target_abs_ret_3m', 'target_up_1m', 'ewma_vol'] keep_cols = [c for c in features + targets + [price_col] if c in monthly.columns] panel = monthly[keep_cols].copy() # 保留所有行(含无目标的最新月份,用于 live 预测) n_with_target = panel['target_ret_1m'].notna().sum() n_total = len(panel) print(f"面板 [{price_col}]: {n_total} 月 ({n_with_target} 有目标, {n_total - n_with_target} live), {len(features)} 特征") return panel, features # ═══════════════════════════════════════════════════════════ # REGIME DETECTION # ═══════════════════════════════════════════════════════════ def detect_regime(sel, coefs, x_vals, pred_vol): """基于因子贡献计算当前 regime 向量,与历史时期做余弦相似度匹配。""" regime_features = { 'supply_stress': 0, 'demand_stress': 0, 'geopolitical_stress': 0, 'price_momentum': 0, 'vol_level': 0, } supply_feats = {'rig_count_us_new', 'supply_saudi', 'us_oil_inventory_total'} demand_feats = {'pmi_us_mfg', 'usd_index', 'nonfarm_us', 'ipi_us'} geo_feats = {'vix_lag1', 'vix_lag2', 'geo_shock_count', 'geo_active_events'} tech_feats = {'mom1m_lag1', 'hist_vol_12m', 'rsi12m'} for f in sel: coef_val = coefs.get(f, 0) * x_vals.get(f, 0) if f in supply_feats: regime_features['supply_stress'] += abs(coef_val) elif f in demand_feats: regime_features['demand_stress'] += abs(coef_val) elif f in geo_feats: regime_features['geopolitical_stress'] += abs(coef_val) elif f in tech_feats: regime_features['price_momentum'] += abs(coef_val) regime_features['vol_level'] = pred_vol # Cosine similarity curr_vec = np.array(list(regime_features.values())) curr_norm = np.linalg.norm(curr_vec) best_regime, best_sim = '常态/低波动', 0 regime_sims = {} for rname, rsig in REGIME_SIGNATURES.items(): ref_vec = np.array(list(rsig.values())) ref_norm = np.linalg.norm(ref_vec) sim = float(np.dot(curr_vec, ref_vec) / (curr_norm * ref_norm)) if curr_norm > 0 and ref_norm > 0 else 0 regime_sims[rname] = sim if sim > best_sim: best_sim = sim best_regime = rname return { 'regime_match': best_regime, 'regime_similarity': round(best_sim, 3), 'regime_type': REGIME_TYPE_MAP.get(best_regime, 'normal'), 'regime_sims': regime_sims, } # ═══════════════════════════════════════════════════════════ # WALK-FORWARD ENGINE # ═══════════════════════════════════════════════════════════ def run_walk_forward(panel, features, train_window=TRAIN_WINDOW): """执行完整的 walk-forward 预测循环。""" results = [] shap_records = [] # Walk-forward: 有目标的月份用于训练/评估 last_target_idx = panel['target_ret_1m'].last_valid_index() last_target_pos = panel.index.get_loc(last_target_idx) if last_target_idx is not None else len(panel) - 4 wf_end = min(last_target_pos + 1, len(panel)) for i in range(train_window, wf_end): train_df = panel.iloc[max(0, i - train_window):i] # QR: rolling window train_expand = panel.iloc[:i] # LGB: expanding window (all history) test_df = panel.iloc[i:i + 1] test_date = panel.index[i] avail = [f for f in features if train_df[f].notna().mean() > 0.8] if len(avail) < 3: continue # ── Impute + Scale ── X_tr = train_df[avail].copy() X_te = test_df[avail].copy() for col in avail: med = X_tr[col].median() X_tr[col] = X_tr[col].fillna(med) X_te[col] = X_te[col].fillna(med) scaler = StandardScaler() X_tr_s = pd.DataFrame(scaler.fit_transform(X_tr), columns=avail, index=train_df.index) X_te_s = pd.DataFrame(scaler.transform(X_te), columns=avail, index=test_df.index) # ── Feature selection (MI, inner split) ── inner_n = int(len(X_tr_s) * 0.8) y_sel = train_df['target_abs_ret_1m'].iloc[:inner_n] X_sel = X_tr_s.iloc[:inner_n] valid = y_sel.notna() & X_sel.notna().all(axis=1) if valid.sum() < MIN_TRAIN_SAMPLES: continue try: mi = mutual_info_regression(X_sel.loc[valid], y_sel.loc[valid], random_state=42, n_neighbors=5) except: mi = np.ones(len(avail)) K = min(MAX_FEATURES, len(avail)) sel = [avail[j] for j in np.argsort(mi)[-K:]] X_tr_f = X_tr_s[sel] X_te_f = X_te_s[sel] # ── Actuals ── actual_ret_1m = panel['target_ret_1m'].iloc[i] actual_ret_3m = panel['target_ret_3m'].iloc[i] if pd.notna(panel['target_ret_3m'].iloc[i]) else np.nan actual_vol = panel['target_abs_ret_1m'].iloc[i] actual_up = panel['target_up_1m'].iloc[i] ewma_now = test_df['ewma_vol'].values[0] rec = { 'test_date': test_date, 'actual_ret_1m': actual_ret_1m, 'actual_ret_3m': actual_ret_3m, 'actual_vol': actual_vol, 'actual_up': actual_up, } y_ret = train_df['target_ret_1m'] y_ret_3m = train_df['target_ret_3m'] y_vol = train_df['target_abs_ret_1m'] mask_r = y_ret.notna() mask_3m = y_ret_3m.notna() # ════ VOL HEAD ════ baseline_vol = float(ewma_now) if not np.isnan(ewma_now) else 0.05 rec['baseline_ewma'] = baseline_vol valid_vol = y_vol.notna() & train_df['ewma_vol'].notna() & (train_df['ewma_vol'] > 0.001) if valid_vol.sum() > MIN_TRAIN_SAMPLES: log_ratio = np.log(y_vol[valid_vol] / train_df['ewma_vol'][valid_vol]) log_ratio = log_ratio.replace([np.inf, -np.inf], np.nan).dropna() if len(log_ratio) > 15: try: ridge = Ridge(alpha=10.0) ridge.fit(X_tr_f.loc[log_ratio.index], log_ratio) plr = np.clip(ridge.predict(X_te_f)[0], -2.0, 2.0) rec['pred_vol'] = float(np.clip(baseline_vol * np.exp(plr), 0.005, 0.50)) except: rec['pred_vol'] = baseline_vol else: rec['pred_vol'] = baseline_vol else: rec['pred_vol'] = baseline_vol # ════ INTERVAL HEAD: 1M (QR + LightGBM) ════ for q, alpha_q in [('q10', 0.10), ('q50', 0.50), ('q90', 0.90)]: try: qr = QuantileRegressor(quantile=alpha_q, alpha=0.1, solver='highs') qr.fit(X_tr_f.loc[mask_r], y_ret.loc[mask_r]) rec[f'qr_{q}_1m'] = qr.predict(X_te_f)[0] except: rec[f'qr_{q}_1m'] = float(y_ret.quantile(alpha_q)) # LGB: expanding window (all history → more samples → better trees) try: X_tr_lgb = train_expand[avail].copy() for col in avail: X_tr_lgb[col] = X_tr_lgb[col].fillna(X_tr_lgb[col].median()) X_tr_lgb_s = pd.DataFrame(scaler.transform(X_tr_lgb), columns=avail, index=train_expand.index) y_ret_lgb = train_expand['target_ret_1m'] mask_lgb = y_ret_lgb.notna() & X_tr_lgb_s[sel].notna().all(axis=1) lgb_params = { 'objective': 'quantile', 'alpha': alpha_q, 'n_estimators': 100, 'max_depth': 3, 'learning_rate': 0.05, 'num_leaves': 8, 'min_child_samples': max(10, int(mask_lgb.sum() * 0.05)), 'subsample': 0.8, 'colsample_bytree': 0.8, 'reg_alpha': 0.3, 'reg_lambda': 0.5, 'verbose': -1, 'random_state': 42, } lgb_model = lgb.LGBMRegressor(**lgb_params) lgb_model.fit(X_tr_lgb_s[sel].loc[mask_lgb], y_ret_lgb.loc[mask_lgb]) rec[f'lgb_{q}_1m'] = lgb_model.predict(X_te_f)[0] except: rec[f'lgb_{q}_1m'] = rec[f'qr_{q}_1m'] # Enforce monotonicity for pfx in ['qr', 'lgb']: vals = [rec[f'{pfx}_q10_1m'], rec[f'{pfx}_q50_1m'], rec[f'{pfx}_q90_1m']] rec[f'{pfx}_q10_1m'] = min(vals) rec[f'{pfx}_q50_1m'] = np.median(vals) rec[f'{pfx}_q90_1m'] = max(vals) # ════ TFT HEAD (Temporal Fusion Transformer) ════ tft_ok = False if _TFT_AVAILABLE and i >= train_window + 30: # Need enough history try: tft_result = tft_predict_step( train_df, test_df, sel, y_col='target_ret_1m' ) if tft_result is not None: rec['tft_q10_1m'] = tft_result['tft_q10_1m'] rec['tft_q50_1m'] = tft_result['tft_q50_1m'] rec['tft_q90_1m'] = tft_result['tft_q90_1m'] tft_ok = True except: pass if not tft_ok: rec['tft_q10_1m'] = rec['qr_q10_1m'] rec['tft_q50_1m'] = rec['qr_q50_1m'] rec['tft_q90_1m'] = rec['qr_q90_1m'] # ════ INTERVAL HEAD: 3M ════ for q, alpha_q in [('q10', 0.10), ('q50', 0.50), ('q90', 0.90)]: try: qr3 = QuantileRegressor(quantile=alpha_q, alpha=0.1, solver='highs') qr3.fit(X_tr_f.loc[mask_3m], y_ret_3m.loc[mask_3m]) rec[f'qr_{q}_3m'] = qr3.predict(X_te_f)[0] except: rec[f'qr_{q}_3m'] = float(y_ret_3m.quantile(alpha_q)) if mask_3m.sum() > 5 else 0.0 vals3 = [rec['qr_q10_3m'], rec['qr_q50_3m'], rec['qr_q90_3m']] rec['qr_q10_3m'] = min(vals3) rec['qr_q50_3m'] = np.median(vals3) rec['qr_q90_3m'] = max(vals3) # ════ QR + LGB + TFT ENSEMBLE ════ # Three-way blend: QR(50%) + LGB(25%) + TFT(25%) w_qr, w_lgb, w_tft = 0.50, 0.25, 0.25 ens_q10 = w_qr * rec['qr_q10_1m'] + w_lgb * rec['lgb_q10_1m'] + w_tft * rec['tft_q10_1m'] ens_q50 = w_qr * rec['qr_q50_1m'] + w_lgb * rec['lgb_q50_1m'] + w_tft * rec['tft_q50_1m'] ens_q90 = w_qr * rec['qr_q90_1m'] + w_lgb * rec['lgb_q90_1m'] + w_tft * rec['tft_q90_1m'] # ════ VOL-ADAPTIVE WIDTH (on blended ensemble) ════ center = ens_q50 raw_upper = ens_q90 - center raw_lower = center - ens_q10 train_vol_median = float(y_vol.median()) vol_ratio = rec['pred_vol'] / train_vol_median if train_vol_median > 0.001 else 1.0 vol_ratio = np.clip(vol_ratio, 0.85, 2.5) # grid-searched optimal rec['pred_q10_1m'] = center - raw_lower * vol_ratio rec['pred_q50_1m'] = center rec['pred_q90_1m'] = center + raw_upper * vol_ratio rec['interval_width_1m'] = (raw_upper + raw_lower) * vol_ratio rec['vol_ratio'] = vol_ratio # ════ 3M: INDEPENDENT VOL-ADAPTIVE (lb=1.0, doesn't narrow) ════ center3 = rec['qr_q50_3m'] raw_upper3 = rec['qr_q90_3m'] - center3 raw_lower3 = center3 - rec['qr_q10_3m'] vol_ratio_3m = np.clip(vol_ratio, 1.0, 2.5) # 3M keeps lb=1.0 rec['pred_q10_3m'] = center3 - raw_lower3 * vol_ratio_3m rec['pred_q50_3m'] = center3 rec['pred_q90_3m'] = center3 + raw_upper3 * vol_ratio_3m rec['interval_width_3m'] = (raw_upper3 + raw_lower3) * vol_ratio_3m # ════ CONFORMAL QR (CQR) ════ # Split training into proper_train + calibration for conformity scores cal_size = max(15, int(len(X_tr_f) * 0.2)) proper_n = len(X_tr_f) - cal_size if proper_n > MIN_TRAIN_SAMPLES: X_proper = X_tr_f.iloc[:proper_n] X_cal = X_tr_f.iloc[proper_n:] y_proper = y_ret.iloc[:proper_n] y_cal = y_ret.iloc[proper_n:] mask_proper = y_proper.notna() mask_cal = y_cal.notna() try: # Fit QR on proper training set qr_lo = QuantileRegressor(quantile=0.10, alpha=0.1, solver='highs') qr_hi = QuantileRegressor(quantile=0.90, alpha=0.1, solver='highs') qr_lo.fit(X_proper.loc[mask_proper], y_proper.loc[mask_proper]) qr_hi.fit(X_proper.loc[mask_proper], y_proper.loc[mask_proper]) # Conformity scores on calibration set cal_lo = qr_lo.predict(X_cal.loc[mask_cal]) cal_hi = qr_hi.predict(X_cal.loc[mask_cal]) y_cal_vals = y_cal.loc[mask_cal].values scores = np.maximum(cal_lo - y_cal_vals, y_cal_vals - cal_hi) # Quantile of scores for coverage guarantee alpha = 0.20 # Desired miscoverage = 20% → 80% coverage n_cal = len(scores) q_level = min(1.0, (1 - alpha) * (1 + 1 / n_cal)) Q_hat = np.quantile(scores, q_level) # Apply to test point cqr_lo = qr_lo.predict(X_te_f)[0] - Q_hat cqr_hi = qr_hi.predict(X_te_f)[0] + Q_hat cqr_md = (cqr_lo + cqr_hi) / 2 rec['cqr_q10_1m'] = float(cqr_lo) rec['cqr_q50_1m'] = float(cqr_md) rec['cqr_q90_1m'] = float(cqr_hi) rec['cqr_Q_hat'] = float(Q_hat) except: rec['cqr_q10_1m'] = rec['pred_q10_1m'] rec['cqr_q50_1m'] = rec['pred_q50_1m'] rec['cqr_q90_1m'] = rec['pred_q90_1m'] rec['cqr_Q_hat'] = 0.0 else: rec['cqr_q10_1m'] = rec['pred_q10_1m'] rec['cqr_q50_1m'] = rec['pred_q50_1m'] rec['cqr_q90_1m'] = rec['pred_q90_1m'] rec['cqr_Q_hat'] = 0.0 # Save pre-blend QR intervals for risk_bias (CQR should not affect directional signal) qr_q10_prebend = rec['pred_q10_1m'] qr_q50_prebend = rec['pred_q50_1m'] qr_q90_prebend = rec['pred_q90_1m'] # ════ BLEND: 80% QR(VA) + 20% CQR → final 1M intervals ════ # CQR boosts hi-vol coverage (+5pp) while keeping WIS optimal w_cqr = 0.20 rec['pred_q10_1m'] = (1 - w_cqr) * rec['pred_q10_1m'] + w_cqr * rec['cqr_q10_1m'] rec['pred_q90_1m'] = (1 - w_cqr) * rec['pred_q90_1m'] + w_cqr * rec['cqr_q90_1m'] rec['interval_width_1m'] = rec['pred_q90_1m'] - rec['pred_q10_1m'] # ════ FEATURE IMPORTANCE (LightGBM) ════ try: lgb50 = lgb.LGBMRegressor( objective='quantile', alpha=0.50, n_estimators=100, max_depth=3, learning_rate=0.05, num_leaves=8, min_child_samples=10, verbose=-1, random_state=42 ) lgb50.fit(X_tr_f.loc[mask_r], y_ret.loc[mask_r]) importances = lgb50.feature_importances_ imp_dict = dict(zip(sel, importances)) total_imp = sum(importances) if sum(importances) > 0 else 1 for group, members in FACTOR_GROUPS.items(): group_imp = sum(imp_dict.get(f, 0) for f in members if f in imp_dict) rec[f'shap_{group}'] = float(group_imp / total_imp) rec['shap_base'] = float(lgb50.predict(X_te_f)[0]) shap_records.append({ 'date': str(test_date), 'features': sel, 'importances': importances.tolist(), 'normalized': (importances / total_imp).tolist(), 'lgb_pred': float(lgb50.predict(X_te_f)[0]), }) except: for g in FACTOR_GROUPS: rec[f'shap_{g}'] = 0.0 rec['shap_base'] = 0.0 # ════ RISK LEVEL ════ # Use pre-blend QR intervals for bias (CQR widens symmetrically, distorts bias signal) upside_tail = qr_q90_prebend - qr_q50_prebend downside_tail = qr_q50_prebend - qr_q10_prebend vol_pctile = (y_vol < rec['pred_vol']).mean() rec['risk_level'] = 'High' if vol_pctile >= 0.70 else ('Medium' if vol_pctile >= 0.40 else 'Low') rec['risk_bias'] = 'Upward' if upside_tail > 1.15 * downside_tail else ( 'Downward' if downside_tail > 1.15 * upside_tail else 'Balanced') rec['vol_pctile'] = vol_pctile # ════ FACTOR ATTRIBUTION (Ridge) ════ factor_map = {} for f in sel: for group, members in FACTOR_GROUPS.items(): if f in members: factor_map.setdefault(group, []).append(f) break attr_window = min(60, len(X_tr_f)) X_attr = X_tr_f.iloc[-attr_window:] y_attr = y_ret.iloc[-attr_window:] valid_attr = y_attr.notna() coefs = {} x_vals = {} if valid_attr.sum() > MIN_TRAIN_SAMPLES: try: lr_attr = Ridge(alpha=1.0) lr_attr.fit(X_attr.loc[valid_attr], y_attr.loc[valid_attr]) coefs = dict(zip(sel, lr_attr.coef_)) x_vals = dict(X_te_f.iloc[0]) for group in FACTOR_GROUPS: gf = factor_map.get(group, []) contrib = sum(coefs.get(f, 0) * x_vals.get(f, 0) for f in gf) rec[f'factor_{group}'] = float(contrib) abs_contribs = {g: abs(rec.get(f'factor_{g}', 0)) for g in factor_map} rec['top_factor'] = max(abs_contribs, key=abs_contribs.get) if abs_contribs else 'Unknown' except: rec['top_factor'] = 'Unknown' else: rec['top_factor'] = 'Unknown' # ════ REGIME DETECTION ════ regime_info = detect_regime(sel, coefs, x_vals, rec['pred_vol']) rec['regime_match'] = regime_info['regime_match'] rec['regime_similarity'] = regime_info['regime_similarity'] rec['regime_type'] = regime_info['regime_type'] # ════ SCENARIOS ════ scenarios = {} for scen_name, modifications in [ ('vix_shock', {'vix_lag1': 2.0, 'vix_lag2': 1.5}), ('supply_cut', {'supply_saudi': -1.5, 'rig_count_us_new': -1.0}), ('demand_crash', {'pmi_us_mfg': -2.0, 'ipi_us': -1.5}), ]: X_stress = X_te_f.copy() for feat, mult in modifications.items(): if feat in X_stress.columns: X_stress[feat] = X_stress[feat] + mult try: stress_q50 = lgb50.predict(X_stress)[0] scenarios[scen_name] = float(stress_q50) except: scenarios[scen_name] = rec.get('pred_q50_1m', 0.0) rec['scenario_base'] = rec['pred_q50_1m'] rec['scenario_vix_shock'] = scenarios.get('vix_shock', 0.0) rec['scenario_supply_cut'] = scenarios.get('supply_cut', 0.0) rec['scenario_demand_crash'] = scenarios.get('demand_crash', 0.0) results.append(rec) # ════ LIVE FORECAST: 预测无目标的最新月份 ════ if wf_end < len(panel) and len(results) > 0: print(f"\n Live Forecast: {len(panel) - wf_end} 个最新月份 (无实际值)") last_rec = results[-1] for li in range(wf_end, len(panel)): test_df = panel.iloc[li:li + 1] test_date = panel.index[li] train_df = panel.iloc[max(0, li - train_window):li] avail = [f for f in features if train_df[f].notna().mean() > 0.8] if len(avail) < 3: continue X_tr = train_df[avail].copy() X_te = test_df[avail].copy() for col in avail: med = X_tr[col].median() X_tr[col] = X_tr[col].fillna(med) X_te[col] = X_te[col].fillna(med) scaler = StandardScaler() X_tr_s = pd.DataFrame(scaler.fit_transform(X_tr), columns=avail, index=train_df.index) X_te_s = pd.DataFrame(scaler.transform(X_te), columns=avail, index=test_df.index) # 使用固定特征 (从最后一次 walk-forward 的 sel) sel_live = [f for f in sel if f in avail] if len(sel_live) < 2: sel_live = avail[:MAX_FEATURES] X_tr_f = X_tr_s[sel_live] X_te_f = X_te_s[sel_live] ewma_now = test_df['ewma_vol'].values[0] if 'ewma_vol' in test_df.columns else 0.05 baseline_vol = float(ewma_now) if not np.isnan(ewma_now) else 0.05 live_rec = { 'test_date': test_date, 'actual_ret_1m': np.nan, 'actual_ret_3m': np.nan, 'actual_vol': np.nan, 'actual_up': np.nan, 'baseline_ewma': baseline_vol, 'is_live': True, } # 用最新的训练数据重新拟合轻量模型 y_ret = train_df['target_ret_1m'].dropna() y_vol = train_df['target_abs_ret_1m'].dropna() try: # QR 1M for alpha, suffix in [(0.05, 'q10'), (0.5, 'q50'), (0.95, 'q90')]: qr = QuantileRegressor(quantile=alpha, alpha=0.1, solver='highs') mask_qr = y_ret.index.isin(X_tr_f.index) y_qr = y_ret[mask_qr] X_qr = X_tr_f.loc[y_qr.index] if len(X_qr) > 10: qr.fit(X_qr, y_qr) val = float(qr.predict(X_te_f)[0]) live_rec[f'pred_{suffix}_1m'] = val # LightGBM mask_lgb = y_ret.index.isin(X_tr_f.index) y_lgb = y_ret[mask_lgb] X_lgb = X_tr_f.loc[y_lgb.index] if len(X_lgb) > 10: lgb50 = lgb.LGBMRegressor(objective='quantile', alpha=0.5, n_estimators=200, max_depth=4, verbose=-1) lgb50.fit(X_lgb, y_lgb) live_rec['lgb_q50'] = float(lgb50.predict(X_te_f)[0]) # Vol mask_vol = y_vol.index.isin(X_tr_f.index) y_v = y_vol[mask_vol] X_v = X_tr_f.loc[y_v.index] if len(X_v) > 10: ridge = Ridge(alpha=1.0) ridge.fit(X_v, y_v) live_rec['pred_vol'] = max(float(ridge.predict(X_te_f)[0]), baseline_vol * 0.5) else: live_rec['pred_vol'] = baseline_vol except Exception as e: live_rec['pred_vol'] = baseline_vol # Risk level + bias lo = live_rec.get('pred_q10_1m', -0.05) hi = live_rec.get('pred_q90_1m', 0.05) q50 = live_rec.get('pred_q50_1m', 0.0) width = abs(hi - lo) live_rec['risk_level'] = 'High' if width > 0.15 else 'Medium' if width > 0.08 else 'Low' live_rec['risk_score'] = width upside = hi - q50 downside = q50 - lo live_rec['risk_bias'] = 'Upward' if upside > 1.15 * downside else ( 'Downward' if downside > 1.15 * upside else 'Balanced') live_rec['vol_pctile'] = 0.5 # 无法计算,默认中等 # Copy structure from last record for key in ['regime_match', 'regime_similarity', 'regime_type', 'scenario_base', 'scenario_vix_shock', 'scenario_supply_cut', 'scenario_demand_crash', 'top_factor']: if key not in live_rec and key in last_rec: live_rec[key] = last_rec[key] # Regime detection x_vals = {f: float(X_te_f[f].values[0]) for f in sel_live} coefs = {} regime_info = detect_regime(sel_live, coefs, x_vals, live_rec.get('pred_vol', 0.05)) live_rec.update(regime_info) # Scenarios for scen_name, modifications in [ ('vix_shock', {'vix_lag1': 2.0, 'vix_lag2': 1.5}), ('supply_cut', {'supply_saudi': -1.5, 'rig_count_us_new': -1.0}), ('demand_crash', {'pmi_us_mfg': -2.0, 'ipi_us': -1.5}), ]: X_stress = X_te_f.copy() for feat, mult in modifications.items(): if feat in X_stress.columns: X_stress[feat] = X_stress[feat] + mult try: live_rec[f'scenario_{scen_name}'] = float(lgb50.predict(X_stress)[0]) except: live_rec[f'scenario_{scen_name}'] = live_rec.get('pred_q50_1m', 0.0) live_rec['scenario_base'] = live_rec.get('pred_q50_1m', 0.0) # Factor attribution (simplified) for group in FACTOR_GROUPS: live_rec[f'factor_{group}'] = last_rec.get(f'factor_{group}', 0.0) results.append(live_rec) print(f" ✓ {test_date.strftime('%Y-%m')}: Q50={live_rec.get('pred_q50_1m', 0):.3f}, " f"Risk={live_rec['risk_level']}") return pd.DataFrame(results), shap_records