File size: 3,219 Bytes
d730c93
 
8171f7d
d730c93
 
8171f7d
d730c93
8171f7d
 
 
d730c93
8171f7d
 
 
 
 
d730c93
8171f7d
 
 
 
 
 
 
 
d730c93
441286f
8171f7d
 
441286f
8171f7d
c9fc5f7
8171f7d
c9fc5f7
8171f7d
c9fc5f7
441286f
8171f7d
 
441286f
8171f7d
 
 
441286f
8171f7d
 
 
 
 
 
 
 
 
c9fc5f7
d730c93
8171f7d
 
 
 
 
 
 
 
 
d730c93
 
8171f7d
 
 
 
 
 
 
 
 
 
441286f
8171f7d
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import torch
import torch.nn as nn
import torch.nn.functional as F

class BilinearNorm(nn.Module):
    def __init__(self, d):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(1,1,d))
        self.beta = nn.Parameter(torch.zeros(1,1,d))
        self.gate = nn.Parameter(torch.ones(1,1,d))
    def forward(self, x):
        m = x.mean(1, keepdim=True)
        s = x.std(1, keepdim=True) + 1e-8
        xn = (x - m) / s
        g = torch.sigmoid(self.gate)
        return g * (self.gamma * xn + self.beta) + (1 - g) * x


class LOBAlgoNet(nn.Module):
    """
    CNN + Transformer model for algorithmic order pattern detection.
    Input: (B, T=100, 40) normalized LOB snapshots
    Output: (B, 5) logits for [TWAP, VWAP, ICEBERG, SUPPORT, NORMAL]
    """
    def __init__(self, num_classes=5, d_model=128, nhead=4, dropout=0.25):
        super().__init__()
        self.norm = BilinearNorm(40)
        
        # Spatial CNN: cross-level patterns
        self.spatial = nn.Sequential(
            nn.Conv2d(1, 32, (1,2), stride=(1,2)),  # 40→20
            nn.BatchNorm2d(32), nn.LeakyReLU(0.01),
            nn.Conv2d(32, 32, (1,2), stride=(1,2)), # 20→10
            nn.BatchNorm2d(32), nn.LeakyReLU(0.01),
            nn.Conv2d(32, 32, (1,10)),               # 10→1
            nn.BatchNorm2d(32), nn.LeakyReLU(0.01),
        )
        
        # Temporal CNN: multi-scale temporal features
        self.temporal = nn.Sequential(
            nn.Conv1d(32, 64, 3, padding=1), nn.BatchNorm1d(64), nn.LeakyReLU(0.01), nn.Dropout(dropout),
            nn.Conv1d(64, 64, 5, padding=2), nn.BatchNorm1d(64), nn.LeakyReLU(0.01), nn.Dropout(dropout),
            nn.Conv1d(64, d_model, 3, padding=1), nn.BatchNorm1d(d_model), nn.LeakyReLU(0.01), nn.Dropout(dropout),
        )
        
        # Transformer attention
        enc_layer = nn.TransformerEncoderLayer(d_model, nhead, d_model*2, dropout, batch_first=True, activation='gelu')
        self.attention = nn.TransformerEncoder(enc_layer, num_layers=2)
        
        # Classifier
        self.head = nn.Sequential(
            nn.LayerNorm(d_model), nn.Dropout(dropout),
            nn.Linear(d_model, 64), nn.GELU(), nn.Dropout(dropout),
            nn.Linear(64, num_classes)
        )
        
        self._init()
    
    def _init(self):
        for m in self.modules():
            if isinstance(m, (nn.Linear, nn.Conv1d, nn.Conv2d)):
                nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None: nn.init.zeros_(m.bias)
    
    def forward(self, x):
        x = self.norm(x)
        x = self.spatial(x.unsqueeze(1)).squeeze(-1)  # (B,32,T)
        x = self.temporal(x)                            # (B,d_model,T)
        x = self.attention(x.permute(0,2,1))            # (B,T,d_model)
        x = x.mean(dim=1)                              # (B,d_model)
        return self.head(x)
    
    def get_embeddings(self, x):
        """提取特征向量,用于聚类/可视化分析"""
        x = self.norm(x)
        x = self.spatial(x.unsqueeze(1)).squeeze(-1)
        x = self.temporal(x)
        x = self.attention(x.permute(0,2,1))
        return x.mean(dim=1)  # (B, d_model)