| """ |
| Tests for domainTokenizer Phase 2C: Pre-training Pipeline. |
| 19 tests covering tokenization, packing, collation, integration, and Trainer smoke test. |
| |
| Run: pytest tests/test_training.py -v |
| """ |
|
|
| import logging |
| import random |
| from datetime import datetime, timedelta |
| from typing import Any, Dict, List |
|
|
| import numpy as np |
| import torch |
| import pytest |
|
|
| from datasets import Dataset as HFDataset |
| from transformers import DataCollatorForLanguageModeling |
|
|
| from domain_tokenizer.schemas.predefined import FINANCE_SCHEMA |
| from domain_tokenizer.tokenizers.domain_tokenizer import DomainTokenizerBuilder |
| from domain_tokenizer.models.configuration import DomainTransformerConfig |
| from domain_tokenizer.models.modeling import DomainTransformerForCausalLM |
| from domain_tokenizer.training.data_pipeline import ( |
| tokenize_user_sequences, pack_sequences, prepare_clm_dataset, |
| ) |
| from domain_tokenizer.training.pretrain import pretrain_domain_model |
|
|
| logging.basicConfig(level=logging.INFO) |
|
|
|
|
| def make_finance_events(n_events=10, seed=42): |
| rng = random.Random(seed) |
| merchants = ["AMAZON", "UBER", "SALARY", "GROCERY", "NETFLIX", "GAS", "RESTAURANT", "PHARMACY"] |
| base_date = datetime(2025, 1, 1) |
| return [ |
| {"amount_sign": (amt := rng.uniform(5, 5000) * rng.choice([1, -1])), |
| "amount": amt, |
| "timestamp": base_date + timedelta(days=rng.randint(0, 365), hours=rng.randint(0, 23)), |
| "description": rng.choice(merchants)} |
| for _ in range(n_events) |
| ] |
|
|
|
|
| def make_user_sequences(n_users=20, min_events=5, max_events=30, seed=42): |
| rng = random.Random(seed) |
| return [make_finance_events(rng.randint(min_events, max_events), seed + i) for i in range(n_users)] |
|
|
|
|
| def build_finance_tokenizer(events_flat): |
| builder = DomainTokenizerBuilder(FINANCE_SCHEMA) |
| builder.fit(events_flat) |
| text_corpus = list(set(e["description"] for e in events_flat)) * 20 |
| return builder, builder.build(text_corpus=text_corpus, bpe_vocab_size=500) |
|
|
|
|
| class TestTokenizeUserSequences: |
| @pytest.fixture |
| def setup(self): |
| seqs = make_user_sequences(5, 3, 10) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| return seqs, b, t |
|
|
| def test_returns_list_of_lists(self, setup): |
| seqs, b, t = setup |
| r = tokenize_user_sequences(seqs, b, t) |
| assert len(r) == 5 and all(isinstance(s, list) for s in r) |
|
|
| def test_variable_lengths(self, setup): |
| seqs, b, t = setup |
| r = tokenize_user_sequences(seqs, b, t) |
| assert len(set(len(s) for s in r)) > 1 |
|
|
| def test_bos_eos_present(self, setup): |
| seqs, b, t = setup |
| r = tokenize_user_sequences(seqs, b, t, add_bos=True, add_eos=True) |
| bos = t.convert_tokens_to_ids("[BOS]") |
| assert all(bos in s[:5] for s in r) |
|
|
| def test_no_bos_eos(self, setup): |
| seqs, b, t = setup |
| with_ = tokenize_user_sequences(seqs[:1], b, t, add_bos=True, add_eos=True) |
| without = tokenize_user_sequences(seqs[:1], b, t, add_bos=False, add_eos=False) |
| assert len(without[0]) < len(with_[0]) |
|
|
|
|
| class TestPackSequences: |
| def test_fixed_length(self): |
| ds = pack_sequences([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15]], block_size=5) |
| assert len(ds) == 3 and all(len(r["input_ids"]) == 5 for r in ds) |
|
|
| def test_concat(self): |
| ds = pack_sequences([[1, 2, 3], [4, 5, 6]], block_size=3) |
| assert ds[0]["input_ids"] == [1, 2, 3] and ds[1]["input_ids"] == [4, 5, 6] |
|
|
| def test_drops_remainder(self): |
| ds = pack_sequences([[1, 2, 3, 4, 5, 6, 7]], block_size=3) |
| assert len(ds) == 2 |
|
|
| def test_too_few(self): |
| with pytest.raises(ValueError): |
| pack_sequences([[1, 2]], block_size=10) |
|
|
| def test_hf_dataset(self): |
| assert isinstance(pack_sequences([[i for i in range(100)]], block_size=10), HFDataset) |
|
|
| def test_no_padding(self): |
| ds = pack_sequences([[i for i in range(50)] for _ in range(10)], block_size=25) |
| assert all(len(r["input_ids"]) == 25 for r in ds) |
|
|
|
|
| class TestPrepareCLMDataset: |
| def test_full(self): |
| seqs = make_user_sequences(10, 5, 15) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| ds = prepare_clm_dataset(seqs, b, t, block_size=64) |
| assert len(ds) > 0 and all(len(r["input_ids"]) == 64 for r in ds) |
|
|
| def test_block_sizes(self): |
| seqs = make_user_sequences(10, 10, 20) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| ds32 = prepare_clm_dataset(seqs, b, t, block_size=32) |
| ds64 = prepare_clm_dataset(seqs, b, t, block_size=64) |
| assert len(ds32) > len(ds64) |
|
|
|
|
| class TestDataCollator: |
| @pytest.fixture |
| def setup(self): |
| seqs = make_user_sequences(5, 5, 15) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| ds = prepare_clm_dataset(seqs, b, t, block_size=32) |
| return ds, DataCollatorForLanguageModeling(tokenizer=t, mlm=False), t |
|
|
| def test_adds_labels(self, setup): |
| ds, c, _ = setup |
| batch = c([ds[i] for i in range(min(4, len(ds)))]) |
| assert all(k in batch for k in ["input_ids", "labels", "attention_mask"]) |
|
|
| def test_labels_eq_ids(self, setup): |
| ds, c, _ = setup |
| batch = c([ds[0]]) |
| assert torch.equal(batch["input_ids"], batch["labels"]) |
|
|
| def test_shapes(self, setup): |
| ds, c, _ = setup |
| n = min(4, len(ds)) |
| batch = c([ds[i] for i in range(n)]) |
| assert batch["input_ids"].shape == (n, 32) |
|
|
| def test_all_ones_mask(self, setup): |
| ds, c, _ = setup |
| batch = c([ds[0]]) |
| assert batch["attention_mask"].sum() == 32 |
|
|
|
|
| class TestTrainingIntegration: |
| def test_forward(self): |
| seqs = make_user_sequences(10, 5, 15) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| ds = prepare_clm_dataset(seqs, b, t, block_size=32) |
| c = DataCollatorForLanguageModeling(tokenizer=t, mlm=False) |
| batch = c([ds[i] for i in range(min(4, len(ds)))]) |
| config = DomainTransformerConfig(vocab_size=t.vocab_size, hidden_size=64, |
| num_hidden_layers=2, num_attention_heads=4, intermediate_size=128) |
| model = DomainTransformerForCausalLM(config) |
| out = model(**batch) |
| assert out.loss.item() > 0 |
| out.loss.backward() |
| assert sum(p.grad.norm().item() for p in model.parameters() if p.grad is not None) > 0 |
|
|
|
|
| class TestPretrainDomainModel: |
| def test_smoke(self, tmp_path): |
| seqs = make_user_sequences(20, 5, 15) |
| flat = [e for s in seqs for e in s] |
| b, t = build_finance_tokenizer(flat) |
| ds = prepare_clm_dataset(seqs, b, t, block_size=32) |
| config = DomainTransformerConfig(vocab_size=t.vocab_size, hidden_size=64, |
| num_hidden_layers=2, num_attention_heads=4, intermediate_size=128) |
| model = DomainTransformerForCausalLM(config) |
| trainer = pretrain_domain_model( |
| model=model, tokenizer=t, train_dataset=ds, |
| output_dir=str(tmp_path / "ck"), hub_model_id=None, |
| num_epochs=1, per_device_batch_size=4, gradient_accumulation_steps=1, |
| learning_rate=1e-3, warmup_steps=0, logging_steps=1, |
| save_steps=999999, report_to="none", seed=42, |
| ) |
| assert trainer.state.global_step > 0 |
|
|
| def test_no_pad_raises(self, tmp_path): |
| from transformers import PreTrainedTokenizerFast |
| from tokenizers import Tokenizer |
| from tokenizers.models import BPE |
| hf = PreTrainedTokenizerFast(tokenizer_object=Tokenizer(BPE(unk_token="[UNK]")), unk_token="[UNK]") |
| config = DomainTransformerConfig(vocab_size=100, hidden_size=32, num_hidden_layers=1, num_attention_heads=2) |
| with pytest.raises(ValueError, match="pad_token"): |
| pretrain_domain_model( |
| model=DomainTransformerForCausalLM(config), tokenizer=hf, |
| train_dataset=HFDataset.from_dict({"input_ids": [[1, 2, 3]]}), |
| output_dir=str(tmp_path), |
| ) |
|
|