| """ |
| Watermark Remover Module |
| Removes text watermarks from PDF pages using image processing |
| Optimized for file size and quality |
| """ |
|
|
| import io |
| import fitz |
| import numpy as np |
| from PIL import Image |
| from concurrent.futures import ThreadPoolExecutor |
| import time |
|
|
| try: |
| import cv2 |
| CV2_AVAILABLE = True |
| except ImportError: |
| CV2_AVAILABLE = False |
|
|
|
|
| def remove_watermark_from_pdf( |
| pdf_bytes: bytes, |
| watermark_text: str = "Educated Nepal", |
| method: str = "inpaint", |
| intensity: int = 50, |
| dpi: int = 150, |
| jpeg_quality: int = 75, |
| max_workers: int = 4 |
| ) -> bytes: |
| """ |
| Remove watermark from PDF pages with optimized output size. |
| Uses JPEG compression to keep file size small. |
| |
| Args: |
| pdf_bytes: Input PDF as bytes |
| watermark_text: Text to remove (not used in current methods) |
| method: 'inpaint', 'threshold', or 'color' |
| intensity: 0-100, higher = more aggressive removal |
| dpi: Resolution for processing (lower = smaller file, 100-150 recommended) |
| jpeg_quality: JPEG quality 10-100 (lower = smaller file, 60-80 recommended) |
| max_workers: Parallel processing threads |
| """ |
| if not CV2_AVAILABLE: |
| raise ImportError("OpenCV not installed. Run: pip install opencv-python-headless") |
| |
| start_time = time.time() |
| original_size = len(pdf_bytes) |
| |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| output_doc = fitz.open() |
| |
| |
| page_sizes = [(doc[i].rect.width, doc[i].rect.height) for i in range(len(doc))] |
| |
| def process_page(page_num): |
| page = doc[page_num] |
| orig_width, orig_height = page_sizes[page_num] |
| |
| |
| mat = fitz.Matrix(dpi / 72, dpi / 72) |
| pix = page.get_pixmap(matrix=mat) |
| |
| |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) |
| |
| |
| if pix.n == 4: |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) |
| elif pix.n == 3: |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| |
| |
| if method == "inpaint": |
| result = remove_by_inpainting(img, intensity) |
| elif method == "threshold": |
| result = remove_by_threshold(img, intensity) |
| elif method == "color": |
| result = remove_by_color(img, intensity) |
| else: |
| result = remove_by_inpainting(img, intensity) |
| |
| |
| result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) |
| |
| |
| pil_img = Image.fromarray(result_rgb) |
| jpeg_buffer = io.BytesIO() |
| pil_img.save(jpeg_buffer, format='JPEG', quality=jpeg_quality, optimize=True) |
| jpeg_bytes = jpeg_buffer.getvalue() |
| |
| return page_num, jpeg_bytes, (orig_width, orig_height) |
| |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| results = list(executor.map(lambda i: process_page(i), range(len(doc)))) |
| |
| |
| results.sort(key=lambda x: x[0]) |
| |
| |
| for page_num, jpeg_bytes, (orig_width, orig_height) in results: |
| |
| pdf_page = output_doc.new_page(width=orig_width, height=orig_height) |
| |
| |
| rect = fitz.Rect(0, 0, orig_width, orig_height) |
| pdf_page.insert_image(rect, stream=jpeg_bytes) |
| |
| |
| output_bytes = output_doc.tobytes(deflate=True, garbage=4, clean=True) |
| |
| page_count = len(doc) |
| doc.close() |
| output_doc.close() |
| |
| elapsed = time.time() - start_time |
| output_size = len(output_bytes) |
| ratio = output_size / original_size if original_size > 0 else 1 |
| print(f"Watermark removal: {page_count} pages in {elapsed:.1f}s, " |
| f"{original_size/1024:.0f}KB -> {output_size/1024:.0f}KB ({ratio:.1%})") |
| |
| return output_bytes |
|
|
|
|
| def remove_by_inpainting(img: np.ndarray, intensity: int) -> np.ndarray: |
| """Remove watermark using inpainting - best for handwritten notes.""" |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| |
| |
| |
| thresh_value = 220 - int(intensity * 0.4) |
| |
| |
| _, mask = cv2.threshold(gray, thresh_value, 255, cv2.THRESH_BINARY) |
| |
| |
| white_mask = gray > 248 |
| mask[white_mask] = 0 |
| |
| |
| dark_mask = gray < 200 |
| mask[dark_mask] = 0 |
| |
| |
| kernel = np.ones((2, 2), np.uint8) |
| mask = cv2.dilate(mask, kernel, iterations=1) |
| |
| |
| result = cv2.inpaint(img, mask, inpaintRadius=3, flags=cv2.INPAINT_TELEA) |
| |
| return result |
|
|
|
|
| def remove_by_threshold(img: np.ndarray, intensity: int) -> np.ndarray: |
| """Remove watermark by converting light gray to white - fast method.""" |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| |
| |
| thresh_value = 230 - int(intensity * 0.5) |
| |
| |
| mask = (gray > thresh_value) & (gray < 250) |
| |
| result = img.copy() |
| result[mask] = [255, 255, 255] |
| |
| return result |
|
|
|
|
| def remove_by_color(img: np.ndarray, intensity: int) -> np.ndarray: |
| """Remove watermark by targeting gray color range.""" |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| |
| |
| lower_gray = np.array([0, 0, 200 - intensity // 2]) |
| upper_gray = np.array([180, 40 + intensity // 3, 250]) |
| |
| mask = cv2.inRange(hsv, lower_gray, upper_gray) |
| |
| |
| mask[gray < 150] = 0 |
| |
| |
| mask[gray > 250] = 0 |
| |
| kernel = np.ones((2, 2), np.uint8) |
| mask = cv2.dilate(mask, kernel, iterations=1) |
| |
| result = img.copy() |
| result[mask > 0] = [255, 255, 255] |
| |
| return result |
|
|
|
|
| def preview_single_page( |
| pdf_bytes: bytes, |
| page_num: int = 0, |
| method: str = "inpaint", |
| intensity: int = 50, |
| dpi: int = 100 |
| ) -> tuple[bytes, bytes]: |
| """ |
| Preview watermark removal on a single page. |
| Returns (original_jpeg, processed_jpeg) for comparison. |
| """ |
| if not CV2_AVAILABLE: |
| raise ImportError("OpenCV not installed") |
| |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| |
| if page_num >= len(doc): |
| page_num = 0 |
| |
| page = doc[page_num] |
| mat = fitz.Matrix(dpi / 72, dpi / 72) |
| pix = page.get_pixmap(matrix=mat) |
| |
| |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) |
| |
| if pix.n == 4: |
| img_bgr = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB) |
| else: |
| img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| img_rgb = img |
| |
| |
| pil_original = Image.fromarray(img_rgb) |
| orig_buffer = io.BytesIO() |
| pil_original.save(orig_buffer, format='JPEG', quality=85) |
| original_bytes = orig_buffer.getvalue() |
| |
| |
| if method == "inpaint": |
| result = remove_by_inpainting(img_bgr, intensity) |
| elif method == "threshold": |
| result = remove_by_threshold(img_bgr, intensity) |
| else: |
| result = remove_by_color(img_bgr, intensity) |
| |
| |
| result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) |
| pil_processed = Image.fromarray(result_rgb) |
| proc_buffer = io.BytesIO() |
| pil_processed.save(proc_buffer, format='JPEG', quality=85) |
| processed_bytes = proc_buffer.getvalue() |
| |
| doc.close() |
| return original_bytes, processed_bytes |
|
|