File size: 9,481 Bytes
859678d
 
 
 
f231103
 
859678d
 
f231103
 
 
 
 
859678d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f231103
859678d
 
f231103
859678d
 
 
 
 
 
 
 
 
 
 
f231103
859678d
f231103
859678d
 
 
f231103
 
 
 
 
 
 
 
 
 
 
 
859678d
 
 
f231103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
859678d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f231103
859678d
 
f231103
 
 
 
 
 
 
 
859678d
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""
Screenshot Anonymizer
=====================
Backend : gr.Server (Gradio + FastAPI)
Frontend: Canvas-based image editor (served from app_v2.py)
PII     : openai/privacy-filter (1.5B sparse-MoE, 50M active, apache-2.0)
OCR     : pytesseract (Tesseract 5)

Drag in a screenshot of a chat / email / document. Tesseract extracts words
with pixel positions, the privacy-filter model finds PII character spans,
we map the spans back to pixel rectangles and draw black bars on top. The
canvas editor lets the user toggle, move, add, or delete bars, then export
the redacted screenshot as a PNG.
"""

# ── stdlib ───────────────────────────────────────────────────────
import base64
import functools
import io
import json
import os
from pathlib import Path

# ── third-party ──────────────────────────────────────────────────
import gradio as gr
import pytesseract
import spaces
import torch
from fastapi import File, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse
from PIL import Image

# ── configuration ────────────────────────────────────────────────
PII_MODEL_REPO = os.getenv("PII_MODEL_ID", "openai/privacy-filter")
HF_TOKEN = os.getenv("HF_TOKEN", None)

# Categories returned by openai/privacy-filter, colored for the UI.
CATEGORIES_META = {
    "private_person":  {"color": "#ef4444", "label": "Person"},
    "private_address": {"color": "#06b6d4", "label": "Address"},
    "private_email":   {"color": "#3b82f6", "label": "Email"},
    "private_phone":   {"color": "#22c55e", "label": "Phone"},
    "private_url":     {"color": "#eab308", "label": "URL"},
    "private_date":    {"color": "#a855f7", "label": "Date"},
    "account_number":  {"color": "#f97316", "label": "Account"},
    "secret":          {"color": "#dc2626", "label": "Secret"},
}


# =====================================================================
# PRIVACY FILTER  (openai/privacy-filter)
# =====================================================================

@functools.lru_cache(maxsize=1)
def get_pii_pipeline():
    """Lazy-load the privacy filter on the GPU. Kept in a cache so repeated
    calls inside a single ZeroGPU slot don't re-download / re-move weights."""
    from transformers import pipeline
    return pipeline(
        task="token-classification",
        model=PII_MODEL_REPO,
        aggregation_strategy="simple",  # merges BIOES tags into char-level spans
        device=0,
        torch_dtype=torch.bfloat16,
        token=HF_TOKEN,
    )


@spaces.GPU
def run_pii_analysis(text: str):
    """Return (source_text, spans) where each span is
    {label, start, end, text} with character offsets into `text`."""
    if not text.strip():
        return text, []
    pipe = get_pii_pipeline()
    results = pipe(text)
    spans = []
    for r in results:
        label = r.get("entity_group") or r.get("entity")
        if not label or label == "O":
            continue
        s, e = int(r["start"]), int(r["end"])
        if e <= s or s < 0 or e > len(text):
            continue
        spans.append({
            "label": label,
            "start": s,
            "end": e,
            "text": text[s:e],
        })
    return text, spans


# =====================================================================
# OCR  +  SPAN β†’ BOX MAPPING
# =====================================================================

def ocr_image(img: Image.Image) -> dict:
    """Run Tesseract and return the concatenated text plus per-word boxes.

    The text is reconstructed with a single space between words on the same
    line and a newline between lines, matching the character offsets we emit
    in the `words` list β€” so later char-span β†’ box mapping is a pure lookup.
    """
    data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
    words, parts = [], []
    pos = 0
    last_line_key = None
    for i in range(len(data["text"])):
        text = data["text"][i]
        if text is None:
            continue
        text = text.strip()
        if not text:
            continue
        try:
            conf = float(data["conf"][i])
        except (TypeError, ValueError):
            conf = -1.0
        if conf < 0:
            continue
        line_key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
        if last_line_key is None:
            pass
        elif line_key != last_line_key:
            parts.append("\n"); pos += 1
        else:
            parts.append(" "); pos += 1
        last_line_key = line_key
        start = pos
        parts.append(text); pos += len(text)
        words.append({
            "text": text, "start": start, "end": pos,
            "x": int(data["left"][i]), "y": int(data["top"][i]),
            "w": int(data["width"][i]), "h": int(data["height"][i]),
        })
    return {"text": "".join(parts), "words": words}


