| from typing import Callable |
| import pytest |
| import torch |
|
|
| pytest.importorskip("torch.cuda") |
| from .test_setup import DTYPES, DTYPE_TO_TOLS, PARAMS, SEED |
| from flex_sae import ( |
| triton_hierarchical_sae_loss, |
| hierarchical_sae_loss, |
| triton_topk_sae_loss, |
| topk_sae_loss, |
| ) |
|
|
|
|
| @pytest.fixture(autouse=True) |
| def _set_cuda_default_device(): |
| torch.set_default_device("cuda") |
|
|
|
|
| def run_funcs(B, K, F, D, dtype, *, kernel_foo: Callable, ref_foo: Callable): |
| if dtype is torch.bfloat16 and not torch.cuda.is_bf16_supported(): |
| pytest.skip("BF16 not supported on this GPU") |
|
|
| torch.manual_seed(SEED) |
|
|
| indices = torch.randint(0, F, (B, K), dtype=torch.long, device="cuda") |
|
|
| vals = torch.randn(B, K, dtype=dtype, device="cuda").abs().requires_grad_() |
| decoder = torch.randn(F, D, dtype=dtype, device="cuda", requires_grad=True) |
| bias = torch.randn(D, dtype=dtype, device="cuda", requires_grad=True) |
| target = torch.randn(B, D, dtype=dtype, device="cuda") |
|
|
| sv_ref = vals.clone().detach().requires_grad_() |
| dec_ref = decoder.clone().detach().requires_grad_() |
| bias_ref = bias.clone().detach().requires_grad_() |
|
|
| loss_f = kernel_foo(indices, decoder, vals, bias, target) |
| loss_r = ref_foo(indices, dec_ref, sv_ref, bias_ref, target) |
|
|
| torch.testing.assert_close(loss_f, loss_r, **DTYPE_TO_TOLS[dtype]) |
|
|
| grad_out = torch.randn((), device="cuda", dtype=torch.float32) |
| loss_f.backward(grad_out) |
| loss_r.backward(grad_out.clone()) |
|
|
| torch.testing.assert_close(vals.grad, sv_ref.grad, **DTYPE_TO_TOLS[dtype]) |
| torch.testing.assert_close(decoder.grad, dec_ref.grad, **DTYPE_TO_TOLS[dtype]) |
| torch.testing.assert_close(bias.grad, bias_ref.grad, **DTYPE_TO_TOLS[dtype]) |
|
|
| assert indices.grad is None |
|
|
|
|
| @pytest.mark.parametrize("B, K, F, D", PARAMS) |
| @pytest.mark.parametrize("dtype", DTYPES) |
| def test_triton_hierarchical_sae_loss_and_grads(B, K, F, D, dtype): |
| run_funcs(B, K, F, D, dtype, kernel_foo=triton_hierarchical_sae_loss, ref_foo=hierarchical_sae_loss) |
| torch.cuda.empty_cache() |
|
|
|
|
| @pytest.mark.parametrize("B, K, F, D", PARAMS) |
| @pytest.mark.parametrize("dtype", DTYPES) |
| def test_topk_sae_loss_and_grads(B, K, F, D, dtype): |
| run_funcs( |
| B, K, F, D, dtype, kernel_foo=triton_topk_sae_loss, ref_foo=topk_sae_loss |
| ) |
| torch.cuda.empty_cache() |
|
|