File size: 3,446 Bytes
b4b2877
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Models for T8 v3 — privileged future-pressure conditioning.

Wraps the existing TransformerForecast (DAF) to accept future pressure as
side-channel context. The future pressure trajectory is encoded into T_fut
tokens that get appended to the past memory; future queries cross-attend
over the union (past sensors + future pressure). This is privileged
information (oracle) — at test time we'd not have future pressure — so
this is a hypothesis-test setup, not a deployable forecaster.
"""
from __future__ import annotations
from typing import Dict

import torch
import torch.nn as nn


class _PerModalityProj(nn.Module):
    def __init__(self, modality_dims, d_model):
        super().__init__()
        self.proj = nn.ModuleDict({
            m: nn.Linear(d, d_model) for m, d in modality_dims.items()
        })
        self.mod_emb = nn.Parameter(torch.zeros(len(modality_dims), d_model))
        nn.init.trunc_normal_(self.mod_emb, std=0.02)
        self.mods = list(modality_dims.keys())

    def forward(self, x):
        out = None
        for i, m in enumerate(self.mods):
            h = self.proj[m](x[m]) + self.mod_emb[i]
            out = h if out is None else out + h
        return out / len(self.mods)


class DAFFuturePressure(nn.Module):
    """DAF backbone + future-pressure conditioning."""

    def __init__(self, modality_dims: Dict[str, int], target_dim: int,
                 t_obs: int, t_fut: int, future_pressure_dim: int = 50,
                 d_model: int = 128, n_heads: int = 4, n_layers: int = 2,
                 dropout: float = 0.1):
        super().__init__()
        self.t_obs = t_obs
        self.t_fut = t_fut
        self.embed = _PerModalityProj(modality_dims, d_model)
        self.pos = nn.Parameter(torch.zeros(1, t_obs, d_model))
        nn.init.trunc_normal_(self.pos, std=0.02)
        layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads, dim_feedforward=4 * d_model,
            dropout=dropout, batch_first=True, activation="gelu",
        )
        self.encoder = nn.TransformerEncoder(layer, num_layers=n_layers)
        # future-pressure encoder
        self.fp_proj = nn.Linear(future_pressure_dim, d_model)
        self.fp_pos = nn.Parameter(torch.zeros(1, t_fut, d_model))
        nn.init.trunc_normal_(self.fp_pos, std=0.02)
        self.fp_seg = nn.Parameter(torch.zeros(1, 1, d_model))             # segment id
        nn.init.trunc_normal_(self.fp_seg, std=0.02)
        # decoder side
        self.queries = nn.Parameter(torch.zeros(1, t_fut, d_model))
        nn.init.trunc_normal_(self.queries, std=0.02)
        self.cross_attn = nn.MultiheadAttention(
            d_model, n_heads, dropout=dropout, batch_first=True
        )
        self.norm = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, target_dim)

    def forward(self, x: Dict[str, torch.Tensor],
                future_pressure: torch.Tensor) -> torch.Tensor:
        h_past = self.encoder(self.embed(x) + self.pos)         # (B, T_obs, D)
        h_fp = self.fp_proj(future_pressure) + self.fp_pos + self.fp_seg
        memory = torch.cat([h_past, h_fp], dim=1)                # (B, T_obs+T_fut, D)
        q = self.queries.expand(memory.size(0), -1, -1)          # (B, T_fut, D)
        out, _ = self.cross_attn(q, memory, memory, need_weights=False)
        out = self.norm(out)
        return self.head(out)                                    # (B, T_fut, target_dim)