""" HuggingFace-compatible wrapper around BertGATPIIModel. Self-contained on purpose: the Hub repo doesn't import ``pii_redactor``, so we redeclare the architecture here. This is the file ``transformers`` loads when a user does:: from transformers import AutoModel model = AutoModel.from_pretrained( "your-username/pii-redactor-bert-gat", trust_remote_code=True ) """ from typing import List, Optional, Tuple import torch import torch.nn as nn import torch.nn.functional as F from transformers import AutoConfig, AutoModel, AutoTokenizer, PreTrainedModel # Dual-mode import: works both when this file is loaded as part of a # package (HuggingFace's ``trust_remote_code=True`` flow) and when it's # imported as a sibling module by a script like ``convert_checkpoint.py``. try: from .configuration_bert_gat import BertGATConfig except ImportError: from configuration_bert_gat import BertGATConfig try: from torch_geometric.nn import GATConv except ImportError as e: # pragma: no cover raise ImportError( "torch-geometric is required. Install with: pip install torch-geometric" ) from e # --------------------------------------------------------------------------- # # Label space (kept in sync with pii_redactor.config) # --------------------------------------------------------------------------- # PII_TYPES = [ "SSN", "BANK_ACCOUNT", "ROUTING_NUMBER", "CREDIT_CARD", "CVV", "CARD_EXPIRY", "IBAN", "DOB", "FULL_NAME", "EMAIL", "PHONE", "ADDRESS", "PASSPORT", "DRIVERS_LICENSE", "TAX_ID", ] LABELS = ["O"] + sum(([f"B-{t}", f"I-{t}"] for t in PII_TYPES), []) ID2LABEL = {i: l for i, l in enumerate(LABELS)} # --------------------------------------------------------------------------- # # Graph builder (mirrors pii_redactor.models.graph_builder) # --------------------------------------------------------------------------- # def _build_token_graph( seq_len: int, attn_weights: torch.Tensor, window: int, top_k: int, device: torch.device, ) -> Tuple[torch.Tensor, torch.Tensor]: src_list, dst_list, wt_list = [], [], [] for i in range(seq_len): for j in range(max(0, i - window), min(seq_len, i + window + 1)): if i != j: src_list.append(i) dst_list.append(j) wt_list.append(1.0) avg_attn = attn_weights.mean(dim=0) topk_vals, topk_idx = avg_attn.topk(min(top_k, seq_len), dim=-1) for i in range(seq_len): for ki in range(topk_idx.shape[1]): j = topk_idx[i, ki].item() wt = topk_vals[i, ki].item() if i != j and wt > 1e-4: src_list.append(i) dst_list.append(j) wt_list.append(wt) edge_index = torch.tensor([src_list, dst_list], dtype=torch.long, device=device) edge_attr = torch.tensor(wt_list, dtype=torch.float, device=device).unsqueeze(1) return edge_index, edge_attr # --------------------------------------------------------------------------- # # Model # --------------------------------------------------------------------------- # class BertGATForTokenClassification(PreTrainedModel): config_class = BertGATConfig base_model_prefix = "bert_gat_pii" # This model has no tied weights (no shared embeddings, no encoder- # decoder). Different transformers versions look for either the # old ``_tied_weights_keys`` (list) or the newer # ``all_tied_weights_keys`` (dict); declaring both empty keeps # ``from_pretrained``'s post-load tied-weight bookkeeping happy. _tied_weights_keys: list = [] all_tied_weights_keys: dict = {} def __init__(self, config: BertGATConfig): super().__init__(config) # Instantiate the BERT trunk EMPTY (no weight download here). The # outer ``from_pretrained`` populates everything — including these # parameters — from the saved state dict. Calling # ``AutoModel.from_pretrained`` here would clash with the meta- # device context the outer loader sets up. bert_config = AutoConfig.from_pretrained(config.bert_model_name) bert_config.output_attentions = True self.bert = AutoModel.from_config(bert_config) bert_dim = self.bert.config.hidden_size self.dropout = nn.Dropout(config.dropout) self.window = config.window self.top_k = config.top_k_attn self.gat_layers = nn.ModuleList() in_dim = bert_dim for _ in range(config.gat_layers): self.gat_layers.append( GATConv(in_dim, config.gat_hidden, heads=config.gat_heads, concat=True, dropout=config.dropout, edge_dim=1) ) in_dim = config.gat_hidden * config.gat_heads self.layer_norm = nn.LayerNorm(in_dim) self.residual_proj = nn.Linear(bert_dim, in_dim) self.classifier = nn.Linear(in_dim, config.num_labels) def forward( self, input_ids: torch.Tensor, attention_mask: torch.Tensor, labels: Optional[torch.Tensor] = None, ): B, L = input_ids.shape device = input_ids.device bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask) token_embs = bert_out.last_hidden_state last_attn = bert_out.attentions[-1] gat_outputs = [] for b in range(B): seq_len = int(attention_mask[b].sum().item()) attn_b = last_attn[b, :, :seq_len, :seq_len] edge_idx, edge_attr = _build_token_graph( seq_len, attn_b, self.window, self.top_k, device, ) h_real = token_embs[b, :seq_len] h_res = h_real for gat in self.gat_layers: h_real = self.dropout(h_real) h_real = gat(h_real, edge_idx, edge_attr=edge_attr) h_real = F.elu(h_real) h_real = self.layer_norm(h_real + self.residual_proj(h_res)) pad_len = L - seq_len if pad_len > 0: pad = torch.zeros(pad_len, h_real.shape[-1], device=device) h_real = torch.cat([h_real, pad], dim=0) gat_outputs.append(h_real) gat_embs = torch.stack(gat_outputs, dim=0) logits = self.classifier(self.dropout(gat_embs)) loss = None if labels is not None: loss = nn.CrossEntropyLoss(ignore_index=-100)( logits.view(-1, self.config.num_labels), labels.view(-1) ) return {"loss": loss, "logits": logits} # ---- Convenience inference helpers ------------------------------------- @torch.no_grad() def predict( self, text: str, tokenizer: AutoTokenizer, device: Optional[torch.device] = None, ) -> dict: device = device or next(self.parameters()).device enc = tokenizer( text, return_tensors="pt", return_offsets_mapping=True, truncation=True, max_length=self.config.max_length, ) input_ids = enc["input_ids"].to(device) attention_mask = enc["attention_mask"].to(device) offsets = enc["offset_mapping"].squeeze(0).tolist() out = self(input_ids, attention_mask) preds = out["logits"].squeeze(0).argmax(dim=-1).cpu().tolist() # preds and offsets are aligned 1:1 by index; iterate them # together (zip-style) so that special tokens — whose offset is # (0, 0) — and their matching prediction are skipped as a pair. spans: List[dict] = [] cur_lbl, cur_start, cur_end = None, None, None for pred_id, (tok_s, tok_e) in zip(preds, offsets): if tok_s == tok_e: continue pred_lbl = ID2LABEL[pred_id] if pred_lbl.startswith("B-"): if cur_lbl: spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl}) cur_lbl, cur_start, cur_end = pred_lbl[2:], tok_s, tok_e elif pred_lbl.startswith("I-") and cur_lbl == pred_lbl[2:]: cur_end = tok_e else: if cur_lbl: spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl}) cur_lbl, cur_start, cur_end = None, None, None if cur_lbl: spans.append({"start": cur_start, "end": cur_end, "label": cur_lbl}) for sp in spans: sp["value"] = text[sp["start"]:sp["end"]] redacted = text for sp in sorted(spans, key=lambda s: s["start"], reverse=True): redacted = redacted[:sp["start"]] + f"[{sp['label']}]" + redacted[sp["end"]:] return {"original": text, "redacted": redacted, "spans": spans} # Hooks so AutoModel can find this class via the config's auto_map. BertGATConfig.register_for_auto_class("AutoConfig") BertGATForTokenClassification.register_for_auto_class("AutoModel")