""" Data pipeline for domain sequence CLM pre-training. Pipeline: 1. Raw events -> DomainTokenizerBuilder.tokenize_sequence() -> token strings 2. Token strings -> HF tokenizer -> token IDs (variable length) 3. Token ID sequences -> pack into fixed-length blocks (group_texts pattern) 4. Packed blocks -> DataCollatorForLanguageModeling -> {input_ids, labels, attention_mask} Packing follows the official HF run_clm.py pattern: concatenate all tokenized sequences, split into fixed-length blocks. Zero padding waste, 100% token utilization. """ import logging from typing import Any, Dict, List, Optional, Sequence from datasets import Dataset as HFDataset from ..tokenizers.domain_tokenizer import DomainTokenizerBuilder logger = logging.getLogger(__name__) def tokenize_user_sequences( user_sequences: Sequence[Sequence[Dict[str, Any]]], builder: DomainTokenizerBuilder, hf_tokenizer, add_bos: bool = True, add_eos: bool = True, num_proc: int = 1, ) -> List[List[int]]: """Tokenize user event sequences into token ID lists.""" all_token_ids = [] for events in user_sequences: token_strings = builder.tokenize_sequence(events, add_bos=add_bos, add_eos=add_eos) token_text = " ".join(token_strings) encoding = hf_tokenizer(token_text, add_special_tokens=False) all_token_ids.append(encoding["input_ids"]) return all_token_ids def pack_sequences(token_id_sequences: List[List[int]], block_size: int = 512) -> HFDataset: """Pack variable-length token sequences into fixed-length blocks. Follows the official HF run_clm.py pattern: concatenate all sequences into one long stream, split into fixed-length blocks, drop remainder. Achieves 100% token utilization with zero padding waste. """ concatenated = [] for seq in token_id_sequences: concatenated.extend(seq) total_tokens = len(concatenated) n_blocks = total_tokens // block_size dropped = total_tokens - n_blocks * block_size if n_blocks == 0: raise ValueError( f"Not enough tokens ({total_tokens}) to form even one block of size {block_size}. " f"Reduce block_size or add more data." ) logger.info(f"Packing: {total_tokens:,} tokens -> {n_blocks:,} blocks of {block_size} " f"({dropped} tokens dropped, {dropped/total_tokens*100:.1f}% waste)") packed = [concatenated[i * block_size : (i + 1) * block_size] for i in range(n_blocks)] return HFDataset.from_dict({"input_ids": packed}) def prepare_clm_dataset( user_sequences: Sequence[Sequence[Dict[str, Any]]], builder: DomainTokenizerBuilder, hf_tokenizer, block_size: int = 512, add_bos: bool = True, add_eos: bool = True, ) -> HFDataset: """Full pipeline: user event sequences -> packed CLM training dataset. Example: >>> dataset = prepare_clm_dataset(user_sequences, builder, hf_tokenizer, block_size=512) >>> collator = DataCollatorForLanguageModeling(tokenizer=hf_tokenizer, mlm=False) >>> trainer = Trainer(model=model, train_dataset=dataset, data_collator=collator, ...) """ token_id_sequences = tokenize_user_sequences( user_sequences, builder, hf_tokenizer, add_bos=add_bos, add_eos=add_eos, ) total_tokens = sum(len(seq) for seq in token_id_sequences) avg_tokens = total_tokens / max(len(token_id_sequences), 1) logger.info(f"Tokenized {len(token_id_sequences)} user sequences -> " f"{total_tokens:,} tokens (avg {avg_tokens:.1f} tokens/sequence)") return pack_sequences(token_id_sequences, block_size=block_size)