| """ |
| 算法单签名检测器 (Algorithm Order Signature Detector) |
| |
| 从原始无标签的Level-2委托单数据中,通过规则引擎检测主力常用的算法单模式, |
| 生成伪标签用于训练深度学习模型。 |
| |
| 检测的5种模式: |
| 0: TWAP - 时间加权平均价算法 (等量等间隔下单) |
| 1: VWAP - 成交量加权平均价算法 (跟随市场成交量节奏) |
| 2: ICEBERG - 冰山订单 (显示小量,实际大量,一档反复补单) |
| 3: SUPPORT - 护盘/支撑 (关键价位持续大单挂单) |
| 4: NORMAL - 正常/散户 (无明显算法特征) |
| |
| 数据输入格式 (原始Level-2行情): |
| - 10档买卖委托 (ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10) |
| - 逐笔委托 (ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE) |
| |
| 参考文献: |
| - MarS (arxiv:2409.07486): TWAP签名定义 |
| - PULSE (arxiv:2312.05827): 多时钟特征工程 |
| - Hautsch & Huang (2012): 冰山订单识别 |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy.signal import find_peaks |
| from scipy.stats import pearsonr |
|
|
|
|
| |
| |
| |
|
|
| def compute_order_size_cv(sizes, window=20): |
| """ |
| 计算订单大小的变异系数 (Coefficient of Variation) |
| TWAP特征: CV < 0.15 表明等量下单 |
| """ |
| N = len(sizes) |
| cv = np.ones(N) * 999 |
| for i in range(window, N): |
| w = sizes[i-window:i] |
| mean_w = w.mean() |
| if mean_w > 0: |
| cv[i] = w.std() / mean_w |
| return cv |
|
|
|
|
| def compute_periodicity(timestamps, window=20, expected_lag=None): |
| """ |
| 计算下单的周期性得分 |
| TWAP特征: 等间隔下单 → 自相关函数在lag=Δt处有峰值 |
| """ |
| N = len(timestamps) |
| periodicity = np.zeros(N) |
| |
| for i in range(window, N): |
| ts = timestamps[i-window:i] |
| |
| intervals = np.diff(ts) |
| if len(intervals) < 3 or intervals.std() == 0: |
| continue |
| |
| mean_interval = intervals.mean() |
| std_interval = intervals.std() |
| |
| |
| if mean_interval > 0: |
| regularity = max(0, 1 - std_interval / mean_interval) |
| periodicity[i] = regularity |
| |
| return periodicity |
|
|
|
|
| def compute_cancel_burst_ratio(types, timestamps, window=20, boundary_frac=0.2): |
| """ |
| 计算撤单在时间窗口边界的集中度 |
| TWAP特征: 在每个Δt结束时集中撤单 |
| """ |
| N = len(types) |
| cancel_burst = np.zeros(N) |
| is_cancel = (types == 'ORDER_CANCELLED').astype(float) |
| |
| for i in range(window, N): |
| total_cancel = is_cancel[i-window:i].sum() |
| if total_cancel == 0: |
| continue |
| |
| |
| boundary_start = int(window * (1 - boundary_frac)) |
| boundary_cancel = is_cancel[i-window+boundary_start:i].sum() |
| cancel_burst[i] = boundary_cancel / total_cancel |
| |
| return cancel_burst |
|
|
|
|
| def compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20): |
| """ |
| 计算被动/主动订单比例 |
| TWAP特征: 被动-主动交替模式 (25s被动挂bid1, 5s主动扫ask) |
| |
| 被动: 买单价 <= mid_price (挂在bid侧) 或 卖单价 >= mid_price (挂在ask侧) |
| 主动: 买单价 > mid_price (扫ask侧) 或 卖单价 < mid_price (扫bid侧) |
| """ |
| N = len(prices) |
| pa_ratio = np.zeros(N) |
| |
| is_aggressive = np.zeros(N) |
| for i in range(N): |
| if buy_sell[i]: |
| is_aggressive[i] = 1 if prices[i] >= mid_prices[i] else 0 |
| else: |
| is_aggressive[i] = 1 if prices[i] <= mid_prices[i] else 0 |
| |
| |
| cum_agg = np.cumsum(is_aggressive) |
| for i in range(window, N): |
| total_agg = cum_agg[i] - cum_agg[i - window] |
| pa_ratio[i] = total_agg / window |
| |
| return pa_ratio |
|
|
|
|
| def compute_participation_rate(sizes, total_market_volume, window=20): |
| """ |
| 计算参与率稳定性 |
| VWAP特征: 参与率 ≈ 常数 (10-20%) |
| """ |
| N = len(sizes) |
| participation_stability = np.ones(N) * 999 |
| |
| cum_sizes = np.cumsum(sizes) |
| cum_market = np.cumsum(total_market_volume) |
| |
| for i in range(window, N): |
| |
| sub_window = max(1, window // 5) |
| rates = [] |
| for j in range(5): |
| start = i - window + j * sub_window |
| end = min(start + sub_window, i) |
| if end <= start: |
| continue |
| vol = cum_sizes[end] - cum_sizes[start] |
| market_vol = cum_market[end] - cum_market[start] |
| if market_vol > 0: |
| rates.append(vol / market_vol) |
| |
| if len(rates) >= 3: |
| rates = np.array(rates) |
| mean_rate = rates.mean() |
| if mean_rate > 0: |
| participation_stability[i] = rates.std() / mean_rate |
| |
| return participation_stability |
|
|
|
|
| def compute_volume_correlation(sizes, buy_sell, total_market_volume, window=50): |
| """ |
| 计算子订单量与市场成交量的相关性 |
| VWAP特征: Pearson(child_vol, market_vol) > 0.7 |
| """ |
| N = len(sizes) |
| vol_corr = np.zeros(N) |
| |
| for i in range(window, N): |
| child_vols = sizes[i-window:i] |
| market_vols = total_market_volume[i-window:i] |
| |
| if child_vols.std() > 0 and market_vols.std() > 0: |
| corr, _ = pearsonr(child_vols, market_vols) |
| vol_corr[i] = max(0, corr) |
| |
| return vol_corr |
|
|
|
|
| def compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20, refill_threshold=0.7): |
| """ |
| 计算一档补单比率 |
| 冰山订单特征: 成交后一档量瞬间恢复 |
| |
| 检测: V_level1(t) 大幅下降后又快速恢复到接近原值 |
| """ |
| N = len(ask_sizes_1) |
| refill_score = np.zeros(N) |
| |
| for side_sizes in [ask_sizes_1, bid_sizes_1]: |
| for i in range(2, N): |
| prev = side_sizes[i-2] |
| curr = side_sizes[i-1] |
| next_v = side_sizes[i] |
| |
| |
| if prev > 0 and curr < prev * 0.5 and next_v > prev * refill_threshold: |
| refill_score[i] += 1 |
| |
| |
| cum_refill = np.cumsum(refill_score) |
| result = np.zeros(N) |
| for i in range(window, N): |
| result[i] = (cum_refill[i] - cum_refill[i - window]) / window |
| |
| return result |
|
|
|
|
| def compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50): |
| """ |
| 计算隐藏量比率 |
| 冰山订单特征: total_executed / max_displayed > 3.0 |
| """ |
| N = len(sizes) |
| hidden_ratio = np.zeros(N) |
| |
| for i in range(window, N): |
| |
| total_vol = sizes[i-window:i].sum() |
| |
| |
| max_displayed = max( |
| ask_sizes_1[i-window:i].max(), |
| bid_sizes_1[i-window:i].max(), |
| 1 |
| ) |
| |
| hidden_ratio[i] = total_vol / max_displayed |
| |
| return hidden_ratio |
|
|
|
|
| def compute_level_persistence(lob_sizes, window=50, big_order_percentile=90): |
| """ |
| 计算各价位大单持续性得分 |
| 支撑/阻力位特征: 某价位长期保持大单 |
| |
| lob_sizes: (N, 20) - 10档买卖量 [ask_s_1..10, bid_s_1..10] |
| """ |
| N = lob_sizes.shape[0] |
| threshold = np.percentile(lob_sizes[lob_sizes > 0], big_order_percentile) |
| |
| persistence = np.zeros(N) |
| for i in range(window, N): |
| w = lob_sizes[i-window:i] |
| |
| |
| max_persistence = 0 |
| for level in range(20): |
| level_big = (w[:, level] > threshold).sum() / window |
| max_persistence = max(max_persistence, level_big) |
| |
| persistence[i] = max_persistence |
| |
| return persistence |
|
|
|
|
| def compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3): |
| """ |
| 计算深度不平衡度 |
| 支撑位特征: bid侧大量堆单 → imbalance > 0 |
| 阻力位特征: ask侧大量堆单 → imbalance < 0 |
| """ |
| bid_depth = bid_sizes[:, :top_levels].sum(axis=1) |
| ask_depth = ask_sizes[:, :top_levels].sum(axis=1) |
| |
| total = bid_depth + ask_depth + 1e-8 |
| imbalance = (bid_depth - ask_depth) / total |
| |
| return imbalance |
|
|
|
|
| def compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50]): |
| """ |
| 多尺度订单流不平衡 (Order Flow Imbalance) |
| PULSE论文中的核心特征 |
| """ |
| N = len(ask_sizes_1) |
| features = {} |
| |
| imb = (bid_sizes_1 - ask_sizes_1) / (bid_sizes_1 + ask_sizes_1 + 1e-8) |
| |
| for w in windows: |
| |
| cum = np.cumsum(imb) |
| roll_mean = np.zeros(N) |
| roll_mean[w:] = (cum[w:] - cum[:-w]) / w |
| features[f'ofi_{w}'] = roll_mean |
| |
| |
| cum_sq = np.cumsum(imb ** 2) |
| roll_var = np.zeros(N) |
| roll_var[w:] = (cum_sq[w:] - cum_sq[:-w]) / w - roll_mean[w:] ** 2 |
| features[f'ofi_vol_{w}'] = np.sqrt(np.maximum(roll_var, 0)) |
| |
| return features |
|
|
|
|
| |
| |
| |
|
|
| def generate_pseudo_labels(df, verbose=True): |
| """ |
| 从原始Level-2数据生成伪标签。 |
| |
| 输入: DataFrame,包含ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE, |
| ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10 |
| |
| 输出: |
| labels: (N,) int64, 0=TWAP, 1=VWAP, 2=ICEBERG, 3=SUPPORT, 4=NORMAL |
| scores: (N, 4) float32, 每种算法的置信度分数 |
| features: (N, F) float32, 提取的全部特征 |
| """ |
| N = len(df) |
| |
| |
| sizes = df['SIZE'].values.astype(np.float32) |
| buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32) |
| types = df['TYPE'].values |
| prices = df['PRICE'].values.astype(np.float32) |
| |
| ask_sizes_1 = df['ask_size_1'].values.astype(np.float32) |
| bid_sizes_1 = df['bid_size_1'].values.astype(np.float32) |
| ask_price_1 = df['ask_price_1'].values.astype(np.float32) |
| bid_price_1 = df['bid_price_1'].values.astype(np.float32) |
| |
| |
| for arr in [ask_sizes_1, bid_sizes_1, ask_price_1, bid_price_1]: |
| arr[np.abs(arr) > 1e9] = 0 |
| |
| |
| valid = (ask_price_1 > 0) & (bid_price_1 > 0) |
| mid_prices = np.where(valid, (ask_price_1 + bid_price_1) / 2.0, 0.0) |
| for i in range(1, N): |
| if mid_prices[i] == 0 and mid_prices[i-1] != 0: |
| mid_prices[i] = mid_prices[i-1] |
| |
| |
| ask_sizes = np.zeros((N, 10), dtype=np.float32) |
| bid_sizes = np.zeros((N, 10), dtype=np.float32) |
| for i in range(10): |
| ask_s = df[f'ask_size_{i+1}'].values.astype(np.float32) |
| bid_s = df[f'bid_size_{i+1}'].values.astype(np.float32) |
| ask_s[np.abs(ask_s) > 1e9] = 0 |
| bid_s[np.abs(bid_s) > 1e9] = 0 |
| ask_sizes[:, i] = ask_s |
| bid_sizes[:, i] = bid_s |
| |
| |
| timestamps = np.arange(N, dtype=np.float32) |
| |
| if verbose: |
| print("Computing algorithm signatures...") |
| |
| |
| order_cv = compute_order_size_cv(sizes, window=20) |
| periodicity = compute_periodicity(timestamps, window=20) |
| cancel_burst = compute_cancel_burst_ratio(types, timestamps, window=20) |
| pa_ratio = compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20) |
| |
| |
| twap_score = np.zeros(N) |
| twap_score += np.clip(1 - order_cv / 0.3, 0, 1) * 0.35 |
| twap_score += periodicity * 0.30 |
| twap_score += np.clip(cancel_burst / 0.5, 0, 1) * 0.20 |
| twap_score += np.clip(1 - pa_ratio / 0.3, 0, 1) * 0.15 |
| |
| |
| |
| market_vol = np.convolve(sizes, np.ones(10)/10, mode='same') |
| part_stability = compute_participation_rate(sizes, market_vol, window=50) |
| vol_corr = compute_volume_correlation(sizes, buy_sell, market_vol, window=50) |
| |
| |
| vwap_score = np.zeros(N) |
| vwap_score += np.clip(1 - part_stability / 0.5, 0, 1) * 0.50 |
| vwap_score += vol_corr * 0.50 |
| |
| |
| refill = compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20) |
| hidden_vol = compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50) |
| |
| |
| iceberg_score = np.zeros(N) |
| iceberg_score += np.clip(refill / 0.5, 0, 1) * 0.50 |
| iceberg_score += np.clip(hidden_vol / 5.0, 0, 1) * 0.50 |
| |
| |
| lob_sizes = np.concatenate([ask_sizes, bid_sizes], axis=1) |
| persistence = compute_level_persistence(lob_sizes, window=50) |
| depth_imb = compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3) |
| |
| |
| support_score = np.zeros(N) |
| support_score += persistence * 0.50 |
| support_score += np.clip(np.abs(depth_imb) / 0.5, 0, 1) * 0.50 |
| |
| |
| ofi_features = compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50]) |
| |
| |
| scores = np.stack([twap_score, vwap_score, iceberg_score, support_score], axis=1) |
| |
| |
| max_scores = scores.max(axis=1) |
| labels = np.full(N, 4, dtype=np.int64) |
| |
| |
| for cls in range(4): |
| cls_scores = scores[:, cls] |
| valid_scores = cls_scores[cls_scores > 0.01] |
| if len(valid_scores) > 0: |
| thr = np.percentile(valid_scores, 80) |
| labels[(cls_scores >= thr) & (cls_scores > 0.2)] = cls |
| |
| |
| all_features = np.column_stack([ |
| order_cv, periodicity, cancel_burst, pa_ratio, |
| part_stability, vol_corr, |
| refill, hidden_vol, |
| persistence, depth_imb, |
| *[ofi_features[k] for k in sorted(ofi_features.keys())] |
| ]).astype(np.float32) |
| |
| |
| all_features = np.nan_to_num(all_features, nan=0.0, posinf=0.0, neginf=0.0) |
| |
| if verbose: |
| label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'} |
| unique, counts = np.unique(labels, return_counts=True) |
| print(f"Pseudo-label distribution:") |
| for u, c in zip(unique, counts): |
| print(f" {u} ({label_names[u]}): {c} ({c/N*100:.1f}%)") |
| print(f"Feature matrix shape: {all_features.shape}") |
| |
| return labels, scores, all_features |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from datasets import load_dataset |
| |
| print("Loading TRADES-LOB dataset...") |
| ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train") |
| df = ds.to_pandas() |
| print(f"Dataset: {len(df)} rows") |
| |
| labels, scores, features = generate_pseudo_labels(df) |
| |
| print(f"\nLabel shape: {labels.shape}") |
| print(f"Score shape: {scores.shape}") |
| print(f"Feature shape: {features.shape}") |
| |
| |
| label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'} |
| for cls in range(4): |
| cls_mask = labels == cls |
| if cls_mask.sum() > 0: |
| top_idx = np.where(cls_mask)[0] |
| top_scores = scores[top_idx, cls] |
| best = top_idx[top_scores.argmax()] |
| print(f"\n{label_names[cls]} 最高置信度样本 (idx={best}, score={scores[best, cls]:.3f}):") |
| print(f" SIZE={df.iloc[best]['SIZE']}, PRICE={df.iloc[best]['PRICE']}, " |
| f"BUY_SELL={'Buy' if df.iloc[best]['BUY_SELL_FLAG'] else 'Sell'}") |
|
|