alianassmaaa's picture
Fix: upload actual app.py content (was literal string from previous upload)
a5f4663 verified
"""
PDF PII Redactor β€” Detects and redacts sensitive information from PDF documents.
Dual-mode AI backend:
- GPU (ZeroGPU): openai/privacy-filter (1.4B MoE, Viterbi decoding, 128k context)
- CPU fallback: GLiNER-PII (zero-shot, 60+ entity types, F1=81%, Apache 2.0)
Also uses:
- PyMuPDF (fitz) for PDF text extraction with bounding boxes and redaction
- EasyOCR as fallback for scanned/image-based PDFs
- Regex patterns for additional sensitive data (credentials, financial, medical codes)
"""
import dataclasses
import functools
import json
import math
import os
import re
import tempfile
import time
from bisect import bisect_left, bisect_right
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Final
import fitz # PyMuPDF
import gradio as gr
import torch
import torch.nn.functional as F
# ─── Detect ZeroGPU / CUDA availability ─────────────────────────────────────────
# On ZeroGPU Spaces, the `spaces` library is installed AND torch.cuda.is_available()
# returns True at startup (CUDA emulation mode). On plain CPU Spaces, the `spaces`
# library is also installed but torch.cuda.is_available() returns False.
# We use BOTH checks to decide whether to use the GPU path.
ZEROGPU_AVAILABLE = False
try:
import spaces
ZEROGPU_AVAILABLE = True
print("ZeroGPU: `spaces` library detected.")
except ImportError:
print("ZeroGPU: `spaces` library not available.")
# The critical check: on ZeroGPU, CUDA emulation is active at startup so this is True.
# On CPU-only Spaces, this is False even though `spaces` is importable.
CUDA_AVAILABLE = torch.cuda.is_available()
print(f"CUDA available at startup: {CUDA_AVAILABLE}")
# ─── Conditional heavy imports (only needed for the GPU path) ────────────────────
HAS_PRIVACY_FILTER_DEPS = False
try:
from safetensors import safe_open
import tiktoken
from huggingface_hub import snapshot_download
HAS_PRIVACY_FILTER_DEPS = True
except ImportError:
pass
# ─── Download models at startup based on what's available ───────────────────────
# Only attempt the 2.8GB privacy-filter download if we have CUDA (real or emulated)
MODEL_DIR = None
USE_PRIVACY_FILTER = False
if HAS_PRIVACY_FILTER_DEPS and CUDA_AVAILABLE:
try:
print("Downloading openai/privacy-filter model...")
_root = snapshot_download("openai/privacy-filter", allow_patterns=["original/*"])
MODEL_DIR = Path(_root) / "original"
if MODEL_DIR.exists() and any(MODEL_DIR.glob("*.safetensors")):
USE_PRIVACY_FILTER = True
print("openai/privacy-filter checkpoint ready.")
else:
print("openai/privacy-filter: no safetensors found β€” falling back to CPU.")
except Exception as e:
print(f"Could not download openai/privacy-filter: {e} β€” falling back to CPU.")
elif not CUDA_AVAILABLE:
print("No CUDA available β€” skipping openai/privacy-filter download, will use CPU model.")
# CPU fallback: load GLiNER-PII (zero-shot, 60+ entity types, F1=81%)
gliner_model = None
GLINER_PII_LABELS = [
"name", "email address", "phone number", "location address",
"location city", "location zip", "ssn", "credit card number",
"date of birth", "password", "ip address", "account number",
"passport number", "driver license", "username", "url",
]
if not USE_PRIVACY_FILTER:
print("Loading CPU model (GLiNER-PII-base-v1.0)...")
from gliner import GLiNER
gliner_model = GLiNER.from_pretrained("knowledgator/gliner-pii-base-v1.0")
print("GLiNER-PII loaded on CPU.")
ACTIVE_MODEL = "openai/privacy-filter" if USE_PRIVACY_FILTER else "GLiNER-PII"
print(f"Active PII model: {ACTIVE_MODEL}")
# ─── Model constants (privacy-filter) ───────────────────────────────────────────
PRIVACY_FILTER_MODEL_TYPE: Final[str] = "privacy_filter"
REQUIRED_MODEL_CONFIG_KEYS: Final[tuple[str, ...]] = (
"model_type", "encoding", "num_hidden_layers", "num_experts", "experts_per_token",
"vocab_size", "num_labels", "hidden_size", "intermediate_size", "head_dim",
"num_attention_heads", "num_key_value_heads", "sliding_window",
"bidirectional_context", "bidirectional_left_context", "bidirectional_right_context",
"default_n_ctx", "initial_context_length", "rope_theta", "rope_scaling_factor",
"rope_ntk_alpha", "rope_ntk_beta", "param_dtype",
)
BACKGROUND_CLASS_LABEL: Final[str] = "O"
BOUNDARY_PREFIXES: Final[tuple[str, ...]] = ("B", "I", "E", "S")
SPAN_CLASS_NAMES: Final[tuple[str, ...]] = (
BACKGROUND_CLASS_LABEL, "account_number", "private_address", "private_date",
"private_email", "private_person", "private_phone", "private_url", "secret",
)
NER_CLASS_NAMES: Final[tuple[str, ...]] = (BACKGROUND_CLASS_LABEL,) + tuple(
f"{prefix}-{base_label}"
for base_label in SPAN_CLASS_NAMES if base_label != BACKGROUND_CLASS_LABEL
for prefix in BOUNDARY_PREFIXES
)
VITERBI_TRANSITION_BIAS_KEYS: Final[tuple[str, ...]] = (
"transition_bias_background_stay", "transition_bias_background_to_start",
"transition_bias_inside_to_continue", "transition_bias_inside_to_end",
"transition_bias_end_to_background", "transition_bias_end_to_start",
)
DEFAULT_VITERBI_CALIBRATION_PRESET: Final[str] = "default"
# ─── Regex patterns ─────────────────────────────────────────────────────────────
REGEX_PATTERNS = {
"SSN": r"\b\d{3}[-\u2013]\d{2}[-\u2013]\d{4}\b",
"CREDIT_CARD": r"\b(?:\d{4}[-\u2013 ]?){3}\d{4}\b",
"PHONE_INTL": r"\b\+\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b",
"IP_ADDRESS": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
"DATE_OF_BIRTH": r"\b(?:DOB|Date of Birth|Born)[:\s]*\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"PASSPORT": r"\b(?:passport|Passport)[:\s]*[A-Z]{1,2}\d{6,9}\b",
"IBAN": r"\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}(?:[A-Z0-9]?){0,16}\b",
"DRIVER_LICENSE": r"\b(?:DL|Driver'?s?\s*(?:License|Licence))[:\s]*[A-Z0-9]{5,15}\b",
"MEDICAL_RECORD": r"\b(?:MRN|Medical Record|Patient ID)[:\s#]*[\w\-]+\b",
"API_KEY": r"\b(?:sk|pk|api[_-]?key|token|secret)[_\-]?[A-Za-z0-9]{20,}\b",
"AWS_KEY": r"\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b",
"PRIVATE_KEY": r"-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----",
"BITCOIN_ADDR": r"\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b",
"MAC_ADDRESS": r"\b(?:[0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}\b",
}
# ─── Label descriptions (superset of both models) ───────────────────────────────
NER_LABEL_DESCRIPTIONS = {
"private_person": "Person Name", "private_address": "Physical Address",
"private_email": "Email Address", "private_phone": "Phone Number",
"private_url": "Personal URL", "private_date": "Date (tied to a person)",
"account_number": "Account Number", "secret": "Secret (password, API key, token)",
# GLiNER-PII labels
"name": "Person Name", "email address": "Email Address", "phone number": "Phone Number",
"location address": "Physical Address", "location city": "City",
"location zip": "Zip Code", "ssn": "Social Security Number",
"credit card number": "Credit Card Number", "date of birth": "Date of Birth",
"password": "Password", "ip address": "IP Address",
"passport number": "Passport Number", "driver license": "Driver License",
"username": "Username", "url": "URL",
# Regex labels
"SSN": "Social Security Number (regex)", "CREDIT_CARD": "Credit Card (regex)",
"PHONE_INTL": "International Phone (regex)", "IP_ADDRESS": "IP Address",
"DATE_OF_BIRTH": "Date of Birth (regex)", "PASSPORT": "Passport Number",
"IBAN": "IBAN", "DRIVER_LICENSE": "Driver License (regex)",
"MEDICAL_RECORD": "Medical Record Number", "API_KEY": "API Key / Token",
"AWS_KEY": "AWS Access Key", "PRIVATE_KEY": "Private Key",
"BITCOIN_ADDR": "Bitcoin Address", "MAC_ADDRESS": "MAC Address",
}
# ═══════════════════════════════════════════════════════════════════════════════
# GPU PATH: openai/privacy-filter
# ═══════════════════════════════════════════════════════════════════════════════
def validate_model_config_contract(cfg, *, context):
missing = [k for k in REQUIRED_MODEL_CONFIG_KEYS if k not in cfg]
if missing: raise ValueError(f"{context} missing: {missing}")
rl=cfg.get("bidirectional_left_context"); rr=cfg.get("bidirectional_right_context")
if not isinstance(rl,int) or isinstance(rl,bool) or not isinstance(rr,int) or isinstance(rr,bool) or rl!=rr or rl<0: raise ValueError("ctx")
if cfg.get("sliding_window")!=2*rl+1: raise ValueError("sw")
if cfg["num_labels"]!=33: raise ValueError("labels")
if cfg["param_dtype"]!="bfloat16": raise ValueError("dtype")
def expert_linear(x,w,b):
nr,exp,kd=x.shape;_,_,_,od=w.shape
o=torch.bmm(x.reshape(nr*exp,1,kd),w.reshape(nr*exp,kd,od)).reshape(nr,exp,od)
if b is not None: o=o+b
return o
@dataclass
class ModelConfig:
num_hidden_layers:int;num_experts:int;experts_per_token:int;vocab_size:int;num_labels:int
hidden_size:int;intermediate_size:int;head_dim:int;num_attention_heads:int;num_key_value_heads:int
bidirectional_context_size:int;initial_context_length:int;rope_theta:float;rope_scaling_factor:float
rope_ntk_alpha:float;rope_ntk_beta:float
@classmethod
def from_checkpoint_config(cls,c,*,context):
c=dict(c);c["bidirectional_context_size"]=c["bidirectional_left_context"]
fs={f.name:f for f in dataclasses.fields(cls)};return cls(**{k:v for k,v in c.items() if k in fs})
class RMSNorm(torch.nn.Module):
def __init__(self,nf,eps=1e-05,device=None):
super().__init__();self.eps=eps;self.scale=torch.nn.Parameter(torch.ones(nf,device=device,dtype=torch.float32))
def forward(self,x):t=x.float();return(t*torch.rsqrt(t.pow(2).mean(-1,keepdim=True)+self.eps)*self.scale).to(x.dtype)
def apply_rope(x,cos,sin):
cos=cos.unsqueeze(-2).to(x.dtype);sin=sin.unsqueeze(-2).to(x.dtype)
x1,x2=x[...,::2],x[...,1::2];return torch.stack((x1*cos-x2*sin,x2*cos+x1*sin),dim=-1).reshape(x.shape)
class RotaryEmbedding(torch.nn.Module):
def __init__(self,hd,base,dtype,*,initial_context_length=4096,scaling_factor=1.0,ntk_alpha=1.0,ntk_beta=32.0,device=None):
super().__init__();self.head_dim,self.base,self.dtype=hd,base,dtype
self.initial_context_length,self.scaling_factor=initial_context_length,scaling_factor
self.ntk_alpha,self.ntk_beta,self.device=ntk_alpha,ntk_beta,device
c,s=self._cs(max(int(initial_context_length*scaling_factor),initial_context_length),device=torch.device("cpu"))
td=device or torch.device("cpu");self.register_buffer("cc",c.to(td),persistent=False);self.register_buffer("sc",s.to(td),persistent=False)
def _inv(self,device=None):
device=device or self.device;f=self.base**(torch.arange(0,self.head_dim,2,dtype=torch.float,device=device)/self.head_dim)
if self.scaling_factor>1.0:
cn=0.1*math.log(self.scaling_factor)+1.0;dh=self.head_dim/2
lo=dh*math.log(self.initial_context_length/(self.ntk_beta*2*math.pi))/math.log(self.base)
hi=dh*math.log(self.initial_context_length/(self.ntk_alpha*2*math.pi))/math.log(self.base)
ip=1.0/(self.scaling_factor*f);ep=1.0/f;r=(torch.arange(dh,dtype=torch.float32,device=device)-lo)/(hi-lo)
m=1-r.clamp(0,1);return cn,ip*(1-m)+ep*m
return 1.0,1.0/f
def _cs(self,n,device=None):
cn,iv=self._inv(device=device);device=device or self.device;t=torch.arange(n,dtype=torch.float32,device=device)
fr=torch.einsum("i,j->ij",t,iv);return(fr.cos()*cn).to(self.dtype),(fr.sin()*cn).to(self.dtype)
def forward(self,q,k):
n=q.shape[0]
if n>self.cc.shape[0]:c,s=self._cs(n,device=torch.device("cpu"));self.cc,self.sc=c.to(q.device),s.to(q.device)
cc=self.cc.to(q.device) if self.cc.device!=q.device else self.cc;sc=self.sc.to(q.device) if self.sc.device!=q.device else self.sc
c,s=cc[:n],sc[:n];qs=q.shape;q=apply_rope(q.view(n,-1,self.head_dim),c,s).reshape(qs)
ks=k.shape;k=apply_rope(k.view(n,-1,self.head_dim),c,s).reshape(ks);return q,k
def sdpa(Q,K,V,S,sm,ctx):
n,nh,qm,hd=Q.shape;w=2*ctx+1;Kp=F.pad(K,(0,0,0,0,ctx,ctx));Vp=F.pad(V,(0,0,0,0,ctx,ctx))
Kw=Kp.unfold(0,w,1).permute(0,3,1,2);Vw=Vp.unfold(0,w,1).permute(0,3,1,2)
idx=torch.arange(w,device=Q.device)-ctx;pos=torch.arange(n,device=Q.device)[:,None]+idx[None,:]
v=(pos>=0)&(pos<n);sc=torch.einsum("nhqd,nwhd->nhqw",Q,Kw).float()*sm;sc=sc.masked_fill(~v[:,None,None,:],-float("inf"))
ss=(S*math.log(2.0)).reshape(nh,qm)[None,:,:,None].expand(n,-1,-1,1);sc=torch.cat([sc,ss],dim=-1)
wt=torch.softmax(sc,dim=-1)[...,:-1].to(V.dtype);return torch.einsum("nhqw,nwhd->nhqd",wt,Vw).reshape(n,-1)
class AttentionBlock(torch.nn.Module):
def __init__(self,cfg,device=None):
super().__init__();pd=torch.bfloat16;self.hd,self.nah,self.nkvh=cfg.head_dim,cfg.num_attention_heads,cfg.num_key_value_heads
self.bcs=int(cfg.bidirectional_context_size);self.sinks=torch.nn.Parameter(torch.empty(cfg.num_attention_heads,device=device,dtype=torch.float32))
self.norm=RMSNorm(cfg.hidden_size,device=device);qd=cfg.head_dim*(cfg.num_attention_heads+2*cfg.num_key_value_heads)
self.qkv=torch.nn.Linear(cfg.hidden_size,qd,device=device,dtype=pd);self.out=torch.nn.Linear(cfg.head_dim*cfg.num_attention_heads,cfg.hidden_size,device=device,dtype=pd)
self.qk_scale=1/math.sqrt(math.sqrt(cfg.head_dim))
self.rope=RotaryEmbedding(cfg.head_dim,int(cfg.rope_theta),torch.float32,initial_context_length=cfg.initial_context_length,scaling_factor=cfg.rope_scaling_factor,ntk_alpha=cfg.rope_ntk_alpha,ntk_beta=cfg.rope_ntk_beta,device=device)
def forward(self,x):
t=self.norm(x);t=t.to(self.qkv.weight.dtype) if t.dtype!=self.qkv.weight.dtype else t;qkv=F.linear(t,self.qkv.weight,self.qkv.bias)
q=qkv[:,:self.nah*self.hd].contiguous();ks=self.nah*self.hd;km=(self.nah+self.nkvh)*self.hd;ke=(self.nah+2*self.nkvh)*self.hd
k=qkv[:,ks:km].contiguous();v=qkv[:,km:ke].contiguous();q,k=self.rope(q,k);q=q*self.qk_scale;k=k*self.qk_scale;n=q.shape[0]
q=q.view(n,self.nkvh,self.nah//self.nkvh,self.hd);k=k.view(n,self.nkvh,self.hd);v=v.view(n,self.nkvh,self.hd)
ao=sdpa(q,k,v,self.sinks,1.0,self.bcs);ao=ao.to(self.out.weight.dtype) if ao.dtype!=self.out.weight.dtype else ao
return x+F.linear(ao,self.out.weight,self.out.bias).to(x.dtype)
def swiglu(x,alpha=1.702,limit=7.0):g,l=x.chunk(2,dim=-1);g=g.clamp(max=limit);l=l.clamp(min=-limit,max=limit);return g*torch.sigmoid(alpha*g)*(l+1)
class MLPBlock(torch.nn.Module):
def __init__(self,cfg,device=None):
super().__init__();pd=torch.bfloat16;self.ne,self.ept,self.sl=cfg.num_experts,cfg.experts_per_token,7.0
self.norm=RMSNorm(cfg.hidden_size,device=device);self.gate=torch.nn.Linear(cfg.hidden_size,cfg.num_experts,device=device,dtype=pd)
self.m1w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size,cfg.intermediate_size*2),device=device,dtype=pd))
self.m1b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size*2),device=device,dtype=pd))
self.m2w=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.intermediate_size,cfg.hidden_size),device=device,dtype=pd))
self.m2b=torch.nn.Parameter(torch.empty((cfg.num_experts,cfg.hidden_size),device=device,dtype=pd))
def forward(self,x):
t=self.norm(x);gs=F.linear(t.float(),self.gate.weight.float(),self.gate.bias.float())
exp=torch.topk(gs,k=self.ept,dim=-1,sorted=True);ew=torch.softmax(exp.values,dim=-1)/self.ept;ei=exp.indices;ept=self.ept
def _c(tc,eic,ewc):
te=tc.float().unsqueeze(1).expand(-1,eic.shape[1],-1);o=expert_linear(te,self.m1w[eic].float(),self.m1b[eic].float())
o=swiglu(o,limit=self.sl);o=expert_linear(o.float(),self.m2w[eic].float(),self.m2b[eic].float())
o=o.to(ewc.dtype) if o.dtype!=ewc.dtype else o;return(torch.einsum("bec,be->bc",o,ewc)*ept).to(x.dtype)
cs=32
if t.shape[0]>cs:t=torch.cat([_c(t[s:s+cs],ei[s:s+cs],ew[s:s+cs]) for s in range(0,t.shape[0],cs)],dim=0)
else:t=_c(t,ei,ew)
return x+t
class TransformerBlock(torch.nn.Module):
def __init__(self,cfg,device=None):super().__init__();self.attn=AttentionBlock(cfg,device=device);self.mlp=MLPBlock(cfg,device=device)
def forward(self,x):return self.mlp(self.attn(x))
class Checkpoint:
@staticmethod
def build_param_name_map(n):
m={}
for i in range(n):m[f"block.{i}.mlp.m1b"]=f"block.{i}.mlp.swiglu.bias";m[f"block.{i}.mlp.m1w"]=f"block.{i}.mlp.swiglu.weight";m[f"block.{i}.mlp.m2b"]=f"block.{i}.mlp.out.bias";m[f"block.{i}.mlp.m2w"]=f"block.{i}.mlp.out.weight"
return m
def __init__(self,path,device,num_hidden_layers):
self.pnm=self.build_param_name_map(num_hidden_layers);self.ds=device.type if device.index is None else f"{device.type}:{device.index}"
self.tnf={};sfs=[os.path.join(path,f) for f in os.listdir(path) if f.endswith(".safetensors")]
for sf in sfs:
with safe_open(sf,framework="pt",device=self.ds) as h:
for k in h.keys():self.tnf[k]=sf
def get(self,name):
m=self.pnm.get(name,name)
with safe_open(self.tnf[m],framework="pt",device=self.ds) as h:return h.get_tensor(m)
class PrivacyFilterTransformer(torch.nn.Module):
def __init__(self,config,device):
super().__init__();pd=torch.bfloat16;self.embedding=torch.nn.Embedding(config.vocab_size,config.hidden_size,device=device,dtype=pd)
self.block=torch.nn.ModuleList([TransformerBlock(config,device=device) for _ in range(config.num_hidden_layers)])
self.norm=RMSNorm(config.hidden_size,device=device);self.unembedding=torch.nn.Linear(config.hidden_size,config.num_labels,bias=False,device=device,dtype=pd)
def forward(self,tid):
x=self.embedding(tid)
for b in self.block:x=b(x)
return F.linear(self.norm(x),self.unembedding.weight,None)
@classmethod
def from_checkpoint(cls,cd,*,device):
torch.backends.cuda.matmul.allow_tf32=False;torch.backends.cudnn.allow_tf32=False;torch.set_float32_matmul_precision("highest")
with(Path(cd)/"config.json").open("r") as f:cc=json.load(f)
validate_model_config_contract(cc,context=str(cd));cfg=ModelConfig.from_checkpoint_config(cc,context=str(cd))
ckpt=Checkpoint(cd,device,num_hidden_layers=cfg.num_hidden_layers);model=cls(config=cfg,device=device);model.eval()
for name,param in model.named_parameters():param.data.copy_(ckpt.get(name))
return model
@dataclass(frozen=True)
class LabelInfo:
boundary_label_lookup:dict;token_to_span_label:dict;token_boundary_tags:dict;span_class_names:tuple;span_label_lookup:dict;background_token_label:int;background_span_label:int
def labels_to_spans(lbi,li):
spans=[];cl=si=pi=None;bsl=li.background_span_label
for ti in sorted(lbi):
lid=lbi[ti];sl=li.token_to_span_label.get(lid);bt=li.token_boundary_tags.get(lid)
if pi is not None and ti!=pi+1:
if cl is not None and si is not None:spans.append((cl,si,pi+1))
cl=si=None
if sl is None:pi=ti;continue
if sl==bsl:
if cl is not None and si is not None:spans.append((cl,si,ti))
cl=si=None;pi=ti;continue
if bt=="S":
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
spans.append((sl,ti,ti+1));cl=si=None
elif bt=="B":
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
cl=sl;si=ti
elif bt=="I":
if cl is None or cl!=sl:
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
cl=sl;si=ti
elif bt=="E":
if cl is None or cl!=sl or si is None:
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
spans.append((sl,ti,ti+1));cl=si=None
else:spans.append((cl,si,ti+1));cl=si=None
else:
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
cl=si=None
pi=ti
if cl is not None and si is not None and pi is not None:spans.append((cl,si,pi+1))
return spans
def token_spans_to_char_spans(sp,cs,ce):return[(l,cs[ts],ce[te-1]) for l,ts,te in sp if 0<=ts<te<=len(cs) and ce[te-1]>cs[ts]]
def trim_char_spans_whitespace(sp,text):
o=[]
for l,s,e in sp:
if not(0<=s<e<=len(text)):continue
while s<e and text[s].isspace():s+=1
while e>s and text[e-1].isspace():e-=1
if e>s:o.append((l,s,e))
return o
@dataclass(frozen=True)
class InferenceRuntime:
model:PrivacyFilterTransformer;encoding:object;label_info:LabelInfo;device:torch.device;n_ctx:int
@functools.lru_cache(maxsize=1)
def get_viterbi_transition_biases():
cp=MODEL_DIR/"viterbi_calibration.json";default={k:0.0 for k in VITERBI_TRANSITION_BIAS_KEYS}
if not cp.is_file():return default
p=json.loads(cp.read_text(encoding="utf-8"));rb=p
ops=p.get("operating_points")
if ops:pe=ops.get(DEFAULT_VITERBI_CALIBRATION_PRESET);rb=pe.get("biases",rb) if isinstance(pe,dict) else rb
if not isinstance(rb,dict):return default
res={}
for k in VITERBI_TRANSITION_BIAS_KEYS:
v=rb.get(k)
if isinstance(v,bool) or not isinstance(v,(int,float)):return default
res[k]=float(v)
return res
@functools.lru_cache(maxsize=1)
def get_runtime():
cc=json.loads((MODEL_DIR/"config.json").read_text(encoding="utf-8"))
validate_model_config_contract(cc,context=str(MODEL_DIR/"config.json"))
device=torch.device("cuda");enc=tiktoken.get_encoding(cc["encoding"].strip())
scn=[BACKGROUND_CLASS_LABEL];sll={BACKGROUND_CLASS_LABEL:0};bll={};tsl={};tbt={};bgi=None
for i,name in enumerate(NER_CLASS_NAMES):
if name==BACKGROUND_CLASS_LABEL:bgi=i;tsl[i]=0;tbt[i]=None;continue
bd,bl=name.split("-",1);si=sll.get(bl)
if si is None:si=len(scn);scn.append(bl);sll[bl]=si
tsl[i]=si;tbt[i]=bd;bll.setdefault(bl,{})[bd]=i
li=LabelInfo(boundary_label_lookup=bll,token_to_span_label=tsl,token_boundary_tags=tbt,span_class_names=tuple(scn),span_label_lookup=sll,background_token_label=bgi,background_span_label=0)
model=PrivacyFilterTransformer.from_checkpoint(MODEL_DIR,device=device)
return InferenceRuntime(model=model,encoding=enc,label_info=li,device=device,n_ctx=int(cc["default_n_ctx"]))
class Decoder:
def __init__(self,li):
nc=len(li.token_to_span_label);self._ss=torch.full((nc,),-1e9,dtype=torch.float32);self._es=torch.full((nc,),-1e9,dtype=torch.float32)
self._ts=torch.full((nc,nc),-1e9,dtype=torch.float32);tb=get_viterbi_transition_biases()
bt,bs=li.background_token_label,li.background_span_label;tags,smap=li.token_boundary_tags,li.token_to_span_label
for i in range(nc):
tg=tags.get(i)
if tg in{"B","S"} or i==bt:self._ss[i]=0.0
if tg in{"E","S"} or i==bt:self._es[i]=0.0
for j in range(nc):
ntg,nsp,sp=tags.get(j),smap.get(j),smap.get(i);nib=nsp==bs or j==bt
if(nsp is None or ntg is None) and not nib:continue
if sp is None or tg is None:
if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb)
elif sp==bs or tg in{"E","S"}:
if nib or ntg in{"B","S"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb)
elif tg in{"B","I"}:
if sp==nsp and ntg in{"I","E"}:self._ts[i,j]=self._tb(tg,sp,ntg,nsp,bs,tb)
@staticmethod
def _tb(pt,ps,nt,ns,bs,b):
nib=ns==bs;pib=ps==bs
if pib:return b["transition_bias_background_stay"] if nib else b["transition_bias_background_to_start"]
if pt in{"B","I"}:return b["transition_bias_inside_to_continue"] if nt=="I" else b["transition_bias_inside_to_end"]
return b["transition_bias_end_to_background"] if nib else b["transition_bias_end_to_start"]
def decode(self,lp):
sl,nc=lp.shape
if sl==0:return[]
ss=self._ss.to(device=lp.device,dtype=lp.dtype);es=self._es.to(device=lp.device,dtype=lp.dtype);ts=self._ts.to(device=lp.device,dtype=lp.dtype)
sc=lp[0]+ss;bp=torch.empty((sl-1,nc),device=lp.device,dtype=torch.int64)
for i in range(1,sl):tr=sc.unsqueeze(1)+ts;bs_,bi=tr.max(dim=0);sc=bs_+lp[i];bp[i-1]=bi
if not torch.isfinite(sc).any():return lp.argmax(dim=1).tolist()
sc=sc+es;ll=sc.argmax();path=torch.empty((sl,),device=lp.device,dtype=torch.int64);path[-1]=ll
for i in range(sl-2,-1,-1):ll=bp[i,ll];path[i]=ll
return path.tolist()
@torch.inference_mode()
def predict_text_gpu(rt,text,dec):
tids=tuple(int(t) for t in rt.encoding.encode(text,allowed_special="all"))
if not tids:return text,[]
svs=[]
for s in range(0,len(tids),rt.n_ctx):
e=min(s+rt.n_ctx,len(tids));wt=torch.tensor(tids[s:e],device=rt.device,dtype=torch.int32)
lp=F.log_softmax(rt.model(wt).float(),dim=-1);svs.extend(lp.unbind(0))
if not svs:return text,[]
stacked=torch.stack(svs,dim=0);dl=dec.decode(stacked)
if len(dl)!=len(tids):dl=stacked.argmax(dim=1).tolist()
pli={i:int(l) for i,l in enumerate(dl)};pts=labels_to_spans(pli,rt.label_info)
tbs=[rt.encoding.decode_single_token_bytes(t) for t in tids];dt=b"".join(tbs).decode("utf-8",errors="replace")
cbs,cbe=[],[];bc=0
for ch in dt:cbs.append(bc);bc+=len(ch.encode("utf-8"));cbe.append(bc)
cs,ce=[],[];tbc=0
for rb in tbs:
s0=tbc;s1=s0+len(rb);tbc=s1;si=bisect_right(cbe,s0);ei=bisect_left(cbs,s1)
if ei<si:ei=si
cs.append(si);ce.append(ei)
st=dt if dt!=text else text;pcs=trim_char_spans_whitespace(token_spans_to_char_spans(pts,cs,ce),st)
det=[]
for li,s,e in pcs:
if not(0<=s<e<=len(st)):continue
label=rt.label_info.span_class_names[li] if 0<=li<len(rt.label_info.span_class_names) else f"label_{li}"
det.append({"entity":label,"word":st[s:e],"start":int(s),"end":int(e),"score":1.0,"source":"AI"})
return st,det
# ═══════════════════════════════════════════════════════════════════════════════
# CPU PATH: GLiNER-PII (zero-shot, 60+ entity types)
# ═══════════════════════════════════════════════════════════════════════════════
def get_pii_entities_cpu(text, min_score=0.3):
"""Run GLiNER-PII on CPU β€” zero-shot detection with 60+ PII categories."""
entities = []
try:
results = gliner_model.predict_entities(text, GLINER_PII_LABELS, threshold=min_score)
except Exception as e:
print(f"GLiNER error: {e}")
return entities
for ent in results:
entities.append({
"entity": ent["label"],
"start": ent["start"],
"end": ent["end"],
"word": ent["text"],
"score": ent["score"],
"source": "GLiNER",
})
return entities
# ═══════════════════════════════════════════════════════════════════════════════
# UNIFIED PII DETECTION
# ═══════════════════════════════════════════════════════════════════════════════
def _get_pii_entities_gpu_inner(text):
rt=get_runtime();dec=Decoder(label_info=rt.label_info);_,ents=predict_text_gpu(rt,text,dec);return ents
if USE_PRIVACY_FILTER and ZEROGPU_AVAILABLE:
@spaces.GPU
def get_pii_entities_ai(text):
if not text.strip():return[]
return _get_pii_entities_gpu_inner(text)
else:
def get_pii_entities_ai(text,min_score=0.5):
if not text.strip():return[]
return get_pii_entities_cpu(text,min_score)
# ─── Regex + merge ───────────────────────────────────────────────────────────────
def get_pii_entities_regex(text):
ents=[]
for label,pattern in REGEX_PATTERNS.items():
for m in re.finditer(pattern,text,re.IGNORECASE):
ents.append({"entity":label,"start":m.start(),"end":m.end(),"word":m.group(),"score":1.0,"source":"REGEX"})
return ents
def merge_entities(ai,rx):
combined=[];used=[False]*len(ai)
for r in rx:
ov=[i for i,a in enumerate(ai) if r["start"]<a["end"] and r["end"]>a["start"]]
if ov:
rs=r["end"]-r["start"];ms=max(ai[i]["end"]-ai[i]["start"] for i in ov)
if rs>ms:combined.append(r);[used.__setitem__(i,True) for i in ov]
else:combined.append(r)
for i,a in enumerate(ai):
if not used[i]:combined.append(a)
return sorted(combined,key=lambda e:e["start"])
# ─── EasyOCR ─────────────────────────────────────────────────────────────────────
ocr_reader=None
try:
import easyocr;ocr_reader=easyocr.Reader(["en"],gpu=False,verbose=False);print("EasyOCR loaded.")
except Exception as e:print(f"EasyOCR not available: {e}")
# ─── PDF helpers ─────────────────────────────────────────────────────────────────
def build_char_to_bbox_map(page):
words=page.get_text("words");ft=page.get_text("text");cm={};ss=0
for w in words:
wt=w[4];idx=ft.find(wt,ss)
if idx==-1:continue
r=fitz.Rect(w[0],w[1],w[2],w[3])
for ci in range(idx,idx+len(wt)):cm[ci]=r
ss=idx+len(wt)
return cm,ft
def get_redact_rects(cm,s,e):
rects={}
for ci in range(s,e):
if ci in cm:r=cm[ci];k=(r.x0,r.y0,r.x1,r.y1);rects.setdefault(k,r)
return list(rects.values())
def ocr_page(page):
if ocr_reader is None:return[]
import numpy as np;pix=page.get_pixmap(matrix=fitz.Matrix(2,2))
img=np.frombuffer(pix.samples,dtype=np.uint8).reshape(pix.h,pix.w,pix.n)
if pix.n==4:img=img[:,:,:3]
results=ocr_reader.readtext(img);wb=[]
for(pts,text,conf) in results:
if conf<0.3 or not text.strip():continue
xs=[p[0]*0.5 for p in pts];ys=[p[1]*0.5 for p in pts]
wb.append((fitz.Rect(min(xs),min(ys),max(xs),max(ys)),text))
return wb
# ─── Main redaction pipeline ────────────────────────────────────────────────────
def redact_pdf(input_file,sensitivity="Standard",use_regex=True,use_ocr=True,progress=gr.Progress()):
if input_file is None:raise gr.Error("Please upload a PDF file first.")
input_path=input_file.name if hasattr(input_file,'name') else input_file if isinstance(input_file,str) else None
if not input_path or not os.path.exists(input_path):raise gr.Error("File not found.")
min_score={"Conservative":0.7,"Standard":0.4,"Aggressive":0.2}.get(sensitivity,0.4) if not USE_PRIVACY_FILTER else {"Conservative":0.85,"Standard":0.60,"Aggressive":0.35}.get(sensitivity,0.60)
start_time=time.time()
try:doc=fitz.open(input_path)
except Exception as e:raise gr.Error(f"Failed to open PDF: {e}")
total_pages=len(doc);all_report=[];total_redactions=0;type_counts={};ocr_pages=0
ml="openai/privacy-filter (GPU)" if USE_PRIVACY_FILTER else "GLiNER-PII (CPU)"
progress(0,desc=f"Starting PII detection with {ml}...")
for pi in range(total_pages):
page=doc[pi];page_ents=[];progress(pi/total_pages,desc=f"Processing page {pi+1}/{total_pages}...")
text=page.get_text("text");is_scanned=len(text.strip())<10
if is_scanned and use_ocr:
ocr_pages+=1;wb=ocr_page(page)
if not wb:continue
ft=" ".join(t for _,t in wb)
ai_ents=get_pii_entities_ai(ft) if USE_PRIVACY_FILTER else get_pii_entities_ai(ft,min_score)
re_ents=get_pii_entities_regex(ft) if use_regex else[];ents=merge_entities(ai_ents,re_ents)
for ent in ents:
if ent["score"]<min_score:continue
et=ent["word"].strip()
for rect,wt in wb:
if et.lower() in wt.lower() or wt.lower() in et.lower():
page.add_redact_annot(rect,fill=(0,0,0));total_redactions+=1
type_counts[ent["entity"]]=type_counts.get(ent["entity"],0)+1;page_ents.append(ent);break
else:
cm,ft=build_char_to_bbox_map(page)
if not ft.strip():continue
ai_ents=get_pii_entities_ai(ft) if USE_PRIVACY_FILTER else get_pii_entities_ai(ft,min_score)
re_ents=get_pii_entities_regex(ft) if use_regex else[];ents=merge_entities(ai_ents,re_ents)
for ent in ents:
if ent["score"]<min_score:continue
rects=get_redact_rects(cm,ent["start"],ent["end"])
if not rects:
sr=page.search_for(ent["word"].strip())
if sr:rects=sr
for r in rects:page.add_redact_annot(fitz.Rect(r.x0-1,r.y0-1,r.x1+1,r.y1+1),fill=(0,0,0));total_redactions+=1
type_counts[ent["entity"]]=type_counts.get(ent["entity"],0)+1;page_ents.append(ent)
page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE)
if page_ents:
all_report.append({"page":pi+1,"entities":[{"type":e["entity"],"text":e["word"][:3]+"***" if len(e["word"])>3 else "***","confidence":round(e["score"],3),"source":e["source"]} for e in page_ents]})
output_path=tempfile.mktemp(suffix="_redacted.pdf");doc.save(output_path,garbage=4,deflate=True);doc.close()
elapsed=time.time()-start_time
mr="[openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B params, GPU)" if USE_PRIVACY_FILTER else "[GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ types, CPU)"
lines=["# \U0001f4c4 PDF PII Redaction Report","",f"**Processing time:** {elapsed:.1f}s",f"**Pages processed:** {total_pages}",f"**Pages with OCR:** {ocr_pages}",f"**Sensitivity:** {sensitivity} (threshold: {min_score})",f"**Regex:** {'On' if use_regex else 'Off'}",f"**AI Model:** {mr}",f"**Entities detected:** {sum(type_counts.values())}",f"**Redaction boxes:** {total_redactions}","","## Entity Types",""]
if type_counts:
lines+=["| Type | Description | Count |","|------|-------------|-------|"]
for l,c in sorted(type_counts.items(),key=lambda x:-x[1]):lines.append(f"| {l} | {NER_LABEL_DESCRIPTIONS.get(l,l)} | {c} |")
else:lines.append("*No PII detected.*")
lines+=["","## Details by Page",""]
for pr in all_report:
lines.append(f"### Page {pr['page']}")
for e in pr["entities"]:lines.append(f"- **{e['type']}**: `{e['text']}` (conf: {e['confidence']}, src: {e['source']})")
lines.append("")
return output_path,"\n".join(lines)
# ─── Gradio UI ──────────────────────────────────────────────────────────────────
_badge="\U0001f7e2 **GPU mode** \u2014 [openai/privacy-filter](https://huggingface.co/openai/privacy-filter) (1.4B MoE)" if USE_PRIVACY_FILTER else "\U0001f7e1 **CPU mode** \u2014 [GLiNER-PII](https://huggingface.co/knowledgator/gliner-pii-base-v1.0) (zero-shot, 60+ entity types, F1=81%)"
DESCRIPTION=f"""
# \U0001f512 PDF PII Redactor
**Automatically detect and redact sensitive information from PDF documents.**
{_badge}
When running on ZeroGPU, uses OpenAI Privacy Filter (1.4B params, 128k context). On CPU, uses GLiNER-PII (zero-shot, 60+ entity types, F1=81%).
### \U0001f6e1\ufe0f What gets redacted:
| Category | Types |
|----------|-------|
| **Personal** | Names, Addresses, URLs, Dates |
| **Financial** | Account Numbers, Credit Cards, IBANs |
| **Contact** | Emails, Phone Numbers |
| **Credentials** | Passwords, API Keys, Tokens |
| **Identifiers** | SSN, Driver's License, Passport |
| **Medical** | Medical Record Numbers |
> \u26a0\ufe0f Redaction is **permanent** \u2014 keep a backup of your original.
"""
with gr.Blocks(title="PDF PII Redactor") as demo:
gr.Markdown(DESCRIPTION)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### \U0001f4e4 Upload & Configure")
input_pdf=gr.File(label="Upload PDF",file_types=[".pdf"],type="filepath")
with gr.Row():
sensitivity=gr.Radio(choices=["Conservative","Standard","Aggressive"],value="Standard",label="Sensitivity")
with gr.Row():
use_regex=gr.Checkbox(value=True,label="Regex Patterns")
use_ocr=gr.Checkbox(value=True,label="OCR for Scanned Pages")
redact_btn=gr.Button("\U0001f512 Redact",variant="primary",size="lg")
with gr.Column(scale=1):
gr.Markdown("### \U0001f4e5 Output")
output_pdf=gr.File(label="Redacted PDF")
gr.Markdown("### \U0001f4ca Report")
report_output=gr.Markdown(value="*Upload a PDF and click Redact.*")
redact_btn.click(fn=redact_pdf,inputs=[input_pdf,sensitivity,use_regex,use_ocr],outputs=[output_pdf,report_output])
gr.Markdown("---")
gr.Markdown(f"### \u2139\ufe0f About\n**Active:** {_badge}\n\n**PDF Engine:** PyMuPDF \u2022 **OCR:** EasyOCR")
if __name__=="__main__":
demo.launch(server_name="0.0.0.0",server_port=7860)