| import torch |
|
|
| def precompute_freqs_cis(head_dim: int, max_seq_len: int, theta: float = 10000.0): |
| |
| freq_seq = torch.arange(0, head_dim, 2).float() / head_dim |
| freqs = 1.0 / (theta ** freq_seq) |
|
|
| |
| t = torch.arange(max_seq_len, dtype=torch.float32) |
| angles = torch.outer(t, freqs) |
| |
| |
| freqs_cis = torch.polar( |
| torch.ones_like(angles), |
| angles |
| ) |
| return freqs_cis |
|
|
|
|
| def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor): |
| """ |
| x is [B, n_heads, seq_len, head_dim_as_complex], |
| so we want to broadcast freqs_cis from [max_seq_len, half_dim] |
| to [1, 1, seq_len, half_dim]. |
| """ |
| seq_len = x.shape[2] |
| freqs_cis = freqs_cis[:seq_len] |
| return freqs_cis.view(1, 1, seq_len, -1) |
|
|
|
|
| def apply_rotary_emb( |
| xq: torch.Tensor, |
| xk: torch.Tensor, |
| freqs_cis: torch.Tensor, |
| ) -> tuple[torch.Tensor, torch.Tensor]: |
| |
| |
| xq_complex = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) |
| xk_complex = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) |
|
|
| |
| freqs_cis = reshape_for_broadcast(freqs_cis, xq_complex) |
|
|
| |
| xq_complex = xq_complex * freqs_cis |
| xk_complex = xk_complex * freqs_cis |
|
|
| |
| xq_out = torch.view_as_real(xq_complex).reshape(*xq.shape) |
| xk_out = torch.view_as_real(xk_complex).reshape(*xk.shape) |
| return xq_out.type_as(xq), xk_out.type_as(xk) |
|
|
|
|
| def main(): |
| import math |
| from torch.testing import assert_close |
|
|
| |
| dim = 2 |
| freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=1, theta=1.0) |
| xq = torch.tensor([[[[1.0, 0.0]]]]) |
| xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
| assert_close(xq_out, xq, msg="Test 1 failed") |
| print("Test 1 passed.") |
|
|
| |
| L = 5 |
| freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=L, theta=1.0) |
| xq = torch.tensor([[[[1.0, 0.0] for _ in range(L)]]]) |
| xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
| expected = torch.tensor([[[[math.cos(p), math.sin(p)] for p in range(L)]]]) |
| assert_close(xq_out, expected, rtol=1e-6, atol=1e-6, msg="Test 2 failed") |
| print("Test 2 passed.") |
|
|
| |
| xq = torch.tensor([[[[1.0, 0.0, 1.0, 0.0]]]]) |
| freqs_cis = precompute_freqs_cis(dim=4, max_seq_len=1, theta=1.0) |
| xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
| assert_close(xq_out, xq, msg="Test 3 failed") |
| print("Test 3 passed.") |
|
|
| |
| torch.manual_seed(1337) |
| B, H, L, D = 2, 3, 5, 8 |
| xq = torch.randn(B, H, L, D) |
| xk = torch.randn(B, H, L, D) |
| freqs_cis = precompute_freqs_cis(dim=D, max_seq_len=L, theta=1.0) |
| xq_out, xk_out = apply_rotary_emb(xq, xk, freqs_cis) |
| assert xq_out.shape == (B, H, L, D), "Test 4 Q shape failed" |
| assert xk_out.shape == (B, H, L, D), "Test 4 K shape failed" |
| for b in range(B): |
| for h in range(H): |
| for l in range(L): |
| assert torch.allclose(xq[b,h,l].norm(), xq_out[b,h,l].norm(), atol=1e-5), "Test 4 Q norm failed" |
| assert torch.allclose(xk[b,h,l].norm(), xk_out[b,h,l].norm(), atol=1e-5), "Test 4 K norm failed" |
| print("Test 4 passed.\nAll tests passed successfully!") |
|
|
| if __name__ == "__main__": |
| main() |