Spaces:
Running
Running
| """ | |
| PDF PII Redactor β Detects and redacts sensitive information from PDF documents. | |
| Dual-mode AI backend: | |
| - GPU (ZeroGPU): openai/privacy-filter (1.4B MoE, Viterbi decoding, 128k context) | |
| - CPU fallback: GLiNER-PII (zero-shot, 60+ entity types, F1=81%, Apache 2.0) | |
| Also uses: | |
| - PyMuPDF (fitz) for PDF text extraction with bounding boxes and redaction | |
| - EasyOCR as fallback for scanned/image-based PDFs | |
| - Regex patterns for additional sensitive data (credentials, financial, medical codes) | |
| """ | |
| import dataclasses | |
| import functools | |
| import json | |
| import math | |
| import os | |
| import re | |
| import tempfile | |
| import time | |
| from bisect import bisect_left, bisect_right | |
| from collections.abc import Sequence | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Final | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| import torch | |
| import torch.nn.functional as F | |
| # βββ Detect ZeroGPU / CUDA availability βββββββββββββββββββββββββββββββββββββββββ | |
| # On ZeroGPU Spaces, the `spaces` library is installed AND torch.cuda.is_available() | |
| # returns True at startup (CUDA emulation mode). On plain CPU Spaces, the `spaces` | |
| # library is also installed but torch.cuda.is_available() returns False. | |
| # We use BOTH checks to decide whether to use the GPU path. | |
| ZEROGPU_AVAILABLE = False | |
| try: | |
| import spaces | |
| ZEROGPU_AVAILABLE = True | |
| print("ZeroGPU: `spaces` library detected.") | |
| except ImportError: | |
| print("ZeroGPU: `spaces` library not available.") | |
| # The critical check: on ZeroGPU, CUDA emulation is active at startup so this is True. | |
| # On CPU-only Spaces, this is False even though `spaces` is importable. | |
| CUDA_AVAILABLE = torch.cuda.is_available() | |
| print(f"CUDA available at startup: {CUDA_AVAILABLE}") | |
| # βββ Conditional heavy imports (only needed for the GPU path) ββββββββββββββββββββ | |
| HAS_PRIVACY_FILTER_DEPS = False | |
| try: | |
| from safetensors import safe_open | |
| import tiktoken | |
| from huggingface_hub import snapshot_download | |
| HAS_PRIVACY_FILTER_DEPS = True | |
| except ImportError: | |
| pass | |
| # βββ Download models at startup based on what's available βββββββββββββββββββββββ | |
| # Only attempt the 2.8GB privacy-filter download if we have CUDA (real or emulated) | |
| MODEL_DIR = None | |
| USE_PRIVACY_FILTER = False | |
| if HAS_PRIVACY_FILTER_DEPS and CUDA_AVAILABLE: | |
| try: | |
| print("Downloading openai/privacy-filter model...") | |
| _root = snapshot_download("openai/privacy-filter", allow_patterns=["original/*"]) | |
| MODEL_DIR = Path(_root) / "original" | |
| if MODEL_DIR.exists() and any(MODEL_DIR.glob("*.safetensors")): | |
| USE_PRIVACY_FILTER = True | |
| print("openai/privacy-filter checkpoint ready.") | |
| else: | |
| print("openai/privacy-filter: no safetensors found β falling back to CPU.") | |
| except Exception as e: | |
| print(f"Could not download openai/privacy-filter: {e} β falling back to CPU.") | |
| elif not CUDA_AVAILABLE: | |
| print("No CUDA available β skipping openai/privacy-filter download, will use CPU model.") | |
| # CPU fallback: load GLiNER-PII (zero-shot, 60+ entity types, F1=81%) | |
| gliner_model = None | |
| GLINER_PII_LABELS = [ | |
| "name", "email address", "phone number", "location address", | |
| "location city", "location zip", "ssn", "credit card number", | |
| "date of birth", "password", "ip address", "account number", | |
| "passport number", "driver license", "username", "url", | |
| ] | |
| if not USE_PRIVACY_FILTER: | |
| print("Loading CPU model (GLiNER-PII-base-v1.0)...") | |
| from gliner import GLiNER | |
| gliner_model = GLiNER.from_pretrained("knowledgator/gliner-pii-base-v1.0") | |
| print("GLiNER-PII loaded on CPU.") | |
| ACTIVE_MODEL = "openai/privacy-filter" if USE_PRIVACY_FILTER else "GLiNER-PII" | |
| print(f"Active PII model: {ACTIVE_MODEL}") | |
| # βββ Model constants (privacy-filter) βββββββββββββββββββββββββββββββββββββββββββ | |
| PRIVACY_FILTER_MODEL_TYPE: Final[str] = "privacy_filter" | |
| REQUIRED_MODEL_CONFIG_KEYS: Final[tuple[str, ...]] = ( | |
| "model_type", "encoding", "num_hidden_layers", "num_experts", "experts_per_token", | |
| "vocab_size", "num_labels", "hidden_size", "intermediate_size", "head_dim", | |
| "num_attention_heads", "num_key_value_heads", "sliding_window", | |
| "bidirectional_context", "bidirectional_left_context", "bidirectional_right_context", | |
| "default_n_ctx", "initial_context_length", "rope_theta", "rope_scaling_factor", | |
| "rope_ntk_alpha", "rope_ntk_beta", "param_dtype", | |
| ) | |
| BACKGROUND_CLASS_LABEL: Final[str] = "O" | |
| BOUNDARY_PREFIXES: Final[tuple[str, ...]] = ("B", "I", "E", "S") | |
| SPAN_CLASS_NAMES: Final[tuple[str, ...]] = ( | |
| BACKGROUND_CLASS_LABEL, "account_number", "private_address", "private_date", | |
| "private_email", "private_person", "private_phone", "private_url", "secret", | |
| ) | |
| NER_CLASS_NAMES: Final[tuple[str, ...]] = (BACKGROUND_CLASS_LABEL,) + tuple( | |
| f"{prefix}-{base_label}" | |
| for base_label in SPAN_CLASS_NAMES if base_label != BACKGROUND_CLASS_LABEL | |
| for prefix in BOUNDARY_PREFIXES | |
| ) | |
| VITERBI_TRANSITION_BIAS_KEYS: Final[tuple[str, ...]] = ( | |
| "transition_bias_background_stay", "transition_bias_background_to_start", | |
| "transition_bias_inside_to_continue", "transition_bias_inside_to_end", | |
| "transition_bias_end_to_background", "transition_bias_end_to_start", | |
| ) | |
| DEFAULT_VITERBI_CALIBRATION_PRESET: Final[str] = "default" | |
| # βββ Regex patterns βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| REGEX_PATTERNS = { | |
| "SSN": r"\b\d{3}[-\u2013]\d{2}[-\u2013]\d{4}\b", | |
| "CREDIT_CARD": r"\b(?:\d{4}[-\u2013 ]?){3}\d{4}\b", | |
| "PHONE_INTL": r"\b\+\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b", | |
| "IP_ADDRESS": r"\b(?:\d{1,3}\.){3}\d{1,3}\b", | |
| "DATE_OF_BIRTH": r"\b(?:DOB|Date of Birth|Born)[:\s]*\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b", | |
| "PASSPORT": r"\b(?:passport|Passport)[:\s]*[A-Z]{1,2}\d{6,9}\b", | |
| "IBAN": r"\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b", | |
| "DRIVER_LICENSE": r"\b(?:DL|Driver'?s?\s*(?:License|Licence))[:\s]*[A-Z0-9]{5,15}\b", | |
| "MEDICAL_RECORD": r"\b(?:MRN|Medical Record|Patient ID)[:\s#]*[\w\-]+\b", | |
| "API_KEY": r"\b(?:sk|pk|api[_-]?key|token|secret)[_\-]?[A-Za-z0-9]{20,}\b", | |
| "AWS_KEY": r"\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b", | |
| "PRIVATE_KEY": r"-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----", | |
| "BITCOIN_ADDR": r"\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b", | |
| "MAC_ADDRESS": r"\b(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}\b", | |
| } | |
| # βββ Label descriptions (superset of both models) βββββββββββββββββββββββββββββββ | |
| NER_LABEL_DESCRIPTIONS = { | |
| "private_person": "Person Name", "private_address": "Physical Address", | |
| "private_email": "Email Address", "private_phone": "Phone Number", | |
| "private_url": "Personal URL", "private_date": "Date (tied to a person)", | |
| "account_number": "Account Number", "secret": "Secret (password, API key, token)", | |
| # GLiNER-PII labels | |
| "name": "Person Name", "email address": "Email Address", "phone number": "Phone Number", | |
| "location address": "Physical Address", "location city": "City", | |
| "location zip": "Zip Code", "ssn": "Social Security Number", | |
| "credit card number": "Credit Card Number", "date of birth": "Date of Birth", | |
| "password": "Password", "ip address": "IP Address", | |
| "passport number": "Passport Number", "driver license": "Driver License", | |
| "username": "Username", "url": "URL", | |
| # Regex labels | |
| "SSN": "Social Security Number (regex)", "CREDIT_CARD": "Credit Card (regex)", | |
| "PHONE_INTL": "International Phone (regex)", "IP_ADDRESS": "IP Address", | |
| "DATE_OF_BIRTH": "Date of Birth (regex)", "PASSPORT": "Passport Number", | |
| "IBAN": "IBAN", "DRIVER_LICENSE": "Driver License (regex)", | |
| "MEDICAL_RECORD": "Medical Record Number", "API_KEY": "API Key / Token", | |
| "AWS_KEY": "AWS Access Key", "PRIVATE_KEY": "Private Key", | |
| "BITCOIN_ADDR": "Bitcoin Address", "MAC_ADDRESS": "MAC Address", | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GPU PATH: openai/privacy-filter | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def validate_model_config_contract(cfg, *, context): | |
| missing = [k for k in REQUIRED_MODEL_CONFIG_KEYS if k not in cfg] | |
| if missing: raise ValueError(f"{context} missing: {missing}") | |
| rl=cfg.get("bidirectional_left_context"); rr=cfg.get("bidirectional_right_context") | |
| if not isinstance(rl,int) or isinstance(rl,bool) or not isinstance(rr,int) or isinstance(rr,bool) or rl!=rr or rl<0: raise ValueError("ctx") | |
| if cfg.get("sliding_window")!=2*rl+1: raise ValueError("sw") | |
| if cfg["num_labels"]!=33: raise ValueError("labels") | |
| if cfg["param_dtype"]!="bfloat16": raise ValueError("dtype") | |
| def expert_linear(x,w,b): | |
| nr,exp,kd=x.shape;_,_,_,od=w.shape | |
| o=torch.bmm(x.reshape(nr*exp,1,kd),w.reshape(nr*exp,kd,od)).reshape(nr,exp,od) | |
| if b is not None: o=o+b | |
| return o | |
| class ModelConfig: | |
| num_hidden_layers:int;num_experts:int;experts_per_token:int;vocab_size:int;num_labels:int | |
| hidden_size:int;intermediate_size:int;head_dim:int;num_attention_heads:int;num_key_value_heads:int | |
| bidirectional_context_size:int;initial_context_length:int;rope_theta:float;rope_scaling_factor:float | |
| rope_ntk_alpha:float;rope_ntk_beta:float | |
| def from_checkpoint_config(cls,c,*,context): | |
| c=dict(c);c["bidirectional_context_size"]=c["bidirectional_left_context"] | |
| fs={f.name:f for f in dataclasses.fields(cls)};return cls(**{k:v for k,v in c.items() if k in fs}) | |
| class RMSNorm(torch.nn.Module): | |
| def __init__(self,nf,eps=1e-05,device=None): | |
| super().__init__();self.eps=eps;self.scale=torch.nn.Parameter(torch.ones(nf,device=device,dtype=torch.float32)) | |
| def forward(self,x):t=x.float();return(t*torch.rsqrt(t.pow(2).mean(-1,keepdim=True)+self.eps)*self.scale).to(x.dtype) | |
| def apply_rope(x,cos,sin): | |
| cos=cos.unsqueeze(-2).to(x.dtype);sin=sin.unsqueeze(-2).to(x.dtype) | |
| x1,x2=x[...,::2],x[...,1::2];return torch.stack((x1*cos-x2*sin,x2*cos+x1*sin),dim=-1).reshape(x.shape) | |
| class RotaryEmbedding(torch.nn.Module): | |
| def __init__(self,hd,base,dtype,*,initial_context_length=4096,scaling_factor=1.0,ntk_alpha=1.0,ntk_beta=32.0,device=None): | |
| super().__init__();self.head_dim,self.base,self.dtype=hd,base,dtype | |
| self.initial_context_length,self.scaling_factor=initial_context_length,scaling_factor | |
| self.ntk_alpha,self.ntk_beta,self.device=ntk_alpha,ntk_beta,device | |
| c,s=self._cs(max(int(initial_context_length*scaling_factor),initial_context_length),device=torch.device("cpu")) | |
| td=device or torch.device("cpu");self.register_buffer("cc",c.to(td),persistent=False);self.register_buffer("sc",s.to(td),persistent=False) | |
| def _inv(self,device=None): | |
| device=device or self.device;f=self.base**(torch.arange(0,self.head_dim,2,dtype=torch.float,device=device)/self.head_dim) | |
| if self.scaling_factor>1.0: | |
| cn=0.1*math.log(self.scaling_factor)+1.0;dh=self.head_dim/2 | |
| lo=dh*math.log(self.initial_context_length/(self.ntk_beta*2*math.pi))/math.log(self.base) | |
| hi=dh*math.log(self.initial_context_length/(self.ntk_alpha*2*math.pi))/math.log(self.base) | |
| ip=1.0/(self.scaling_factor*f);ep=1.0/f;r=(torch.arange(dh,dtype=torch.float32,device=device)-lo)/(hi-lo) | |
| m=1-r.clamp(0,1);return cn,ip*(1-m)+ep*m | |
| return 1.0,1.0/f | |
| def _cs(self,n,device=None): | |
| cn,iv=self._inv(device=device);device=device or self.device;t=torch.arange(n,dtype=torch.float32,device=device) | |
| fr=torch.einsum("i,j->ij",t,iv);return(fr.cos()*cn).to(self.dtype),(fr.sin()*cn).to(self.dtype) | |
| def forward(self,q,k): | |
| n=q.shape[0] | |
| if n>self.cc.shape[0]:c,s=self._cs(n,device=torch.device("cpu"));self.cc,self.sc=c.to(q.device),s.to(q.device) | |
| cc=self.cc.to(q.device) if self.cc.device!=q.device else self.cc;sc=self.sc.to(q.device) if self.sc.device!=q.device else self.sc | |
| c,s=cc[:n],sc[:n];qs=q.shape;q=apply_rope(q.view(n,-1,self.head_dim),c,s).reshape(qs) | |
| ks=k.shape;k=apply_rope(k.view(n,-1,self.head_dim),c,s).reshape(ks);return q,k | |
| def sdpa(Q,K,V,S,sm,ctx): | |
| n,nh,qm,hd=Q.shape;w=2*ctx+1;Kp=F.pad(K,(0,0,0,0,ctx,ctx));Vp=F.pad(V,(0,0,0,0,ctx,ctx)) | |
| Kw=Kp.unfold(0,w,1).permute(0,3,1,2);Vw=Vp.unfold(0,w,1).permute(0,3,1,2) | |
| idx=torch.arange(w,device=Q.device)-ctx;pos=torch.arange(n,device=Q.device)[:,None]+idx[None,:] | |
| v=(pos>=0)&(pos<n);sc=torch.einsum("nhqd,nwhd->nhqw",Q,Kw).float()*sm;sc=sc.masked_fill(~v[:,None,None,:],-float("inf")) | |
| ss=(S*math.log(2.0)).reshape(nh,qm)[None,:,:,None].expand(n,-1,-1,1);sc=torch.cat([sc,ss],dim=-1) | |
| wt=torch.softmax(sc,dim=-1)[...,:-1].to(V.dtype);return torch.einsum("nhqw,nwhd->nhqd",wt,Vw).reshape(n,-1) | |
| class AttentionBlock(torch.nn.Module): | |
| def __init__(self,cfg,device=None): | |
| super().__init__();pd=torch.bfloat16;self.hd,self.nah,self.nkvh=cfg.head_dim,cfg.num_attention_heads,cfg.num_key_value_heads | |
| self.bcs=int(cfg.bidirectional_context_size);self.sinks=torch.nn.Parameter(torch.empty(cfg.num_attention_heads,device=device,dtype=torch.float32)) | |
| self.norm=RMSNorm(cfg.hidden_size,device=device);qd=cfg.head_dim*(cfg.num_attention_heads+2*cfg.num_key_value_heads) | |
| self.qkv=torch.nn.Linear(cfg.hidden_size,qd,device=device,dtype=pd);self.out=torch.nn.Linear(cfg.head_dim*cfg.num_attention_heads,cfg.hidden_size,device=device,dtype=pd) | |
| self.qk_scale=1/math.sqrt(math.sqrt(cfg.head_dim)) | |
| self.rope=RotaryEmbedding(cfg.head_dim,int(cfg.rope_theta),torch.float32,initial_context_length=cfg.initial_context_length,scaling_factor=cfg.rope_scaling_factor,ntk_alpha=cfg.rope_ntk_alpha,ntk_beta=cfg.rope_ntk_beta,device=device) | |
| def forward(self,x): | |
| t=self.norm(x);t=t.to(self.qkv.weight.dtype) if t.dtype!=self.qkv.weight.dtype else t;qkv=F.linear(t,self.qkv.weight,self.qkv.bias) | |
| q=qkv[:,:self.nah*self.hd].contiguous();ks=self.nah*self.hd;km=(self.nah+self.nkvh)*self.hd;ke=(self.nah+2*self.nkvh)*self.hd | |
| k=qkv[:,ks:km].contiguous();v=qkv[:,km:ke].contiguous();q,k=self.rope(q,k);q=q*self.qk_scale;k=k*self.qk_scale;n=q.shape[0] | |
| q=q.view(n,self.nkvh,self.nah//self.nkvh,self.hd);k=k.view(n,self.nkvh,self.hd);v=v.view(n,self.nkvh,self.hd) | |
| ao=sdpa(q,k,v,self.sinks,1.0,self.bcs);ao=ao.to(self.out.weight.dtype) if ao.dtype!=self.out.weight.dtype else ao | |
| return x+F.linear(ao,self.out.weight,self.out.bias).to(x.dtype) | |
| def swiglu(x,alpha=1.702,limit=7.0):g,l=x.chunk(2,dim=-1);g=g.clamp(max=limit);l=l.clamp(min=-limit,max=limit);return g*torch.sigmoid(alpha*g)*(l+1) | |
| class MLPBlock(torch.nn.Module): | |
| def __init__(self,cfg,device=None): | |
| super().__init__();pd=torch.bfloat16;self.ne,self.ept,self.sl=cfg.num_experts,cfg.experts_per_token,7.0 | |
| self.norm=RMSNorm(cfg.hidden_size,device=device);self.gate=torch.nn.Linear(cfg.hidden_size,cfg.num_experts,device=device,dtype=pd) | |
| self.m1w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size,cfg.intermediate_size*2),device=device,dtype=pd)) | |
| self.m1b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size*2),device=device,dtype=pd)) | |
| self.m2w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size,cfg.hidden_size),device=device,dtype=pd)) | |
| self.m2b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size),device=device,dtype=pd)) | |
| def forward(self,x): | |
| t=self.norm(x);gs=F.linear(t.float(),self.gate.weight.float(),self.gate.bias.float()) | |
| exp=torch.topk(gs,k=self.ept,dim=-1,sorted=True);ew=torch.softmax(exp.values,dim=-1)/self.ept;ei=exp.indices;ept=self.ept | |
| def _c(tc,eic,ewc): | |
| te=tc.float().unsqueeze(1).expand(-1,eic.shape[1],-1);o=expert_linear(te,self.m1w[eic].float(),self.m1b[eic].float()) | |
| o=swiglu(o,limit=self.sl);o=expert_linear(o.float(),self.m2w[eic].float(),self.m2b[eic].float()) | |
| o=o.to(ewc.dtype) if o.dtype!=ewc.dtype else o;return(torch.einsum("bec,be->bc",o,ewc)*ept).to(x.dtype) | |
| cs=32 | |
| if t.shape[0]>cs:t=torch.cat([_c(t[s:s+cs],ei[s:s+cs],ew[s:s+cs]) for s in range(0,t.shape[0],cs)],dim=0) | |
| else:t=_c(t,ei,ew) | |
| return x+t | |
| class TransformerBlock(torch.nn.Module): | |
| def __init__(self,cfg,device=None):super().__init__();self.attn=AttentionBlock(cfg,device=device);self.mlp=MLPBlock(cfg,device=device) | |
| def forward(self,x):return self.mlp(self.attn(x)) | |
| class Checkpoint: | |
| def build_param_name_map(n): | |
| m={} | |
| for i in range(n):m[f"block.{i}.mlp.m1b"]=f"block.{i}.mlp.swiglu.bias";m[f"block.{i}.mlp.m1w"]=f"block.{i}.mlp.swiglu.weight";m[f"block.{i}.mlp.m2b"]=f"block.{i}.mlp.out.bias";m[f"block.{i}.mlp.m2w"]=f"block.{i}.mlp.out.weight" | |
| return m | |
| def __init__(self,path,device,num_hidden_layers): | |
| self.pnm=self.build_param_name_map(num_hidden_layers);self.ds=device.type if device.index is None else f"{device.type}:{device.index}" | |
| self.tnf={};sfs=[os.path.join(path,f) for f in os.listdir(path) if f.endswith(".safetensors")] | |
| for sf in sfs: | |
| with safe_open(sf,framework="pt",device=self.ds) as h: | |
| for k in h.keys():self.tnf[k]=sf | |
| def get(self,name): | |
| m=self.pnm.get(name,name) | |
| with safe_open(self.tnf[m],framework="pt",device=self.ds) as h:return h.get_tensor(m) | |
| class PrivacyFilterTransformer(torch.nn.Module): | |
| def __init__(self,config,device): | |
| super().__init__();pd=torch.bfloat16;self.embedding=torch.nn.Embedding(config.vocab_size,config.hidden_size,device=device,dtype=pd) | |
| self.block=torch.nn.ModuleList([TransformerBlock(config,device=device) for _ in range(config.num_hidden_layers)]) | |
| self.norm=RMSNorm(config.hidden_size,device=device);self.unembedding=torch.nn.Linear(config.hidden_size,config.num_labels,bias=False,device=device,dtype=pd) | |
| def forward(self,tid): | |
| x=self.embedding(tid) | |
| for b in self.block:x=b(x) | |
| return F.linear(self.norm(x),self.unembedding.weight,None) | |
| def from_checkpoint(cls,cd,*,device): | |
| torch.backends.cuda.matmul.allow_tf32=False;torch.backends.cudnn.allow_tf32=False;torch.set_float32_matmul_precision("highest") | |
| with(Path(cd)/"config.json").open("r") as f:cc=json.load(f) | |
| validate_model_config_contract(cc,context=str(cd));cfg=ModelConfig.from_checkpoint_config(cc,context=str(cd)) | |
| ckpt=Checkpoint(cd,device,num_hidden_layers=cfg.num_hidden_layers);model=cls(config=cfg,device=device);model.eval() | |
| for name,param in model.named_parameters():param.data.copy_(ckpt.get(name)) | |
| return model | |
| class LabelInfo: | |
| boundary_label_lookup:dict;token_to_span_label:dict;token_boundary_tags:dict;span_class_names:tuple;span_label_lookup:dict;background_token_label:int;background_span_label:int | |
| def labels_to_spans(lbi,li): | |
| spans=[];cl=si=pi=None;bsl=li.background_span_label | |
| for ti in sorted(lbi): | |
| lid=lbi[ti];sl=li.token_to_span_label.get(lid);bt=li.token_boundary_tags.get(lid) | |
| if pi is not None and ti!=pi+1: | |
| if cl is not None and si is not None:spans.append((cl,si,pi+1)) | |
| cl=si=None | |
| if sl is None:pi=ti;continue | |
| if sl==bsl: | |
| if cl is not None and si is not None:spans.append((cl,si,ti)) | |
| cl=si=None;pi=ti;continue | |
| if bt=="S": | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| spans.append((sl,ti,ti+1));cl=si=None | |
| elif bt=="B": | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| cl=sl;si=ti | |
| elif bt=="I": | |
| if cl is None or cl!=sl: | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| cl=sl;si=ti | |
| elif bt=="E": | |
| if cl is None or cl!=sl or si is None: | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| spans.append((sl,ti,ti+1));cl=si=None | |
| else:spans.append((cl,si,ti+1));cl=si=None | |
| else: | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| cl=si=None | |
| pi=ti | |
| if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) | |
| return spans | |
| def token_spans_to_char_spans(sp,cs,ce):return[(l,cs[ts],ce[te-1]) for l,ts,te in sp if 0<=ts<te<=len(cs) and ce[te-1]>cs[ts]] | |
| def trim_char_spans_whitespace(sp,text): | |
| o=[] | |
| for l,s,e in sp: | |
| if not(0<=s<e<=len(text)):continue | |
| while s<e and text[s].isspace():s+=1 | |
| while e>s and text[e-1].isspace():e-=1 | |
| if e>s:o.append((l,s,e)) | |
| return o | |
| class InferenceRuntime: | |
| model:PrivacyFilterTransformer;encoding:object;label_info:LabelInfo;device:torch.device;n_ctx:int | |
| def get_viterbi_transition_biases(): | |
| cp=MODEL_DIR/"viterbi_calibration.json";default={k:0.0 for k in VITERBI_TRANSITION_BIAS_KEYS} | |
| if not cp.is_file():return default | |
| p=json.loads(cp.read_text(encoding="utf-8"));rb=p | |
| ops=p.get("operating_points") | |
| if ops:pe=ops.get(DEFAULT_VITERBI_CALIBRATION_PRESET);rb=pe.get("biases",rb) if isinstance(pe,dict) else rb | |
| if not isinstance(rb,dict):return default | |
| res={} | |
| for k in VITERBI_TRANSITION_BIAS_KEYS: | |
| v=rb.get(k) | |
| if isinstance(v,bool) or not isinstance(v,(int,float)):return default | |
| res[k]=float(v) | |
| return res | |
| def get_runtime(): | |
| cc=json.loads((MODEL_DIR/"config.json").read_text(encoding="utf-8")) | |
| validate_model_config_contract(cc,context=str(MODEL_DIR/"config.json")) | |
| device=torch.device("cuda");enc=tiktoken.get_encoding(cc["encoding"].strip()) | |
| scn=[BACKGROUND_CLASS_LABEL];sll={BACKGROUND_CLASS_LABEL:0};bll={};tsl={};tbt={};bgi=None | |
| for i,name in enumerate(NER_CLASS_NAMES): | |
| if name==BACKGROUND_CLASS_LABEL:bgi=i;tsl[i]=0;tbt[i]=None;continue | |
| bd,bl=name.split("-",1);si=sll.get(bl) | |
| if si is None:si=len(scn);scn.append(bl);sll[bl]=si | |
| tsl[i]=si;tbt[i]=bd;bll.setdefault(bl,{})[bd]=i | |
| li=LabelInfo(boundary_label_lookup=bll,token_to_span_label=tsl,token_boundary_tags=tbt,span_class_names=tuple(scn),span_label_lookup=sll,background_token_label=bgi,background_span_label=0) | |
| model=PrivacyFilterTransformer.from_checkpoint(MODEL_DIR,device=device) | |
| return InferenceRuntime(model=model,encoding=enc,label_info=li,device=device,n_ctx=int(cc["default_n_ctx"])) | |
| class Decoder: | |
| def __init__(self,li): | |
| nc=len(li.token_to_span_label);self._ss=torch.full((nc,),-1e9,dtype=torch.float32);self._es=torch.full((nc,),-1e9,dtype=torch.float32) | |
| self._ts=torch.full((nc,nc),-1e9,dtype=torch.float32);tb=get_viterbi_transition_biases() | |
| bt,bs=li.background_token_label,li.background_span_label;tags,smap=li.token_boundary_tags,li.token_to_span_label | |
| for i in range(nc): | |
| tg=tags.get(i) | |
| if tg in{"B","S"} or i==bt:self._ss[i]=0.0 | |
| if tg in{"E","S"} or i==bt:self._es[i]=0.0 | |
| for j in range(nc): | |
| ntg,nsp,sp=tags.get(j),smap.get(j),smap.get(i);nib=nsp==bs or j==bt | |
| if(nsp is None or ntg is None) and not nib:continue | |
| if sp is None or tg is None: | |
| if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) | |
| elif sp==bs or tg in{"E","S"}: | |
| if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) | |
| elif tg in{"B","I"}: | |
| if sp==nsp and ntg in{"I","E"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) | |
| def _tb(pt,ps,nt,ns,bs,b): | |
| nib=ns==bs;pib=ps==bs | |
| if pib:return b["transition_bias_background_stay"] if nib else b["transition_bias_background_to_start"] | |
| if pt in{"B","I"}:return b["transition_bias_inside_to_continue"] if nt=="I" else b["transition_bias_inside_to_end"] | |
| return b["transition_bias_end_to_background"] if nib else b["transition_bias_end_to_start"] | |
| def decode(self,lp): | |
| sl,nc=lp.shape | |
| if sl==0:return[] | |
| ss=self._ss.to(device=lp.device,dtype=lp.dtype);es=self._es.to(device=lp.device,dtype=lp.dtype);ts=self._ts.to(device=lp.device,dtype=lp.dtype) | |
| sc=lp[0]+ss;bp=torch.empty((sl-1,nc),device=lp.device,dtype=torch.int64) | |
| for i in range(1,sl):tr=sc.unsqueeze(1)+ts;bs_,bi=tr.max(dim=0);sc=bs_+lp[i];bp[i-1]=bi | |
| if not torch.isfinite(sc).any():return lp.argmax(dim=1).tolist() | |
| sc=sc+es;ll=sc.argmax();path=torch.empty((sl,),device=lp.device,dtype=torch.int64);path[-1]=ll | |
| for i in range(sl-2,-1,-1):ll=bp[i,ll];path[i]=ll | |
| return path.tolist() | |
| def predict_text_gpu(rt,text,dec): | |
| tids=tuple(int(t) for t in rt.encoding.encode(text,allowed_special="all")) | |
| if not tids:return text,[] | |
| svs=[] | |
| for s in range(0,len(tids),rt.n_ctx): | |
| e=min(s+rt.n_ctx,len(tids));wt=torch.tensor(tids[s:e],device=rt.device,dtype=torch.int32) | |
| lp=F.log_softmax(rt.model(wt).float(),dim=-1);svs.extend(lp.unbind(0)) | |
| if not svs:return text,[] | |
| stacked=torch.stack(svs,dim=0);dl=dec.decode(stacked) | |
| if len(dl)!=len(tids):dl=stacked.argmax(dim=1).tolist() | |
| pli={i:int(l) for i,l in enumerate(dl)};pts=labels_to_spans(pli,rt.label_info) | |
| tbs=[rt.encoding.decode_single_token_bytes(t) for t in tids];dt=b"".join(tbs).decode("utf-8",errors="replace") | |
| cbs,cbe=[],[];bc=0 | |
| for ch in dt:cbs.append(bc);bc+=len(ch.encode("utf-8"));cbe.append(bc) | |
| cs,ce=[],[];tbc=0 | |
| for rb in tbs: | |
| s0=tbc;s1=s0+len(rb);tbc=s1;si=bisect_right(cbe,s0);ei=bisect_left(cbs,s1) | |
| if ei<si:ei=si | |
| cs.append(si);ce.append(ei) | |
| st=dt if dt!=text else text;pcs=trim_char_spans_whitespace(token_spans_to_char_spans(pts,cs,ce),st) | |
| det=[] | |
| for li,s,e in pcs: | |
| if not(0<=s<e<=len(st)):continue | |
| label=rt.label_info.span_class_names[li] if 0<=li<len(rt.label_info.span_class_names) else f"label_{li}" | |
| det.append({"entity":label,"word":st[s:e],"start":int(s),"end":int(e),"score":1.0,"source":"AI"}) | |
| return st,det | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CPU PATH: GLiNER-PII (zero-shot, 60+ entity types) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_pii_entities_cpu(text, min_score=0.3): | |
| """Run GLiNER-PII on CPU β zero-shot detection with 60+ PII categories.""" | |
| entities = [] | |
| try: | |
| results = gliner_model.predict_entities(text, GLINER_PII_LABELS, threshold=min_score) | |
| except Exception as e: | |
| print(f"GLiNER error: {e}") | |
| return entities | |
| for ent in results: | |
| entities.append({ | |
| "entity": ent["label"], | |
| "start": ent["start"], | |
| "end": ent["end"], | |
| "word": ent["text"], | |
| "score": ent["score"], | |
| "source": "GLiNER", | |
| }) | |
| return entities | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UNIFIED PII DETECTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_pii_entities_gpu_inner(text): | |
| rt=get_runtime();dec=Decoder(label_info=rt.label_info);_,ents=predict_text_gpu(rt,text,dec);return ents | |
| if USE_PRIVACY_FILTER and ZEROGPU_AVAILABLE: | |
| def get_pii_entities_ai(text): | |
| if not text.strip():return[] | |
| return _get_pii_entities_gpu_inner(text) | |
| else: | |
| def get_pii_entities_ai(text,min_score=0.5): | |
| if not text.strip():return[] | |
| return get_pii_entities_cpu(text,min_score) | |
| # βββ Regex + merge βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_pii_entities_regex(text): | |
| ents=[] | |
| for label,pattern in REGEX_PATTERNS.items(): | |
| for m in re.finditer(pattern,text,re.IGNORECASE): | |
| ents.append({"entity":label,"start":m.start(),"end":m.end(),"word":m.group(),"score":1.0,"source":"REGEX"}) | |
| return ents | |
| def merge_entities(ai,rx): | |
| combined=[];used=[False]*len(ai) | |
| for r in rx: | |
| ov=[i for i,a in enumerate(ai) if r["start"]<a["end"] and r["end"]>a["start"]] | |
| if ov: | |
| rs=r["end"]-r["start"];ms=max(ai[i]["end"]-ai[i]["start"] for i in ov) | |
| if rs>ms:combined.append(r);[used.__setitem__(i,True) for i in ov] | |
| else:combined.append(r) | |
| for i,a in enumerate(ai): | |
| if not used[i]:combined.append(a) | |
| return sorted(combined,key=lambda e:e["start"]) | |
| # βββ EasyOCR βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ocr_reader=None | |
| try: | |
| import easyocr;ocr_reader=easyocr.Reader(["en"],gpu=False,verbose=False);print("EasyOCR loaded.") | |
| except Exception as e:print(f"EasyOCR not available: {e}") | |
| # βββ PDF helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_char_to_bbox_map(page): | |
| words=page.get_text("words");ft=page.get_text("text");cm={};ss=0 | |
| for w in words: | |
| wt=w[4];idx=ft.find(wt,ss) | |
| if idx==-1:continue | |
| r=fitz.Rect(w[0],w[1],w[2],w[3]) | |
| for ci in range(idx,idx+len(wt)):cm[ci]=r | |
| ss=idx+len(wt) | |
| return cm,ft | |
| def get_redact_rects(cm,s,e): | |
| rects={} | |
| for ci in range(s,e): | |
| if ci in cm:r=cm[ci];k=(r.x0,r.y0,r.x1,r.y1);rects.setdefault(k,r) | |
| return list(rects.values()) | |
| def ocr_page(page): | |
| if ocr_reader is None:return[] | |
| import numpy as np;pix=page.get_pixmap(matrix=fitz.Matrix(2,2)) | |
| img=np.frombuffer(pix.samples,dtype=np.uint8).reshape(pix.h,pix.w,pix.n) | |
| if pix.n==4:img=img[:,:,:3] | |
| results=ocr_reader.readtext(img);wb=[] | |
| for(pts,text,conf) in results: | |
| if conf<0.3 or not text.strip():continue | |
| xs=[p[0]*0.5 for p in pts];ys=[p[1]*0.5 for p in pts] | |
| wb.append((fitz.Rect(min(xs),min(ys),max(xs),max(ys)),text)) | |
| return wb | |
| # βββ Main redaction pipeline ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def redact_pdf(input_file,sensitivity="Standard",use_regex=True,use_ocr=True,progress=gr.Progress()): | |
| if input_file is None:raise gr.Error("Please upload a PDF file first.") | |
| input_path=input_file.name if hasattr(input_file,'name') else input_file if isinstance(input_file,str) else None | |
| if not input_path or not os.path.exists(input_path):raise gr.Error("File not found.") | |
| min_score={"Conservative":0.7,"Standard":0.4,"Aggressive":0.2}.get(sensitivity,0.4) if not USE_PRIVACY_FILTER else {"Conservative":0.85,"Standard":0.60,"Aggressive":0.35}.get(sensitivity,0.60) | |
| start_time=time.time() | |
| try:doc=fitz.open(input_path) | |
| except Exception as e:raise gr.Error(f"Failed to open PDF: {e}") | |
| total_pages=len(doc);all_report=[];total_redactions=0;type_counts={};ocr_pages=0 | |
| ml="openai/privacy-filter (GPU)" if USE_PRIVACY_FILTER else "GLiNER-PII (CPU)" | |
| progress(0,desc=f"Starting PII detection with {ml}...") | |
| for pi in range(total_pages): | |
| page=doc[pi];page_ents=[];progress(pi/total_pages,desc=f"Processing page {pi+1}/{total_pages}...") | |
| text=page.get_text("text");is_scanned=len(text.strip())<10 | |
| if is_scanned and use_ocr: | |
| ocr_pages+=1;wb=ocr_page(page) | |
| if not wb:continue | |
| ft=" ".join(t for _,t in wb) | |
| ai_ents=get_pii_entities_ai(ft) if USE_PRIVACY_FILTER else get_pii_entities_ai(ft,min_score) | |
| re_ents=get_pii_entities_regex(ft) if use_regex else[];ents=merge_entities(ai_ents,re_ents) | |
| for ent in ents: | |
| if ent["score"]<min_score:continue | |
| et=ent["word"].strip() | |
| for rect,wt in wb: | |
| if et.lower() in wt.lower() or wt.lower() in et.lower(): | |
| page.add_redact_annot(rect,fill=(0,0,0));total_redactions+=1 | |
| type_counts[ent["entity"]]=type_counts.get(ent["entity"],0)+1;page_ents.append(ent);break | |
| else: | |
| cm,ft=build_char_to_bbox_map(page) | |
| if not ft.strip():continue | |
| ai_ents=get_pii_entities_ai(ft) if USE_PRIVACY_FILTER else get_pii_entities_ai(ft,min_score) | |
| re_ents=get_pii_entities_regex(ft) if use_regex else[];ents=merge_entities(ai_ents,re_ents) | |
| for ent in ents: | |
| if ent["score"]<min_score:continue | |
| rects=get_redact_rects(cm,ent["start"],ent["end"]) | |
| if not rects: | |
| sr=page.search_for(ent["word"].strip()) | |
| if sr:rects=sr | |
| for r in rects:page.add_redact_annot(fitz.Rect(r.x0-1,r.y0-1,r.x1+1,r.y1+1),fill=(0,0,0));total_redactions+=1 | |
| type_counts[ent["entity"]]=type_counts.get(ent["entity"],0)+1;page_ents.append(ent) | |
| page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE) | |
| if page_ents: | |
| all_report.append({"page":pi+1,"entities":[{"type":e["entity"],"text":e["word"][:3]+"***" if len(e["word"])>3 else "***","confidence":round(e["score"],3),"source":e["source"]} for e in page_ents]}) | |
| output_path=tempfile.mktemp(suffix="_redacted.pdf");doc.save(output_path,garbage=4,deflate=True);doc.close() | |
| elapsed=time.time()-start_time | |
| mr="[openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B params, GPU)" if USE_PRIVACY_FILTER else "[GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ types, CPU)" | |
| lines=["# \U0001f4c4 PDF PII Redaction Report","",f"**Processing time:** {elapsed:.1f}s",f"**Pages processed:** {total_pages}",f"**Pages with OCR:** {ocr_pages}",f"**Sensitivity:** {sensitivity} (threshold: {min_score})",f"**Regex:** {'On' if use_regex else 'Off'}",f"**AI Model:** {mr}",f"**Entities detected:** {sum(type_counts.values())}",f"**Redaction boxes:** {total_redactions}","","## Entity Types",""] | |
| if type_counts: | |
| lines+=["| Type | Description | Count |","|------|-------------|-------|"] | |
| for l,c in sorted(type_counts.items(),key=lambda x:-x[1]):lines.append(f"| {l} | {NER_LABEL_DESCRIPTIONS.get(l,l)} | {c} |") | |
| else:lines.append("*No PII detected.*") | |
| lines+=["","## Details by Page",""] | |
| for pr in all_report: | |
| lines.append(f"### Page {pr['page']}") | |
| for e in pr["entities"]:lines.append(f"- **{e['type']}**: `{e['text']}` (conf: {e['confidence']}, src: {e['source']})") | |
| lines.append("") | |
| return output_path,"\n".join(lines) | |
| # βββ Gradio UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _badge="\U0001f7e2 **GPU mode** \u2014 [openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B MoE)" if USE_PRIVACY_FILTER else "\U0001f7e1 **CPU mode** \u2014 [GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ entity types, F1=81%)" | |
| DESCRIPTION=f""" | |
| # \U0001f512 PDF PII Redactor | |
| **Automatically detect and redact sensitive information from PDF documents.** | |
| {_badge} | |
| When running on ZeroGPU, uses OpenAI Privacy Filter (1.4B params, 128k context). On CPU, uses GLiNER-PII (zero-shot, 60+ entity types, F1=81%). | |
| ### \U0001f6e1\ufe0f What gets redacted: | |
| | Category | Types | | |
| |----------|-------| | |
| | **Personal** | Names, Addresses, URLs, Dates | | |
| | **Financial** | Account Numbers, Credit Cards, IBANs | | |
| | **Contact** | Emails, Phone Numbers | | |
| | **Credentials** | Passwords, API Keys, Tokens | | |
| | **Identifiers** | SSN, Driver's License, Passport | | |
| | **Medical** | Medical Record Numbers | | |
| > \u26a0\ufe0f Redaction is **permanent** \u2014 keep a backup of your original. | |
| """ | |
| with gr.Blocks(title="PDF PII Redactor") as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### \U0001f4e4 Upload & Configure") | |
| input_pdf=gr.File(label="Upload PDF",file_types=[".pdf"],type="filepath") | |
| with gr.Row(): | |
| sensitivity=gr.Radio(choices=["Conservative","Standard","Aggressive"],value="Standard",label="Sensitivity") | |
| with gr.Row(): | |
| use_regex=gr.Checkbox(value=True,label="Regex Patterns") | |
| use_ocr=gr.Checkbox(value=True,label="OCR for Scanned Pages") | |
| redact_btn=gr.Button("\U0001f512 Redact",variant="primary",size="lg") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### \U0001f4e5 Output") | |
| output_pdf=gr.File(label="Redacted PDF") | |
| gr.Markdown("### \U0001f4ca Report") | |
| report_output=gr.Markdown(value="*Upload a PDF and click Redact.*") | |
| redact_btn.click(fn=redact_pdf,inputs=[input_pdf,sensitivity,use_regex,use_ocr],outputs=[output_pdf,report_output]) | |
| gr.Markdown("---") | |
| gr.Markdown(f"### \u2139\ufe0f About\n**Active:** {_badge}\n\n**PDF Engine:** PyMuPDF \u2022 **OCR:** EasyOCR") | |
| if __name__=="__main__": | |
| demo.launch(server_name="0.0.0.0",server_port=7860) | |