def map_spans_to_boxes(words, spans, pad=3):
    """Map each char span to one or more pixel boxes, splitting across lines."""
    boxes = []
    for span in spans:
        ss, se, lbl = span["start"], span["end"], span["label"]
        hits = [w for w in words if w["start"] < se and w["end"] > ss]
        if not hits:
            continue
        by_line = {}
        for w in hits:
            yc = w["y"] + w["h"] // 2
            matched = None
            for key in by_line:
                if abs(key - yc) < max(w["h"] * 0.6, 10):
                    matched = key; break
            key = matched if matched is not None else yc
            by_line.setdefault(key, []).append(w)
        for line_words in by_line.values():
            x1 = min(w["x"] for w in line_words) - pad
            y1 = min(w["y"] for w in line_words) - pad
            x2 = max(w["x"] + w["w"] for w in line_words) + pad
            y2 = max(w["y"] + w["h"] for w in line_words) + pad
            boxes.append({
                "x": max(0, x1), "y": max(0, y1),
                "w": max(1, x2 - x1), "h": max(1, y2 - y1),
                "label": lbl,
                "text": " ".join(w["text"] for w in line_words),
            })
    return boxes


# =====================================================================
# SERVER
# =====================================================================

server = gr.Server()


@server.get("/", response_class=HTMLResponse)
async def homepage():
    return FRONTEND_HTML


@server.post("/api/detect")
async def detect(file: UploadFile = File(...)):
    suffix = Path(file.filename or "").suffix.lower()
    if suffix not in (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"):
        return JSONResponse({"error": f"Unsupported image type: {suffix or '(none)'}"}, 400)
    try:
        img_bytes = await file.read()
        img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
    except Exception as e:
        return JSONResponse({"error": f"Could not read image: {e}"}, 400)

    ocr = ocr_image(img)
    if not ocr["text"].strip():
        return JSONResponse({"error": "No text detected in the image."}, 400)

    try:
        source_text, spans = run_pii_analysis(ocr["text"])
    except Exception as e:
        return JSONResponse({"error": f"PII analysis failed: {e}"}, 500)

    if source_text != ocr["text"]:
        spans = [s for s in spans if s["end"] <= len(ocr["text"])]

    boxes = map_spans_to_boxes(ocr["words"], spans)

    buf = io.BytesIO(); img.save(buf, format="PNG")
    data_url = "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()

    return JSONResponse({
        "filename": file.filename,
        "image": data_url,
        "width": img.width, "height": img.height,
        "boxes": boxes,
        "text": ocr["text"],
        "spans": spans,
        "categories_meta": {k: {"color": v["color"], "label": v["label"]}
                            for k, v in CATEGORIES_META.items()},
    })


@server.api(name="anonymize_screenshot")
def anonymize_screenshot_api(image_path: str) -> str:
    """Gradio API: takes a path to an image, returns JSON with detected boxes."""
    img = Image.open(image_path).convert("RGB")
    ocr = ocr_image(img)
    if not ocr["text"].strip():
        return json.dumps({"boxes": [], "text": "", "spans": []})
    _, spans = run_pii_analysis(ocr["text"])
    boxes = map_spans_to_boxes(ocr["words"], spans)
    return json.dumps({
        "width": img.width, "height": img.height,
        "boxes": boxes, "text": ocr["text"], "spans": spans,
    }, ensure_ascii=False)


# =====================================================================
# FRONTEND (standalone fallback β€” the main UI lives in app_v2.py)
# =====================================================================

FRONTEND_HTML = """<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Screenshot Anonymizer</title></head>
<body style="font-family:system-ui;padding:2rem;max-width:640px;margin:0 auto">
<h1>Screenshot Anonymizer</h1>
<p>This endpoint serves the backend API only. The editor UI lives in
<code>app_v2.py</code>. Use <code>POST /api/detect</code> or the
<code>/anonymize_screenshot</code> Gradio API.</p>
</body></html>"""


if __name__ == "__main__":
    server.launch(server_name="0.0.0.0", server_port=7860)