sentinel-prime-350m / hf_model.py
qubitpage's picture
Upload hf_model.py with huggingface_hub
3ba2208 verified
"""
HuggingFace-compatible wrappers for SentinelBrain.
Provides:
- SentinelBrainConfig(PretrainedConfig) — serializes to config.json
- SentinelBrainForCausalLM(PreTrainedModel) — from_pretrained / save_pretrained
- Auto-registration for AutoConfig / AutoModelForCausalLM
Usage:
from hf_model import SentinelBrainForCausalLM, SentinelBrainConfig
model = SentinelBrainForCausalLM.from_pretrained("qubitpage/sentinel-prime-nano")
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple, Union
from transformers import PretrainedConfig, PreTrainedModel, AutoConfig, AutoModelForCausalLM
from transformers.generation import GenerationMixin
from transformers.modeling_outputs import CausalLMOutputWithPast
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
class SentinelBrainConfig(PretrainedConfig):
"""HuggingFace-compatible config for SentinelBrain."""
model_type = "sentinel_brain"
def __init__(
self,
vocab_size: int = 100277,
d_model: int = 768,
n_layers: int = 12,
n_heads: int = 12,
n_kv_heads: int = 4,
d_ff: int = 2048,
max_seq_len: int = 1024,
n_experts: int = 4,
n_active_experts: int = 2,
expert_capacity_factor: float = 1.25,
router_aux_loss_coeff: float = 0.01,
router_z_loss_coeff: float = 0.001,
rope_theta: float = 500000.0,
norm_eps: float = 1e-5,
dropout: float = 0.0,
expert_dropout: float = 0.0,
tie_embeddings: bool = True,
routing_mode: str = "token_choice",
# Standard HF fields
bos_token_id: int = None,
eos_token_id: int = 100257,
pad_token_id: int = 100257,
**kwargs,
):
# Pop reserved kwargs that we set explicitly to avoid duplicate kwarg conflict on reload
kwargs.pop("tie_word_embeddings", None)
super().__init__(
bos_token_id=bos_token_id,
eos_token_id=eos_token_id,
pad_token_id=pad_token_id,
tie_word_embeddings=tie_embeddings,
**kwargs,
)
self.vocab_size = vocab_size
self.d_model = d_model
self.n_layers = n_layers
self.n_heads = n_heads
self.n_kv_heads = n_kv_heads
self.d_ff = d_ff
self.max_seq_len = max_seq_len
self.n_experts = n_experts
self.n_active_experts = n_active_experts
self.expert_capacity_factor = expert_capacity_factor
self.router_aux_loss_coeff = router_aux_loss_coeff
self.router_z_loss_coeff = router_z_loss_coeff
self.rope_theta = rope_theta
self.norm_eps = norm_eps
self.dropout = dropout
self.expert_dropout = expert_dropout
self.tie_embeddings = tie_embeddings
self.routing_mode = routing_mode
# Derived
self.hidden_size = d_model # HF convention
self.num_hidden_layers = n_layers
self.num_attention_heads = n_heads
@property
def head_dim(self) -> int:
return self.d_model // self.n_heads
@property
def kv_dim(self) -> int:
return self.n_kv_heads * self.head_dim
# ---------------------------------------------------------------------------
# Layers (self-contained, no cross-imports)
# ---------------------------------------------------------------------------
class _RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-5):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
norm = torch.rsqrt(x.float().pow(2).mean(-1, keepdim=True) + self.eps)
return (x.float() * norm).type_as(x) * self.weight
class _RotaryEmbedding(nn.Module):
def __init__(self, head_dim: int, max_seq_len: int = 8192,
theta: float = 500000.0):
super().__init__()
self.head_dim = head_dim
self.max_seq_len = max_seq_len
self.theta = theta
self._build_cache(max_seq_len)
def _build_cache(self, seq_len: int):
freqs = 1.0 / (self.theta ** (
torch.arange(0, self.head_dim, 2).float() / self.head_dim
))
t = torch.arange(seq_len).float()
angles = torch.outer(t, freqs)
self.register_buffer("cos_cached", angles.cos().unsqueeze(0).unsqueeze(0),
persistent=False)
self.register_buffer("sin_cached", angles.sin().unsqueeze(0).unsqueeze(0),
persistent=False)
def forward(self, seq_len: int):
if seq_len > self.max_seq_len:
self._build_cache(seq_len * 2)
self.max_seq_len = seq_len * 2
return self.cos_cached[:, :, :seq_len], self.sin_cached[:, :, :seq_len]
def _apply_rope(x, cos, sin):
d2 = x.shape[-1] // 2
x1, x2 = x[..., :d2], x[..., d2:]
return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1)
class _SwiGLUFFN(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout: float = 0.0):
super().__init__()
self.w_gate = nn.Linear(d_model, d_ff, bias=False)
self.w_up = nn.Linear(d_model, d_ff, bias=False)
self.w_down = nn.Linear(d_ff, d_model, bias=False)
self.dropout = nn.Dropout(dropout) if dropout > 0 else nn.Identity()
def forward(self, x):
return self.dropout(self.w_down(F.silu(self.w_gate(x)) * self.w_up(x)))
class _GQA(nn.Module):
def __init__(self, d_model, n_heads, n_kv_heads, head_dim, dropout=0.0):
super().__init__()
self.n_heads = n_heads
self.n_kv_heads = n_kv_heads
self.head_dim = head_dim
self.n_rep = n_heads // n_kv_heads
self.wq = nn.Linear(d_model, n_heads * head_dim, bias=False)
self.wk = nn.Linear(d_model, n_kv_heads * head_dim, bias=False)
self.wv = nn.Linear(d_model, n_kv_heads * head_dim, bias=False)
self.wo = nn.Linear(n_heads * head_dim, d_model, bias=False)
self.attn_dropout = dropout
def forward(self, x, rope_cos, rope_sin, mask=None, kv_cache=None):
B, T, _ = x.shape
q = self.wq(x).view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
k = self.wk(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)
v = self.wv(x).view(B, T, self.n_kv_heads, self.head_dim).transpose(1, 2)
q = _apply_rope(q, rope_cos, rope_sin)
k = _apply_rope(k, rope_cos, rope_sin)
if kv_cache is not None:
k = torch.cat([kv_cache[0], k], dim=2)
v = torch.cat([kv_cache[1], v], dim=2)
new_kv = (k, v)
if self.n_rep > 1:
k = k.repeat_interleave(self.n_rep, dim=1)
v = v.repeat_interleave(self.n_rep, dim=1)
out = F.scaled_dot_product_attention(
q, k, v, is_causal=(kv_cache is None and T > 1),
dropout_p=self.attn_dropout if self.training else 0.0,
)
out = out.transpose(1, 2).contiguous().view(B, T, -1)
return self.wo(out), new_kv
class _ExpertRouter(nn.Module):
def __init__(self, d_model, n_experts, n_active, aux_coeff=0.01, z_coeff=0.001):
super().__init__()
self.n_experts = n_experts
self.n_active = n_active
self.aux_coeff = aux_coeff
self.z_coeff = z_coeff
self.gate = nn.Linear(d_model, n_experts, bias=False)
def forward(self, x):
logits = self.gate(x)
probs = F.softmax(logits, dim=-1)
topk_w, topk_idx = torch.topk(probs, self.n_active, dim=-1)
topk_w = topk_w / (topk_w.sum(dim=-1, keepdim=True) + 1e-9)
# Aux loss
B, T, E = probs.shape
flat_probs = probs.view(-1, E)
flat_idx = topk_idx.view(-1, self.n_active)
one_hot = F.one_hot(flat_idx, E).float()
f = one_hot.sum(1).mean(0)
P = flat_probs.mean(0)
aux = self.aux_coeff * E * (f * P).sum()
# Z loss
log_z = torch.logsumexp(logits, dim=-1)
z = self.z_coeff * log_z.square().mean()
return topk_w, topk_idx, aux, z
class _SparseMoE(nn.Module):
def __init__(self, d_model, d_ff, n_experts, n_active, dropout=0.0,
aux_coeff=0.01, z_coeff=0.001):
super().__init__()
self.n_experts = n_experts
self.n_active = n_active
self.router = _ExpertRouter(d_model, n_experts, n_active, aux_coeff, z_coeff)
self.experts = nn.ModuleList([
_SwiGLUFFN(d_model, d_ff, dropout) for _ in range(n_experts)
])
def forward(self, x):
B, T, D = x.shape
weights, indices, aux, z = self.router(x)
flat_x = x.view(-1, D)
flat_w = weights.view(-1, self.n_active)
flat_idx = indices.view(-1, self.n_active)
out = torch.zeros_like(flat_x)
for k in range(self.n_active):
expert_idx = flat_idx[:, k]
w = flat_w[:, k].unsqueeze(-1)
for e in range(self.n_experts):
mask = (expert_idx == e)
if mask.any():
out[mask] += w[mask] * self.experts[e](flat_x[mask])
return out.view(B, T, D), aux, z
class _TransformerBlock(nn.Module):
def __init__(self, cfg: SentinelBrainConfig, layer_idx: int):
super().__init__()
self.attn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps)
self.attn = _GQA(cfg.d_model, cfg.n_heads, cfg.n_kv_heads,
cfg.head_dim, cfg.dropout)
self.ffn_norm = _RMSNorm(cfg.d_model, cfg.norm_eps)
if cfg.n_experts > 1:
self.ffn = _SparseMoE(cfg.d_model, cfg.d_ff, cfg.n_experts,
cfg.n_active_experts, cfg.expert_dropout,
cfg.router_aux_loss_coeff, cfg.router_z_loss_coeff)
self.is_moe = True
else:
self.ffn = _SwiGLUFFN(cfg.d_model, cfg.d_ff, cfg.dropout)
self.is_moe = False
def forward(self, x, rope_cos, rope_sin, kv_cache=None):
residual = x
x = self.attn_norm(x)
attn_out, new_kv = self.attn(x, rope_cos, rope_sin, kv_cache=kv_cache)
x = residual + attn_out
residual = x
x = self.ffn_norm(x)
aux, z = 0.0, 0.0
if self.is_moe:
ffn_out, aux, z = self.ffn(x)
else:
ffn_out = self.ffn(x)
x = residual + ffn_out
return x, new_kv, aux, z
# ---------------------------------------------------------------------------
# HF PreTrainedModel
# ---------------------------------------------------------------------------
class SentinelBrainForCausalLM(PreTrainedModel, GenerationMixin):
"""HuggingFace-compatible wrapper for SentinelBrain causal LM."""
config_class = SentinelBrainConfig
supports_gradient_checkpointing = True
_no_split_modules = ["_TransformerBlock"]
def __init__(self, config: SentinelBrainConfig):
super().__init__(config)
self.tok_emb = nn.Embedding(config.vocab_size, config.d_model)
self.rope = _RotaryEmbedding(config.head_dim, config.max_seq_len * 2,
config.rope_theta)
self.layers = nn.ModuleList([
_TransformerBlock(config, i) for i in range(config.n_layers)
])
self.norm = _RMSNorm(config.d_model, config.norm_eps)
self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False)
if getattr(config, "tie_embeddings", True) and getattr(config, "tie_word_embeddings", True):
self.lm_head.weight = self.tok_emb.weight
self.post_init()
def get_input_embeddings(self):
return self.tok_emb
def set_input_embeddings(self, value):
self.tok_emb = value
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def get_expanded_tied_weights_keys(self, all_submodels=False):
# Return empty dict — manual tying in __init__
return {}
def tie_weights(self, *args, **kwargs):
# No-op — manual tying handled in __init__; bypass transformers tying machinery entirely
return
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
past_key_values: Optional[list] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
**kwargs,
) -> Union[Tuple, CausalLMOutputWithPast]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
use_cache = use_cache if use_cache is not None else False
B, T = input_ids.shape
x = self.tok_emb(input_ids)
# Determine if we have valid past KV caches.
# Support: list-of-tuples (legacy), tuple-of-tuples, and DynamicCache (new transformers).
has_past = False
past_len = 0
_legacy_past = None # normalized to list-of-tuples form
if past_key_values is not None:
# New API: DynamicCache or similar Cache object
if hasattr(past_key_values, "to_legacy_cache"):
try:
legacy = past_key_values.to_legacy_cache()
if legacy is not None and len(legacy) > 0:
_legacy_past = list(legacy)
first = _legacy_past[0]
if first is not None and len(first) > 0 and first[0] is not None:
has_past = True
past_len = first[0].shape[2]
except Exception:
pass
# Legacy API: list/tuple of (k, v) tuples
elif isinstance(past_key_values, (list, tuple)) and len(past_key_values) > 0:
_legacy_past = list(past_key_values)
first = _legacy_past[0]
if first is not None:
if isinstance(first, (tuple, list)) and len(first) > 0 and first[0] is not None:
has_past = True
past_len = first[0].shape[2]
elif hasattr(first, "shape"):
has_past = True
past_len = first.shape[2]
rope_cos, rope_sin = self.rope(past_len + T)
rope_cos = rope_cos[:, :, past_len:past_len + T].to(x.device)
rope_sin = rope_sin[:, :, past_len:past_len + T].to(x.device)
new_kv_caches = []
total_aux = 0.0
total_z = 0.0
for i, layer in enumerate(self.layers):
kv_cache = _legacy_past[i] if (has_past and _legacy_past is not None and i < len(_legacy_past)) else None
x, new_kv, aux, z = layer(x, rope_cos, rope_sin, kv_cache=kv_cache)
new_kv_caches.append(new_kv)
total_aux += aux
total_z += z
x = self.norm(x)
logits = self.lm_head(x)
loss = None
if labels is not None:
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=-100,
)
# Add MoE losses
n_moe = sum(1 for l in self.layers if l.is_moe)
if n_moe > 0:
loss = loss + total_aux / n_moe + total_z / n_moe
if not return_dict:
output = (logits, new_kv_caches if use_cache else None)
return ((loss,) + output) if loss is not None else output
return CausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=new_kv_caches if use_cache else None,
)
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
if past_key_values is not None:
input_ids = input_ids[:, -1:]
return {
"input_ids": input_ids,
"past_key_values": past_key_values,
"use_cache": True,
}
# ---------------------------------------------------------------------------
# Auto-registration
# ---------------------------------------------------------------------------
AutoConfig.register("sentinel_brain", SentinelBrainConfig)
AutoModelForCausalLM.register(SentinelBrainConfig, SentinelBrainForCausalLM)