""" HuggingFace-compatible wrappers for SentinelBrain. Provides: - SentinelBrainConfig(PretrainedConfig) — serializes to config.json - SentinelBrainForCausalLM(PreTrainedModel) — from_pretrained / save_pretrained - Auto-registration for AutoConfig / AutoModelForCausalLM Usage: from hf_model import SentinelBrainForCausalLM, SentinelBrainConfig model = SentinelBrainForCausalLM.from_pretrained("qubitpage/sentinel-prime-nano") """ import math import torch import torch.nn as nn import torch.nn.functional as F from typing import Optional, Tuple, Union from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModelForCausalLM from transformers.generation import GenerationMixin from transformers.modeling_outputs import CausalLMOutputWithPast # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- class SentinelBrainConfig(PretrainedConfig): """HuggingFace-compatible config for SentinelBrain.""" model_type = "sentinel_brain" def __init__( self, vocab_size: int = 100277, d_model: int = 768, n_layers: int = 12, n_heads: int = 12, n_kv_heads: int = 4, d_ff: int = 2048, max_seq_len: int = 1024, n_experts: int = 4, n_active_experts: int = 2, expert_capacity_factor: float = 1.25, router_aux_loss_coeff: float = 0.01, router_z_loss_coeff: float = 0.001, rope_theta: float = 500000.0, norm_eps: float = 1e-5, dropout: float = 0.0, expert_dropout: float = 0.0, tie_embeddings: bool = True, routing_mode: str = "token_choice", # Standard HF fields bos_token_id: int = None, eos_token_id: int = 100257, pad_token_id: int = 100257, **kwargs, ): # Pop reserved kwargs that we set explicitly to avoid duplicate kwarg conflict on reload kwargs.pop("tie_word_embeddings", None) super().__init__( bos_token_id=bos_token_id, eos_token_id=eos_token_id, pad_token_id=pad_token_id, tie_word_embeddings=tie_embeddings, **kwargs, ) self.vocab_size = vocab_size self.d_model = d_model self.n_layers = n_layers self.n_heads = n_heads self.n_kv_heads = n_kv_heads self.d_ff = d_ff self.max_seq_len = max_seq_len self.n_experts = n_experts self.n_active_experts = n_active_experts self.expert_capacity_factor = expert_capacity_factor self.router_aux_loss_coeff = router_aux_loss_coeff self.router_z_loss_coeff = router_z_loss_coeff self.rope_theta = rope_theta self.norm_eps = norm_eps self.dropout = dropout self.expert_dropout = expert_dropout self.tie_embeddings = tie_embeddings self.routing_mode = routing_mode # Derived self.hidden_size = d_model # HF convention self.num_hidden_layers = n_layers self.num_attention_heads = n_heads @property def head_dim(self) -> int: return self.d_model // self.n_heads @property def kv_dim(self) -> int: return self.n_kv_heads * self.head_dim # --------------------------------------------------------------------------- # Layers (self-contained, no cross-imports) # --------------------------------------------------------------------------- class _RMSNorm(nn.Module): def __init__(self, dim: int, eps: float = 1e-5): super().__init__() self.eps = eps self.weight = nn.Parameter(torch.ones(dim)) def forward(self, x: torch.Tensor) -> torch.Tensor: norm = torch.rsqrt(x.float().pow(2).mean(-1, keepdim=True) + self.eps) return (x.float() * norm).type_as(x) * self.weight class _RotaryEmbedding(nn.Module): def __init__(self, head_dim: int, max_seq_len: int = 8192, theta: float = 500000.0): super().__init__() self.head_dim = head_dim self.max_seq_len = max_seq_len self.theta = theta self._build_cache(max_seq_len) def _build_cache(self, seq_len: int): freqs = 1.0 / (self.theta ** ( torch.arange(0, self.head_dim, 2).float() / self.head_dim )) t = torch.arange(seq_len).float() angles = torch.outer(t, freqs) self.register_buffer("cos_cached", angles.cos().unsqueeze(0).unsqueeze(0), persistent=False) self.register_buffer("sin_cached", angles.sin().unsqueeze(0).unsqueeze(0), persistent=False) def forward(self, seq_len: int): if seq_len > self.max_seq_len: self._build_cache(seq_len * 2) self.max_seq_len = seq_len * 2 return self.cos_cached[:, :, :seq_len], self.sin_cached[:, :, :seq_len] def _apply_rope(x, cos, sin): d2 = x.shape[-1] // 2 x1, x2 = x[..., :d2], x[..., d2:] return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1) class _SwiGLUFFN(nn.Module): def __init__(self, d_model: int, d_ff: int, dropout: float = 0.0): super().__init__() self.w_gate = nn.Linear(d_model, d_ff, bias=False) self.w_up = nn.Linear(d_model, d_ff, bias=False) self.w_down = nn.Linear(d_ff, d_model, bias=False) self.dropout = nn.Dropout(dropout) if dropout > 0 else nn.Identity() def forward(self, x): return self.dropout(self.w_down(F.silu(self.w_gate(x)) * self.w_up(x))) class _GQA(nn.Module): def __init__(self, d_model, n_heads, n_kv_heads, head_dim, dropout=0.0): super().__init__() self.n_heads = n_heads self.n_kv_heads = n_kv_heads self.head_dim = head_dim self.n_rep = n_heads // n_kv_heads self.wq = nn.Linear(d_model, n_heads * head_dim, bias=False) self.wk = nn.Linear(d_model, n_kv_heads * head_dim, bias=False) self.wv = nn.Linear(d_model, n_kv_heads * head_dim, bias=False) self.wo = nn.Linear(n_heads * head_dim, d_model, bias=False) self.attn_dropout = dropout def forward(self, x, rope_cos, rope_sin, mask=None, kv_cache=None): B, T, _ = x.shape q = self.wq(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2) k = self.wk(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2) v = self.wv(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2) q = _apply_rope(q, rope_cos, rope_sin) k = _apply_rope(k, rope_cos, rope_sin) if kv_cache is not None: k = torch.cat([kv_cache[0], k], dim=2) v = torch.cat([kv_cache[1], v], dim=2) new_kv = (k, v) if self.n_rep > 1: k = k.repeat_interleave(self.n_rep, dim=1) v = v.repeat_interleave(self.n_rep, dim=1) out = F.scaled_dot_product_attention( q, k, v, is_causal=(kv_cache is None and T > 1), dropout_p=self.attn_dropout if self.training else 0.0, ) out = out.transpose(1, 2).contiguous().view(B, T, -1) return self.wo(out), new_kv class _ExpertRouter(nn.Module): def __init__(self, d_model, n_experts, n_active, aux_coeff=0.01, z_coeff=0.001): super().__init__() self.n_experts = n_experts self.n_active = n_active self.aux_coeff = aux_coeff self.z_coeff = z_coeff self.gate = nn.Linear(d_model, n_experts, bias=False) def forward(self, x): logits = self.gate(x) probs = F.softmax(logits, dim=-1) topk_w, topk_idx = torch.topk(probs, self.n_active, dim=-1) topk_w = topk_w / (topk_w.sum(dim=-1, keepdim=True) + 1e-9) # Aux loss B, T, E = probs.shape flat_probs = probs.view(-1, E) flat_idx = topk_idx.view(-1, self.n_active) one_hot = F.one_hot(flat_idx, E).float() f = one_hot.sum(1).mean(0) P = flat_probs.mean(0) aux = self.aux_coeff * E * (f * P).sum() # Z loss log_z = torch.logsumexp(logits, dim=-1) z = self.z_coeff * log_z.square().mean() return topk_w, topk_idx, aux, z class _SparseMoE(nn.Module): def __init__(self, d_model, d_ff, n_experts, n_active, dropout=0.0, aux_coeff=0.01, z_coeff=0.001): super().__init__() self.n_experts = n_experts self.n_active = n_active self.router = _ExpertRouter(d_model, n_experts, n_active, aux_coeff, z_coeff) self.experts = nn.ModuleList([ _SwiGLUFFN(d_model, d_ff, dropout) for _ in range(n_experts) ]) def forward(self, x): B, T, D = x.shape weights, indices, aux, z = self.router(x) flat_x = x.view(-1, D) flat_w = weights.view(-1, self.n_active) flat_idx = indices.view(-1, self.n_active) out = torch.zeros_like(flat_x) for k in range(self.n_active): expert_idx = flat_idx[:, k] w = flat_w[:, k].unsqueeze(-1) for e in range(self.n_experts): mask = (expert_idx == e) if mask.any(): out[mask] += w[mask] * self.experts[e](flat_x[mask]) return out.view(B, T, D), aux, z class _TransformerBlock(nn.Module): def __init__(self, cfg: SentinelBrainConfig, layer_idx: int): super().__init__() self.attn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps) self.attn = _GQA(cfg.d_model, cfg.n_heads, cfg.n_kv_heads, cfg.head_dim, cfg.dropout) self.ffn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps) if cfg.n_experts > 1: self.ffn = _SparseMoE(cfg.d_model, cfg.d_ff, cfg.n_experts, cfg.n_active_experts, cfg.expert_dropout, cfg.router_aux_loss_coeff, cfg.router_z_loss_coeff) self.is_moe = True else: self.ffn = _SwiGLUFFN(cfg.d_model, cfg.d_ff, cfg.dropout) self.is_moe = False def forward(self, x, rope_cos, rope_sin, kv_cache=None): residual = x x = self.attn_norm(x) attn_out, new_kv = self.attn(x, rope_cos, rope_sin, kv_cache=kv_cache) x = residual + attn_out residual = x x = self.ffn_norm(x) aux, z = 0.0, 0.0 if self.is_moe: ffn_out, aux, z = self.ffn(x) else: ffn_out = self.ffn(x) x = residual + ffn_out return x, new_kv, aux, z # --------------------------------------------------------------------------- # HF PreTrainedModel # --------------------------------------------------------------------------- class SentinelBrainForCausalLM(PreTrainedModel, GenerationMixin): """HuggingFace-compatible wrapper for SentinelBrain causal LM.""" config_class = SentinelBrainConfig supports_gradient_checkpointing = True _no_split_modules = ["_TransformerBlock"] def __init__(self, config: SentinelBrainConfig): super().__init__(config) self.tok_emb = nn.Embedding(config.vocab_size, config.d_model) self.rope = _RotaryEmbedding(config.head_dim, config.max_seq_len * 2, config.rope_theta) self.layers = nn.ModuleList([ _TransformerBlock(config, i) for i in range(config.n_layers) ]) self.norm = _RMSNorm(config.d_model, config.norm_eps) self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False) if getattr(config, "tie_embeddings", True) and getattr(config, "tie_word_embeddings", True): self.lm_head.weight = self.tok_emb.weight self.post_init() def get_input_embeddings(self): return self.tok_emb def set_input_embeddings(self, value): self.tok_emb = value def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def get_expanded_tied_weights_keys(self, all_submodels=False): # Return empty dict — manual tying in __init__ return {} def tie_weights(self, *args, **kwargs): # No-op — manual tying handled in __init__; bypass transformers tying machinery entirely return def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, past_key_values: Optional[list] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs, ) -> Union[Tuple, CausalLMOutputWithPast]: return_dict = return_dict if return_dict is not None else self.config.use_return_dict use_cache = use_cache if use_cache is not None else False B, T = input_ids.shape x = self.tok_emb(input_ids) # Determine if we have valid past KV caches. # Support: list-of-tuples (legacy), tuple-of-tuples, and DynamicCache (new transformers). has_past = False past_len = 0 _legacy_past = None # normalized to list-of-tuples form if past_key_values is not None: # New API: DynamicCache or similar Cache object if hasattr(past_key_values, "to_legacy_cache"): try: legacy = past_key_values.to_legacy_cache() if legacy is not None and len(legacy) > 0: _legacy_past = list(legacy) first = _legacy_past[0] if first is not None and len(first) > 0 and first[0] is not None: has_past = True past_len = first[0].shape[2] except Exception: pass # Legacy API: list/tuple of (k, v) tuples elif isinstance(past_key_values, (list, tuple)) and len(past_key_values) > 0: _legacy_past = list(past_key_values) first = _legacy_past[0] if first is not None: if isinstance(first, (tuple, list)) and len(first) > 0 and first[0] is not None: has_past = True past_len = first[0].shape[2] elif hasattr(first, "shape"): has_past = True past_len = first.shape[2] rope_cos, rope_sin = self.rope(past_len + T) rope_cos = rope_cos[:, :, past_len:past_len + T].to(x.device) rope_sin = rope_sin[:, :, past_len:past_len + T].to(x.device) new_kv_caches = [] total_aux = 0.0 total_z = 0.0 for i, layer in enumerate(self.layers): kv_cache = _legacy_past[i] if (has_past and _legacy_past is not None and i < len(_legacy_past)) else None x, new_kv, aux, z = layer(x, rope_cos, rope_sin, kv_cache=kv_cache) new_kv_caches.append(new_kv) total_aux += aux total_z += z x = self.norm(x) logits = self.lm_head(x) loss = None if labels is not None: shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() loss = F.cross_entropy( shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1), ignore_index=-100, ) # Add MoE losses n_moe = sum(1 for l in self.layers if l.is_moe) if n_moe > 0: loss = loss + total_aux / n_moe + total_z / n_moe if not return_dict: output = (logits, new_kv_caches if use_cache else None) return ((loss,) + output) if loss is not None else output return CausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=new_kv_caches if use_cache else None, ) def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs): if past_key_values is not None: input_ids = input_ids[:, -1:] return { "input_ids": input_ids, "past_key_values": past_key_values, "use_cache": True, } # --------------------------------------------------------------------------- # Auto-registration # --------------------------------------------------------------------------- AutoConfig.register("sentinel_brain", SentinelBrainConfig) AutoModelForCausalLM.register(SentinelBrainConfig, SentinelBrainForCausalLM)