""" PDF PII Redactor — Detects and redacts sensitive information from PDF documents. Dual-mode AI backend: - GPU (ZeroGPU): openai/privacy-filter (1.4B MoE, Viterbi decoding, 128k context) - CPU fallback: GLiNER-PII (zero-shot, 60+ entity types, F1=81%, Apache 2.0) Also uses: - PyMuPDF (fitz) for PDF text extraction with bounding boxes and redaction - EasyOCR as fallback for scanned/image-based PDFs - Regex patterns for additional sensitive data (credentials, financial, medical codes) """ import dataclasses import functools import json import math import os import re import tempfile import time from bisect import bisect_left, bisect_right from collections.abc import Sequence from dataclasses import dataclass from pathlib import Path from typing import Final import fitz # PyMuPDF import gradio as gr import torch import torch.nn.functional as F # ─── Detect ZeroGPU / CUDA availability ───────────────────────────────────────── # On ZeroGPU Spaces, the `spaces` library is installed AND torch.cuda.is_available() # returns True at startup (CUDA emulation mode). On plain CPU Spaces, the `spaces` # library is also installed but torch.cuda.is_available() returns False. # We use BOTH checks to decide whether to use the GPU path. ZEROGPU_AVAILABLE = False try: import spaces ZEROGPU_AVAILABLE = True print("ZeroGPU: `spaces` library detected.") except ImportError: print("ZeroGPU: `spaces` library not available.") # The critical check: on ZeroGPU, CUDA emulation is active at startup so this is True. # On CPU-only Spaces, this is False even though `spaces` is importable. CUDA_AVAILABLE = torch.cuda.is_available() print(f"CUDA available at startup: {CUDA_AVAILABLE}") # ─── Conditional heavy imports (only needed for the GPU path) ──────────────────── HAS_PRIVACY_FILTER_DEPS = False try: from safetensors import safe_open import tiktoken from huggingface_hub import snapshot_download HAS_PRIVACY_FILTER_DEPS = True except ImportError: pass # ─── Download models at startup based on what's available ─────────────────────── # Only attempt the 2.8GB privacy-filter download if we have CUDA (real or emulated) MODEL_DIR = None USE_PRIVACY_FILTER = False if HAS_PRIVACY_FILTER_DEPS and CUDA_AVAILABLE: try: print("Downloading openai/privacy-filter model...") _root = snapshot_download("openai/privacy-filter", allow_patterns=["original/*"]) MODEL_DIR = Path(_root) / "original" if MODEL_DIR.exists() and any(MODEL_DIR.glob("*.safetensors")): USE_PRIVACY_FILTER = True print("openai/privacy-filter checkpoint ready.") else: print("openai/privacy-filter: no safetensors found — falling back to CPU.") except Exception as e: print(f"Could not download openai/privacy-filter: {e} — falling back to CPU.") elif not CUDA_AVAILABLE: print("No CUDA available — skipping openai/privacy-filter download, will use CPU model.") # CPU fallback: load GLiNER-PII (zero-shot, 60+ entity types, F1=81%) gliner_model = None GLINER_PII_LABELS = [ "name", "email address", "phone number", "location address", "location city", "location zip", "ssn", "credit card number", "date of birth", "password", "ip address", "account number", "passport number", "driver license", "username", "url", ] if not USE_PRIVACY_FILTER: print("Loading CPU model (GLiNER-PII-base-v1.0)...") from gliner import GLiNER gliner_model = GLiNER.from_pretrained("knowledgator/gliner-pii-base-v1.0") print("GLiNER-PII loaded on CPU.") ACTIVE_MODEL = "openai/privacy-filter" if USE_PRIVACY_FILTER else "GLiNER-PII" print(f"Active PII model: {ACTIVE_MODEL}") # ─── Model constants (privacy-filter) ─────────────────────────────────────────── PRIVACY_FILTER_MODEL_TYPE: Final[str] = "privacy_filter" REQUIRED_MODEL_CONFIG_KEYS: Final[tuple[str, ...]] = ( "model_type", "encoding", "num_hidden_layers", "num_experts", "experts_per_token", "vocab_size", "num_labels", "hidden_size", "intermediate_size", "head_dim", "num_attention_heads", "num_key_value_heads", "sliding_window", "bidirectional_context", "bidirectional_left_context", "bidirectional_right_context", "default_n_ctx", "initial_context_length", "rope_theta", "rope_scaling_factor", "rope_ntk_alpha", "rope_ntk_beta", "param_dtype", ) BACKGROUND_CLASS_LABEL: Final[str] = "O" BOUNDARY_PREFIXES: Final[tuple[str, ...]] = ("B", "I", "E", "S") SPAN_CLASS_NAMES: Final[tuple[str, ...]] = ( BACKGROUND_CLASS_LABEL, "account_number", "private_address", "private_date", "private_email", "private_person", "private_phone", "private_url", "secret", ) NER_CLASS_NAMES: Final[tuple[str, ...]] = (BACKGROUND_CLASS_LABEL,) + tuple( f"{prefix}-{base_label}" for base_label in SPAN_CLASS_NAMES if base_label != BACKGROUND_CLASS_LABEL for prefix in BOUNDARY_PREFIXES ) VITERBI_TRANSITION_BIAS_KEYS: Final[tuple[str, ...]] = ( "transition_bias_background_stay", "transition_bias_background_to_start", "transition_bias_inside_to_continue", "transition_bias_inside_to_end", "transition_bias_end_to_background", "transition_bias_end_to_start", ) DEFAULT_VITERBI_CALIBRATION_PRESET: Final[str] = "default" # ─── Regex patterns ───────────────────────────────────────────────────────────── REGEX_PATTERNS = { "SSN": r"\b\d{3}[-\u2013]\d{2}[-\u2013]\d{4}\b", "CREDIT_CARD": r"\b(?:\d{4}[-\u2013 ]?){3}\d{4}\b", "PHONE_INTL": r"\b\+\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b", "IP_ADDRESS": r"\b(?:\d{1,3}\.){3}\d{1,3}\b", "DATE_OF_BIRTH": r"\b(?:DOB|Date of Birth|Born)[:\s]*\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b", "PASSPORT": r"\b(?:passport|Passport)[:\s]*[A-Z]{1,2}\d{6,9}\b", "IBAN": r"\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b", "DRIVER_LICENSE": r"\b(?:DL|Driver'?s?\s*(?:License|Licence))[:\s]*[A-Z0-9]{5,15}\b", "MEDICAL_RECORD": r"\b(?:MRN|Medical Record|Patient ID)[:\s#]*[\w\-]+\b", "API_KEY": r"\b(?:sk|pk|api[_-]?key|token|secret)[_\-]?[A-Za-z0-9]{20,}\b", "AWS_KEY": r"\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b", "PRIVATE_KEY": r"-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----", "BITCOIN_ADDR": r"\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b", "MAC_ADDRESS": r"\b(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}\b", } # ─── Label descriptions (superset of both models) ─────────────────────────────── NER_LABEL_DESCRIPTIONS = { "private_person": "Person Name", "private_address": "Physical Address", "private_email": "Email Address", "private_phone": "Phone Number", "private_url": "Personal URL", "private_date": "Date (tied to a person)", "account_number": "Account Number", "secret": "Secret (password, API key, token)", # GLiNER-PII labels "name": "Person Name", "email address": "Email Address", "phone number": "Phone Number", "location address": "Physical Address", "location city": "City", "location zip": "Zip Code", "ssn": "Social Security Number", "credit card number": "Credit Card Number", "date of birth": "Date of Birth", "password": "Password", "ip address": "IP Address", "passport number": "Passport Number", "driver license": "Driver License", "username": "Username", "url": "URL", # Regex labels "SSN": "Social Security Number (regex)", "CREDIT_CARD": "Credit Card (regex)", "PHONE_INTL": "International Phone (regex)", "IP_ADDRESS": "IP Address", "DATE_OF_BIRTH": "Date of Birth (regex)", "PASSPORT": "Passport Number", "IBAN": "IBAN", "DRIVER_LICENSE": "Driver License (regex)", "MEDICAL_RECORD": "Medical Record Number", "API_KEY": "API Key / Token", "AWS_KEY": "AWS Access Key", "PRIVATE_KEY": "Private Key", "BITCOIN_ADDR": "Bitcoin Address", "MAC_ADDRESS": "MAC Address", } # ═══════════════════════════════════════════════════════════════════════════════ # GPU PATH: openai/privacy-filter # ═══════════════════════════════════════════════════════════════════════════════ def validate_model_config_contract(cfg, *, context): missing = [k for k in REQUIRED_MODEL_CONFIG_KEYS if k not in cfg] if missing: raise ValueError(f"{context} missing: {missing}") rl=cfg.get("bidirectional_left_context"); rr=cfg.get("bidirectional_right_context") if not isinstance(rl,int) or isinstance(rl,bool) or not isinstance(rr,int) or isinstance(rr,bool) or rl!=rr or rl<0: raise ValueError("ctx") if cfg.get("sliding_window")!=2*rl+1: raise ValueError("sw") if cfg["num_labels"]!=33: raise ValueError("labels") if cfg["param_dtype"]!="bfloat16": raise ValueError("dtype") def expert_linear(x,w,b): nr,exp,kd=x.shape;_,_,_,od=w.shape o=torch.bmm(x.reshape(nr*exp,1,kd),w.reshape(nr*exp,kd,od)).reshape(nr,exp,od) if b is not None: o=o+b return o @dataclass class ModelConfig: num_hidden_layers:int;num_experts:int;experts_per_token:int;vocab_size:int;num_labels:int hidden_size:int;intermediate_size:int;head_dim:int;num_attention_heads:int;num_key_value_heads:int bidirectional_context_size:int;initial_context_length:int;rope_theta:float;rope_scaling_factor:float rope_ntk_alpha:float;rope_ntk_beta:float @classmethod def from_checkpoint_config(cls,c,*,context): c=dict(c);c["bidirectional_context_size"]=c["bidirectional_left_context"] fs={f.name:f for f in dataclasses.fields(cls)};return cls(**{k:v for k,v in c.items() if k in fs}) class RMSNorm(torch.nn.Module): def __init__(self,nf,eps=1e-05,device=None): super().__init__();self.eps=eps;self.scale=torch.nn.Parameter(torch.ones(nf,device=device,dtype=torch.float32)) def forward(self,x):t=x.float();return(t*torch.rsqrt(t.pow(2).mean(-1,keepdim=True)+self.eps)*self.scale).to(x.dtype) def apply_rope(x,cos,sin): cos=cos.unsqueeze(-2).to(x.dtype);sin=sin.unsqueeze(-2).to(x.dtype) x1,x2=x[...,::2],x[...,1::2];return torch.stack((x1*cos-x2*sin,x2*cos+x1*sin),dim=-1).reshape(x.shape) class RotaryEmbedding(torch.nn.Module): def __init__(self,hd,base,dtype,*,initial_context_length=4096,scaling_factor=1.0,ntk_alpha=1.0,ntk_beta=32.0,device=None): super().__init__();self.head_dim,self.base,self.dtype=hd,base,dtype self.initial_context_length,self.scaling_factor=initial_context_length,scaling_factor self.ntk_alpha,self.ntk_beta,self.device=ntk_alpha,ntk_beta,device c,s=self._cs(max(int(initial_context_length*scaling_factor),initial_context_length),device=torch.device("cpu")) td=device or torch.device("cpu");self.register_buffer("cc",c.to(td),persistent=False);self.register_buffer("sc",s.to(td),persistent=False) def _inv(self,device=None): device=device or self.device;f=self.base**(torch.arange(0,self.head_dim,2,dtype=torch.float,device=device)/self.head_dim) if self.scaling_factor>1.0: cn=0.1*math.log(self.scaling_factor)+1.0;dh=self.head_dim/2 lo=dh*math.log(self.initial_context_length/(self.ntk_beta*2*math.pi))/math.log(self.base) hi=dh*math.log(self.initial_context_length/(self.ntk_alpha*2*math.pi))/math.log(self.base) ip=1.0/(self.scaling_factor*f);ep=1.0/f;r=(torch.arange(dh,dtype=torch.float32,device=device)-lo)/(hi-lo) m=1-r.clamp(0,1);return cn,ip*(1-m)+ep*m return 1.0,1.0/f def _cs(self,n,device=None): cn,iv=self._inv(device=device);device=device or self.device;t=torch.arange(n,dtype=torch.float32,device=device) fr=torch.einsum("i,j->ij",t,iv);return(fr.cos()*cn).to(self.dtype),(fr.sin()*cn).to(self.dtype) def forward(self,q,k): n=q.shape[0] if n>self.cc.shape[0]:c,s=self._cs(n,device=torch.device("cpu"));self.cc,self.sc=c.to(q.device),s.to(q.device) cc=self.cc.to(q.device) if self.cc.device!=q.device else self.cc;sc=self.sc.to(q.device) if self.sc.device!=q.device else self.sc c,s=cc[:n],sc[:n];qs=q.shape;q=apply_rope(q.view(n,-1,self.head_dim),c,s).reshape(qs) ks=k.shape;k=apply_rope(k.view(n,-1,self.head_dim),c,s).reshape(ks);return q,k def sdpa(Q,K,V,S,sm,ctx): n,nh,qm,hd=Q.shape;w=2*ctx+1;Kp=F.pad(K,(0,0,0,0,ctx,ctx));Vp=F.pad(V,(0,0,0,0,ctx,ctx)) Kw=Kp.unfold(0,w,1).permute(0,3,1,2);Vw=Vp.unfold(0,w,1).permute(0,3,1,2) idx=torch.arange(w,device=Q.device)-ctx;pos=torch.arange(n,device=Q.device)[:,None]+idx[None,:] v=(pos>=0)&(posnhqw",Q,Kw).float()*sm;sc=sc.masked_fill(~v[:,None,None,:],-float("inf")) ss=(S*math.log(2.0)).reshape(nh,qm)[None,:,:,None].expand(n,-1,-1,1);sc=torch.cat([sc,ss],dim=-1) wt=torch.softmax(sc,dim=-1)[...,:-1].to(V.dtype);return torch.einsum("nhqw,nwhd->nhqd",wt,Vw).reshape(n,-1) class AttentionBlock(torch.nn.Module): def __init__(self,cfg,device=None): super().__init__();pd=torch.bfloat16;self.hd,self.nah,self.nkvh=cfg.head_dim,cfg.num_attention_heads,cfg.num_key_value_heads self.bcs=int(cfg.bidirectional_context_size);self.sinks=torch.nn.Parameter(torch.empty(cfg.num_attention_heads,device=device,dtype=torch.float32)) self.norm=RMSNorm(cfg.hidden_size,device=device);qd=cfg.head_dim*(cfg.num_attention_heads+2*cfg.num_key_value_heads) self.qkv=torch.nn.Linear(cfg.hidden_size,qd,device=device,dtype=pd);self.out=torch.nn.Linear(cfg.head_dim*cfg.num_attention_heads,cfg.hidden_size,device=device,dtype=pd) self.qk_scale=1/math.sqrt(math.sqrt(cfg.head_dim)) self.rope=RotaryEmbedding(cfg.head_dim,int(cfg.rope_theta),torch.float32,initial_context_length=cfg.initial_context_length,scaling_factor=cfg.rope_scaling_factor,ntk_alpha=cfg.rope_ntk_alpha,ntk_beta=cfg.rope_ntk_beta,device=device) def forward(self,x): t=self.norm(x);t=t.to(self.qkv.weight.dtype) if t.dtype!=self.qkv.weight.dtype else t;qkv=F.linear(t,self.qkv.weight,self.qkv.bias) q=qkv[:,:self.nah*self.hd].contiguous();ks=self.nah*self.hd;km=(self.nah+self.nkvh)*self.hd;ke=(self.nah+2*self.nkvh)*self.hd k=qkv[:,ks:km].contiguous();v=qkv[:,km:ke].contiguous();q,k=self.rope(q,k);q=q*self.qk_scale;k=k*self.qk_scale;n=q.shape[0] q=q.view(n,self.nkvh,self.nah//self.nkvh,self.hd);k=k.view(n,self.nkvh,self.hd);v=v.view(n,self.nkvh,self.hd) ao=sdpa(q,k,v,self.sinks,1.0,self.bcs);ao=ao.to(self.out.weight.dtype) if ao.dtype!=self.out.weight.dtype else ao return x+F.linear(ao,self.out.weight,self.out.bias).to(x.dtype) def swiglu(x,alpha=1.702,limit=7.0):g,l=x.chunk(2,dim=-1);g=g.clamp(max=limit);l=l.clamp(min=-limit,max=limit);return g*torch.sigmoid(alpha*g)*(l+1) class MLPBlock(torch.nn.Module): def __init__(self,cfg,device=None): super().__init__();pd=torch.bfloat16;self.ne,self.ept,self.sl=cfg.num_experts,cfg.experts_per_token,7.0 self.norm=RMSNorm(cfg.hidden_size,device=device);self.gate=torch.nn.Linear(cfg.hidden_size,cfg.num_experts,device=device,dtype=pd) self.m1w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size,cfg.intermediate_size*2),device=device,dtype=pd)) self.m1b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size*2),device=device,dtype=pd)) self.m2w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size,cfg.hidden_size),device=device,dtype=pd)) self.m2b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size),device=device,dtype=pd)) def forward(self,x): t=self.norm(x);gs=F.linear(t.float(),self.gate.weight.float(),self.gate.bias.float()) exp=torch.topk(gs,k=self.ept,dim=-1,sorted=True);ew=torch.softmax(exp.values,dim=-1)/self.ept;ei=exp.indices;ept=self.ept def _c(tc,eic,ewc): te=tc.float().unsqueeze(1).expand(-1,eic.shape[1],-1);o=expert_linear(te,self.m1w[eic].float(),self.m1b[eic].float()) o=swiglu(o,limit=self.sl);o=expert_linear(o.float(),self.m2w[eic].float(),self.m2b[eic].float()) o=o.to(ewc.dtype) if o.dtype!=ewc.dtype else o;return(torch.einsum("bec,be->bc",o,ewc)*ept).to(x.dtype) cs=32 if t.shape[0]>cs:t=torch.cat([_c(t[s:s+cs],ei[s:s+cs],ew[s:s+cs]) for s in range(0,t.shape[0],cs)],dim=0) else:t=_c(t,ei,ew) return x+t class TransformerBlock(torch.nn.Module): def __init__(self,cfg,device=None):super().__init__();self.attn=AttentionBlock(cfg,device=device);self.mlp=MLPBlock(cfg,device=device) def forward(self,x):return self.mlp(self.attn(x)) class Checkpoint: @staticmethod def build_param_name_map(n): m={} for i in range(n):m[f"block.{i}.mlp.m1b"]=f"block.{i}.mlp.swiglu.bias";m[f"block.{i}.mlp.m1w"]=f"block.{i}.mlp.swiglu.weight";m[f"block.{i}.mlp.m2b"]=f"block.{i}.mlp.out.bias";m[f"block.{i}.mlp.m2w"]=f"block.{i}.mlp.out.weight" return m def __init__(self,path,device,num_hidden_layers): self.pnm=self.build_param_name_map(num_hidden_layers);self.ds=device.type if device.index is None else f"{device.type}:{device.index}" self.tnf={};sfs=[os.path.join(path,f) for f in os.listdir(path) if f.endswith(".safetensors")] for sf in sfs: with safe_open(sf,framework="pt",device=self.ds) as h: for k in h.keys():self.tnf[k]=sf def get(self,name): m=self.pnm.get(name,name) with safe_open(self.tnf[m],framework="pt",device=self.ds) as h:return h.get_tensor(m) class PrivacyFilterTransformer(torch.nn.Module): def __init__(self,config,device): super().__init__();pd=torch.bfloat16;self.embedding=torch.nn.Embedding(config.vocab_size,config.hidden_size,device=device,dtype=pd) self.block=torch.nn.ModuleList([TransformerBlock(config,device=device) for _ in range(config.num_hidden_layers)]) self.norm=RMSNorm(config.hidden_size,device=device);self.unembedding=torch.nn.Linear(config.hidden_size,config.num_labels,bias=False,device=device,dtype=pd) def forward(self,tid): x=self.embedding(tid) for b in self.block:x=b(x) return F.linear(self.norm(x),self.unembedding.weight,None) @classmethod def from_checkpoint(cls,cd,*,device): torch.backends.cuda.matmul.allow_tf32=False;torch.backends.cudnn.allow_tf32=False;torch.set_float32_matmul_precision("highest") with(Path(cd)/"config.json").open("r") as f:cc=json.load(f) validate_model_config_contract(cc,context=str(cd));cfg=ModelConfig.from_checkpoint_config(cc,context=str(cd)) ckpt=Checkpoint(cd,device,num_hidden_layers=cfg.num_hidden_layers);model=cls(config=cfg,device=device);model.eval() for name,param in model.named_parameters():param.data.copy_(ckpt.get(name)) return model @dataclass(frozen=True) class LabelInfo: boundary_label_lookup:dict;token_to_span_label:dict;token_boundary_tags:dict;span_class_names:tuple;span_label_lookup:dict;background_token_label:int;background_span_label:int def labels_to_spans(lbi,li): spans=[];cl=si=pi=None;bsl=li.background_span_label for ti in sorted(lbi): lid=lbi[ti];sl=li.token_to_span_label.get(lid);bt=li.token_boundary_tags.get(lid) if pi is not None and ti!=pi+1: if cl is not None and si is not None:spans.append((cl,si,pi+1)) cl=si=None if sl is None:pi=ti;continue if sl==bsl: if cl is not None and si is not None:spans.append((cl,si,ti)) cl=si=None;pi=ti;continue if bt=="S": if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) spans.append((sl,ti,ti+1));cl=si=None elif bt=="B": if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) cl=sl;si=ti elif bt=="I": if cl is None or cl!=sl: if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) cl=sl;si=ti elif bt=="E": if cl is None or cl!=sl or si is None: if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) spans.append((sl,ti,ti+1));cl=si=None else:spans.append((cl,si,ti+1));cl=si=None else: if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) cl=si=None pi=ti if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1)) return spans def token_spans_to_char_spans(sp,cs,ce):return[(l,cs[ts],ce[te-1]) for l,ts,te in sp if 0<=tscs[ts]] def trim_char_spans_whitespace(sp,text): o=[] for l,s,e in sp: if not(0<=ss and text[e-1].isspace():e-=1 if e>s:o.append((l,s,e)) return o @dataclass(frozen=True) class InferenceRuntime: model:PrivacyFilterTransformer;encoding:object;label_info:LabelInfo;device:torch.device;n_ctx:int @functools.lru_cache(maxsize=1) def get_viterbi_transition_biases(): cp=MODEL_DIR/"viterbi_calibration.json";default={k:0.0 for k in VITERBI_TRANSITION_BIAS_KEYS} if not cp.is_file():return default p=json.loads(cp.read_text(encoding="utf-8"));rb=p ops=p.get("operating_points") if ops:pe=ops.get(DEFAULT_VITERBI_CALIBRATION_PRESET);rb=pe.get("biases",rb) if isinstance(pe,dict) else rb if not isinstance(rb,dict):return default res={} for k in VITERBI_TRANSITION_BIAS_KEYS: v=rb.get(k) if isinstance(v,bool) or not isinstance(v,(int,float)):return default res[k]=float(v) return res @functools.lru_cache(maxsize=1) def get_runtime(): cc=json.loads((MODEL_DIR/"config.json").read_text(encoding="utf-8")) validate_model_config_contract(cc,context=str(MODEL_DIR/"config.json")) device=torch.device("cuda");enc=tiktoken.get_encoding(cc["encoding"].strip()) scn=[BACKGROUND_CLASS_LABEL];sll={BACKGROUND_CLASS_LABEL:0};bll={};tsl={};tbt={};bgi=None for i,name in enumerate(NER_CLASS_NAMES): if name==BACKGROUND_CLASS_LABEL:bgi=i;tsl[i]=0;tbt[i]=None;continue bd,bl=name.split("-",1);si=sll.get(bl) if si is None:si=len(scn);scn.append(bl);sll[bl]=si tsl[i]=si;tbt[i]=bd;bll.setdefault(bl,{})[bd]=i li=LabelInfo(boundary_label_lookup=bll,token_to_span_label=tsl,token_boundary_tags=tbt,span_class_names=tuple(scn),span_label_lookup=sll,background_token_label=bgi,background_span_label=0) model=PrivacyFilterTransformer.from_checkpoint(MODEL_DIR,device=device) return InferenceRuntime(model=model,encoding=enc,label_info=li,device=device,n_ctx=int(cc["default_n_ctx"])) class Decoder: def __init__(self,li): nc=len(li.token_to_span_label);self._ss=torch.full((nc,),-1e9,dtype=torch.float32);self._es=torch.full((nc,),-1e9,dtype=torch.float32) self._ts=torch.full((nc,nc),-1e9,dtype=torch.float32);tb=get_viterbi_transition_biases() bt,bs=li.background_token_label,li.background_span_label;tags,smap=li.token_boundary_tags,li.token_to_span_label for i in range(nc): tg=tags.get(i) if tg in{"B","S"} or i==bt:self._ss[i]=0.0 if tg in{"E","S"} or i==bt:self._es[i]=0.0 for j in range(nc): ntg,nsp,sp=tags.get(j),smap.get(j),smap.get(i);nib=nsp==bs or j==bt if(nsp is None or ntg is None) and not nib:continue if sp is None or tg is None: if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) elif sp==bs or tg in{"E","S"}: if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) elif tg in{"B","I"}: if sp==nsp and ntg in{"I","E"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb) @staticmethod def _tb(pt,ps,nt,ns,bs,b): nib=ns==bs;pib=ps==bs if pib:return b["transition_bias_background_stay"] if nib else b["transition_bias_background_to_start"] if pt in{"B","I"}:return b["transition_bias_inside_to_continue"] if nt=="I" else b["transition_bias_inside_to_end"] return b["transition_bias_end_to_background"] if nib else b["transition_bias_end_to_start"] def decode(self,lp): sl,nc=lp.shape if sl==0:return[] ss=self._ss.to(device=lp.device,dtype=lp.dtype);es=self._es.to(device=lp.device,dtype=lp.dtype);ts=self._ts.to(device=lp.device,dtype=lp.dtype) sc=lp[0]+ss;bp=torch.empty((sl-1,nc),device=lp.device,dtype=torch.int64) for i in range(1,sl):tr=sc.unsqueeze(1)+ts;bs_,bi=tr.max(dim=0);sc=bs_+lp[i];bp[i-1]=bi if not torch.isfinite(sc).any():return lp.argmax(dim=1).tolist() sc=sc+es;ll=sc.argmax();path=torch.empty((sl,),device=lp.device,dtype=torch.int64);path[-1]=ll for i in range(sl-2,-1,-1):ll=bp[i,ll];path[i]=ll return path.tolist() @torch.inference_mode() def predict_text_gpu(rt,text,dec): tids=tuple(int(t) for t in rt.encoding.encode(text,allowed_special="all")) if not tids:return text,[] svs=[] for s in range(0,len(tids),rt.n_ctx): e=min(s+rt.n_ctx,len(tids));wt=torch.tensor(tids[s:e],device=rt.device,dtype=torch.int32) lp=F.log_softmax(rt.model(wt).float(),dim=-1);svs.extend(lp.unbind(0)) if not svs:return text,[] stacked=torch.stack(svs,dim=0);dl=dec.decode(stacked) if len(dl)!=len(tids):dl=stacked.argmax(dim=1).tolist() pli={i:int(l) for i,l in enumerate(dl)};pts=labels_to_spans(pli,rt.label_info) tbs=[rt.encoding.decode_single_token_bytes(t) for t in tids];dt=b"".join(tbs).decode("utf-8",errors="replace") cbs,cbe=[],[];bc=0 for ch in dt:cbs.append(bc);bc+=len(ch.encode("utf-8"));cbe.append(bc) cs,ce=[],[];tbc=0 for rb in tbs: s0=tbc;s1=s0+len(rb);tbc=s1;si=bisect_right(cbe,s0);ei=bisect_left(cbs,s1) if eia["start"]] if ov: rs=r["end"]-r["start"];ms=max(ai[i]["end"]-ai[i]["start"] for i in ov) if rs>ms:combined.append(r);[used.__setitem__(i,True) for i in ov] else:combined.append(r) for i,a in enumerate(ai): if not used[i]:combined.append(a) return sorted(combined,key=lambda e:e["start"]) # ─── EasyOCR ───────────────────────────────────────────────────────────────────── ocr_reader=None try: import easyocr;ocr_reader=easyocr.Reader(["en"],gpu=False,verbose=False);print("EasyOCR loaded.") except Exception as e:print(f"EasyOCR not available: {e}") # ─── PDF helpers ───────────────────────────────────────────────────────────────── def build_char_to_bbox_map(page): words=page.get_text("words");ft=page.get_text("text");cm={};ss=0 for w in words: wt=w[4];idx=ft.find(wt,ss) if idx==-1:continue r=fitz.Rect(w[0],w[1],w[2],w[3]) for ci in range(idx,idx+len(wt)):cm[ci]=r ss=idx+len(wt) return cm,ft def get_redact_rects(cm,s,e): rects={} for ci in range(s,e): if ci in cm:r=cm[ci];k=(r.x0,r.y0,r.x1,r.y1);rects.setdefault(k,r) return list(rects.values()) def ocr_page(page): if ocr_reader is None:return[] import numpy as np;pix=page.get_pixmap(matrix=fitz.Matrix(2,2)) img=np.frombuffer(pix.samples,dtype=np.uint8).reshape(pix.h,pix.w,pix.n) if pix.n==4:img=img[:,:,:3] results=ocr_reader.readtext(img);wb=[] for(pts,text,conf) in results: if conf<0.3 or not text.strip():continue xs=[p[0]*0.5 for p in pts];ys=[p[1]*0.5 for p in pts] wb.append((fitz.Rect(min(xs),min(ys),max(xs),max(ys)),text)) return wb # ─── Main redaction pipeline ──────────────────────────────────────────────────── def redact_pdf(input_file,sensitivity="Standard",use_regex=True,use_ocr=True,progress=gr.Progress()): if input_file is None:raise gr.Error("Please upload a PDF file first.") input_path=input_file.name if hasattr(input_file,'name') else input_file if isinstance(input_file,str) else None if not input_path or not os.path.exists(input_path):raise gr.Error("File not found.") min_score={"Conservative":0.7,"Standard":0.4,"Aggressive":0.2}.get(sensitivity,0.4) if not USE_PRIVACY_FILTER else {"Conservative":0.85,"Standard":0.60,"Aggressive":0.35}.get(sensitivity,0.60) start_time=time.time() try:doc=fitz.open(input_path) except Exception as e:raise gr.Error(f"Failed to open PDF: {e}") total_pages=len(doc);all_report=[];total_redactions=0;type_counts={};ocr_pages=0 ml="openai/privacy-filter (GPU)" if USE_PRIVACY_FILTER else "GLiNER-PII (CPU)" progress(0,desc=f"Starting PII detection with {ml}...") for pi in range(total_pages): page=doc[pi];page_ents=[];progress(pi/total_pages,desc=f"Processing page {pi+1}/{total_pages}...") text=page.get_text("text");is_scanned=len(text.strip())<10 if is_scanned and use_ocr: ocr_pages+=1;wb=ocr_page(page) if not wb:continue ft=" ".join(t for _,t in wb) ai_ents=get_pii_entities_ai(ft) if USE_PRIVACY_FILTER else get_pii_entities_ai(ft,min_score) re_ents=get_pii_entities_regex(ft) if use_regex else[];ents=merge_entities(ai_ents,re_ents) for ent in ents: if ent["score"]3 else "***","confidence":round(e["score"],3),"source":e["source"]} for e in page_ents]}) output_path=tempfile.mktemp(suffix="_redacted.pdf");doc.save(output_path,garbage=4,deflate=True);doc.close() elapsed=time.time()-start_time mr="[openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B params, GPU)" if USE_PRIVACY_FILTER else "[GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ types, CPU)" lines=["# \U0001f4c4 PDF PII Redaction Report","",f"**Processing time:** {elapsed:.1f}s",f"**Pages processed:** {total_pages}",f"**Pages with OCR:** {ocr_pages}",f"**Sensitivity:** {sensitivity} (threshold: {min_score})",f"**Regex:** {'On' if use_regex else 'Off'}",f"**AI Model:** {mr}",f"**Entities detected:** {sum(type_counts.values())}",f"**Redaction boxes:** {total_redactions}","","## Entity Types",""] if type_counts: lines+=["| Type | Description | Count |","|------|-------------|-------|"] for l,c in sorted(type_counts.items(),key=lambda x:-x[1]):lines.append(f"| {l} | {NER_LABEL_DESCRIPTIONS.get(l,l)} | {c} |") else:lines.append("*No PII detected.*") lines+=["","## Details by Page",""] for pr in all_report: lines.append(f"### Page {pr['page']}") for e in pr["entities"]:lines.append(f"- **{e['type']}**: `{e['text']}` (conf: {e['confidence']}, src: {e['source']})") lines.append("") return output_path,"\n".join(lines) # ─── Gradio UI ────────────────────────────────────────────────────────────────── _badge="\U0001f7e2 **GPU mode** \u2014 [openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B MoE)" if USE_PRIVACY_FILTER else "\U0001f7e1 **CPU mode** \u2014 [GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ entity types, F1=81%)" DESCRIPTION=f""" # \U0001f512 PDF PII Redactor **Automatically detect and redact sensitive information from PDF documents.** {_badge} When running on ZeroGPU, uses OpenAI Privacy Filter (1.4B params, 128k context). On CPU, uses GLiNER-PII (zero-shot, 60+ entity types, F1=81%). ### \U0001f6e1\ufe0f What gets redacted: | Category | Types | |----------|-------| | **Personal** | Names, Addresses, URLs, Dates | | **Financial** | Account Numbers, Credit Cards, IBANs | | **Contact** | Emails, Phone Numbers | | **Credentials** | Passwords, API Keys, Tokens | | **Identifiers** | SSN, Driver's License, Passport | | **Medical** | Medical Record Numbers | > \u26a0\ufe0f Redaction is **permanent** \u2014 keep a backup of your original. """ with gr.Blocks(title="PDF PII Redactor") as demo: gr.Markdown(DESCRIPTION) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### \U0001f4e4 Upload & Configure") input_pdf=gr.File(label="Upload PDF",file_types=[".pdf"],type="filepath") with gr.Row(): sensitivity=gr.Radio(choices=["Conservative","Standard","Aggressive"],value="Standard",label="Sensitivity") with gr.Row(): use_regex=gr.Checkbox(value=True,label="Regex Patterns") use_ocr=gr.Checkbox(value=True,label="OCR for Scanned Pages") redact_btn=gr.Button("\U0001f512 Redact",variant="primary",size="lg") with gr.Column(scale=1): gr.Markdown("### \U0001f4e5 Output") output_pdf=gr.File(label="Redacted PDF") gr.Markdown("### \U0001f4ca Report") report_output=gr.Markdown(value="*Upload a PDF and click Redact.*") redact_btn.click(fn=redact_pdf,inputs=[input_pdf,sensitivity,use_regex,use_ocr],outputs=[output_pdf,report_output]) gr.Markdown("---") gr.Markdown(f"### \u2139\ufe0f About\n**Active:** {_badge}\n\n**PDF Engine:** PyMuPDF \u2022 **OCR:** EasyOCR") if __name__=="__main__": demo.launch(server_name="0.0.0.0",server_port=7860)