| """ |
| Domain Tokenizer Builder — assembles per-field tokenizers into an |
| HF-compatible PreTrainedTokenizerFast. |
| |
| This is the core of domainTokenizer: it takes a DomainSchema, builds |
| per-field tokenizers, fits data-dependent ones, and produces a single |
| HuggingFace tokenizer that can encode domain events as token ID sequences. |
| |
| The output tokenizer is fully compatible with HF Trainer, push_to_hub, |
| from_pretrained, etc. |
| |
| References: |
| - Nubank nuFormer: V = V_special(97) U V_BPE -- ~14 tokens/transaction |
| - ActionPiece: items as unordered feature sets -> tokenized sequences |
| """ |
|
|
| import json |
| import os |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union |
|
|
| import numpy as np |
| from tokenizers import Tokenizer, decoders, models, pre_tokenizers, trainers |
| from transformers import PreTrainedTokenizerFast |
|
|
| from ..schema import DomainSchema, FieldSpec, FieldType |
| from .field_tokenizers import ( |
| BaseFieldTokenizer, |
| CalendarTokenizer, |
| CategoricalTokenizer, |
| DiscreteNumericalTokenizer, |
| MagnitudeBucketTokenizer, |
| SignTokenizer, |
| create_field_tokenizer, |
| ) |
|
|
|
|
| |
| CONTROL_TOKENS = ["[PAD]", "[UNK]", "[BOS]", "[EOS]", "[MASK]", "[CLS]", "[SEP]"] |
|
|
|
|
| class DomainTokenizerBuilder: |
| """Builds an HF-compatible tokenizer from a DomainSchema. |
| |
| Workflow: |
| 1. builder = DomainTokenizerBuilder(schema) |
| 2. builder.fit(events) # fit magnitude bins etc. |
| 3. hf_tok = builder.build(text_corpus) # build HF tokenizer |
| 4. tokens = builder.tokenize_event(event) # tokenize a single event |
| 5. ids = hf_tok(tokens_str) # convert to IDs |
| |
| Or use the convenience method: |
| 6. ids = builder.encode_event(event, hf_tok) # event -> IDs in one call |
| 7. ids = builder.encode_sequence(events, hf_tok) # full sequence -> IDs |
| |
| Example (finance): |
| >>> from domain_tokenizer.schemas.predefined import FINANCE_SCHEMA |
| >>> builder = DomainTokenizerBuilder(FINANCE_SCHEMA) |
| >>> builder.fit(training_events) |
| >>> hf_tokenizer = builder.build(text_corpus=descriptions) |
| >>> token_ids = builder.encode_sequence(user_transactions, hf_tokenizer, max_length=2048) |
| """ |
|
|
| def __init__(self, schema: DomainSchema): |
| self.schema = schema |
| self.field_tokenizers: Dict[str, Optional[BaseFieldTokenizer]] = {} |
| self._is_fitted = False |
| self._build_field_tokenizers() |
|
|
| def _build_field_tokenizers(self): |
| """Instantiate a field tokenizer for each field in the schema.""" |
| for spec in self.schema.fields: |
| self.field_tokenizers[spec.name] = create_field_tokenizer(spec) |
|
|
| def fit(self, events: Sequence[Dict[str, Any]]) -> "DomainTokenizerBuilder": |
| """Fit data-dependent tokenizers on training events. |
| |
| Currently fits: NUMERICAL_CONTINUOUS fields (magnitude bucket bins). |
| |
| Args: |
| events: Iterable of event dicts, e.g. [{"amount": 79.99, ...}, ...] |
| |
| Returns: |
| self (for chaining) |
| """ |
| for spec in self.schema.fields: |
| if spec.field_type == FieldType.NUMERICAL_CONTINUOUS: |
| tok = self.field_tokenizers[spec.name] |
| values = [] |
| for event in events: |
| v = event.get(spec.name) if isinstance(event, dict) else getattr(event, spec.name, None) |
| if v is not None: |
| values.append(float(v)) |
| if values: |
| tok.fit(np.array(values)) |
| else: |
| raise ValueError(f"No values found for field '{spec.name}' during fitting") |
| self._is_fitted = True |
| return self |
|
|
| @property |
| def is_fitted(self) -> bool: |
| """Whether all data-dependent tokenizers have been fitted.""" |
| if not self.schema.fittable_field_names: |
| return True |
| return self._is_fitted |
|
|
| def _collect_special_tokens(self) -> List[str]: |
| """Collect all special tokens: control + event separator + per-field domain tokens.""" |
| tokens = list(CONTROL_TOKENS) |
| tokens.append(self.schema.event_separator) |
| for spec in self.schema.fields: |
| tok = self.field_tokenizers.get(spec.name) |
| if tok is not None and hasattr(tok, "vocab"): |
| tokens.extend(tok.vocab) |
| seen = set() |
| unique = [] |
| for t in tokens: |
| if t not in seen: |
| seen.add(t) |
| unique.append(t) |
| return unique |
|
|
| def build( |
| self, |
| text_corpus: Optional[Iterator[str]] = None, |
| bpe_vocab_size: int = 8000, |
| min_frequency: int = 2, |
| ) -> PreTrainedTokenizerFast: |
| """Build a complete HuggingFace-compatible tokenizer. |
| |
| 1. Collects all domain special tokens from field tokenizers |
| 2. Trains BPE on text corpus (if schema has text fields) |
| 3. Merges into a single PreTrainedTokenizerFast |
| |
| Args: |
| text_corpus: Iterator of text strings for BPE training. |
| bpe_vocab_size: Target BPE vocabulary size (including special tokens). |
| min_frequency: Minimum frequency for BPE merges. |
| |
| Returns: |
| A PreTrainedTokenizerFast ready for use with HF Trainer. |
| """ |
| for name in self.schema.fittable_field_names: |
| tok = self.field_tokenizers[name] |
| if isinstance(tok, MagnitudeBucketTokenizer) and not tok._is_fitted: |
| raise RuntimeError( |
| f"Field '{name}' requires fitting. Call builder.fit(events) first." |
| ) |
| all_special_tokens = self._collect_special_tokens() |
| if self.schema.has_text_fields and text_corpus is not None: |
| base_tokenizer = Tokenizer(models.BPE(unk_token="[UNK]")) |
| base_tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False) |
| base_tokenizer.decoder = decoders.ByteLevel() |
| trainer_obj = trainers.BpeTrainer( |
| vocab_size=bpe_vocab_size, |
| special_tokens=all_special_tokens, |
| min_frequency=min_frequency, |
| show_progress=True, |
| ) |
| if isinstance(text_corpus, (list, tuple)): |
| base_tokenizer.train_from_iterator(iter(text_corpus), trainer=trainer_obj) |
| else: |
| base_tokenizer.train_from_iterator(text_corpus, trainer=trainer_obj) |
| else: |
| vocab = {token: i for i, token in enumerate(all_special_tokens)} |
| merges = [] |
| base_tokenizer = Tokenizer(models.BPE(vocab=vocab, merges=merges, unk_token="[UNK]")) |
| base_tokenizer.pre_tokenizer = pre_tokenizers.Whitespace() |
| base_tokenizer.decoder = decoders.BPEDecoder() |
| hf_tokenizer = PreTrainedTokenizerFast( |
| tokenizer_object=base_tokenizer, |
| bos_token="[BOS]", |
| eos_token="[EOS]", |
| pad_token="[PAD]", |
| unk_token="[UNK]", |
| mask_token="[MASK]", |
| cls_token="[CLS]", |
| sep_token="[SEP]", |
| ) |
| return hf_tokenizer |
|
|
| def tokenize_event(self, event: Union[Dict[str, Any], Any]) -> List[str]: |
| """Convert a single domain event into a list of token strings.""" |
| tokens = [] |
| for spec in self.schema.fields: |
| if isinstance(event, dict): |
| value = event.get(spec.name) |
| else: |
| value = getattr(event, spec.name, None) |
| if spec.field_type == FieldType.TEXT: |
| if value is not None: |
| tokens.append(str(value)) |
| continue |
| tok = self.field_tokenizers.get(spec.name) |
| if tok is None: |
| continue |
| if value is None: |
| tokens.append("[UNK]") |
| continue |
| result = tok(value) |
| if isinstance(result, list): |
| tokens.extend(result) |
| else: |
| tokens.append(result) |
| return tokens |
|
|
| def tokenize_sequence( |
| self, |
| events: Sequence[Union[Dict[str, Any], Any]], |
| add_bos: bool = True, |
| add_eos: bool = True, |
| ) -> List[str]: |
| """Tokenize a full sequence of events into token strings.""" |
| all_tokens = [] |
| if add_bos: |
| all_tokens.append("[BOS]") |
| for i, event in enumerate(events): |
| if i > 0: |
| all_tokens.append(self.schema.event_separator) |
| event_tokens = self.tokenize_event(event) |
| all_tokens.extend(event_tokens) |
| if add_eos: |
| all_tokens.append("[EOS]") |
| return all_tokens |
|
|
| def encode_sequence( |
| self, |
| events: Sequence[Union[Dict[str, Any], Any]], |
| hf_tokenizer: PreTrainedTokenizerFast, |
| max_length: int = 2048, |
| add_bos: bool = True, |
| add_eos: bool = True, |
| return_tensors: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Full pipeline: events -> token strings -> token IDs.""" |
| token_strings = self.tokenize_sequence(events, add_bos=add_bos, add_eos=add_eos) |
| token_text = " ".join(token_strings) |
| encoding = hf_tokenizer( |
| token_text, |
| max_length=max_length, |
| truncation=True, |
| padding="max_length", |
| return_tensors=return_tensors, |
| ) |
| return encoding |
|
|
| def save(self, directory: str): |
| """Save the builder state (fitted bins, schema, etc.) to a directory.""" |
| os.makedirs(directory, exist_ok=True) |
| state = { |
| "schema_name": self.schema.name, |
| "is_fitted": self._is_fitted, |
| "field_tokenizers": {}, |
| } |
| for name, tok in self.field_tokenizers.items(): |
| if tok is not None: |
| state["field_tokenizers"][name] = tok.to_dict() |
| with open(os.path.join(directory, "domain_tokenizer_builder.json"), "w") as f: |
| json.dump(state, f, indent=2) |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Return statistics about the tokenizer configuration.""" |
| return { |
| "schema_name": self.schema.name, |
| "total_fields": len(self.schema.fields), |
| "special_token_count": self.schema.special_token_count, |
| "fixed_tokens_per_event": self.schema.fixed_tokens_per_event, |
| "has_text_fields": self.schema.has_text_fields, |
| "is_fitted": self.is_fitted, |
| "field_details": { |
| spec.name: { |
| "type": spec.field_type.value, |
| "vocab_tokens": spec.token_count, |
| "tokens_per_event": spec.tokens_per_event, |
| } |
| for spec in self.schema.fields |
| }, |
| } |
|
|