| """ |
| HuggingFace-compatible wrappers for SentinelBrain. |
| |
| Provides: |
| - SentinelBrainConfig(PretrainedConfig) — serializes to config.json |
| - SentinelBrainForCausalLM(PreTrainedModel) — from_pretrained / save_pretrained |
| - Auto-registration for AutoConfig / AutoModelForCausalLM |
| |
| Usage: |
| from hf_model import SentinelBrainForCausalLM, SentinelBrainConfig |
| model = SentinelBrainForCausalLM.from_pretrained("qubitpage/sentinel-prime-nano") |
| """ |
|
|
| import math |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from typing import Optional, Tuple, Union |
| from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModelForCausalLM |
| from transformers.generation import GenerationMixin |
| from transformers.modeling_outputs import CausalLMOutputWithPast |
|
|
|
|
| |
| |
| |
|
|
| class SentinelBrainConfig(PretrainedConfig): |
| """HuggingFace-compatible config for SentinelBrain.""" |
|
|
| model_type = "sentinel_brain" |
|
|
| def __init__( |
| self, |
| vocab_size: int = 100277, |
| d_model: int = 768, |
| n_layers: int = 12, |
| n_heads: int = 12, |
| n_kv_heads: int = 4, |
| d_ff: int = 2048, |
| max_seq_len: int = 1024, |
| n_experts: int = 4, |
| n_active_experts: int = 2, |
| expert_capacity_factor: float = 1.25, |
| router_aux_loss_coeff: float = 0.01, |
| router_z_loss_coeff: float = 0.001, |
| rope_theta: float = 500000.0, |
| norm_eps: float = 1e-5, |
| dropout: float = 0.0, |
| expert_dropout: float = 0.0, |
| tie_embeddings: bool = True, |
| routing_mode: str = "token_choice", |
| |
| bos_token_id: int = None, |
| eos_token_id: int = 100257, |
| pad_token_id: int = 100257, |
| **kwargs, |
| ): |
| |
| kwargs.pop("tie_word_embeddings", None) |
| super().__init__( |
| bos_token_id=bos_token_id, |
| eos_token_id=eos_token_id, |
| pad_token_id=pad_token_id, |
| tie_word_embeddings=tie_embeddings, |
| **kwargs, |
| ) |
| self.vocab_size = vocab_size |
| self.d_model = d_model |
| self.n_layers = n_layers |
| self.n_heads = n_heads |
| self.n_kv_heads = n_kv_heads |
| self.d_ff = d_ff |
| self.max_seq_len = max_seq_len |
| self.n_experts = n_experts |
| self.n_active_experts = n_active_experts |
| self.expert_capacity_factor = expert_capacity_factor |
| self.router_aux_loss_coeff = router_aux_loss_coeff |
| self.router_z_loss_coeff = router_z_loss_coeff |
| self.rope_theta = rope_theta |
| self.norm_eps = norm_eps |
| self.dropout = dropout |
| self.expert_dropout = expert_dropout |
| self.tie_embeddings = tie_embeddings |
| self.routing_mode = routing_mode |
| |
| self.hidden_size = d_model |
| self.num_hidden_layers = n_layers |
| self.num_attention_heads = n_heads |
|
|
| @property |
| def head_dim(self) -> int: |
| return self.d_model // self.n_heads |
|
|
| @property |
| def kv_dim(self) -> int: |
| return self.n_kv_heads * self.head_dim |
|
|
|
|
| |
| |
| |
|
|
| class _RMSNorm(nn.Module): |
| def __init__(self, dim: int, eps: float = 1e-5): |
| super().__init__() |
| self.eps = eps |
| self.weight = nn.Parameter(torch.ones(dim)) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| norm = torch.rsqrt(x.float().pow(2).mean(-1, keepdim=True) + self.eps) |
| return (x.float() * norm).type_as(x) * self.weight |
|
|
|
|
| class _RotaryEmbedding(nn.Module): |
| def __init__(self, head_dim: int, max_seq_len: int = 8192, |
| theta: float = 500000.0): |
| super().__init__() |
| self.head_dim = head_dim |
| self.max_seq_len = max_seq_len |
| self.theta = theta |
| self._build_cache(max_seq_len) |
|
|
| def _build_cache(self, seq_len: int): |
| freqs = 1.0 / (self.theta ** ( |
| torch.arange(0, self.head_dim, 2).float() / self.head_dim |
| )) |
| t = torch.arange(seq_len).float() |
| angles = torch.outer(t, freqs) |
| self.register_buffer("cos_cached", angles.cos().unsqueeze(0).unsqueeze(0), |
| persistent=False) |
| self.register_buffer("sin_cached", angles.sin().unsqueeze(0).unsqueeze(0), |
| persistent=False) |
|
|
| def forward(self, seq_len: int): |
| if seq_len > self.max_seq_len: |
| self._build_cache(seq_len * 2) |
| self.max_seq_len = seq_len * 2 |
| return self.cos_cached[:, :, :seq_len], self.sin_cached[:, :, :seq_len] |
|
|
|
|
| def _apply_rope(x, cos, sin): |
| d2 = x.shape[-1] // 2 |
| x1, x2 = x[..., :d2], x[..., d2:] |
| return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1) |
|
|
|
|
| class _SwiGLUFFN(nn.Module): |
| def __init__(self, d_model: int, d_ff: int, dropout: float = 0.0): |
| super().__init__() |
| self.w_gate = nn.Linear(d_model, d_ff, bias=False) |
| self.w_up = nn.Linear(d_model, d_ff, bias=False) |
| self.w_down = nn.Linear(d_ff, d_model, bias=False) |
| self.dropout = nn.Dropout(dropout) if dropout > 0 else nn.Identity() |
|
|
| def forward(self, x): |
| return self.dropout(self.w_down(F.silu(self.w_gate(x)) * self.w_up(x))) |
|
|
|
|
| class _GQA(nn.Module): |
| def __init__(self, d_model, n_heads, n_kv_heads, head_dim, dropout=0.0): |
| super().__init__() |
| self.n_heads = n_heads |
| self.n_kv_heads = n_kv_heads |
| self.head_dim = head_dim |
| self.n_rep = n_heads // n_kv_heads |
| self.wq = nn.Linear(d_model, n_heads * head_dim, bias=False) |
| self.wk = nn.Linear(d_model, n_kv_heads * head_dim, bias=False) |
| self.wv = nn.Linear(d_model, n_kv_heads * head_dim, bias=False) |
| self.wo = nn.Linear(n_heads * head_dim, d_model, bias=False) |
| self.attn_dropout = dropout |
|
|
| def forward(self, x, rope_cos, rope_sin, mask=None, kv_cache=None): |
| B, T, _ = x.shape |
| q = self.wq(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2) |
| k = self.wk(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2) |
| v = self.wv(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2) |
|
|
| q = _apply_rope(q, rope_cos, rope_sin) |
| k = _apply_rope(k, rope_cos, rope_sin) |
|
|
| if kv_cache is not None: |
| k = torch.cat([kv_cache[0], k], dim=2) |
| v = torch.cat([kv_cache[1], v], dim=2) |
| new_kv = (k, v) |
|
|
| if self.n_rep > 1: |
| k = k.repeat_interleave(self.n_rep, dim=1) |
| v = v.repeat_interleave(self.n_rep, dim=1) |
|
|
| out = F.scaled_dot_product_attention( |
| q, k, v, is_causal=(kv_cache is None and T > 1), |
| dropout_p=self.attn_dropout if self.training else 0.0, |
| ) |
| out = out.transpose(1, 2).contiguous().view(B, T, -1) |
| return self.wo(out), new_kv |
|
|
|
|
| class _ExpertRouter(nn.Module): |
| def __init__(self, d_model, n_experts, n_active, aux_coeff=0.01, z_coeff=0.001): |
| super().__init__() |
| self.n_experts = n_experts |
| self.n_active = n_active |
| self.aux_coeff = aux_coeff |
| self.z_coeff = z_coeff |
| self.gate = nn.Linear(d_model, n_experts, bias=False) |
|
|
| def forward(self, x): |
| logits = self.gate(x) |
| probs = F.softmax(logits, dim=-1) |
| topk_w, topk_idx = torch.topk(probs, self.n_active, dim=-1) |
| topk_w = topk_w / (topk_w.sum(dim=-1, keepdim=True) + 1e-9) |
|
|
| |
| B, T, E = probs.shape |
| flat_probs = probs.view(-1, E) |
| flat_idx = topk_idx.view(-1, self.n_active) |
| one_hot = F.one_hot(flat_idx, E).float() |
| f = one_hot.sum(1).mean(0) |
| P = flat_probs.mean(0) |
| aux = self.aux_coeff * E * (f * P).sum() |
|
|
| |
| log_z = torch.logsumexp(logits, dim=-1) |
| z = self.z_coeff * log_z.square().mean() |
|
|
| return topk_w, topk_idx, aux, z |
|
|
|
|
| class _SparseMoE(nn.Module): |
| def __init__(self, d_model, d_ff, n_experts, n_active, dropout=0.0, |
| aux_coeff=0.01, z_coeff=0.001): |
| super().__init__() |
| self.n_experts = n_experts |
| self.n_active = n_active |
| self.router = _ExpertRouter(d_model, n_experts, n_active, aux_coeff, z_coeff) |
| self.experts = nn.ModuleList([ |
| _SwiGLUFFN(d_model, d_ff, dropout) for _ in range(n_experts) |
| ]) |
|
|
| def forward(self, x): |
| B, T, D = x.shape |
| weights, indices, aux, z = self.router(x) |
|
|
| flat_x = x.view(-1, D) |
| flat_w = weights.view(-1, self.n_active) |
| flat_idx = indices.view(-1, self.n_active) |
| out = torch.zeros_like(flat_x) |
|
|
| for k in range(self.n_active): |
| expert_idx = flat_idx[:, k] |
| w = flat_w[:, k].unsqueeze(-1) |
| for e in range(self.n_experts): |
| mask = (expert_idx == e) |
| if mask.any(): |
| out[mask] += w[mask] * self.experts[e](flat_x[mask]) |
|
|
| return out.view(B, T, D), aux, z |
|
|
|
|
| class _TransformerBlock(nn.Module): |
| def __init__(self, cfg: SentinelBrainConfig, layer_idx: int): |
| super().__init__() |
| self.attn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps) |
| self.attn = _GQA(cfg.d_model, cfg.n_heads, cfg.n_kv_heads, |
| cfg.head_dim, cfg.dropout) |
| self.ffn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps) |
|
|
| if cfg.n_experts > 1: |
| self.ffn = _SparseMoE(cfg.d_model, cfg.d_ff, cfg.n_experts, |
| cfg.n_active_experts, cfg.expert_dropout, |
| cfg.router_aux_loss_coeff, cfg.router_z_loss_coeff) |
| self.is_moe = True |
| else: |
| self.ffn = _SwiGLUFFN(cfg.d_model, cfg.d_ff, cfg.dropout) |
| self.is_moe = False |
|
|
| def forward(self, x, rope_cos, rope_sin, kv_cache=None): |
| residual = x |
| x = self.attn_norm(x) |
| attn_out, new_kv = self.attn(x, rope_cos, rope_sin, kv_cache=kv_cache) |
| x = residual + attn_out |
|
|
| residual = x |
| x = self.ffn_norm(x) |
| aux, z = 0.0, 0.0 |
| if self.is_moe: |
| ffn_out, aux, z = self.ffn(x) |
| else: |
| ffn_out = self.ffn(x) |
| x = residual + ffn_out |
| return x, new_kv, aux, z |
|
|
|
|
| |
| |
| |
|
|
| class SentinelBrainForCausalLM(PreTrainedModel, GenerationMixin): |
| """HuggingFace-compatible wrapper for SentinelBrain causal LM.""" |
|
|
| config_class = SentinelBrainConfig |
| supports_gradient_checkpointing = True |
| _no_split_modules = ["_TransformerBlock"] |
| def __init__(self, config: SentinelBrainConfig): |
| super().__init__(config) |
|
|
| self.tok_emb = nn.Embedding(config.vocab_size, config.d_model) |
| self.rope = _RotaryEmbedding(config.head_dim, config.max_seq_len * 2, |
| config.rope_theta) |
| self.layers = nn.ModuleList([ |
| _TransformerBlock(config, i) for i in range(config.n_layers) |
| ]) |
| self.norm = _RMSNorm(config.d_model, config.norm_eps) |
| self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False) |
|
|
| if getattr(config, "tie_embeddings", True) and getattr(config, "tie_word_embeddings", True): |
| self.lm_head.weight = self.tok_emb.weight |
|
|
| self.post_init() |
|
|
| def get_input_embeddings(self): |
| return self.tok_emb |
|
|
| def set_input_embeddings(self, value): |
| self.tok_emb = value |
|
|
| def get_output_embeddings(self): |
| return self.lm_head |
|
|
| def set_output_embeddings(self, new_embeddings): |
| self.lm_head = new_embeddings |
|
|
| def get_expanded_tied_weights_keys(self, all_submodels=False): |
| |
| return {} |
|
|
| def tie_weights(self, *args, **kwargs): |
| |
| return |
|
|
| def forward( |
| self, |
| input_ids: torch.LongTensor = None, |
| attention_mask: Optional[torch.Tensor] = None, |
| past_key_values: Optional[list] = None, |
| labels: Optional[torch.LongTensor] = None, |
| use_cache: Optional[bool] = None, |
| output_attentions: Optional[bool] = None, |
| output_hidden_states: Optional[bool] = None, |
| return_dict: Optional[bool] = None, |
| **kwargs, |
| ) -> Union[Tuple, CausalLMOutputWithPast]: |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
| use_cache = use_cache if use_cache is not None else False |
|
|
| B, T = input_ids.shape |
| x = self.tok_emb(input_ids) |
|
|
| |
| |
| has_past = False |
| past_len = 0 |
| _legacy_past = None |
|
|
| if past_key_values is not None: |
| |
| if hasattr(past_key_values, "to_legacy_cache"): |
| try: |
| legacy = past_key_values.to_legacy_cache() |
| if legacy is not None and len(legacy) > 0: |
| _legacy_past = list(legacy) |
| first = _legacy_past[0] |
| if first is not None and len(first) > 0 and first[0] is not None: |
| has_past = True |
| past_len = first[0].shape[2] |
| except Exception: |
| pass |
| |
| elif isinstance(past_key_values, (list, tuple)) and len(past_key_values) > 0: |
| _legacy_past = list(past_key_values) |
| first = _legacy_past[0] |
| if first is not None: |
| if isinstance(first, (tuple, list)) and len(first) > 0 and first[0] is not None: |
| has_past = True |
| past_len = first[0].shape[2] |
| elif hasattr(first, "shape"): |
| has_past = True |
| past_len = first.shape[2] |
|
|
| rope_cos, rope_sin = self.rope(past_len + T) |
| rope_cos = rope_cos[:, :, past_len:past_len + T].to(x.device) |
| rope_sin = rope_sin[:, :, past_len:past_len + T].to(x.device) |
|
|
| new_kv_caches = [] |
| total_aux = 0.0 |
| total_z = 0.0 |
|
|
| for i, layer in enumerate(self.layers): |
| kv_cache = _legacy_past[i] if (has_past and _legacy_past is not None and i < len(_legacy_past)) else None |
| x, new_kv, aux, z = layer(x, rope_cos, rope_sin, kv_cache=kv_cache) |
| new_kv_caches.append(new_kv) |
| total_aux += aux |
| total_z += z |
|
|
| x = self.norm(x) |
| logits = self.lm_head(x) |
|
|
| loss = None |
| if labels is not None: |
| shift_logits = logits[..., :-1, :].contiguous() |
| shift_labels = labels[..., 1:].contiguous() |
| loss = F.cross_entropy( |
| shift_logits.view(-1, shift_logits.size(-1)), |
| shift_labels.view(-1), |
| ignore_index=-100, |
| ) |
| |
| n_moe = sum(1 for l in self.layers if l.is_moe) |
| if n_moe > 0: |
| loss = loss + total_aux / n_moe + total_z / n_moe |
|
|
| if not return_dict: |
| output = (logits, new_kv_caches if use_cache else None) |
| return ((loss,) + output) if loss is not None else output |
|
|
| return CausalLMOutputWithPast( |
| loss=loss, |
| logits=logits, |
| past_key_values=new_kv_caches if use_cache else None, |
| ) |
|
|
| def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs): |
| if past_key_values is not None: |
| input_ids = input_ids[:, -1:] |
| return { |
| "input_ids": input_ids, |
| "past_key_values": past_key_values, |
| "use_cache": True, |
| } |
|
|
|
|
| |
| |
| |
|
|
| AutoConfig.register("sentinel_brain", SentinelBrainConfig) |
| AutoModelForCausalLM.register(SentinelBrainConfig, SentinelBrainForCausalLM) |
|
|