File size: 3,651 Bytes
1dfd4e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Data pipeline for domain sequence CLM pre-training.

Pipeline:
  1. Raw events -> DomainTokenizerBuilder.tokenize_sequence() -> token strings
  2. Token strings -> HF tokenizer -> token IDs (variable length)
  3. Token ID sequences -> pack into fixed-length blocks (group_texts pattern)
  4. Packed blocks -> DataCollatorForLanguageModeling -> {input_ids, labels, attention_mask}

Packing follows the official HF run_clm.py pattern: concatenate all tokenized
sequences, split into fixed-length blocks. Zero padding waste, 100% token utilization.
"""

import logging
from typing import Any, Dict, List, Optional, Sequence

from datasets import Dataset as HFDataset

from ..tokenizers.domain_tokenizer import DomainTokenizerBuilder

logger = logging.getLogger(__name__)


def tokenize_user_sequences(
    user_sequences: Sequence[Sequence[Dict[str, Any]]],
    builder: DomainTokenizerBuilder,
    hf_tokenizer,
    add_bos: bool = True,
    add_eos: bool = True,
    num_proc: int = 1,
) -> List[List[int]]:
    """Tokenize user event sequences into token ID lists."""
    all_token_ids = []
    for events in user_sequences:
        token_strings = builder.tokenize_sequence(events, add_bos=add_bos, add_eos=add_eos)
        token_text = " ".join(token_strings)
        encoding = hf_tokenizer(token_text, add_special_tokens=False)
        all_token_ids.append(encoding["input_ids"])
    return all_token_ids


def pack_sequences(token_id_sequences: List[List[int]], block_size: int = 512) -> HFDataset:
    """Pack variable-length token sequences into fixed-length blocks.

    Follows the official HF run_clm.py pattern: concatenate all sequences
    into one long stream, split into fixed-length blocks, drop remainder.
    Achieves 100% token utilization with zero padding waste.
    """
    concatenated = []
    for seq in token_id_sequences:
        concatenated.extend(seq)

    total_tokens = len(concatenated)
    n_blocks = total_tokens // block_size
    dropped = total_tokens - n_blocks * block_size

    if n_blocks == 0:
        raise ValueError(
            f"Not enough tokens ({total_tokens}) to form even one block of size {block_size}. "
            f"Reduce block_size or add more data."
        )

    logger.info(f"Packing: {total_tokens:,} tokens -> {n_blocks:,} blocks of {block_size} "
                f"({dropped} tokens dropped, {dropped/total_tokens*100:.1f}% waste)")

    packed = [concatenated[i * block_size : (i + 1) * block_size] for i in range(n_blocks)]
    return HFDataset.from_dict({"input_ids": packed})


def prepare_clm_dataset(
    user_sequences: Sequence[Sequence[Dict[str, Any]]],
    builder: DomainTokenizerBuilder,
    hf_tokenizer,
    block_size: int = 512,
    add_bos: bool = True,
    add_eos: bool = True,
) -> HFDataset:
    """Full pipeline: user event sequences -> packed CLM training dataset.

    Example:
        >>> dataset = prepare_clm_dataset(user_sequences, builder, hf_tokenizer, block_size=512)
        >>> collator = DataCollatorForLanguageModeling(tokenizer=hf_tokenizer, mlm=False)
        >>> trainer = Trainer(model=model, train_dataset=dataset, data_collator=collator, ...)
    """
    token_id_sequences = tokenize_user_sequences(
        user_sequences, builder, hf_tokenizer, add_bos=add_bos, add_eos=add_eos,
    )
    total_tokens = sum(len(seq) for seq in token_id_sequences)
    avg_tokens = total_tokens / max(len(token_id_sequences), 1)
    logger.info(f"Tokenized {len(token_id_sequences)} user sequences -> "
                f"{total_tokens:,} tokens (avg {avg_tokens:.1f} tokens/sequence)")
    return pack_sequences(token_id_sequences, block_size=block_size)