Spaces:
Running on Zero
Running on Zero
| """ | |
| Screenshot Anonymizer | |
| ===================== | |
| Backend : gr.Server (Gradio + FastAPI) | |
| Frontend: Canvas-based image editor (served from app_v2.py) | |
| PII : openai/privacy-filter (1.5B sparse-MoE, 50M active, apache-2.0) | |
| OCR : pytesseract (Tesseract 5) | |
| Drag in a screenshot of a chat / email / document. Tesseract extracts words | |
| with pixel positions, the privacy-filter model finds PII character spans, | |
| we map the spans back to pixel rectangles and draw black bars on top. The | |
| canvas editor lets the user toggle, move, add, or delete bars, then export | |
| the redacted screenshot as a PNG. | |
| """ | |
| # ββ stdlib βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import base64 | |
| import functools | |
| import io | |
| import json | |
| import os | |
| from pathlib import Path | |
| # ββ third-party ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import gradio as gr | |
| import pytesseract | |
| import spaces | |
| import torch | |
| from fastapi import File, UploadFile | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from PIL import Image | |
| # ββ configuration ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PII_MODEL_REPO = os.getenv("PII_MODEL_ID", "openai/privacy-filter") | |
| HF_TOKEN = os.getenv("HF_TOKEN", None) | |
| # Categories returned by openai/privacy-filter, colored for the UI. | |
| CATEGORIES_META = { | |
| "private_person": {"color": "#ef4444", "label": "Person"}, | |
| "private_address": {"color": "#06b6d4", "label": "Address"}, | |
| "private_email": {"color": "#3b82f6", "label": "Email"}, | |
| "private_phone": {"color": "#22c55e", "label": "Phone"}, | |
| "private_url": {"color": "#eab308", "label": "URL"}, | |
| "private_date": {"color": "#a855f7", "label": "Date"}, | |
| "account_number": {"color": "#f97316", "label": "Account"}, | |
| "secret": {"color": "#dc2626", "label": "Secret"}, | |
| } | |
| # ===================================================================== | |
| # PRIVACY FILTER (openai/privacy-filter) | |
| # ===================================================================== | |
| def get_pii_pipeline(): | |
| """Lazy-load the privacy filter on the GPU. Kept in a cache so repeated | |
| calls inside a single ZeroGPU slot don't re-download / re-move weights.""" | |
| from transformers import pipeline | |
| return pipeline( | |
| task="token-classification", | |
| model=PII_MODEL_REPO, | |
| aggregation_strategy="simple", # merges BIOES tags into char-level spans | |
| device=0, | |
| torch_dtype=torch.bfloat16, | |
| token=HF_TOKEN, | |
| ) | |
| def run_pii_analysis(text: str): | |
| """Return (source_text, spans) where each span is | |
| {label, start, end, text} with character offsets into `text`.""" | |
| if not text.strip(): | |
| return text, [] | |
| pipe = get_pii_pipeline() | |
| results = pipe(text) | |
| spans = [] | |
| for r in results: | |
| label = r.get("entity_group") or r.get("entity") | |
| if not label or label == "O": | |
| continue | |
| s, e = int(r["start"]), int(r["end"]) | |
| if e <= s or s < 0 or e > len(text): | |
| continue | |
| spans.append({ | |
| "label": label, | |
| "start": s, | |
| "end": e, | |
| "text": text[s:e], | |
| }) | |
| return text, spans | |
| # ===================================================================== | |
| # OCR + SPAN β BOX MAPPING | |
| # ===================================================================== | |
| def ocr_image(img: Image.Image) -> dict: | |
| """Run Tesseract and return the concatenated text plus per-word boxes. | |
| The text is reconstructed with a single space between words on the same | |
| line and a newline between lines, matching the character offsets we emit | |
| in the `words` list β so later char-span β box mapping is a pure lookup. | |
| """ | |
| data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) | |
| words, parts = [], [] | |
| pos = 0 | |
| last_line_key = None | |
| for i in range(len(data["text"])): | |
| text = data["text"][i] | |
| if text is None: | |
| continue | |
| text = text.strip() | |
| if not text: | |
| continue | |
| try: | |
| conf = float(data["conf"][i]) | |
| except (TypeError, ValueError): | |
| conf = -1.0 | |
| if conf < 0: | |
| continue | |
| line_key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) | |
| if last_line_key is None: | |
| pass | |
| elif line_key != last_line_key: | |
| parts.append("\n"); pos += 1 | |
| else: | |
| parts.append(" "); pos += 1 | |
| last_line_key = line_key | |
| start = pos | |
| parts.append(text); pos += len(text) | |
| words.append({ | |
| "text": text, "start": start, "end": pos, | |
| "x": int(data["left"][i]), "y": int(data["top"][i]), | |
| "w": int(data["width"][i]), "h": int(data["height"][i]), | |
| }) | |
| return {"text": "".join(parts), "words": words} | |
| def map_spans_to_boxes(words, spans, pad=3): | |
| """Map each char span to one or more pixel boxes, splitting across lines.""" | |
| boxes = [] | |
| for span in spans: | |
| ss, se, lbl = span["start"], span["end"], span["label"] | |
| hits = [w for w in words if w["start"] < se and w["end"] > ss] | |
| if not hits: | |
| continue | |
| by_line = {} | |
| for w in hits: | |
| yc = w["y"] + w["h"] // 2 | |
| matched = None | |
| for key in by_line: | |
| if abs(key - yc) < max(w["h"] * 0.6, 10): | |
| matched = key; break | |
| key = matched if matched is not None else yc | |
| by_line.setdefault(key, []).append(w) | |
| for line_words in by_line.values(): | |
| x1 = min(w["x"] for w in line_words) - pad | |
| y1 = min(w["y"] for w in line_words) - pad | |
| x2 = max(w["x"] + w["w"] for w in line_words) + pad | |
| y2 = max(w["y"] + w["h"] for w in line_words) + pad | |
| boxes.append({ | |
| "x": max(0, x1), "y": max(0, y1), | |
| "w": max(1, x2 - x1), "h": max(1, y2 - y1), | |
| "label": lbl, | |
| "text": " ".join(w["text"] for w in line_words), | |
| }) | |
| return boxes | |
| # ===================================================================== | |
| # SERVER | |
| # ===================================================================== | |
| server = gr.Server() | |
| async def homepage(): | |
| return FRONTEND_HTML | |
| async def detect(file: UploadFile = File(...)): | |
| suffix = Path(file.filename or "").suffix.lower() | |
| if suffix not in (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"): | |
| return JSONResponse({"error": f"Unsupported image type: {suffix or '(none)'}"}, 400) | |
| try: | |
| img_bytes = await file.read() | |
| img = Image.open(io.BytesIO(img_bytes)).convert("RGB") | |
| except Exception as e: | |
| return JSONResponse({"error": f"Could not read image: {e}"}, 400) | |
| ocr = ocr_image(img) | |
| if not ocr["text"].strip(): | |
| return JSONResponse({"error": "No text detected in the image."}, 400) | |
| try: | |
| source_text, spans = run_pii_analysis(ocr["text"]) | |
| except Exception as e: | |
| return JSONResponse({"error": f"PII analysis failed: {e}"}, 500) | |
| if source_text != ocr["text"]: | |
| spans = [s for s in spans if s["end"] <= len(ocr["text"])] | |
| boxes = map_spans_to_boxes(ocr["words"], spans) | |
| buf = io.BytesIO(); img.save(buf, format="PNG") | |
| data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() | |
| return JSONResponse({ | |
| "filename": file.filename, | |
| "image": data_url, | |
| "width": img.width, "height": img.height, | |
| "boxes": boxes, | |
| "text": ocr["text"], | |
| "spans": spans, | |
| "categories_meta": {k: {"color": v["color"], "label": v["label"]} | |
| for k, v in CATEGORIES_META.items()}, | |
| }) | |
| def anonymize_screenshot_api(image_path: str) -> str: | |
| """Gradio API: takes a path to an image, returns JSON with detected boxes.""" | |
| img = Image.open(image_path).convert("RGB") | |
| ocr = ocr_image(img) | |
| if not ocr["text"].strip(): | |
| return json.dumps({"boxes": [], "text": "", "spans": []}) | |
| _, spans = run_pii_analysis(ocr["text"]) | |
| boxes = map_spans_to_boxes(ocr["words"], spans) | |
| return json.dumps({ | |
| "width": img.width, "height": img.height, | |
| "boxes": boxes, "text": ocr["text"], "spans": spans, | |
| }, ensure_ascii=False) | |
| # ===================================================================== | |
| # FRONTEND (standalone fallback β the main UI lives in app_v2.py) | |
| # ===================================================================== | |
| FRONTEND_HTML = """<!DOCTYPE html> | |
| <html><head><meta charset="UTF-8"><title>Screenshot Anonymizer</title></head> | |
| <body style="font-family:system-ui;padding:2rem;max-width:640px;margin:0 auto"> | |
| <h1>Screenshot Anonymizer</h1> | |
| <p>This endpoint serves the backend API only. The editor UI lives in | |
| <code>app_v2.py</code>. Use <code>POST /api/detect</code> or the | |
| <code>/anonymize_screenshot</code> Gradio API.</p> | |
| </body></html>""" | |
| if __name__ == "__main__": | |
| server.launch(server_name="0.0.0.0", server_port=7860) | |