""" Screenshot Anonymizer ===================== Backend : gr.Server (Gradio + FastAPI) Frontend: Canvas-based image editor (served from app_v2.py) PII : openai/privacy-filter (1.5B sparse-MoE, 50M active, apache-2.0) OCR : pytesseract (Tesseract 5) Drag in a screenshot of a chat / email / document. Tesseract extracts words with pixel positions, the privacy-filter model finds PII character spans, we map the spans back to pixel rectangles and draw black bars on top. The canvas editor lets the user toggle, move, add, or delete bars, then export the redacted screenshot as a PNG. """ # ── stdlib ─────────────────────────────────────────────────────── import base64 import functools import io import json import os from pathlib import Path # ── third-party ────────────────────────────────────────────────── import gradio as gr import pytesseract import spaces import torch from fastapi import File, UploadFile from fastapi.responses import HTMLResponse, JSONResponse from PIL import Image # ── configuration ──────────────────────────────────────────────── PII_MODEL_REPO = os.getenv("PII_MODEL_ID", "openai/privacy-filter") HF_TOKEN = os.getenv("HF_TOKEN", None) # Categories returned by openai/privacy-filter, colored for the UI. CATEGORIES_META = { "private_person": {"color": "#ef4444", "label": "Person"}, "private_address": {"color": "#06b6d4", "label": "Address"}, "private_email": {"color": "#3b82f6", "label": "Email"}, "private_phone": {"color": "#22c55e", "label": "Phone"}, "private_url": {"color": "#eab308", "label": "URL"}, "private_date": {"color": "#a855f7", "label": "Date"}, "account_number": {"color": "#f97316", "label": "Account"}, "secret": {"color": "#dc2626", "label": "Secret"}, } # ===================================================================== # PRIVACY FILTER (openai/privacy-filter) # ===================================================================== @functools.lru_cache(maxsize=1) def get_pii_pipeline(): """Lazy-load the privacy filter on the GPU. Kept in a cache so repeated calls inside a single ZeroGPU slot don't re-download / re-move weights.""" from transformers import pipeline return pipeline( task="token-classification", model=PII_MODEL_REPO, aggregation_strategy="simple", # merges BIOES tags into char-level spans device=0, torch_dtype=torch.bfloat16, token=HF_TOKEN, ) @spaces.GPU def run_pii_analysis(text: str): """Return (source_text, spans) where each span is {label, start, end, text} with character offsets into `text`.""" if not text.strip(): return text, [] pipe = get_pii_pipeline() results = pipe(text) spans = [] for r in results: label = r.get("entity_group") or r.get("entity") if not label or label == "O": continue s, e = int(r["start"]), int(r["end"]) if e <= s or s < 0 or e > len(text): continue spans.append({ "label": label, "start": s, "end": e, "text": text[s:e], }) return text, spans # ===================================================================== # OCR + SPAN → BOX MAPPING # ===================================================================== def ocr_image(img: Image.Image) -> dict: """Run Tesseract and return the concatenated text plus per-word boxes. The text is reconstructed with a single space between words on the same line and a newline between lines, matching the character offsets we emit in the `words` list — so later char-span → box mapping is a pure lookup. """ data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) words, parts = [], [] pos = 0 last_line_key = None for i in range(len(data["text"])): text = data["text"][i] if text is None: continue text = text.strip() if not text: continue try: conf = float(data["conf"][i]) except (TypeError, ValueError): conf = -1.0 if conf < 0: continue line_key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) if last_line_key is None: pass elif line_key != last_line_key: parts.append("\n"); pos += 1 else: parts.append(" "); pos += 1 last_line_key = line_key start = pos parts.append(text); pos += len(text) words.append({ "text": text, "start": start, "end": pos, "x": int(data["left"][i]), "y": int(data["top"][i]), "w": int(data["width"][i]), "h": int(data["height"][i]), }) return {"text": "".join(parts), "words": words} def map_spans_to_boxes(words, spans, pad=3): """Map each char span to one or more pixel boxes, splitting across lines.""" boxes = [] for span in spans: ss, se, lbl = span["start"], span["end"], span["label"] hits = [w for w in words if w["start"] < se and w["end"] > ss] if not hits: continue by_line = {} for w in hits: yc = w["y"] + w["h"] // 2 matched = None for key in by_line: if abs(key - yc) < max(w["h"] * 0.6, 10): matched = key; break key = matched if matched is not None else yc by_line.setdefault(key, []).append(w) for line_words in by_line.values(): x1 = min(w["x"] for w in line_words) - pad y1 = min(w["y"] for w in line_words) - pad x2 = max(w["x"] + w["w"] for w in line_words) + pad y2 = max(w["y"] + w["h"] for w in line_words) + pad boxes.append({ "x": max(0, x1), "y": max(0, y1), "w": max(1, x2 - x1), "h": max(1, y2 - y1), "label": lbl, "text": " ".join(w["text"] for w in line_words), }) return boxes # ===================================================================== # SERVER # ===================================================================== server = gr.Server() @server.get("/", response_class=HTMLResponse) async def homepage(): return FRONTEND_HTML @server.post("/api/detect") async def detect(file: UploadFile = File(...)): suffix = Path(file.filename or "").suffix.lower() if suffix not in (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"): return JSONResponse({"error": f"Unsupported image type: {suffix or '(none)'}"}, 400) try: img_bytes = await file.read() img = Image.open(io.BytesIO(img_bytes)).convert("RGB") except Exception as e: return JSONResponse({"error": f"Could not read image: {e}"}, 400) ocr = ocr_image(img) if not ocr["text"].strip(): return JSONResponse({"error": "No text detected in the image."}, 400) try: source_text, spans = run_pii_analysis(ocr["text"]) except Exception as e: return JSONResponse({"error": f"PII analysis failed: {e}"}, 500) if source_text != ocr["text"]: spans = [s for s in spans if s["end"] <= len(ocr["text"])] boxes = map_spans_to_boxes(ocr["words"], spans) buf = io.BytesIO(); img.save(buf, format="PNG") data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() return JSONResponse({ "filename": file.filename, "image": data_url, "width": img.width, "height": img.height, "boxes": boxes, "text": ocr["text"], "spans": spans, "categories_meta": {k: {"color": v["color"], "label": v["label"]} for k, v in CATEGORIES_META.items()}, }) @server.api(name="anonymize_screenshot") def anonymize_screenshot_api(image_path: str) -> str: """Gradio API: takes a path to an image, returns JSON with detected boxes.""" img = Image.open(image_path).convert("RGB") ocr = ocr_image(img) if not ocr["text"].strip(): return json.dumps({"boxes": [], "text": "", "spans": []}) _, spans = run_pii_analysis(ocr["text"]) boxes = map_spans_to_boxes(ocr["words"], spans) return json.dumps({ "width": img.width, "height": img.height, "boxes": boxes, "text": ocr["text"], "spans": spans, }, ensure_ascii=False) # ===================================================================== # FRONTEND (standalone fallback — the main UI lives in app_v2.py) # ===================================================================== FRONTEND_HTML = """ Screenshot Anonymizer

Screenshot Anonymizer

This endpoint serves the backend API only. The editor UI lives in app_v2.py. Use POST /api/detect or the /anonymize_screenshot Gradio API.

""" if __name__ == "__main__": server.launch(server_name="0.0.0.0", server_port=7860)