Text Generation
Transformers
Safetensors
PyTorch
nemotron_labs_diffusion
feature-extraction
nvidia
conversational
custom_code
Instructions to use nvidia/Nemotron-Labs-Diffusion-14B with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use nvidia/Nemotron-Labs-Diffusion-14B with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="nvidia/Nemotron-Labs-Diffusion-14B", trust_remote_code=True) messages = [ {"role": "user", "content": "Who are you?"}, ] pipe(messages)# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("nvidia/Nemotron-Labs-Diffusion-14B", trust_remote_code=True, dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use nvidia/Nemotron-Labs-Diffusion-14B with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "nvidia/Nemotron-Labs-Diffusion-14B" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "nvidia/Nemotron-Labs-Diffusion-14B", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker
docker model run hf.co/nvidia/Nemotron-Labs-Diffusion-14B
- SGLang
How to use nvidia/Nemotron-Labs-Diffusion-14B with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "nvidia/Nemotron-Labs-Diffusion-14B" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "nvidia/Nemotron-Labs-Diffusion-14B", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "nvidia/Nemotron-Labs-Diffusion-14B" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "nvidia/Nemotron-Labs-Diffusion-14B", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }' - Docker Model Runner
How to use nvidia/Nemotron-Labs-Diffusion-14B with Docker Model Runner:
docker model run hf.co/nvidia/Nemotron-Labs-Diffusion-14B
| import copy | |
| from dataclasses import dataclass | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from torch import nn | |
| from transformers.modeling_outputs import CausalLMOutputWithPast, BaseModelOutput | |
| from transformers.utils import ModelOutput | |
| from torch.nn.attention.flex_attention import flex_attention, create_block_mask | |
| from transformers.modeling_flash_attention_utils import FlashAttentionKwargs | |
| from transformers.processing_utils import Unpack | |
| from transformers.cache_utils import Cache, DynamicCache | |
| from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss | |
| from transformers.generation import GenerationMixin | |
| import math | |
| from .modeling_ministral import Ministral3Model, Ministral3PreTrainedModel, Ministral3Attention, apply_rotary_pos_emb, repeat_kv, _get_llama_4_attn_scale | |
| from .configuration_nemotron_labs_diffusion import NemotronLabsDiffusionConfig | |
| __all__ = ["NemotronLabsDiffusionModel", "NemotronLabsDiffusionFlexAttention"] | |
| class NemotronLabsDiffusionOutputWithPast(ModelOutput): | |
| loss: torch.FloatTensor | None = None | |
| logits: torch.FloatTensor | None = None | |
| causal_logits: torch.FloatTensor | None = None | |
| past_key_values: Cache | None = None | |
| hidden_states: tuple[torch.FloatTensor, ...] | None = None | |
| attentions: tuple[torch.FloatTensor, ...] | None = None | |
| def fused_flex_attention(q, k, v, block_mask=None): | |
| return flex_attention(q, k, v, block_mask=block_mask) | |
| class NemotronLabsDiffusionFlexAttention(Ministral3Attention): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.block_size = self.config.block_size | |
| self.block_diff_mask = None | |
| import torch._dynamo.config as dcfg | |
| dcfg.cache_size_limit = 512 | |
| def compute_block_mask(self, mode, q_len, block_size=None): | |
| def block_diff_mask(block_size, b, h, q_idx, kv_idx, n): | |
| x0_flag_q = (q_idx >= n) | |
| x0_flag_kv = (kv_idx >= n) | |
| # Compute block indices | |
| block_q = torch.where(x0_flag_q == 1, | |
| (q_idx - n) // block_size, | |
| q_idx // block_size) | |
| block_kv = torch.where(x0_flag_kv == 1, | |
| (kv_idx - n) // block_size, | |
| kv_idx // block_size) | |
| # **1. Block Diagonal Mask (M_BD) ** | |
| block_diagonal = (block_q == block_kv) & (x0_flag_kv == 0) & (x0_flag_q == 0) | |
| # **2. Offset Block-Causal Mask (M_OBC) ** | |
| offset_block_causal = ( | |
| (block_q > block_kv) | |
| & (x0_flag_kv == 1) | |
| & (x0_flag_q == 0) | |
| ) | |
| # **3. Fully Causal Mask (M_BC) ** | |
| fully_causal = (q_idx >= kv_idx) & (x0_flag_kv == 1) & (x0_flag_q == 1) | |
| # **4. Combine Masks ** | |
| return block_diagonal | offset_block_causal | fully_causal | |
| attn_mask = lambda b, h, q, kv: block_diff_mask(block_size, b, h, q, kv, q_len//2) | |
| block_mask = create_block_mask( | |
| attn_mask, B=None, H=None, Q_LEN=q_len, KV_LEN=q_len | |
| ) | |
| return block_mask | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| position_embeddings: Tuple[torch.Tensor, torch.Tensor], | |
| attention_mask: Optional[torch.Tensor], | |
| past_key_values: Optional[Cache] = None, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| is_training: bool = True, | |
| **kwargs: Unpack[FlashAttentionKwargs], | |
| ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: | |
| bsz, q_len, _ = hidden_states.size() | |
| input_shape = hidden_states.shape[:-1] | |
| hidden_shape = (*input_shape, -1, self.head_dim) | |
| query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2) | |
| key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2) | |
| value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) | |
| cos, sin = position_embeddings | |
| if is_training: | |
| # Split query and key states in half along sequence length dimension | |
| q1, q2 = query_states.chunk(2, dim=2) | |
| k1, k2 = key_states.chunk(2, dim=2) | |
| # Apply RoPE independently to each half | |
| q1, k1 = apply_rotary_pos_emb(q1, k1, cos, sin) | |
| q2, k2 = apply_rotary_pos_emb(q2, k2, cos, sin) | |
| # Recombine the halves | |
| query_states = torch.cat([q1, q2], dim=2) | |
| key_states = torch.cat([k1, k2], dim=2) | |
| else: | |
| query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) | |
| query_states = query_states * _get_llama_4_attn_scale( | |
| cache_position, | |
| self.config.rope_parameters.get("llama_4_scaling_beta"), | |
| self.config.rope_parameters.get("original_max_position_embeddings"), | |
| ).to(query_states.dtype) | |
| if past_key_values is not None: | |
| # sin and cos are specific to RoPE models; cache_position needed for the static cache | |
| cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} | |
| key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs) | |
| key_states = repeat_kv(key_states, self.num_key_value_groups) | |
| value_states = repeat_kv(value_states, self.num_key_value_groups) | |
| if self.block_diff_mask is None or q_len != self.block_diff_mask.shape[-2]: | |
| block_mask = self.compute_block_mask(mode='block_diff', block_size=self.block_size, q_len=q_len) | |
| else: | |
| block_mask = self.block_diff_mask | |
| attn_output = fused_flex_attention(query_states, key_states, value_states, block_mask=block_mask) | |
| attn_output = attn_output.transpose(1, 2).reshape(*input_shape, -1).contiguous() | |
| attn_output = self.o_proj(attn_output) | |
| return attn_output, None | |
| class NemotronLabsDiffusionModel(Ministral3PreTrainedModel, GenerationMixin): | |
| """ | |
| A single model with: | |
| - a bidirectional encoder + diffusion‐LM head over A | |
| - a causal decoder + LM head over B, conditioned on F_A | |
| """ | |
| def __init__(self, config: NemotronLabsDiffusionConfig): | |
| super().__init__(config) | |
| self.mask_token_id = config.mask_token_id | |
| diffusion_config = copy.deepcopy(config) | |
| diffusion_config.diffusion_lm = True | |
| if config.dlm_paradigm == 'block_diff': | |
| diffusion_config.attn_class = NemotronLabsDiffusionFlexAttention | |
| elif config.dlm_paradigm in ['bidirectional', 'autoregressive']: | |
| diffusion_config.attn_class = Ministral3Attention | |
| if config.dlm_paradigm == 'autoregressive': | |
| diffusion_config.diffusion_lm = False | |
| else: | |
| raise ValueError(f"Unsupported DLM paradigm: {config.dlm_paradigm}") | |
| self.encoder = Ministral3Model(diffusion_config) | |
| self.diffusion_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) | |
| self.vocab_size = config.vocab_size | |
| self.post_init() | |
| def get_input_embeddings(self): | |
| return self.encoder.embed_tokens | |
| def set_input_embeddings(self, value): | |
| self.encoder.embed_tokens = value | |
| def get_output_embeddings(self): | |
| return self.diffusion_head | |
| def set_output_embeddings(self, new_embeddings): | |
| self.diffusion_head = new_embeddings | |
| def forward_process(self, input_ids, eps=1e-3, block_size=None, loss_mask=None): | |
| b, l = input_ids.shape | |
| device = input_ids.device | |
| if self.config.dp_varying_mask_ratio: | |
| # Enable different random seeds for each DP rank during sampling | |
| import torch.distributed as dist | |
| dp_rank = 0 | |
| if dist.is_initialized(): | |
| try: | |
| dp_rank = dist.get_rank() | |
| except Exception: | |
| dp_rank = 0 | |
| # Use a local generator to avoid affecting global RNG state | |
| generator = torch.Generator(device=device) | |
| generator.manual_seed(torch.seed() + dp_rank) | |
| else: | |
| generator = None | |
| t = torch.rand(b, device=device, generator=generator) | |
| p_mask = (1 - eps) * t + eps # shape: (b,) | |
| p_mask = p_mask[:, None].expand(-1, l) # shape: (b, l) | |
| masked_indices = torch.rand((b, l), device=device) < p_mask | |
| if loss_mask is not None: | |
| masked_indices[loss_mask == 0] = 0 | |
| noisy_batch = torch.where(masked_indices, self.mask_token_id, input_ids) | |
| return noisy_batch, masked_indices, p_mask | |
| def forward( | |
| self, | |
| input_ids: torch.LongTensor, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| labels: Optional[torch.LongTensor] = None, | |
| split_len: Optional[int] = None, | |
| past_key_values: Optional[Cache] = None, | |
| block_size: Optional[int] = None, | |
| eps: float = 1e-3, | |
| is_teacher: bool = False, | |
| masked_indices: Optional[torch.Tensor] = None, | |
| p_mask: Optional[torch.Tensor] = None, | |
| teacher_logits: Optional[torch.Tensor] = None, | |
| masked_indices_teacher: Optional[torch.Tensor] = None, | |
| loss_mask: Optional[torch.Tensor] = None, | |
| ce_loss_weight: float = 1.0, | |
| output_last_hidden_states_only: bool = False, | |
| skip_loss: bool = False, | |
| **kwargs, | |
| ) -> CausalLMOutputWithPast: | |
| batch_size, seq_len = input_ids.shape | |
| if self.config.dlm_paradigm == 'block_diff': | |
| if labels is not None and block_size is None: | |
| block_size = self.config.block_size | |
| elif self.config.dlm_paradigm not in ('bidirectional', 'autoregressive'): | |
| raise ValueError(f"Unknown dLM paradigm: {self.config.dlm_paradigm}") | |
| if labels is not None and self.config.dlm_paradigm != 'autoregressive': | |
| if masked_indices is not None: | |
| # assert p_mask is not None | |
| if loss_mask is not None: | |
| masked_indices[loss_mask == 0] = 0 | |
| noisy_inputs = torch.where(masked_indices, self.mask_token_id, input_ids) | |
| else: | |
| noisy_inputs, masked_indices, p_mask = self.forward_process(input_ids, eps=eps, block_size=block_size, loss_mask=loss_mask) | |
| else: | |
| noisy_inputs = input_ids | |
| masked_indices = None | |
| p_mask = None | |
| input_ids_len = noisy_inputs.shape[1] | |
| if labels is not None and self.config.dlm_paradigm == 'block_diff': | |
| if position_ids is None: | |
| position_ids = torch.arange(input_ids_len, device=noisy_inputs.device).unsqueeze(0) | |
| noisy_inputs = torch.cat([noisy_inputs, input_ids], dim=1) | |
| enc_out = self.encoder( | |
| past_key_values=past_key_values, | |
| input_ids=noisy_inputs, | |
| attention_mask=attention_mask, | |
| position_ids=position_ids, | |
| is_training=(labels is not None), | |
| **kwargs, | |
| ) | |
| if output_last_hidden_states_only: | |
| return BaseModelOutput(last_hidden_state=enc_out.last_hidden_state) | |
| logits = self.diffusion_head(enc_out.last_hidden_state) # (batch, len_B, vocab) | |
| causal_logits = None | |
| if labels is not None and self.config.dlm_paradigm == 'block_diff': | |
| causal_logits = logits[:, input_ids_len:] | |
| logits = logits[:, :input_ids_len] | |
| loss = None | |
| if labels is not None and not skip_loss: | |
| if self.config.dlm_paradigm == 'autoregressive': | |
| shift_logits = logits[..., :-1, :].contiguous() | |
| shift_labels = labels[..., 1:].contiguous() | |
| if loss_mask is None: | |
| loss_fct = CrossEntropyLoss() | |
| shift_logits = shift_logits.view(-1, shift_logits.size(-1)) | |
| shift_labels = shift_labels.view(-1) | |
| loss = loss_fct(shift_logits, shift_labels) | |
| else: | |
| loss_mask = loss_mask[..., 1:].contiguous() | |
| loss_fct = CrossEntropyLoss(reduction='none') | |
| shift_logits = shift_logits.view(-1, shift_logits.size(-1)) | |
| shift_labels = shift_labels.view(-1) | |
| shift_labels = shift_labels.to(shift_logits.device) | |
| token_losses = loss_fct(shift_logits, shift_labels) | |
| flat_loss_mask = loss_mask.reshape(-1) | |
| loss = token_losses[flat_loss_mask == 1].sum() / flat_loss_mask.sum() | |
| else: | |
| # LLaDA-style diffusion loss on masked positions. | |
| # Token-wise cross entropy loss on masked positions. | |
| token_loss = torch.nn.functional.cross_entropy( | |
| logits[masked_indices], | |
| labels[masked_indices], | |
| reduction='none' | |
| ) / p_mask[masked_indices] | |
| num_mask_tokens = masked_indices.sum() | |
| # global_loss_avg=True: loss is reduced externally by global token count. | |
| loss = token_loss.sum() | |
| if self.config.dlm_loss_weight is not None: | |
| loss = self.config.dlm_loss_weight * loss | |
| if self.config.dlm_paradigm == 'block_diff': | |
| # AR-side loss for block-diffusion paradigm. | |
| causal_logits = causal_logits[..., :-1, :].contiguous() | |
| causal_logits = causal_logits.view(-1, causal_logits.size(-1)) | |
| causal_labels = labels[..., 1:].contiguous().view(-1) | |
| loss_fct = CrossEntropyLoss(reduction='sum') | |
| ar_loss = loss_fct(causal_logits, causal_labels) | |
| self.loss_diffusion = loss.detach().item() / num_mask_tokens | |
| self.loss_ar = ar_loss.detach().item() / seq_len | |
| loss = loss + self.config.ar_loss_weight * ar_loss | |
| # global_loss_avg=True: return (sum_loss, token_count) for external mean. | |
| if self.config.dlm_paradigm == 'block_diff': | |
| loss = (loss, num_mask_tokens + int(self.config.ar_loss_weight * seq_len)) | |
| else: | |
| loss = (loss, num_mask_tokens) | |
| return NemotronLabsDiffusionOutputWithPast( | |
| loss=loss if not is_teacher else logits, | |
| logits=logits, | |
| causal_logits=causal_logits, | |
| past_key_values=enc_out.past_key_values, | |
| hidden_states=None, | |
| attentions=None, | |
| ) | |
| def generate( | |
| self, | |
| prompt_ids: torch.Tensor, | |
| max_new_tokens: int, | |
| block_length: int, | |
| threshold: Optional[float] = None, | |
| causal_context: bool = True, | |
| temperature: float = 0.0, | |
| eos_token_id: Optional[int] = None, | |
| max_thinking_tokens: Optional[int] = None, | |
| end_think_token_id: Optional[int] = None, | |
| ): | |
| """Block-wise diffusion decoding with prefix-cached KV (LLaDA-style). | |
| Each block: append `block_length` mask tokens, then iteratively unmask | |
| by confidence top-k (with optional threshold). When `causal_context`, | |
| the KV cache and the next-block seed are produced via a causal forward | |
| between blocks (flipping `self_attn.diffusion_lm`), matching the AR | |
| objective at block boundaries. | |
| Returns (output_ids, nfe) — output_ids includes the prompt. | |
| """ | |
| if eos_token_id is None: | |
| eos_token_id = getattr(self.config, "eos_token_id", None) | |
| mask_id = self.mask_token_id | |
| x_accum = prompt_ids.clone() | |
| B = prompt_ids.shape[0] | |
| assert max_new_tokens % block_length == 0 | |
| num_blocks = max_new_tokens // block_length | |
| # one denoising step per generated token (matches legacy chat_utils call) | |
| steps_per_block = block_length | |
| nfe = 0 | |
| def _set_diffusion_lm(val: bool): | |
| for layer in self.encoder.layers: | |
| if hasattr(layer.self_attn, "diffusion_lm"): | |
| layer.self_attn.diffusion_lm = val | |
| # Initial causal prefill produces the KV cache and the next-block seed. | |
| if causal_context: | |
| _set_diffusion_lm(False) | |
| output = self(prompt_ids, use_cache=True, use_causal_mask=causal_context) | |
| past_key_values = output.past_key_values | |
| if causal_context: | |
| _set_diffusion_lm(True) | |
| next_token = None | |
| if causal_context: | |
| last_logit = output.logits[:, -1, :] | |
| if temperature > 0: | |
| next_token = torch.multinomial(torch.softmax(last_logit / temperature, dim=-1), num_samples=1) | |
| else: | |
| next_token = torch.argmax(last_logit, dim=-1, keepdim=True) | |
| for num_block in range(num_blocks): | |
| mask_block = torch.full( | |
| (B, block_length), mask_id, dtype=prompt_ids.dtype, device=prompt_ids.device, | |
| ) | |
| if causal_context: | |
| mask_block[:, 0] = next_token[:, 0] | |
| x_accum = torch.cat([x_accum, mask_block], dim=1) | |
| block_start = prompt_ids.size(1) + num_block * block_length | |
| block_slice = slice(block_start, block_start + block_length) | |
| # Thinking-budget enforcement: if we've passed max_thinking_tokens | |
| # without an end-think marker, inject one into this block. | |
| if end_think_token_id is not None and max_thinking_tokens is not None: | |
| tokens_before = num_block * block_length | |
| tokens_after = tokens_before + block_length | |
| if tokens_after > max_thinking_tokens: | |
| gen_so_far = x_accum[:, prompt_ids.size(1):block_start] | |
| has_end_think = ( | |
| (gen_so_far == end_think_token_id).any(dim=1) | |
| if gen_so_far.size(1) > 0 | |
| else torch.zeros(B, dtype=torch.bool, device=prompt_ids.device) | |
| ) | |
| if not has_end_think.all(): | |
| offset = max(0, max_thinking_tokens - tokens_before) | |
| inject_pos = block_start + offset | |
| for b in range(B): | |
| if not has_end_think[b]: | |
| x_accum[b, inject_pos] = end_think_token_id | |
| mask_block_idx0 = x_accum[:, block_slice] == mask_id | |
| num_transfer_tokens = _get_num_transfer_tokens(mask_block_idx0, steps_per_block) | |
| # Denoise the current block by repeated confidence-based unmasking. | |
| for i in range(steps_per_block): | |
| mask_block_idx = x_accum[:, block_slice] == mask_id | |
| if mask_block_idx.sum() == 0: | |
| break | |
| nfe += 1 | |
| logits_block = self( | |
| x_accum[:, block_slice], | |
| past_key_values=past_key_values, | |
| use_cache=False, | |
| ).logits | |
| x0, transfer_idx = _get_transfer_index( | |
| logits_block, temperature, mask_block_idx, x_accum[:, block_slice], | |
| num_transfer_tokens=num_transfer_tokens[:, i], threshold=threshold, | |
| ) | |
| cur = x_accum[:, block_slice].clone() | |
| cur[transfer_idx] = x0[transfer_idx] | |
| x_accum[:, block_slice] = cur | |
| if eos_token_id is not None: | |
| block_tokens = x_accum[:, block_slice] | |
| eos_mask = block_tokens == eos_token_id | |
| if eos_mask.any(dim=1).any(): | |
| after_eos = eos_mask.cumsum(dim=1).bool() | |
| mask_before = (block_tokens == mask_id) & ~after_eos | |
| if (eos_mask.any(dim=1) & ~mask_before.any(dim=1)).any(): | |
| break | |
| # Post-block: causal forward over the block to update the KV cache | |
| # and (when causal_context) sample the seed for the next block. | |
| if causal_context: | |
| _set_diffusion_lm(False) | |
| output = self( | |
| x_accum[:, block_slice], | |
| past_key_values=past_key_values, | |
| use_cache=True, | |
| use_causal_mask=causal_context, | |
| ) | |
| past_key_values = output.past_key_values | |
| nfe += 1 | |
| if causal_context: | |
| _set_diffusion_lm(True) | |
| last_logit = output.logits[:, -1, :] | |
| if temperature > 0: | |
| next_token = torch.multinomial(torch.softmax(last_logit / temperature, dim=-1), num_samples=1) | |
| else: | |
| next_token = torch.argmax(last_logit, dim=-1, keepdim=True) | |
| if eos_token_id is not None: | |
| gen_so_far = x_accum[:, prompt_ids.size(1):] | |
| is_eos = gen_so_far == eos_token_id | |
| if is_eos.any(dim=1).all(): | |
| first_eos = is_eos.to(torch.int64).argmax(dim=1) | |
| max_eos = first_eos.max().item() | |
| return x_accum[:, : prompt_ids.size(1) + max_eos + 1], nfe | |
| return x_accum, nfe | |
| def ar_generate( | |
| self, | |
| prompt_ids: torch.Tensor, | |
| max_new_tokens: int = 128, | |
| temperature: float = 0.0, | |
| eos_token_id: Optional[int] = None, | |
| max_thinking_tokens: Optional[int] = None, | |
| end_think_token_id: Optional[int] = None, | |
| ) -> tuple: | |
| """Autoregressive generation calling the encoder directly (injected by build_hf_tidar_repo). | |
| Bypasses NemotronLabsDiffusionModel.forward() to avoid diffusion-specific | |
| code paths. Calls self.encoder (Ministral3Model) with explicit cache_position, | |
| position_ids, and use_cache so the KV cache and causal masking behave | |
| identically to MistralForCausalLM / vLLM. | |
| Returns: | |
| (output_ids, nfe) where output_ids includes the prompt. | |
| """ | |
| for layer in self.encoder.layers: | |
| if hasattr(layer.self_attn, 'diffusion_lm'): | |
| layer.self_attn.diffusion_lm = False | |
| if eos_token_id is None: | |
| eos_token_id = getattr(self.config, 'eos_token_id', None) | |
| device = prompt_ids.device | |
| batch_size, prompt_len = prompt_ids.shape | |
| past_key_values = DynamicCache() | |
| cache_position = torch.arange(prompt_len, device=device) | |
| position_ids = cache_position.unsqueeze(0).expand(batch_size, -1) | |
| enc_out = self.encoder( | |
| input_ids=prompt_ids, | |
| position_ids=position_ids, | |
| past_key_values=past_key_values, | |
| use_cache=True, | |
| cache_position=cache_position, | |
| ) | |
| past_key_values = enc_out.past_key_values | |
| next_logit = self.diffusion_head(enc_out.last_hidden_state[:, -1:, :]).squeeze(1) | |
| generated_tokens = [] | |
| nfe = 0 | |
| for step in range(max_new_tokens): | |
| nfe += 1 | |
| if temperature > 0: | |
| probs = torch.softmax(next_logit / temperature, dim=-1) | |
| next_token = torch.multinomial(probs, num_samples=1) | |
| else: | |
| next_token = torch.argmax(next_logit, dim=-1, keepdim=True) | |
| # ---- thinking budget enforcement ---- | |
| if end_think_token_id is not None and max_thinking_tokens is not None: | |
| if step >= max_thinking_tokens: | |
| if generated_tokens: | |
| gen_tensor = torch.cat(generated_tokens, dim=1) | |
| has_end_think = (gen_tensor == end_think_token_id).any(dim=1) | |
| else: | |
| has_end_think = torch.zeros(batch_size, dtype=torch.bool, device=device) | |
| for b in range(batch_size): | |
| if not has_end_think[b]: | |
| next_token[b] = end_think_token_id | |
| generated_tokens.append(next_token) | |
| if eos_token_id is not None and (next_token == eos_token_id).all(): | |
| break | |
| if step < max_new_tokens - 1: | |
| cur_pos = prompt_len + step | |
| step_cache_pos = torch.tensor([cur_pos], device=device) | |
| step_pos_ids = step_cache_pos.unsqueeze(0).expand(batch_size, -1) | |
| enc_out = self.encoder( | |
| input_ids=next_token, | |
| position_ids=step_pos_ids, | |
| past_key_values=past_key_values, | |
| use_cache=True, | |
| cache_position=step_cache_pos, | |
| ) | |
| past_key_values = enc_out.past_key_values | |
| next_logit = self.diffusion_head(enc_out.last_hidden_state[:, -1:, :]).squeeze(1) | |
| all_generated = torch.cat(generated_tokens, dim=1) | |
| output_ids = torch.cat([prompt_ids, all_generated], dim=1) | |
| return output_ids, nfe | |
| def linear_spec_generate( | |
| self, | |
| prompt_ids: torch.Tensor, | |
| max_new_tokens: int = 128, | |
| block_length: int = 32, | |
| temperature: float = 0.0, | |
| mask_token_id: Optional[int] = None, | |
| eos_token_id: Optional[int] = None, | |
| max_thinking_tokens: Optional[int] = None, | |
| end_think_token_id: Optional[int] = None, | |
| threshold: float = 0.0, | |
| ): | |
| """Linear speculative decoding: diffusion draft + AR verify. | |
| Each iteration: (1) draft the next block under bidirectional attention, | |
| (2) verify the drafted block under causal attention, accept the longest | |
| prefix where draft matches AR + one bonus token, advance the KV cache. | |
| LoRA-aware: when a PEFT adapter is attached to the model (e.g. | |
| ``linear_spec_lora``), it is toggled ON for the bidirectional draft | |
| phase and OFF for the causal prefill / verify phases — so the adapter | |
| only specializes the diffusion-mode forward and AR semantics are | |
| preserved. With no adapter loaded the calls are no-ops. | |
| Returns ``(output_ids, nfe)`` — ``output_ids`` includes the prompt. | |
| """ | |
| if prompt_ids.shape[0] != 1: | |
| raise ValueError("Linear speculative decoding requires batch_size == 1") | |
| token_mask_id = mask_token_id if mask_token_id is not None else self.config.mask_token_id | |
| if eos_token_id is None: | |
| eos_token_id = getattr(self.config, "eos_token_id", None) | |
| device = prompt_ids.device | |
| def _set_diffusion_lm(val: bool): | |
| for layer in self.encoder.layers: | |
| if hasattr(layer.self_attn, "diffusion_lm"): | |
| layer.self_attn.diffusion_lm = val | |
| def _toggle_adapters(enable: bool): | |
| # No-op when no PEFT/LoRA modules are attached. | |
| for module in self.modules(): | |
| if hasattr(module, "_disable_adapters"): | |
| module._disable_adapters = not enable | |
| # Prefill (causal, LoRA OFF). | |
| _set_diffusion_lm(False) | |
| _toggle_adapters(False) | |
| enc_out = self.encoder( | |
| input_ids=prompt_ids, | |
| past_key_values=DynamicCache(), | |
| use_cache=True, | |
| use_causal_mask=True, | |
| ) | |
| past_key_values = enc_out.past_key_values | |
| last_logit = self.diffusion_head(enc_out.last_hidden_state[:, -1:, :]).squeeze(1) | |
| nfe = 1 | |
| if temperature > 0: | |
| next_token = torch.multinomial(torch.softmax(last_logit / temperature, dim=-1), num_samples=1) | |
| else: | |
| next_token = torch.argmax(last_logit, dim=-1, keepdim=True) | |
| if eos_token_id is not None and next_token.item() == eos_token_id: | |
| return torch.cat([prompt_ids, next_token], dim=1), nfe | |
| generated = [next_token] | |
| total_gen = 1 | |
| while total_gen < max_new_tokens: | |
| cache_len = past_key_values.get_seq_length() | |
| block = torch.full((1, block_length), token_mask_id, dtype=torch.long, device=device) | |
| block[0, 0] = next_token.item() | |
| # Draft phase (bidirectional, LoRA ON) — iterate at threshold>0 so | |
| # that even low-confidence blocks make progress. | |
| _set_diffusion_lm(True) | |
| _toggle_adapters(True) | |
| while True: | |
| is_mask = block == token_mask_id | |
| if not is_mask.any(): | |
| break | |
| enc_out = self.encoder(input_ids=block, past_key_values=past_key_values, use_cache=False) | |
| nfe += 1 | |
| draft_logits = self.diffusion_head(enc_out.last_hidden_state) | |
| # LLaDA: logit[i] directly predicts position i — no shift needed. | |
| if temperature > 0: | |
| draft_probs = torch.softmax(draft_logits / temperature, dim=-1) | |
| draft_tokens = torch.multinomial( | |
| draft_probs.view(-1, draft_probs.shape[-1]), num_samples=1 | |
| ).view(1, block_length) | |
| else: | |
| draft_tokens = draft_logits.argmax(dim=-1) | |
| draft_probs = torch.softmax(draft_logits, dim=-1) | |
| if threshold > 0: | |
| draft_conf = torch.gather(draft_probs, -1, draft_tokens.unsqueeze(-1)).squeeze(-1) | |
| draft_conf = torch.where(is_mask, draft_conf, -torch.inf) | |
| unmask = draft_conf >= threshold | |
| # Force progress even when every masked position is below threshold. | |
| if not unmask.any(): | |
| best_idx = draft_conf.view(-1).argmax() | |
| unmask = torch.zeros_like(is_mask, dtype=torch.bool) | |
| unmask.view(-1)[best_idx] = True | |
| block[unmask] = draft_tokens[unmask] | |
| else: | |
| block[is_mask] = draft_tokens[is_mask] | |
| break | |
| # Verify phase (causal, LoRA OFF). | |
| _set_diffusion_lm(False) | |
| _toggle_adapters(False) | |
| enc_out = self.encoder( | |
| input_ids=block, | |
| past_key_values=past_key_values, | |
| use_cache=True, | |
| use_causal_mask=True, | |
| ) | |
| past_key_values = enc_out.past_key_values | |
| nfe += 1 | |
| verify_logits = self.diffusion_head(enc_out.last_hidden_state) | |
| if temperature > 0: | |
| ar_tokens = torch.multinomial( | |
| torch.softmax(verify_logits / temperature, dim=-1).view(-1, verify_logits.shape[-1]), | |
| num_samples=1, | |
| ).view(1, block_length) | |
| else: | |
| ar_tokens = verify_logits.argmax(dim=-1) | |
| # Accept consecutive matches; AR also gives one bonus token at the tail. | |
| accepted = 0 | |
| for i in range(block_length - 1): | |
| if ar_tokens[0, i].item() == block[0, i + 1].item(): | |
| accepted += 1 | |
| else: | |
| break | |
| accepted += 1 | |
| accepted_toks = ar_tokens[:, :accepted] | |
| generated.append(accepted_toks) | |
| total_gen += accepted | |
| _crop_dynamic_cache(past_key_values, cache_len + accepted) | |
| next_token = ar_tokens[:, accepted - 1 : accepted] | |
| if eos_token_id is not None: | |
| eos_pos = (accepted_toks[0] == eos_token_id).nonzero(as_tuple=True)[0] | |
| if len(eos_pos) > 0: | |
| first_eos = eos_pos[0].item() | |
| generated[-1] = accepted_toks[:, : first_eos + 1] | |
| total_gen = total_gen - accepted + first_eos + 1 | |
| break | |
| # Thinking-budget enforcement: force end-think as next seed if budget exhausted. | |
| if end_think_token_id is not None and max_thinking_tokens is not None: | |
| if total_gen > max_thinking_tokens: | |
| all_gen = torch.cat(generated, dim=1) | |
| if not (all_gen == end_think_token_id).any(): | |
| next_token = torch.tensor([[end_think_token_id]], device=device) | |
| if total_gen >= max_new_tokens: | |
| break | |
| all_generated = torch.cat(generated, dim=1) | |
| output_ids = torch.cat([prompt_ids, all_generated], dim=1) | |
| return output_ids, nfe | |
| # ─── Module-level helpers used by `generate` and `linear_spec_generate` ── | |
| def _crop_dynamic_cache(past_key_values: DynamicCache, max_length: int): | |
| """Crop a DynamicCache to max_length, compatible with both old and new transformers.""" | |
| if hasattr(past_key_values, 'crop'): | |
| past_key_values.crop(max_length) | |
| else: | |
| for layer_idx in range(len(past_key_values)): | |
| past_key_values.key_cache[layer_idx] = past_key_values.key_cache[layer_idx][:, :, :max_length] | |
| past_key_values.value_cache[layer_idx] = past_key_values.value_cache[layer_idx][:, :, :max_length] | |
| past_key_values._seen_tokens = max_length | |
| def _add_gumbel_noise(logits, temperature): | |
| """Gumbel-max sampling in float64 (low-precision Gumbel hurts MDM quality).""" | |
| if temperature == 0: | |
| return logits | |
| logits = logits.to(torch.float64) | |
| noise = torch.rand_like(logits, dtype=torch.float64) | |
| gumbel_noise = (- torch.log(noise)) ** temperature | |
| return logits.exp() / gumbel_noise | |
| def _get_num_transfer_tokens(mask_index, steps: int): | |
| """Even split of masked positions across `steps`, with remainder front-loaded.""" | |
| mask_num = mask_index.sum(dim=1, keepdim=True) | |
| base = mask_num // steps | |
| remainder = mask_num % steps | |
| num_transfer_tokens = torch.zeros(mask_num.size(0), steps, device=mask_index.device, dtype=torch.int64) + base | |
| for i in range(mask_num.size(0)): | |
| num_transfer_tokens[i, : int(remainder[i])] += 1 | |
| return num_transfer_tokens | |
| def _get_transfer_index(logits, temperature, mask_index, x, num_transfer_tokens, threshold=None): | |
| """Pick which masked positions to commit this denoising step. | |
| Returns (x0, transfer_index): x0 is argmax tokens (clamped to original x at | |
| non-masked positions); transfer_index is a bool mask over positions to | |
| finalize, chosen by top-k confidence (and filtered by `threshold` if given). | |
| """ | |
| logits_with_noise = _add_gumbel_noise(logits, temperature=temperature) | |
| x0 = torch.argmax(logits_with_noise, dim=-1) | |
| p = F.softmax(logits, dim=-1) | |
| x0_p = torch.squeeze(torch.gather(p, dim=-1, index=torch.unsqueeze(x0, -1)), -1) | |
| x0 = torch.where(mask_index, x0, x) | |
| confidence = torch.where(mask_index, x0_p, -np.inf) | |
| transfer_index = torch.zeros_like(x0, dtype=torch.bool, device=x0.device) | |
| if threshold is not None: | |
| num_transfer_tokens = mask_index.sum(dim=1, keepdim=True) | |
| for j in range(confidence.shape[0]): | |
| _, select_index = torch.topk(confidence[j], k=num_transfer_tokens[j]) | |
| transfer_index[j, select_index] = True | |
| if threshold is not None: | |
| for k in range(1, num_transfer_tokens[j]): | |
| if confidence[j, select_index[k]] < threshold: | |
| transfer_index[j, select_index[k]] = False | |
| return x0, transfer_index | |
| def gumbel_topk(log_w: torch.Tensor, k: int) -> torch.Tensor: | |
| """Return a Bool mask of length len(log_w) with exactly k True.""" | |
| g = -torch.log(-torch.log(torch.rand_like(log_w) + 1e-9) + 1e-9) | |
| topk = torch.topk(log_w + g, k).indices | |
| mask = torch.zeros_like(log_w, dtype=torch.bool) | |
| mask[topk] = True | |
| return mask | |