lob-pattern-net / algo_detector.py
kangkangchen's picture
Upload folder using huggingface_hub
8171f7d verified
"""
算法单签名检测器 (Algorithm Order Signature Detector)
从原始无标签的Level-2委托单数据中,通过规则引擎检测主力常用的算法单模式,
生成伪标签用于训练深度学习模型。
检测的5种模式:
0: TWAP - 时间加权平均价算法 (等量等间隔下单)
1: VWAP - 成交量加权平均价算法 (跟随市场成交量节奏)
2: ICEBERG - 冰山订单 (显示小量,实际大量,一档反复补单)
3: SUPPORT - 护盘/支撑 (关键价位持续大单挂单)
4: NORMAL - 正常/散户 (无明显算法特征)
数据输入格式 (原始Level-2行情):
- 10档买卖委托 (ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10)
- 逐笔委托 (ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE)
参考文献:
- MarS (arxiv:2409.07486): TWAP签名定义
- PULSE (arxiv:2312.05827): 多时钟特征工程
- Hautsch & Huang (2012): 冰山订单识别
"""
import numpy as np
import pandas as pd
from scipy.signal import find_peaks
from scipy.stats import pearsonr
# ============================================================
# 特征提取器 (Feature Extractors)
# ============================================================
def compute_order_size_cv(sizes, window=20):
"""
计算订单大小的变异系数 (Coefficient of Variation)
TWAP特征: CV < 0.15 表明等量下单
"""
N = len(sizes)
cv = np.ones(N) * 999 # 默认高变异(非TWAP)
for i in range(window, N):
w = sizes[i-window:i]
mean_w = w.mean()
if mean_w > 0:
cv[i] = w.std() / mean_w
return cv
def compute_periodicity(timestamps, window=20, expected_lag=None):
"""
计算下单的周期性得分
TWAP特征: 等间隔下单 → 自相关函数在lag=Δt处有峰值
"""
N = len(timestamps)
periodicity = np.zeros(N)
for i in range(window, N):
ts = timestamps[i-window:i]
# 计算相邻间隔
intervals = np.diff(ts)
if len(intervals) < 3 or intervals.std() == 0:
continue
mean_interval = intervals.mean()
std_interval = intervals.std()
# 间隔的规律性: 1 - CV(intervals), 越接近1越规律
if mean_interval > 0:
regularity = max(0, 1 - std_interval / mean_interval)
periodicity[i] = regularity
return periodicity
def compute_cancel_burst_ratio(types, timestamps, window=20, boundary_frac=0.2):
"""
计算撤单在时间窗口边界的集中度
TWAP特征: 在每个Δt结束时集中撤单
"""
N = len(types)
cancel_burst = np.zeros(N)
is_cancel = (types == 'ORDER_CANCELLED').astype(float)
for i in range(window, N):
total_cancel = is_cancel[i-window:i].sum()
if total_cancel == 0:
continue
# 最后20%的时间窗口内的撤单比例
boundary_start = int(window * (1 - boundary_frac))
boundary_cancel = is_cancel[i-window+boundary_start:i].sum()
cancel_burst[i] = boundary_cancel / total_cancel
return cancel_burst
def compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20):
"""
计算被动/主动订单比例
TWAP特征: 被动-主动交替模式 (25s被动挂bid1, 5s主动扫ask)
被动: 买单价 <= mid_price (挂在bid侧) 或 卖单价 >= mid_price (挂在ask侧)
主动: 买单价 > mid_price (扫ask侧) 或 卖单价 < mid_price (扫bid侧)
"""
N = len(prices)
pa_ratio = np.zeros(N)
is_aggressive = np.zeros(N)
for i in range(N):
if buy_sell[i]: # 买单
is_aggressive[i] = 1 if prices[i] >= mid_prices[i] else 0
else: # 卖单
is_aggressive[i] = 1 if prices[i] <= mid_prices[i] else 0
# 滚动计算被动/主动比例
cum_agg = np.cumsum(is_aggressive)
for i in range(window, N):
total_agg = cum_agg[i] - cum_agg[i - window]
pa_ratio[i] = total_agg / window # 主动比例
return pa_ratio
def compute_participation_rate(sizes, total_market_volume, window=20):
"""
计算参与率稳定性
VWAP特征: 参与率 ≈ 常数 (10-20%)
"""
N = len(sizes)
participation_stability = np.ones(N) * 999
cum_sizes = np.cumsum(sizes)
cum_market = np.cumsum(total_market_volume)
for i in range(window, N):
# 每个子窗口的参与率
sub_window = max(1, window // 5)
rates = []
for j in range(5):
start = i - window + j * sub_window
end = min(start + sub_window, i)
if end <= start:
continue
vol = cum_sizes[end] - cum_sizes[start]
market_vol = cum_market[end] - cum_market[start]
if market_vol > 0:
rates.append(vol / market_vol)
if len(rates) >= 3:
rates = np.array(rates)
mean_rate = rates.mean()
if mean_rate > 0:
participation_stability[i] = rates.std() / mean_rate
return participation_stability
def compute_volume_correlation(sizes, buy_sell, total_market_volume, window=50):
"""
计算子订单量与市场成交量的相关性
VWAP特征: Pearson(child_vol, market_vol) > 0.7
"""
N = len(sizes)
vol_corr = np.zeros(N)
for i in range(window, N):
child_vols = sizes[i-window:i]
market_vols = total_market_volume[i-window:i]
if child_vols.std() > 0 and market_vols.std() > 0:
corr, _ = pearsonr(child_vols, market_vols)
vol_corr[i] = max(0, corr)
return vol_corr
def compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20, refill_threshold=0.7):
"""
计算一档补单比率
冰山订单特征: 成交后一档量瞬间恢复
检测: V_level1(t) 大幅下降后又快速恢复到接近原值
"""
N = len(ask_sizes_1)
refill_score = np.zeros(N)
for side_sizes in [ask_sizes_1, bid_sizes_1]:
for i in range(2, N):
prev = side_sizes[i-2]
curr = side_sizes[i-1]
next_v = side_sizes[i]
# 检测: 先减后增 (V大→V小→V大)
if prev > 0 and curr < prev * 0.5 and next_v > prev * refill_threshold:
refill_score[i] += 1
# 滚动窗口内的平均补单频率
cum_refill = np.cumsum(refill_score)
result = np.zeros(N)
for i in range(window, N):
result[i] = (cum_refill[i] - cum_refill[i - window]) / window
return result
def compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50):
"""
计算隐藏量比率
冰山订单特征: total_executed / max_displayed > 3.0
"""
N = len(sizes)
hidden_ratio = np.zeros(N)
for i in range(window, N):
# 总成交量
total_vol = sizes[i-window:i].sum()
# 最大显示量 (一档的最大值)
max_displayed = max(
ask_sizes_1[i-window:i].max(),
bid_sizes_1[i-window:i].max(),
1 # 避免除以0
)
hidden_ratio[i] = total_vol / max_displayed
return hidden_ratio
def compute_level_persistence(lob_sizes, window=50, big_order_percentile=90):
"""
计算各价位大单持续性得分
支撑/阻力位特征: 某价位长期保持大单
lob_sizes: (N, 20) - 10档买卖量 [ask_s_1..10, bid_s_1..10]
"""
N = lob_sizes.shape[0]
threshold = np.percentile(lob_sizes[lob_sizes > 0], big_order_percentile)
persistence = np.zeros(N)
for i in range(window, N):
w = lob_sizes[i-window:i] # (window, 20)
# 每档的持续大单得分
max_persistence = 0
for level in range(20):
level_big = (w[:, level] > threshold).sum() / window
max_persistence = max(max_persistence, level_big)
persistence[i] = max_persistence
return persistence
def compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3):
"""
计算深度不平衡度
支撑位特征: bid侧大量堆单 → imbalance > 0
阻力位特征: ask侧大量堆单 → imbalance < 0
"""
bid_depth = bid_sizes[:, :top_levels].sum(axis=1)
ask_depth = ask_sizes[:, :top_levels].sum(axis=1)
total = bid_depth + ask_depth + 1e-8
imbalance = (bid_depth - ask_depth) / total
return imbalance
def compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50]):
"""
多尺度订单流不平衡 (Order Flow Imbalance)
PULSE论文中的核心特征
"""
N = len(ask_sizes_1)
features = {}
imb = (bid_sizes_1 - ask_sizes_1) / (bid_sizes_1 + ask_sizes_1 + 1e-8)
for w in windows:
# 滚动均值
cum = np.cumsum(imb)
roll_mean = np.zeros(N)
roll_mean[w:] = (cum[w:] - cum[:-w]) / w
features[f'ofi_{w}'] = roll_mean
# 滚动标准差
cum_sq = np.cumsum(imb ** 2)
roll_var = np.zeros(N)
roll_var[w:] = (cum_sq[w:] - cum_sq[:-w]) / w - roll_mean[w:] ** 2
features[f'ofi_vol_{w}'] = np.sqrt(np.maximum(roll_var, 0))
return features
# ============================================================
# 伪标签生成器 (Pseudo-Label Generator)
# ============================================================
def generate_pseudo_labels(df, verbose=True):
"""
从原始Level-2数据生成伪标签。
输入: DataFrame,包含ORDER_ID, PRICE, SIZE, BUY_SELL_FLAG, TYPE,
ask_price_1..10, ask_size_1..10, bid_price_1..10, bid_size_1..10
输出:
labels: (N,) int64, 0=TWAP, 1=VWAP, 2=ICEBERG, 3=SUPPORT, 4=NORMAL
scores: (N, 4) float32, 每种算法的置信度分数
features: (N, F) float32, 提取的全部特征
"""
N = len(df)
# 基础数据
sizes = df['SIZE'].values.astype(np.float32)
buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32)
types = df['TYPE'].values
prices = df['PRICE'].values.astype(np.float32)
ask_sizes_1 = df['ask_size_1'].values.astype(np.float32)
bid_sizes_1 = df['bid_size_1'].values.astype(np.float32)
ask_price_1 = df['ask_price_1'].values.astype(np.float32)
bid_price_1 = df['bid_price_1'].values.astype(np.float32)
# 替换sentinel值
for arr in [ask_sizes_1, bid_sizes_1, ask_price_1, bid_price_1]:
arr[np.abs(arr) > 1e9] = 0
# mid price
valid = (ask_price_1 > 0) & (bid_price_1 > 0)
mid_prices = np.where(valid, (ask_price_1 + bid_price_1) / 2.0, 0.0)
for i in range(1, N):
if mid_prices[i] == 0 and mid_prices[i-1] != 0:
mid_prices[i] = mid_prices[i-1]
# 收集10档量
ask_sizes = np.zeros((N, 10), dtype=np.float32)
bid_sizes = np.zeros((N, 10), dtype=np.float32)
for i in range(10):
ask_s = df[f'ask_size_{i+1}'].values.astype(np.float32)
bid_s = df[f'bid_size_{i+1}'].values.astype(np.float32)
ask_s[np.abs(ask_s) > 1e9] = 0
bid_s[np.abs(bid_s) > 1e9] = 0
ask_sizes[:, i] = ask_s
bid_sizes[:, i] = bid_s
# 时间戳 (使用ORDER_ID作为序号近似)
timestamps = np.arange(N, dtype=np.float32)
if verbose:
print("Computing algorithm signatures...")
# ============ TWAP特征 ============
order_cv = compute_order_size_cv(sizes, window=20)
periodicity = compute_periodicity(timestamps, window=20)
cancel_burst = compute_cancel_burst_ratio(types, timestamps, window=20)
pa_ratio = compute_passive_aggressive_ratio(prices, mid_prices, buy_sell, window=20)
# TWAP得分: 低变异 + 高周期性 + 边界撤单 + 被动为主
twap_score = np.zeros(N)
twap_score += np.clip(1 - order_cv / 0.3, 0, 1) * 0.35 # CV < 0.3 → 高分
twap_score += periodicity * 0.30 # 周期性
twap_score += np.clip(cancel_burst / 0.5, 0, 1) * 0.20 # 撤单集中度
twap_score += np.clip(1 - pa_ratio / 0.3, 0, 1) * 0.15 # 被动为主
# ============ VWAP特征 ============
# 用全局sizes作为market_volume的代理
market_vol = np.convolve(sizes, np.ones(10)/10, mode='same') # 滑动平均
part_stability = compute_participation_rate(sizes, market_vol, window=50)
vol_corr = compute_volume_correlation(sizes, buy_sell, market_vol, window=50)
# VWAP得分: 稳定参与率 + 高量相关性
vwap_score = np.zeros(N)
vwap_score += np.clip(1 - part_stability / 0.5, 0, 1) * 0.50 # 参与率稳定
vwap_score += vol_corr * 0.50 # 量相关
# ============ 冰山订单特征 ============
refill = compute_refill_ratio(ask_sizes_1, bid_sizes_1, window=20)
hidden_vol = compute_hidden_volume_ratio(sizes, ask_sizes_1, bid_sizes_1, buy_sell, window=50)
# 冰山得分: 高补单率 + 高隐藏量比
iceberg_score = np.zeros(N)
iceberg_score += np.clip(refill / 0.5, 0, 1) * 0.50 # 补单频率
iceberg_score += np.clip(hidden_vol / 5.0, 0, 1) * 0.50 # 隐藏量比
# ============ 支撑/阻力位特征 ============
lob_sizes = np.concatenate([ask_sizes, bid_sizes], axis=1) # (N, 20)
persistence = compute_level_persistence(lob_sizes, window=50)
depth_imb = compute_depth_imbalance(ask_sizes, bid_sizes, top_levels=3)
# 支撑得分: 高持续性 + 不平衡度大
support_score = np.zeros(N)
support_score += persistence * 0.50 # 大单持续性
support_score += np.clip(np.abs(depth_imb) / 0.5, 0, 1) * 0.50 # 深度不平衡
# ============ 多尺度OFI (通用特征) ============
ofi_features = compute_ofi_multi_scale(ask_sizes_1, bid_sizes_1, windows=[5, 10, 20, 50])
# ============ 合并所有得分和特征 ============
scores = np.stack([twap_score, vwap_score, iceberg_score, support_score], axis=1)
# 伪标签: 每种模式用各自的百分位阈值
max_scores = scores.max(axis=1)
labels = np.full(N, 4, dtype=np.int64) # 默认NORMAL
# 每种模式单独设阈值 (取前15-25%为该类)
for cls in range(4):
cls_scores = scores[:, cls]
valid_scores = cls_scores[cls_scores > 0.01]
if len(valid_scores) > 0:
thr = np.percentile(valid_scores, 80) # top 20%
labels[(cls_scores >= thr) & (cls_scores > 0.2)] = cls
# 特征矩阵
all_features = np.column_stack([
order_cv, periodicity, cancel_burst, pa_ratio,
part_stability, vol_corr,
refill, hidden_vol,
persistence, depth_imb,
*[ofi_features[k] for k in sorted(ofi_features.keys())]
]).astype(np.float32)
# 替换NaN/Inf
all_features = np.nan_to_num(all_features, nan=0.0, posinf=0.0, neginf=0.0)
if verbose:
label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'}
unique, counts = np.unique(labels, return_counts=True)
print(f"Pseudo-label distribution:")
for u, c in zip(unique, counts):
print(f" {u} ({label_names[u]}): {c} ({c/N*100:.1f}%)")
print(f"Feature matrix shape: {all_features.shape}")
return labels, scores, all_features
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
from datasets import load_dataset
print("Loading TRADES-LOB dataset...")
ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train")
df = ds.to_pandas()
print(f"Dataset: {len(df)} rows")
labels, scores, features = generate_pseudo_labels(df)
print(f"\nLabel shape: {labels.shape}")
print(f"Score shape: {scores.shape}")
print(f"Feature shape: {features.shape}")
# 展示每种模式的top案例
label_names = {0: 'TWAP', 1: 'VWAP', 2: 'ICEBERG', 3: 'SUPPORT', 4: 'NORMAL'}
for cls in range(4):
cls_mask = labels == cls
if cls_mask.sum() > 0:
top_idx = np.where(cls_mask)[0]
top_scores = scores[top_idx, cls]
best = top_idx[top_scores.argmax()]
print(f"\n{label_names[cls]} 最高置信度样本 (idx={best}, score={scores[best, cls]:.3f}):")
print(f" SIZE={df.iloc[best]['SIZE']}, PRICE={df.iloc[best]['PRICE']}, "
f"BUY_SELL={'Buy' if df.iloc[best]['BUY_SELL_FLAG'] else 'Sell'}")