oilverse-api / core /engine.py
孙家明
deploy: OilVerse for HuggingFace (Node.js 18 fix)
fab9847
"""
engine.py — Walk-Forward 预测引擎
=====================================
核心组件:
1. 数据加载与目标构建
2. Walk-Forward 主循环
├─ 特征选择 (MI)
├─ 波动率预测 (EWMA + Ridge residual)
├─ 区间预测 (QR + LightGBM) × 1M/3M
├─ Vol-Adaptive 宽度调整
├─ LightGBM 特征重要性
├─ 风险等级 / 偏置推断
├─ Regime 检测 (余弦相似度)
└─ 因子归因 (Ridge)
"""
import pandas as pd
import numpy as np
import warnings
from sklearn.linear_model import Ridge, QuantileRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import mutual_info_regression
import lightgbm as lgb
# TFT (Temporal Fusion Transformer) — frontier deep learning model
try:
from core.tft_model import tft_predict_step
_TFT_AVAILABLE = True
except ImportError:
_TFT_AVAILABLE = False
from config import (
FEATURES, FACTOR_GROUPS, PRICE_COL, TRAIN_WINDOW,
MAX_FEATURES, MIN_TRAIN_SAMPLES,
REGIME_SIGNATURES, REGIME_TYPE_MAP,
)
warnings.filterwarnings('ignore')
# ═══════════════════════════════════════════════════════════
# DATA LOADING
# ═══════════════════════════════════════════════════════════
def load_panel(panel_path, price_col=None):
"""加载月度面板并构建目标变量和地缘特征。
Args:
price_col: 可选,指定价格列(默认 config.PRICE_COL)。
当切换目标时(如 Brent),自动从特征列表中移除该列。
"""
if price_col is None:
price_col = PRICE_COL
monthly = pd.read_csv(panel_path, index_col=0, parse_dates=True)
# Geopolitical features
try:
geo = pd.read_csv('data/csv_raw/geopolitical_shocks.csv')
event_cols = [c for c in geo.columns if c not in ['date_str', 'date_num', 'year', 'month', 'day']]
geo['date'] = pd.to_datetime(geo[['year', 'month', 'day']])
geo_monthly = geo.set_index('date')[event_cols].resample('ME').sum()
geo_monthly['geo_shock_count'] = geo_monthly[event_cols].sum(axis=1)
geo_monthly['geo_active_events'] = (geo_monthly[event_cols] > 0).sum(axis=1)
monthly = monthly.merge(
geo_monthly[['geo_shock_count', 'geo_active_events']],
left_index=True, right_index=True, how='left'
)
monthly['geo_shock_count'] = monthly['geo_shock_count'].fillna(0)
monthly['geo_active_events'] = monthly['geo_active_events'].fillna(0)
print("✓ 地缘政治特征已添加")
except Exception as e:
print(f"⚠ GPR 数据未加载: {e}")
monthly['geo_shock_count'] = 0
monthly['geo_active_events'] = 0
# Targets (基于指定价格列)
monthly['target_ret_1m'] = monthly[price_col].pct_change(1).shift(-1)
monthly['target_ret_3m'] = monthly[price_col].pct_change(3).shift(-3)
monthly['target_abs_ret_1m'] = monthly['target_ret_1m'].abs()
monthly['target_abs_ret_3m'] = monthly['target_ret_3m'].abs()
monthly['target_up_1m'] = (monthly['target_ret_1m'] > 0).astype(int)
monthly['ewma_vol'] = monthly[price_col].pct_change(1).abs().ewm(alpha=0.06).mean()
# Filter features: 移除目标价格列自身,防止 target leakage
features = [f for f in FEATURES if f in monthly.columns and f != price_col]
# 如果目标是 Brent,把 WTI_spot 加到特征(如果不在列表里)
if price_col != 'WTI_spot' and 'WTI_spot' in monthly.columns and 'WTI_spot' not in features:
features.append('WTI_spot')
targets = ['target_ret_1m', 'target_ret_3m', 'target_abs_ret_1m',
'target_abs_ret_3m', 'target_up_1m', 'ewma_vol']
keep_cols = [c for c in features + targets + [price_col] if c in monthly.columns]
panel = monthly[keep_cols].copy()
# 保留所有行(含无目标的最新月份,用于 live 预测)
n_with_target = panel['target_ret_1m'].notna().sum()
n_total = len(panel)
print(f"面板 [{price_col}]: {n_total} 月 ({n_with_target} 有目标, {n_total - n_with_target} live), {len(features)} 特征")
return panel, features
# ═══════════════════════════════════════════════════════════
# REGIME DETECTION
# ═══════════════════════════════════════════════════════════
def detect_regime(sel, coefs, x_vals, pred_vol):
"""基于因子贡献计算当前 regime 向量,与历史时期做余弦相似度匹配。"""
regime_features = {
'supply_stress': 0, 'demand_stress': 0,
'geopolitical_stress': 0, 'price_momentum': 0, 'vol_level': 0,
}
supply_feats = {'rig_count_us_new', 'supply_saudi', 'us_oil_inventory_total'}
demand_feats = {'pmi_us_mfg', 'usd_index', 'nonfarm_us', 'ipi_us'}
geo_feats = {'vix_lag1', 'vix_lag2', 'geo_shock_count', 'geo_active_events'}
tech_feats = {'mom1m_lag1', 'hist_vol_12m', 'rsi12m'}
for f in sel:
coef_val = coefs.get(f, 0) * x_vals.get(f, 0)
if f in supply_feats:
regime_features['supply_stress'] += abs(coef_val)
elif f in demand_feats:
regime_features['demand_stress'] += abs(coef_val)
elif f in geo_feats:
regime_features['geopolitical_stress'] += abs(coef_val)
elif f in tech_feats:
regime_features['price_momentum'] += abs(coef_val)
regime_features['vol_level'] = pred_vol
# Cosine similarity
curr_vec = np.array(list(regime_features.values()))
curr_norm = np.linalg.norm(curr_vec)
best_regime, best_sim = '常态/低波动', 0
regime_sims = {}
for rname, rsig in REGIME_SIGNATURES.items():
ref_vec = np.array(list(rsig.values()))
ref_norm = np.linalg.norm(ref_vec)
sim = float(np.dot(curr_vec, ref_vec) / (curr_norm * ref_norm)) if curr_norm > 0 and ref_norm > 0 else 0
regime_sims[rname] = sim
if sim > best_sim:
best_sim = sim
best_regime = rname
return {
'regime_match': best_regime,
'regime_similarity': round(best_sim, 3),
'regime_type': REGIME_TYPE_MAP.get(best_regime, 'normal'),
'regime_sims': regime_sims,
}
# ═══════════════════════════════════════════════════════════
# WALK-FORWARD ENGINE
# ═══════════════════════════════════════════════════════════
def run_walk_forward(panel, features, train_window=TRAIN_WINDOW):
"""执行完整的 walk-forward 预测循环。"""
results = []
shap_records = []
# Walk-forward: 有目标的月份用于训练/评估
last_target_idx = panel['target_ret_1m'].last_valid_index()
last_target_pos = panel.index.get_loc(last_target_idx) if last_target_idx is not None else len(panel) - 4
wf_end = min(last_target_pos + 1, len(panel))
for i in range(train_window, wf_end):
train_df = panel.iloc[max(0, i - train_window):i] # QR: rolling window
train_expand = panel.iloc[:i] # LGB: expanding window (all history)
test_df = panel.iloc[i:i + 1]
test_date = panel.index[i]
avail = [f for f in features if train_df[f].notna().mean() > 0.8]
if len(avail) < 3:
continue
# ── Impute + Scale ──
X_tr = train_df[avail].copy()
X_te = test_df[avail].copy()
for col in avail:
med = X_tr[col].median()
X_tr[col] = X_tr[col].fillna(med)
X_te[col] = X_te[col].fillna(med)
scaler = StandardScaler()
X_tr_s = pd.DataFrame(scaler.fit_transform(X_tr), columns=avail, index=train_df.index)
X_te_s = pd.DataFrame(scaler.transform(X_te), columns=avail, index=test_df.index)
# ── Feature selection (MI, inner split) ──
inner_n = int(len(X_tr_s) * 0.8)
y_sel = train_df['target_abs_ret_1m'].iloc[:inner_n]
X_sel = X_tr_s.iloc[:inner_n]
valid = y_sel.notna() & X_sel.notna().all(axis=1)
if valid.sum() < MIN_TRAIN_SAMPLES:
continue
try:
mi = mutual_info_regression(X_sel.loc[valid], y_sel.loc[valid], random_state=42, n_neighbors=5)
except:
mi = np.ones(len(avail))
K = min(MAX_FEATURES, len(avail))
sel = [avail[j] for j in np.argsort(mi)[-K:]]
X_tr_f = X_tr_s[sel]
X_te_f = X_te_s[sel]
# ── Actuals ──
actual_ret_1m = panel['target_ret_1m'].iloc[i]
actual_ret_3m = panel['target_ret_3m'].iloc[i] if pd.notna(panel['target_ret_3m'].iloc[i]) else np.nan
actual_vol = panel['target_abs_ret_1m'].iloc[i]
actual_up = panel['target_up_1m'].iloc[i]
ewma_now = test_df['ewma_vol'].values[0]
rec = {
'test_date': test_date,
'actual_ret_1m': actual_ret_1m,
'actual_ret_3m': actual_ret_3m,
'actual_vol': actual_vol,
'actual_up': actual_up,
}
y_ret = train_df['target_ret_1m']
y_ret_3m = train_df['target_ret_3m']
y_vol = train_df['target_abs_ret_1m']
mask_r = y_ret.notna()
mask_3m = y_ret_3m.notna()
# ════ VOL HEAD ════
baseline_vol = float(ewma_now) if not np.isnan(ewma_now) else 0.05
rec['baseline_ewma'] = baseline_vol
valid_vol = y_vol.notna() & train_df['ewma_vol'].notna() & (train_df['ewma_vol'] > 0.001)
if valid_vol.sum() > MIN_TRAIN_SAMPLES:
log_ratio = np.log(y_vol[valid_vol] / train_df['ewma_vol'][valid_vol])
log_ratio = log_ratio.replace([np.inf, -np.inf], np.nan).dropna()
if len(log_ratio) > 15:
try:
ridge = Ridge(alpha=10.0)
ridge.fit(X_tr_f.loc[log_ratio.index], log_ratio)
plr = np.clip(ridge.predict(X_te_f)[0], -2.0, 2.0)
rec['pred_vol'] = float(np.clip(baseline_vol * np.exp(plr), 0.005, 0.50))
except:
rec['pred_vol'] = baseline_vol
else:
rec['pred_vol'] = baseline_vol
else:
rec['pred_vol'] = baseline_vol
# ════ INTERVAL HEAD: 1M (QR + LightGBM) ════
for q, alpha_q in [('q10', 0.10), ('q50', 0.50), ('q90', 0.90)]:
try:
qr = QuantileRegressor(quantile=alpha_q, alpha=0.1, solver='highs')
qr.fit(X_tr_f.loc[mask_r], y_ret.loc[mask_r])
rec[f'qr_{q}_1m'] = qr.predict(X_te_f)[0]
except:
rec[f'qr_{q}_1m'] = float(y_ret.quantile(alpha_q))
# LGB: expanding window (all history → more samples → better trees)
try:
X_tr_lgb = train_expand[avail].copy()
for col in avail:
X_tr_lgb[col] = X_tr_lgb[col].fillna(X_tr_lgb[col].median())
X_tr_lgb_s = pd.DataFrame(scaler.transform(X_tr_lgb), columns=avail, index=train_expand.index)
y_ret_lgb = train_expand['target_ret_1m']
mask_lgb = y_ret_lgb.notna() & X_tr_lgb_s[sel].notna().all(axis=1)
lgb_params = {
'objective': 'quantile', 'alpha': alpha_q,
'n_estimators': 100, 'max_depth': 3, 'learning_rate': 0.05,
'num_leaves': 8,
'min_child_samples': max(10, int(mask_lgb.sum() * 0.05)),
'subsample': 0.8, 'colsample_bytree': 0.8,
'reg_alpha': 0.3, 'reg_lambda': 0.5,
'verbose': -1, 'random_state': 42,
}
lgb_model = lgb.LGBMRegressor(**lgb_params)
lgb_model.fit(X_tr_lgb_s[sel].loc[mask_lgb], y_ret_lgb.loc[mask_lgb])
rec[f'lgb_{q}_1m'] = lgb_model.predict(X_te_f)[0]
except:
rec[f'lgb_{q}_1m'] = rec[f'qr_{q}_1m']
# Enforce monotonicity
for pfx in ['qr', 'lgb']:
vals = [rec[f'{pfx}_q10_1m'], rec[f'{pfx}_q50_1m'], rec[f'{pfx}_q90_1m']]
rec[f'{pfx}_q10_1m'] = min(vals)
rec[f'{pfx}_q50_1m'] = np.median(vals)
rec[f'{pfx}_q90_1m'] = max(vals)
# ════ TFT HEAD (Temporal Fusion Transformer) ════
tft_ok = False
if _TFT_AVAILABLE and i >= train_window + 30: # Need enough history
try:
tft_result = tft_predict_step(
train_df, test_df, sel, y_col='target_ret_1m'
)
if tft_result is not None:
rec['tft_q10_1m'] = tft_result['tft_q10_1m']
rec['tft_q50_1m'] = tft_result['tft_q50_1m']
rec['tft_q90_1m'] = tft_result['tft_q90_1m']
tft_ok = True
except:
pass
if not tft_ok:
rec['tft_q10_1m'] = rec['qr_q10_1m']
rec['tft_q50_1m'] = rec['qr_q50_1m']
rec['tft_q90_1m'] = rec['qr_q90_1m']
# ════ INTERVAL HEAD: 3M ════
for q, alpha_q in [('q10', 0.10), ('q50', 0.50), ('q90', 0.90)]:
try:
qr3 = QuantileRegressor(quantile=alpha_q, alpha=0.1, solver='highs')
qr3.fit(X_tr_f.loc[mask_3m], y_ret_3m.loc[mask_3m])
rec[f'qr_{q}_3m'] = qr3.predict(X_te_f)[0]
except:
rec[f'qr_{q}_3m'] = float(y_ret_3m.quantile(alpha_q)) if mask_3m.sum() > 5 else 0.0
vals3 = [rec['qr_q10_3m'], rec['qr_q50_3m'], rec['qr_q90_3m']]
rec['qr_q10_3m'] = min(vals3)
rec['qr_q50_3m'] = np.median(vals3)
rec['qr_q90_3m'] = max(vals3)
# ════ QR + LGB + TFT ENSEMBLE ════
# Three-way blend: QR(50%) + LGB(25%) + TFT(25%)
w_qr, w_lgb, w_tft = 0.50, 0.25, 0.25
ens_q10 = w_qr * rec['qr_q10_1m'] + w_lgb * rec['lgb_q10_1m'] + w_tft * rec['tft_q10_1m']
ens_q50 = w_qr * rec['qr_q50_1m'] + w_lgb * rec['lgb_q50_1m'] + w_tft * rec['tft_q50_1m']
ens_q90 = w_qr * rec['qr_q90_1m'] + w_lgb * rec['lgb_q90_1m'] + w_tft * rec['tft_q90_1m']
# ════ VOL-ADAPTIVE WIDTH (on blended ensemble) ════
center = ens_q50
raw_upper = ens_q90 - center
raw_lower = center - ens_q10
train_vol_median = float(y_vol.median())
vol_ratio = rec['pred_vol'] / train_vol_median if train_vol_median > 0.001 else 1.0
vol_ratio = np.clip(vol_ratio, 0.85, 2.5) # grid-searched optimal
rec['pred_q10_1m'] = center - raw_lower * vol_ratio
rec['pred_q50_1m'] = center
rec['pred_q90_1m'] = center + raw_upper * vol_ratio
rec['interval_width_1m'] = (raw_upper + raw_lower) * vol_ratio
rec['vol_ratio'] = vol_ratio
# ════ 3M: INDEPENDENT VOL-ADAPTIVE (lb=1.0, doesn't narrow) ════
center3 = rec['qr_q50_3m']
raw_upper3 = rec['qr_q90_3m'] - center3
raw_lower3 = center3 - rec['qr_q10_3m']
vol_ratio_3m = np.clip(vol_ratio, 1.0, 2.5) # 3M keeps lb=1.0
rec['pred_q10_3m'] = center3 - raw_lower3 * vol_ratio_3m
rec['pred_q50_3m'] = center3
rec['pred_q90_3m'] = center3 + raw_upper3 * vol_ratio_3m
rec['interval_width_3m'] = (raw_upper3 + raw_lower3) * vol_ratio_3m
# ════ CONFORMAL QR (CQR) ════
# Split training into proper_train + calibration for conformity scores
cal_size = max(15, int(len(X_tr_f) * 0.2))
proper_n = len(X_tr_f) - cal_size
if proper_n > MIN_TRAIN_SAMPLES:
X_proper = X_tr_f.iloc[:proper_n]
X_cal = X_tr_f.iloc[proper_n:]
y_proper = y_ret.iloc[:proper_n]
y_cal = y_ret.iloc[proper_n:]
mask_proper = y_proper.notna()
mask_cal = y_cal.notna()
try:
# Fit QR on proper training set
qr_lo = QuantileRegressor(quantile=0.10, alpha=0.1, solver='highs')
qr_hi = QuantileRegressor(quantile=0.90, alpha=0.1, solver='highs')
qr_lo.fit(X_proper.loc[mask_proper], y_proper.loc[mask_proper])
qr_hi.fit(X_proper.loc[mask_proper], y_proper.loc[mask_proper])
# Conformity scores on calibration set
cal_lo = qr_lo.predict(X_cal.loc[mask_cal])
cal_hi = qr_hi.predict(X_cal.loc[mask_cal])
y_cal_vals = y_cal.loc[mask_cal].values
scores = np.maximum(cal_lo - y_cal_vals, y_cal_vals - cal_hi)
# Quantile of scores for coverage guarantee
alpha = 0.20 # Desired miscoverage = 20% → 80% coverage
n_cal = len(scores)
q_level = min(1.0, (1 - alpha) * (1 + 1 / n_cal))
Q_hat = np.quantile(scores, q_level)
# Apply to test point
cqr_lo = qr_lo.predict(X_te_f)[0] - Q_hat
cqr_hi = qr_hi.predict(X_te_f)[0] + Q_hat
cqr_md = (cqr_lo + cqr_hi) / 2
rec['cqr_q10_1m'] = float(cqr_lo)
rec['cqr_q50_1m'] = float(cqr_md)
rec['cqr_q90_1m'] = float(cqr_hi)
rec['cqr_Q_hat'] = float(Q_hat)
except:
rec['cqr_q10_1m'] = rec['pred_q10_1m']
rec['cqr_q50_1m'] = rec['pred_q50_1m']
rec['cqr_q90_1m'] = rec['pred_q90_1m']
rec['cqr_Q_hat'] = 0.0
else:
rec['cqr_q10_1m'] = rec['pred_q10_1m']
rec['cqr_q50_1m'] = rec['pred_q50_1m']
rec['cqr_q90_1m'] = rec['pred_q90_1m']
rec['cqr_Q_hat'] = 0.0
# Save pre-blend QR intervals for risk_bias (CQR should not affect directional signal)
qr_q10_prebend = rec['pred_q10_1m']
qr_q50_prebend = rec['pred_q50_1m']
qr_q90_prebend = rec['pred_q90_1m']
# ════ BLEND: 80% QR(VA) + 20% CQR → final 1M intervals ════
# CQR boosts hi-vol coverage (+5pp) while keeping WIS optimal
w_cqr = 0.20
rec['pred_q10_1m'] = (1 - w_cqr) * rec['pred_q10_1m'] + w_cqr * rec['cqr_q10_1m']
rec['pred_q90_1m'] = (1 - w_cqr) * rec['pred_q90_1m'] + w_cqr * rec['cqr_q90_1m']
rec['interval_width_1m'] = rec['pred_q90_1m'] - rec['pred_q10_1m']
# ════ FEATURE IMPORTANCE (LightGBM) ════
try:
lgb50 = lgb.LGBMRegressor(
objective='quantile', alpha=0.50, n_estimators=100,
max_depth=3, learning_rate=0.05, num_leaves=8,
min_child_samples=10, verbose=-1, random_state=42
)
lgb50.fit(X_tr_f.loc[mask_r], y_ret.loc[mask_r])
importances = lgb50.feature_importances_
imp_dict = dict(zip(sel, importances))
total_imp = sum(importances) if sum(importances) > 0 else 1
for group, members in FACTOR_GROUPS.items():
group_imp = sum(imp_dict.get(f, 0) for f in members if f in imp_dict)
rec[f'shap_{group}'] = float(group_imp / total_imp)
rec['shap_base'] = float(lgb50.predict(X_te_f)[0])
shap_records.append({
'date': str(test_date),
'features': sel,
'importances': importances.tolist(),
'normalized': (importances / total_imp).tolist(),
'lgb_pred': float(lgb50.predict(X_te_f)[0]),
})
except:
for g in FACTOR_GROUPS:
rec[f'shap_{g}'] = 0.0
rec['shap_base'] = 0.0
# ════ RISK LEVEL ════
# Use pre-blend QR intervals for bias (CQR widens symmetrically, distorts bias signal)
upside_tail = qr_q90_prebend - qr_q50_prebend
downside_tail = qr_q50_prebend - qr_q10_prebend
vol_pctile = (y_vol < rec['pred_vol']).mean()
rec['risk_level'] = 'High' if vol_pctile >= 0.70 else ('Medium' if vol_pctile >= 0.40 else 'Low')
rec['risk_bias'] = 'Upward' if upside_tail > 1.15 * downside_tail else (
'Downward' if downside_tail > 1.15 * upside_tail else 'Balanced')
rec['vol_pctile'] = vol_pctile
# ════ FACTOR ATTRIBUTION (Ridge) ════
factor_map = {}
for f in sel:
for group, members in FACTOR_GROUPS.items():
if f in members:
factor_map.setdefault(group, []).append(f)
break
attr_window = min(60, len(X_tr_f))
X_attr = X_tr_f.iloc[-attr_window:]
y_attr = y_ret.iloc[-attr_window:]
valid_attr = y_attr.notna()
coefs = {}
x_vals = {}
if valid_attr.sum() > MIN_TRAIN_SAMPLES:
try:
lr_attr = Ridge(alpha=1.0)
lr_attr.fit(X_attr.loc[valid_attr], y_attr.loc[valid_attr])
coefs = dict(zip(sel, lr_attr.coef_))
x_vals = dict(X_te_f.iloc[0])
for group in FACTOR_GROUPS:
gf = factor_map.get(group, [])
contrib = sum(coefs.get(f, 0) * x_vals.get(f, 0) for f in gf)
rec[f'factor_{group}'] = float(contrib)
abs_contribs = {g: abs(rec.get(f'factor_{g}', 0)) for g in factor_map}
rec['top_factor'] = max(abs_contribs, key=abs_contribs.get) if abs_contribs else 'Unknown'
except:
rec['top_factor'] = 'Unknown'
else:
rec['top_factor'] = 'Unknown'
# ════ REGIME DETECTION ════
regime_info = detect_regime(sel, coefs, x_vals, rec['pred_vol'])
rec['regime_match'] = regime_info['regime_match']
rec['regime_similarity'] = regime_info['regime_similarity']
rec['regime_type'] = regime_info['regime_type']
# ════ SCENARIOS ════
scenarios = {}
for scen_name, modifications in [
('vix_shock', {'vix_lag1': 2.0, 'vix_lag2': 1.5}),
('supply_cut', {'supply_saudi': -1.5, 'rig_count_us_new': -1.0}),
('demand_crash', {'pmi_us_mfg': -2.0, 'ipi_us': -1.5}),
]:
X_stress = X_te_f.copy()
for feat, mult in modifications.items():
if feat in X_stress.columns:
X_stress[feat] = X_stress[feat] + mult
try:
stress_q50 = lgb50.predict(X_stress)[0]
scenarios[scen_name] = float(stress_q50)
except:
scenarios[scen_name] = rec.get('pred_q50_1m', 0.0)
rec['scenario_base'] = rec['pred_q50_1m']
rec['scenario_vix_shock'] = scenarios.get('vix_shock', 0.0)
rec['scenario_supply_cut'] = scenarios.get('supply_cut', 0.0)
rec['scenario_demand_crash'] = scenarios.get('demand_crash', 0.0)
results.append(rec)
# ════ LIVE FORECAST: 预测无目标的最新月份 ════
if wf_end < len(panel) and len(results) > 0:
print(f"\n Live Forecast: {len(panel) - wf_end} 个最新月份 (无实际值)")
last_rec = results[-1]
for li in range(wf_end, len(panel)):
test_df = panel.iloc[li:li + 1]
test_date = panel.index[li]
train_df = panel.iloc[max(0, li - train_window):li]
avail = [f for f in features if train_df[f].notna().mean() > 0.8]
if len(avail) < 3:
continue
X_tr = train_df[avail].copy()
X_te = test_df[avail].copy()
for col in avail:
med = X_tr[col].median()
X_tr[col] = X_tr[col].fillna(med)
X_te[col] = X_te[col].fillna(med)
scaler = StandardScaler()
X_tr_s = pd.DataFrame(scaler.fit_transform(X_tr), columns=avail, index=train_df.index)
X_te_s = pd.DataFrame(scaler.transform(X_te), columns=avail, index=test_df.index)
# 使用固定特征 (从最后一次 walk-forward 的 sel)
sel_live = [f for f in sel if f in avail]
if len(sel_live) < 2:
sel_live = avail[:MAX_FEATURES]
X_tr_f = X_tr_s[sel_live]
X_te_f = X_te_s[sel_live]
ewma_now = test_df['ewma_vol'].values[0] if 'ewma_vol' in test_df.columns else 0.05
baseline_vol = float(ewma_now) if not np.isnan(ewma_now) else 0.05
live_rec = {
'test_date': test_date,
'actual_ret_1m': np.nan,
'actual_ret_3m': np.nan,
'actual_vol': np.nan,
'actual_up': np.nan,
'baseline_ewma': baseline_vol,
'is_live': True,
}
# 用最新的训练数据重新拟合轻量模型
y_ret = train_df['target_ret_1m'].dropna()
y_vol = train_df['target_abs_ret_1m'].dropna()
try:
# QR 1M
for alpha, suffix in [(0.05, 'q10'), (0.5, 'q50'), (0.95, 'q90')]:
qr = QuantileRegressor(quantile=alpha, alpha=0.1, solver='highs')
mask_qr = y_ret.index.isin(X_tr_f.index)
y_qr = y_ret[mask_qr]
X_qr = X_tr_f.loc[y_qr.index]
if len(X_qr) > 10:
qr.fit(X_qr, y_qr)
val = float(qr.predict(X_te_f)[0])
live_rec[f'pred_{suffix}_1m'] = val
# LightGBM
mask_lgb = y_ret.index.isin(X_tr_f.index)
y_lgb = y_ret[mask_lgb]
X_lgb = X_tr_f.loc[y_lgb.index]
if len(X_lgb) > 10:
lgb50 = lgb.LGBMRegressor(objective='quantile', alpha=0.5,
n_estimators=200, max_depth=4, verbose=-1)
lgb50.fit(X_lgb, y_lgb)
live_rec['lgb_q50'] = float(lgb50.predict(X_te_f)[0])
# Vol
mask_vol = y_vol.index.isin(X_tr_f.index)
y_v = y_vol[mask_vol]
X_v = X_tr_f.loc[y_v.index]
if len(X_v) > 10:
ridge = Ridge(alpha=1.0)
ridge.fit(X_v, y_v)
live_rec['pred_vol'] = max(float(ridge.predict(X_te_f)[0]), baseline_vol * 0.5)
else:
live_rec['pred_vol'] = baseline_vol
except Exception as e:
live_rec['pred_vol'] = baseline_vol
# Risk level + bias
lo = live_rec.get('pred_q10_1m', -0.05)
hi = live_rec.get('pred_q90_1m', 0.05)
q50 = live_rec.get('pred_q50_1m', 0.0)
width = abs(hi - lo)
live_rec['risk_level'] = 'High' if width > 0.15 else 'Medium' if width > 0.08 else 'Low'
live_rec['risk_score'] = width
upside = hi - q50
downside = q50 - lo
live_rec['risk_bias'] = 'Upward' if upside > 1.15 * downside else (
'Downward' if downside > 1.15 * upside else 'Balanced')
live_rec['vol_pctile'] = 0.5 # 无法计算,默认中等
# Copy structure from last record
for key in ['regime_match', 'regime_similarity', 'regime_type',
'scenario_base', 'scenario_vix_shock', 'scenario_supply_cut',
'scenario_demand_crash', 'top_factor']:
if key not in live_rec and key in last_rec:
live_rec[key] = last_rec[key]
# Regime detection
x_vals = {f: float(X_te_f[f].values[0]) for f in sel_live}
coefs = {}
regime_info = detect_regime(sel_live, coefs, x_vals, live_rec.get('pred_vol', 0.05))
live_rec.update(regime_info)
# Scenarios
for scen_name, modifications in [
('vix_shock', {'vix_lag1': 2.0, 'vix_lag2': 1.5}),
('supply_cut', {'supply_saudi': -1.5, 'rig_count_us_new': -1.0}),
('demand_crash', {'pmi_us_mfg': -2.0, 'ipi_us': -1.5}),
]:
X_stress = X_te_f.copy()
for feat, mult in modifications.items():
if feat in X_stress.columns:
X_stress[feat] = X_stress[feat] + mult
try:
live_rec[f'scenario_{scen_name}'] = float(lgb50.predict(X_stress)[0])
except:
live_rec[f'scenario_{scen_name}'] = live_rec.get('pred_q50_1m', 0.0)
live_rec['scenario_base'] = live_rec.get('pred_q50_1m', 0.0)
# Factor attribution (simplified)
for group in FACTOR_GROUPS:
live_rec[f'factor_{group}'] = last_rec.get(f'factor_{group}', 0.0)
results.append(live_rec)
print(f" ✓ {test_date.strftime('%Y-%m')}: Q50={live_rec.get('pred_q50_1m', 0):.3f}, "
f"Risk={live_rec['risk_level']}")
return pd.DataFrame(results), shap_records