pii-redactor-bert-gat / modeling_bert_gat.py
Manikrishneshwar Sasidhar
Initial upload: BERT+GAT PII redactor
bfcecff verified
"""
HuggingFace-compatible wrapper around BertGATPIIModel.
Self-contained on purpose: the Hub repo doesn't import ``pii_redactor``,
so we redeclare the architecture here. This is the file ``transformers``
loads when a user does::
from transformers import AutoModel
model = AutoModel.from_pretrained(
"your-username/pii-redactor-bert-gat", trust_remote_code=True
)
"""
from typing import List, Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoConfig, AutoModel, AutoTokenizer, PreTrainedModel
# Dual-mode import: works both when this file is loaded as part of a
# package (HuggingFace's ``trust_remote_code=True`` flow) and when it's
# imported as a sibling module by a script like ``convert_checkpoint.py``.
try:
from .configuration_bert_gat import BertGATConfig
except ImportError:
from configuration_bert_gat import BertGATConfig
try:
from torch_geometric.nn import GATConv
except ImportError as e: # pragma: no cover
raise ImportError(
"torch-geometric is required. Install with: pip install torch-geometric"
) from e
# --------------------------------------------------------------------------- #
# Label space (kept in sync with pii_redactor.config)
# --------------------------------------------------------------------------- #
PII_TYPES = [
"SSN", "BANK_ACCOUNT", "ROUTING_NUMBER", "CREDIT_CARD", "CVV",
"CARD_EXPIRY", "IBAN", "DOB", "FULL_NAME", "EMAIL", "PHONE",
"ADDRESS", "PASSPORT", "DRIVERS_LICENSE", "TAX_ID",
]
LABELS = ["O"] + sum(([f"B-{t}", f"I-{t}"] for t in PII_TYPES), [])
ID2LABEL = {i: l for i, l in enumerate(LABELS)}
# --------------------------------------------------------------------------- #
# Graph builder (mirrors pii_redactor.models.graph_builder)
# --------------------------------------------------------------------------- #
def _build_token_graph(
seq_len: int,
attn_weights: torch.Tensor,
window: int,
top_k: int,
device: torch.device,
) -> Tuple[torch.Tensor, torch.Tensor]:
src_list, dst_list, wt_list = [], [], []
for i in range(seq_len):
for j in range(max(0, i - window), min(seq_len, i + window + 1)):
if i != j:
src_list.append(i)
dst_list.append(j)
wt_list.append(1.0)
avg_attn = attn_weights.mean(dim=0)
topk_vals, topk_idx = avg_attn.topk(min(top_k, seq_len), dim=-1)
for i in range(seq_len):
for ki in range(topk_idx.shape[1]):
j = topk_idx[i, ki].item()
wt = topk_vals[i, ki].item()
if i != j and wt > 1e-4:
src_list.append(i)
dst_list.append(j)
wt_list.append(wt)
edge_index = torch.tensor([src_list, dst_list], dtype=torch.long, device=device)
edge_attr = torch.tensor(wt_list, dtype=torch.float, device=device).unsqueeze(1)
return edge_index, edge_attr
# --------------------------------------------------------------------------- #
# Model
# --------------------------------------------------------------------------- #
class BertGATForTokenClassification(PreTrainedModel):
config_class = BertGATConfig
base_model_prefix = "bert_gat_pii"
# This model has no tied weights (no shared embeddings, no encoder-
# decoder). Different transformers versions look for either the
# old ``_tied_weights_keys`` (list) or the newer
# ``all_tied_weights_keys`` (dict); declaring both empty keeps
# ``from_pretrained``'s post-load tied-weight bookkeeping happy.
_tied_weights_keys: list = []
all_tied_weights_keys: dict = {}
def __init__(self, config: BertGATConfig):
super().__init__(config)
# Instantiate the BERT trunk EMPTY (no weight download here). The
# outer ``from_pretrained`` populates everything — including these
# parameters — from the saved state dict. Calling
# ``AutoModel.from_pretrained`` here would clash with the meta-
# device context the outer loader sets up.
bert_config = AutoConfig.from_pretrained(config.bert_model_name)
bert_config.output_attentions = True
self.bert = AutoModel.from_config(bert_config)
bert_dim = self.bert.config.hidden_size
self.dropout = nn.Dropout(config.dropout)
self.window = config.window
self.top_k = config.top_k_attn
self.gat_layers = nn.ModuleList()
in_dim = bert_dim
for _ in range(config.gat_layers):
self.gat_layers.append(
GATConv(in_dim, config.gat_hidden, heads=config.gat_heads,
concat=True, dropout=config.dropout, edge_dim=1)
)
in_dim = config.gat_hidden * config.gat_heads
self.layer_norm = nn.LayerNorm(in_dim)
self.residual_proj = nn.Linear(bert_dim, in_dim)
self.classifier = nn.Linear(in_dim, config.num_labels)
def forward(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
labels: Optional[torch.Tensor] = None,
):
B, L = input_ids.shape
device = input_ids.device
bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
token_embs = bert_out.last_hidden_state
last_attn = bert_out.attentions[-1]
gat_outputs = []
for b in range(B):
seq_len = int(attention_mask[b].sum().item())
attn_b = last_attn[b, :, :seq_len, :seq_len]
edge_idx, edge_attr = _build_token_graph(
seq_len, attn_b, self.window, self.top_k, device,
)
h_real = token_embs[b, :seq_len]
h_res = h_real
for gat in self.gat_layers:
h_real = self.dropout(h_real)
h_real = gat(h_real, edge_idx, edge_attr=edge_attr)
h_real = F.elu(h_real)
h_real = self.layer_norm(h_real + self.residual_proj(h_res))
pad_len = L - seq_len
if pad_len > 0:
pad = torch.zeros(pad_len, h_real.shape[-1], device=device)
h_real = torch.cat([h_real, pad], dim=0)
gat_outputs.append(h_real)
gat_embs = torch.stack(gat_outputs, dim=0)
logits = self.classifier(self.dropout(gat_embs))
loss = None
if labels is not None:
loss = nn.CrossEntropyLoss(ignore_index=-100)(
logits.view(-1, self.config.num_labels), labels.view(-1)
)
return {"loss": loss, "logits": logits}
# ---- Convenience inference helpers -------------------------------------
@torch.no_grad()
def predict(
self,
text: str,
tokenizer: AutoTokenizer,
device: Optional[torch.device] = None,
) -> dict:
device = device or next(self.parameters()).device
enc = tokenizer(
text,
return_tensors="pt",
return_offsets_mapping=True,
truncation=True,
max_length=self.config.max_length,
)
input_ids = enc["input_ids"].to(device)
attention_mask = enc["attention_mask"].to(device)
offsets = enc["offset_mapping"].squeeze(0).tolist()
out = self(input_ids, attention_mask)
preds = out["logits"].squeeze(0).argmax(dim=-1).cpu().tolist()
# preds and offsets are aligned 1:1 by index; iterate them
# together (zip-style) so that special tokens — whose offset is
# (0, 0) — and their matching prediction are skipped as a pair.
spans: List[dict] = []
cur_lbl, cur_start, cur_end = None, None, None
for pred_id, (tok_s, tok_e) in zip(preds, offsets):
if tok_s == tok_e:
continue
pred_lbl = ID2LABEL[pred_id]
if pred_lbl.startswith("B-"):
if cur_lbl:
spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl})
cur_lbl, cur_start, cur_end = pred_lbl[2:], tok_s, tok_e
elif pred_lbl.startswith("I-") and cur_lbl == pred_lbl[2:]:
cur_end = tok_e
else:
if cur_lbl:
spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl})
cur_lbl, cur_start, cur_end = None, None, None
if cur_lbl:
spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl})
for sp in spans:
sp["value"] = text[sp["start"]:sp["end"]]
redacted = text
for sp in sorted(spans, key=lambda s: s["start"], reverse=True):
redacted = redacted[:sp["start"]] + f"[{sp['label']}]" + redacted[sp["end"]:]
return {"original": text, "redacted": redacted, "spans": spans}
# Hooks so AutoModel can find this class via the config's auto_map.
BertGATConfig.register_for_auto_class("AutoConfig")
BertGATForTokenClassification.register_for_auto_class("AutoModel")