| """ |
| Data pipeline for domain sequence CLM pre-training. |
| |
| Pipeline: |
| 1. Raw events -> DomainTokenizerBuilder.tokenize_sequence() -> token strings |
| 2. Token strings -> HF tokenizer -> token IDs (variable length) |
| 3. Token ID sequences -> pack into fixed-length blocks (group_texts pattern) |
| 4. Packed blocks -> DataCollatorForLanguageModeling -> {input_ids, labels, attention_mask} |
| |
| Packing follows the official HF run_clm.py pattern: concatenate all tokenized |
| sequences, split into fixed-length blocks. Zero padding waste, 100% token utilization. |
| """ |
|
|
| import logging |
| from typing import Any, Dict, List, Optional, Sequence |
|
|
| from datasets import Dataset as HFDataset |
|
|
| from ..tokenizers.domain_tokenizer import DomainTokenizerBuilder |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def tokenize_user_sequences( |
| user_sequences: Sequence[Sequence[Dict[str, Any]]], |
| builder: DomainTokenizerBuilder, |
| hf_tokenizer, |
| add_bos: bool = True, |
| add_eos: bool = True, |
| num_proc: int = 1, |
| ) -> List[List[int]]: |
| """Tokenize user event sequences into token ID lists.""" |
| all_token_ids = [] |
| for events in user_sequences: |
| token_strings = builder.tokenize_sequence(events, add_bos=add_bos, add_eos=add_eos) |
| token_text = " ".join(token_strings) |
| encoding = hf_tokenizer(token_text, add_special_tokens=False) |
| all_token_ids.append(encoding["input_ids"]) |
| return all_token_ids |
|
|
|
|
| def pack_sequences(token_id_sequences: List[List[int]], block_size: int = 512) -> HFDataset: |
| """Pack variable-length token sequences into fixed-length blocks. |
| |
| Follows the official HF run_clm.py pattern: concatenate all sequences |
| into one long stream, split into fixed-length blocks, drop remainder. |
| Achieves 100% token utilization with zero padding waste. |
| """ |
| concatenated = [] |
| for seq in token_id_sequences: |
| concatenated.extend(seq) |
|
|
| total_tokens = len(concatenated) |
| n_blocks = total_tokens // block_size |
| dropped = total_tokens - n_blocks * block_size |
|
|
| if n_blocks == 0: |
| raise ValueError( |
| f"Not enough tokens ({total_tokens}) to form even one block of size {block_size}. " |
| f"Reduce block_size or add more data." |
| ) |
|
|
| logger.info(f"Packing: {total_tokens:,} tokens -> {n_blocks:,} blocks of {block_size} " |
| f"({dropped} tokens dropped, {dropped/total_tokens*100:.1f}% waste)") |
|
|
| packed = [concatenated[i * block_size : (i + 1) * block_size] for i in range(n_blocks)] |
| return HFDataset.from_dict({"input_ids": packed}) |
|
|
|
|
| def prepare_clm_dataset( |
| user_sequences: Sequence[Sequence[Dict[str, Any]]], |
| builder: DomainTokenizerBuilder, |
| hf_tokenizer, |
| block_size: int = 512, |
| add_bos: bool = True, |
| add_eos: bool = True, |
| ) -> HFDataset: |
| """Full pipeline: user event sequences -> packed CLM training dataset. |
| |
| Example: |
| >>> dataset = prepare_clm_dataset(user_sequences, builder, hf_tokenizer, block_size=512) |
| >>> collator = DataCollatorForLanguageModeling(tokenizer=hf_tokenizer, mlm=False) |
| >>> trainer = Trainer(model=model, train_dataset=dataset, data_collator=collator, ...) |
| """ |
| token_id_sequences = tokenize_user_sequences( |
| user_sequences, builder, hf_tokenizer, add_bos=add_bos, add_eos=add_eos, |
| ) |
| total_tokens = sum(len(seq) for seq in token_id_sequences) |
| avg_tokens = total_tokens / max(len(token_id_sequences), 1) |
| logger.info(f"Tokenized {len(token_id_sequences)} user sequences -> " |
| f"{total_tokens:,} tokens (avg {avg_tokens:.1f} tokens/sequence)") |
| return pack_sequences(token_id_sequences, block_size=block_size) |
|
|