""" 算法单签名检测器 (Algorithm Order Signature Detector) 从原始无标签的Level-2委托单数据中,通过规则引擎检测主力常用的算法单模式, 生成伪标签用于训练深度学习模型。 检测的5种模式: 0: TWAP - 时间加权平均价算法 (等量等间隔下单) 1: VWAP - 成交量加权平均价算法 (跟随市场成交量节奏) 2: ICEBERG - 冰山订单 (显示小量,实际大量,一档反复补单) 3: SUPPORT - 护盘/支撑 (关键价位持续大单挂单) 4: NORMAL - 正常/散户 (无明显算法特征) 数据输入格式 (原始Level-2行情): - 10档买卖委托 (ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10) - 逐笔委托 (ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE) 参考文献: - MarS (arxiv:2409.07486): TWAP签名定义 - PULSE (arxiv:2312.05827): 多时钟特征工程 - Hautsch & Huang (2012): 冰山订单识别 """ import numpy as np import pandas as pd from scipy.signal import find_peaks from scipy.stats import pearsonr # ============================================================ # 特征提取器 (Feature Extractors) # ============================================================ def compute_order_size_cv(sizes, window=20): """ 计算订单大小的变异系数 (Coefficient of Variation) TWAP特征: CV < 0.15 表明等量下单 """ N = len(sizes) cv = np.ones(N) * 999 # 默认高变异(非TWAP) for i in range(window, N): w = sizes[i-window:i] mean_w = w.mean() if mean_w > 0: cv[i] = w.std() / mean_w return cv def compute_periodicity(timestamps, window=20, expected_lag=None): """ 计算下单的周期性得分 TWAP特征: 等间隔下单 → 自相关函数在lag=Δt处有峰值 """ N = len(timestamps) periodicity = np.zeros(N) for i in range(window, N): ts = timestamps[i-window:i] # 计算相邻间隔 intervals = np.diff(ts) if len(intervals) < 3 or intervals.std() == 0: continue mean_interval = intervals.mean() std_interval = intervals.std() # 间隔的规律性: 1 - CV(intervals), 越接近1越规律 if mean_interval > 0: regularity = max(0, 1 - std_interval / mean_interval) periodicity[i] = regularity return periodicity def compute_cancel_burst_ratio(types, timestamps, window=20, boundary_frac=0.2): """ 计算撤单在时间窗口边界的集中度 TWAP特征: 在每个Δt结束时集中撤单 """ N = len(types) cancel_burst = np.zeros(N) is_cancel = (types == 'ORDER_CANCELLED').astype(float) for i in range(window, N): total_cancel = is_cancel[i-window:i].sum() if total_cancel == 0: continue # 最后20%的时间窗口内的撤单比例 boundary_start = int(window * (1 - boundary_frac)) boundary_cancel = is_cancel[i-window+boundary_start:i].sum() cancel_burst[i] = boundary_cancel / total_cancel return cancel_burst def compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20): """ 计算被动/主动订单比例 TWAP特征: 被动-主动交替模式 (25s被动挂bid1, 5s主动扫ask) 被动: 买单价 <= mid_price (挂在bid侧) 或 卖单价 >= mid_price (挂在ask侧) 主动: 买单价 > mid_price (扫ask侧) 或 卖单价 < mid_price (扫bid侧) """ N = len(prices) pa_ratio = np.zeros(N) is_aggressive = np.zeros(N) for i in range(N): if buy_sell[i]: # 买单 is_aggressive[i] = 1 if prices[i] >= mid_prices[i] else 0 else: # 卖单 is_aggressive[i] = 1 if prices[i] <= mid_prices[i] else 0 # 滚动计算被动/主动比例 cum_agg = np.cumsum(is_aggressive) for i in range(window, N): total_agg = cum_agg[i] - cum_agg[i - window] pa_ratio[i] = total_agg / window # 主动比例 return pa_ratio def compute_participation_rate(sizes, total_market_volume, window=20): """ 计算参与率稳定性 VWAP特征: 参与率 ≈ 常数 (10-20%) """ N = len(sizes) participation_stability = np.ones(N) * 999 cum_sizes = np.cumsum(sizes) cum_market = np.cumsum(total_market_volume) for i in range(window, N): # 每个子窗口的参与率 sub_window = max(1, window // 5) rates = [] for j in range(5): start = i - window + j * sub_window end = min(start + sub_window, i) if end <= start: continue vol = cum_sizes[end] - cum_sizes[start] market_vol = cum_market[end] - cum_market[start] if market_vol > 0: rates.append(vol / market_vol) if len(rates) >= 3: rates = np.array(rates) mean_rate = rates.mean() if mean_rate > 0: participation_stability[i] = rates.std() / mean_rate return participation_stability def compute_volume_correlation(sizes, buy_sell, total_market_volume, window=50): """ 计算子订单量与市场成交量的相关性 VWAP特征: Pearson(child_vol, market_vol) > 0.7 """ N = len(sizes) vol_corr = np.zeros(N) for i in range(window, N): child_vols = sizes[i-window:i] market_vols = total_market_volume[i-window:i] if child_vols.std() > 0 and market_vols.std() > 0: corr, _ = pearsonr(child_vols, market_vols) vol_corr[i] = max(0, corr) return vol_corr def compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20, refill_threshold=0.7): """ 计算一档补单比率 冰山订单特征: 成交后一档量瞬间恢复 检测: V_level1(t) 大幅下降后又快速恢复到接近原值 """ N = len(ask_sizes_1) refill_score = np.zeros(N) for side_sizes in [ask_sizes_1, bid_sizes_1]: for i in range(2, N): prev = side_sizes[i-2] curr = side_sizes[i-1] next_v = side_sizes[i] # 检测: 先减后增 (V大→V小→V大) if prev > 0 and curr < prev * 0.5 and next_v > prev * refill_threshold: refill_score[i] += 1 # 滚动窗口内的平均补单频率 cum_refill = np.cumsum(refill_score) result = np.zeros(N) for i in range(window, N): result[i] = (cum_refill[i] - cum_refill[i - window]) / window return result def compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50): """ 计算隐藏量比率 冰山订单特征: total_executed / max_displayed > 3.0 """ N = len(sizes) hidden_ratio = np.zeros(N) for i in range(window, N): # 总成交量 total_vol = sizes[i-window:i].sum() # 最大显示量 (一档的最大值) max_displayed = max( ask_sizes_1[i-window:i].max(), bid_sizes_1[i-window:i].max(), 1 # 避免除以0 ) hidden_ratio[i] = total_vol / max_displayed return hidden_ratio def compute_level_persistence(lob_sizes, window=50, big_order_percentile=90): """ 计算各价位大单持续性得分 支撑/阻力位特征: 某价位长期保持大单 lob_sizes: (N, 20) - 10档买卖量 [ask_s_1..10, bid_s_1..10] """ N = lob_sizes.shape[0] threshold = np.percentile(lob_sizes[lob_sizes > 0], big_order_percentile) persistence = np.zeros(N) for i in range(window, N): w = lob_sizes[i-window:i] # (window, 20) # 每档的持续大单得分 max_persistence = 0 for level in range(20): level_big = (w[:, level] > threshold).sum() / window max_persistence = max(max_persistence, level_big) persistence[i] = max_persistence return persistence def compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3): """ 计算深度不平衡度 支撑位特征: bid侧大量堆单 → imbalance > 0 阻力位特征: ask侧大量堆单 → imbalance < 0 """ bid_depth = bid_sizes[:, :top_levels].sum(axis=1) ask_depth = ask_sizes[:, :top_levels].sum(axis=1) total = bid_depth + ask_depth + 1e-8 imbalance = (bid_depth - ask_depth) / total return imbalance def compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50]): """ 多尺度订单流不平衡 (Order Flow Imbalance) PULSE论文中的核心特征 """ N = len(ask_sizes_1) features = {} imb = (bid_sizes_1 - ask_sizes_1) / (bid_sizes_1 + ask_sizes_1 + 1e-8) for w in windows: # 滚动均值 cum = np.cumsum(imb) roll_mean = np.zeros(N) roll_mean[w:] = (cum[w:] - cum[:-w]) / w features[f'ofi_{w}'] = roll_mean # 滚动标准差 cum_sq = np.cumsum(imb ** 2) roll_var = np.zeros(N) roll_var[w:] = (cum_sq[w:] - cum_sq[:-w]) / w - roll_mean[w:] ** 2 features[f'ofi_vol_{w}'] = np.sqrt(np.maximum(roll_var, 0)) return features # ============================================================ # 伪标签生成器 (Pseudo-Label Generator) # ============================================================ def generate_pseudo_labels(df, verbose=True): """ 从原始Level-2数据生成伪标签。 输入: DataFrame,包含ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE, ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10 输出: labels: (N,) int64, 0=TWAP, 1=VWAP, 2=ICEBERG, 3=SUPPORT, 4=NORMAL scores: (N, 4) float32, 每种算法的置信度分数 features: (N, F) float32, 提取的全部特征 """ N = len(df) # 基础数据 sizes = df['SIZE'].values.astype(np.float32) buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32) types = df['TYPE'].values prices = df['PRICE'].values.astype(np.float32) ask_sizes_1 = df['ask_size_1'].values.astype(np.float32) bid_sizes_1 = df['bid_size_1'].values.astype(np.float32) ask_price_1 = df['ask_price_1'].values.astype(np.float32) bid_price_1 = df['bid_price_1'].values.astype(np.float32) # 替换sentinel值 for arr in [ask_sizes_1, bid_sizes_1, ask_price_1, bid_price_1]: arr[np.abs(arr) > 1e9] = 0 # mid price valid = (ask_price_1 > 0) & (bid_price_1 > 0) mid_prices = np.where(valid, (ask_price_1 + bid_price_1) / 2.0, 0.0) for i in range(1, N): if mid_prices[i] == 0 and mid_prices[i-1] != 0: mid_prices[i] = mid_prices[i-1] # 收集10档量 ask_sizes = np.zeros((N, 10), dtype=np.float32) bid_sizes = np.zeros((N, 10), dtype=np.float32) for i in range(10): ask_s = df[f'ask_size_{i+1}'].values.astype(np.float32) bid_s = df[f'bid_size_{i+1}'].values.astype(np.float32) ask_s[np.abs(ask_s) > 1e9] = 0 bid_s[np.abs(bid_s) > 1e9] = 0 ask_sizes[:, i] = ask_s bid_sizes[:, i] = bid_s # 时间戳 (使用ORDER_ID作为序号近似) timestamps = np.arange(N, dtype=np.float32) if verbose: print("Computing algorithm signatures...") # ============ TWAP特征 ============ order_cv = compute_order_size_cv(sizes, window=20) periodicity = compute_periodicity(timestamps, window=20) cancel_burst = compute_cancel_burst_ratio(types, timestamps, window=20) pa_ratio = compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20) # TWAP得分: 低变异 + 高周期性 + 边界撤单 + 被动为主 twap_score = np.zeros(N) twap_score += np.clip(1 - order_cv / 0.3, 0, 1) * 0.35 # CV < 0.3 → 高分 twap_score += periodicity * 0.30 # 周期性 twap_score += np.clip(cancel_burst / 0.5, 0, 1) * 0.20 # 撤单集中度 twap_score += np.clip(1 - pa_ratio / 0.3, 0, 1) * 0.15 # 被动为主 # ============ VWAP特征 ============ # 用全局sizes作为market_volume的代理 market_vol = np.convolve(sizes, np.ones(10)/10, mode='same') # 滑动平均 part_stability = compute_participation_rate(sizes, market_vol, window=50) vol_corr = compute_volume_correlation(sizes, buy_sell, market_vol, window=50) # VWAP得分: 稳定参与率 + 高量相关性 vwap_score = np.zeros(N) vwap_score += np.clip(1 - part_stability / 0.5, 0, 1) * 0.50 # 参与率稳定 vwap_score += vol_corr * 0.50 # 量相关 # ============ 冰山订单特征 ============ refill = compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20) hidden_vol = compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50) # 冰山得分: 高补单率 + 高隐藏量比 iceberg_score = np.zeros(N) iceberg_score += np.clip(refill / 0.5, 0, 1) * 0.50 # 补单频率 iceberg_score += np.clip(hidden_vol / 5.0, 0, 1) * 0.50 # 隐藏量比 # ============ 支撑/阻力位特征 ============ lob_sizes = np.concatenate([ask_sizes, bid_sizes], axis=1) # (N, 20) persistence = compute_level_persistence(lob_sizes, window=50) depth_imb = compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3) # 支撑得分: 高持续性 + 不平衡度大 support_score = np.zeros(N) support_score += persistence * 0.50 # 大单持续性 support_score += np.clip(np.abs(depth_imb) / 0.5, 0, 1) * 0.50 # 深度不平衡 # ============ 多尺度OFI (通用特征) ============ ofi_features = compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50]) # ============ 合并所有得分和特征 ============ scores = np.stack([twap_score, vwap_score, iceberg_score, support_score], axis=1) # 伪标签: 每种模式用各自的百分位阈值 max_scores = scores.max(axis=1) labels = np.full(N, 4, dtype=np.int64) # 默认NORMAL # 每种模式单独设阈值 (取前15-25%为该类) for cls in range(4): cls_scores = scores[:, cls] valid_scores = cls_scores[cls_scores > 0.01] if len(valid_scores) > 0: thr = np.percentile(valid_scores, 80) # top 20% labels[(cls_scores >= thr) & (cls_scores > 0.2)] = cls # 特征矩阵 all_features = np.column_stack([ order_cv, periodicity, cancel_burst, pa_ratio, part_stability, vol_corr, refill, hidden_vol, persistence, depth_imb, *[ofi_features[k] for k in sorted(ofi_features.keys())] ]).astype(np.float32) # 替换NaN/Inf all_features = np.nan_to_num(all_features, nan=0.0, posinf=0.0, neginf=0.0) if verbose: label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'} unique, counts = np.unique(labels, return_counts=True) print(f"Pseudo-label distribution:") for u, c in zip(unique, counts): print(f" {u} ({label_names[u]}): {c} ({c/N*100:.1f}%)") print(f"Feature matrix shape: {all_features.shape}") return labels, scores, all_features # ============================================================ # 使用示例 # ============================================================ if __name__ == "__main__": from datasets import load_dataset print("Loading TRADES-LOB dataset...") ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train") df = ds.to_pandas() print(f"Dataset: {len(df)} rows") labels, scores, features = generate_pseudo_labels(df) print(f"\nLabel shape: {labels.shape}") print(f"Score shape: {scores.shape}") print(f"Feature shape: {features.shape}") # 展示每种模式的top案例 label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'} for cls in range(4): cls_mask = labels == cls if cls_mask.sum() > 0: top_idx = np.where(cls_mask)[0] top_scores = scores[top_idx, cls] best = top_idx[top_scores.argmax()] print(f"\n{label_names[cls]} 最高置信度样本 (idx={best}, score={scores[best, cls]:.3f}):") print(f" SIZE={df.iloc[best]['SIZE']}, PRICE={df.iloc[best]['PRICE']}, " f"BUY_SELL={'Buy' if df.iloc[best]['BUY_SELL_FLAG'] else 'Sell'}")