ysharma's picture
ysharma HF Staff
Update app.py
f231103 verified
"""
Screenshot Anonymizer
=====================
Backend : gr.Server (Gradio + FastAPI)
Frontend: Canvas-based image editor (served from app_v2.py)
PII : openai/privacy-filter (1.5B sparse-MoE, 50M active, apache-2.0)
OCR : pytesseract (Tesseract 5)
Drag in a screenshot of a chat / email / document. Tesseract extracts words
with pixel positions, the privacy-filter model finds PII character spans,
we map the spans back to pixel rectangles and draw black bars on top. The
canvas editor lets the user toggle, move, add, or delete bars, then export
the redacted screenshot as a PNG.
"""
# ── stdlib ───────────────────────────────────────────────────────
import base64
import functools
import io
import json
import os
from pathlib import Path
# ── third-party ──────────────────────────────────────────────────
import gradio as gr
import pytesseract
import spaces
import torch
from fastapi import File, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse
from PIL import Image
# ── configuration ────────────────────────────────────────────────
PII_MODEL_REPO = os.getenv("PII_MODEL_ID", "openai/privacy-filter")
HF_TOKEN = os.getenv("HF_TOKEN", None)
# Categories returned by openai/privacy-filter, colored for the UI.
CATEGORIES_META = {
"private_person": {"color": "#ef4444", "label": "Person"},
"private_address": {"color": "#06b6d4", "label": "Address"},
"private_email": {"color": "#3b82f6", "label": "Email"},
"private_phone": {"color": "#22c55e", "label": "Phone"},
"private_url": {"color": "#eab308", "label": "URL"},
"private_date": {"color": "#a855f7", "label": "Date"},
"account_number": {"color": "#f97316", "label": "Account"},
"secret": {"color": "#dc2626", "label": "Secret"},
}
# =====================================================================
# PRIVACY FILTER (openai/privacy-filter)
# =====================================================================
@functools.lru_cache(maxsize=1)
def get_pii_pipeline():
"""Lazy-load the privacy filter on the GPU. Kept in a cache so repeated
calls inside a single ZeroGPU slot don't re-download / re-move weights."""
from transformers import pipeline
return pipeline(
task="token-classification",
model=PII_MODEL_REPO,
aggregation_strategy="simple", # merges BIOES tags into char-level spans
device=0,
torch_dtype=torch.bfloat16,
token=HF_TOKEN,
)
@spaces.GPU
def run_pii_analysis(text: str):
"""Return (source_text, spans) where each span is
{label, start, end, text} with character offsets into `text`."""
if not text.strip():
return text, []
pipe = get_pii_pipeline()
results = pipe(text)
spans = []
for r in results:
label = r.get("entity_group") or r.get("entity")
if not label or label == "O":
continue
s, e = int(r["start"]), int(r["end"])
if e <= s or s < 0 or e > len(text):
continue
spans.append({
"label": label,
"start": s,
"end": e,
"text": text[s:e],
})
return text, spans
# =====================================================================
# OCR + SPAN β†’ BOX MAPPING
# =====================================================================
def ocr_image(img: Image.Image) -> dict:
"""Run Tesseract and return the concatenated text plus per-word boxes.
The text is reconstructed with a single space between words on the same
line and a newline between lines, matching the character offsets we emit
in the `words` list β€” so later char-span β†’ box mapping is a pure lookup.
"""
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
words, parts = [], []
pos = 0
last_line_key = None
for i in range(len(data["text"])):
text = data["text"][i]
if text is None:
continue
text = text.strip()
if not text:
continue
try:
conf = float(data["conf"][i])
except (TypeError, ValueError):
conf = -1.0
if conf < 0:
continue
line_key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
if last_line_key is None:
pass
elif line_key != last_line_key:
parts.append("\n"); pos += 1
else:
parts.append(" "); pos += 1
last_line_key = line_key
start = pos
parts.append(text); pos += len(text)
words.append({
"text": text, "start": start, "end": pos,
"x": int(data["left"][i]), "y": int(data["top"][i]),
"w": int(data["width"][i]), "h": int(data["height"][i]),
})
return {"text": "".join(parts), "words": words}
def map_spans_to_boxes(words, spans, pad=3):
"""Map each char span to one or more pixel boxes, splitting across lines."""
boxes = []
for span in spans:
ss, se, lbl = span["start"], span["end"], span["label"]
hits = [w for w in words if w["start"] < se and w["end"] > ss]
if not hits:
continue
by_line = {}
for w in hits:
yc = w["y"] + w["h"] // 2
matched = None
for key in by_line:
if abs(key - yc) < max(w["h"] * 0.6, 10):
matched = key; break
key = matched if matched is not None else yc
by_line.setdefault(key, []).append(w)
for line_words in by_line.values():
x1 = min(w["x"] for w in line_words) - pad
y1 = min(w["y"] for w in line_words) - pad
x2 = max(w["x"] + w["w"] for w in line_words) + pad
y2 = max(w["y"] + w["h"] for w in line_words) + pad
boxes.append({
"x": max(0, x1), "y": max(0, y1),
"w": max(1, x2 - x1), "h": max(1, y2 - y1),
"label": lbl,
"text": " ".join(w["text"] for w in line_words),
})
return boxes
# =====================================================================
# SERVER
# =====================================================================
server = gr.Server()
@server.get("/", response_class=HTMLResponse)
async def homepage():
return FRONTEND_HTML
@server.post("/api/detect")
async def detect(file: UploadFile = File(...)):
suffix = Path(file.filename or "").suffix.lower()
if suffix not in (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"):
return JSONResponse({"error": f"Unsupported image type: {suffix or '(none)'}"}, 400)
try:
img_bytes = await file.read()
img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
except Exception as e:
return JSONResponse({"error": f"Could not read image: {e}"}, 400)
ocr = ocr_image(img)
if not ocr["text"].strip():
return JSONResponse({"error": "No text detected in the image."}, 400)
try:
source_text, spans = run_pii_analysis(ocr["text"])
except Exception as e:
return JSONResponse({"error": f"PII analysis failed: {e}"}, 500)
if source_text != ocr["text"]:
spans = [s for s in spans if s["end"] <= len(ocr["text"])]
boxes = map_spans_to_boxes(ocr["words"], spans)
buf = io.BytesIO(); img.save(buf, format="PNG")
data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
return JSONResponse({
"filename": file.filename,
"image": data_url,
"width": img.width, "height": img.height,
"boxes": boxes,
"text": ocr["text"],
"spans": spans,
"categories_meta": {k: {"color": v["color"], "label": v["label"]}
for k, v in CATEGORIES_META.items()},
})
@server.api(name="anonymize_screenshot")
def anonymize_screenshot_api(image_path: str) -> str:
"""Gradio API: takes a path to an image, returns JSON with detected boxes."""
img = Image.open(image_path).convert("RGB")
ocr = ocr_image(img)
if not ocr["text"].strip():
return json.dumps({"boxes": [], "text": "", "spans": []})
_, spans = run_pii_analysis(ocr["text"])
boxes = map_spans_to_boxes(ocr["words"], spans)
return json.dumps({
"width": img.width, "height": img.height,
"boxes": boxes, "text": ocr["text"], "spans": spans,
}, ensure_ascii=False)
# =====================================================================
# FRONTEND (standalone fallback β€” the main UI lives in app_v2.py)
# =====================================================================
FRONTEND_HTML = """<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Screenshot Anonymizer</title></head>
<body style="font-family:system-ui;padding:2rem;max-width:640px;margin:0 auto">
<h1>Screenshot Anonymizer</h1>
<p>This endpoint serves the backend API only. The editor UI lives in
<code>app_v2.py</code>. Use <code>POST /api/detect</code> or the
<code>/anonymize_screenshot</code> Gradio API.</p>
</body></html>"""
if __name__ == "__main__":
server.launch(server_name="0.0.0.0", server_port=7860)