Spaces:
Running
Running
| """Screenshot capture using Playwright with browser pooling.""" | |
| import atexit | |
| from playwright.sync_api import sync_playwright | |
| from PIL import Image | |
| import tempfile | |
| import time | |
| import os | |
| import io | |
| import threading | |
| class BrowserPool: | |
| """Thread-safe browser pool that keeps a Chromium instance per thread. | |
| Each Flask worker thread gets its own Playwright + Chromium pair via | |
| ``threading.local``. To support clean process shutdown we additionally | |
| keep a global registry (thread-id -> (playwright, browser)) so an | |
| ``atexit`` hook can close every Chromium even on threads that aren't | |
| currently executing β without it, ``shutdown()`` only ever cleaned up | |
| the thread that called it and we leaked Chromium processes. | |
| """ | |
| def __init__(self): | |
| self._local = threading.local() | |
| self._lock = threading.Lock() | |
| # tid -> {"playwright": pw, "browser": br} | |
| self._registry: dict = {} | |
| def _ensure_browser(self): | |
| """Launch browser if not already running in this thread.""" | |
| tid = threading.get_ident() | |
| if not hasattr(self._local, 'playwright') or self._local.playwright is None: | |
| self._local.playwright = sync_playwright().start() | |
| self._local.browser = self._local.playwright.chromium.launch(headless=True) | |
| self._registry[tid] = { | |
| 'playwright': self._local.playwright, | |
| 'browser': self._local.browser, | |
| } | |
| print(f"π Browser launched for thread {threading.current_thread().name}") | |
| elif not self._local.browser.is_connected(): | |
| self._local.browser = self._local.playwright.chromium.launch(headless=True) | |
| self._registry[tid] = { | |
| 'playwright': self._local.playwright, | |
| 'browser': self._local.browser, | |
| } | |
| print(f"π Browser reconnected for thread {threading.current_thread().name}") | |
| def get_page(self, logical_width, logical_height, zoom): | |
| """Get a new page from the thread-local browser.""" | |
| with self._lock: | |
| self._ensure_browser() | |
| page = self._local.browser.new_page( | |
| viewport={"width": logical_width, "height": logical_height}, | |
| device_scale_factor=zoom, | |
| ) | |
| return page | |
| def shutdown(self): | |
| """Clean up browser and playwright resources for current thread.""" | |
| with self._lock: | |
| tid = threading.get_ident() | |
| if hasattr(self._local, 'browser') and self._local.browser: | |
| try: | |
| self._local.browser.close() | |
| except Exception: | |
| pass | |
| self._local.browser = None | |
| if hasattr(self._local, 'playwright') and self._local.playwright: | |
| try: | |
| self._local.playwright.stop() | |
| except Exception: | |
| pass | |
| self._local.playwright = None | |
| self._registry.pop(tid, None) | |
| print( | |
| f"π Browser pool shut down for thread " | |
| f"{threading.current_thread().name}", | |
| flush=True, | |
| ) | |
| def shutdown_all(self): | |
| """Close every Chromium instance we ever launched, regardless of thread. | |
| Called from an ``atexit`` hook on process exit. Closing a Playwright | |
| object created on another thread is technically off-pattern, but in | |
| practice the Playwright sync API tolerates it on shutdown and this | |
| prevents zombie Chromium processes. | |
| """ | |
| with self._lock: | |
| for tid, refs in list(self._registry.items()): | |
| try: | |
| if refs.get('browser') is not None: | |
| refs['browser'].close() | |
| except Exception: | |
| pass | |
| try: | |
| if refs.get('playwright') is not None: | |
| refs['playwright'].stop() | |
| except Exception: | |
| pass | |
| self._registry.pop(tid, None) | |
| print("π BrowserPool.shutdown_all: all Chromium instances closed.", flush=True) | |
| # Module-level browser pool instance | |
| _browser_pool = BrowserPool() | |
| # Best-effort cleanup on process exit. Without this, hot-reload + Ctrl+C | |
| # leak headless Chromium processes (one per worker thread that ever ran). | |
| def _shutdown_browser_pool(): | |
| try: | |
| _browser_pool.shutdown_all() | |
| except Exception: | |
| pass | |
| def get_browser_pool(): | |
| """Get the module-level browser pool.""" | |
| return _browser_pool | |
| def take_screenshot_playwright( | |
| html_content, | |
| save_path, | |
| zoom=2.1, # Single zoom parameter (replaces font_size + device_scale) | |
| overlap=20, # Logical pixels of overlap between consecutive shots | |
| viewport_width=1920, # Final output width in physical pixels | |
| viewport_height=1080, # Final output height in physical pixels | |
| max_screenshots=50, | |
| progress_callback=None, # Optional callback for SSE progress updates | |
| cancel_event=None # Optional threading.Event to abort generation | |
| ): | |
| """ | |
| Take multiple 1920Γ1080 screenshots of HTML content with configurable zoom. | |
| The zoom is achieved purely via device_scale_factor: | |
| - logical viewport = (1920/zoom) Γ (1080/zoom) | |
| - device_scale_factor = zoom | |
| - output image = 1920 Γ 1080 (no resize needed) | |
| Args: | |
| html_content: Full HTML string to render | |
| save_path: Path for the first screenshot (e.g., "output.png") | |
| zoom: Zoom level (2.5 = 250%) | |
| overlap: Overlap between shots in logical pixels | |
| viewport_width: Output image width | |
| viewport_height: Output image height | |
| max_screenshots: Safety cap | |
| progress_callback: Optional callable(message, progress_pct) for progress updates | |
| cancel_event: Optional threading.Event to monitor for user cancellation | |
| """ | |
| screenshots = [] | |
| temp_fd = None | |
| temp_path = None | |
| page = None | |
| try: | |
| # Write HTML to unique temp file (fixes race condition) | |
| temp_fd, temp_path = tempfile.mkstemp(suffix='.html', prefix='screenshot_') | |
| with os.fdopen(temp_fd, 'w', encoding='utf-8') as f: | |
| f.write(html_content) | |
| temp_fd = None # fd is now closed by os.fdopen | |
| file_url = f"file:///{os.path.abspath(temp_path)}" | |
| # βββ Single zoom mechanism βββ | |
| logical_width = int(viewport_width / zoom) | |
| logical_height = int(viewport_height / zoom) | |
| # Clamp overlap below the logical viewport so (viewport_h - overlap) | |
| # in the num_est division is always positive. Without this, a user | |
| # sending overlap >= viewport_h causes ZeroDivisionError or negative | |
| # step-size crashes in the capture loop. | |
| max_overlap = max(0, logical_height - 1) | |
| if overlap > max_overlap: | |
| print( | |
| f"β οΈ overlap={overlap} exceeds logical viewport height " | |
| f"{logical_height}; clamping to {max_overlap}", | |
| flush=True, | |
| ) | |
| overlap = max_overlap | |
| # Use pooled browser | |
| page = _browser_pool.get_page(logical_width, logical_height, zoom) | |
| page.goto(file_url) | |
| page.wait_for_load_state("networkidle") | |
| time.sleep(1) | |
| # Get page dimensions (clean, undistorted values) | |
| dimensions = page.evaluate(""" | |
| () => ({ | |
| scrollHeight: document.documentElement.scrollHeight, | |
| clientHeight: window.innerHeight | |
| }) | |
| """) | |
| total_height = dimensions["scrollHeight"] | |
| viewport_h = dimensions["clientHeight"] | |
| print(f"π Page: {total_height}px logical | Viewport: {viewport_h}px logical", flush=True) | |
| print(f"π Zoom: {zoom}x β logical {logical_width}Γ{logical_height} β " | |
| f"output {viewport_width}Γ{viewport_height}", flush=True) | |
| # Re-clamp against the real clientHeight (may differ from logical_height | |
| # when scrollbars take space) so the divisor below is always >= 1. | |
| if overlap >= viewport_h: | |
| overlap = max(0, viewport_h - 1) | |
| num_est = max(1, -(-total_height // (viewport_h - overlap))) | |
| print(f"πΈ Estimated {num_est} screenshot(s) (overlap={overlap}px)", flush=True) | |
| if progress_callback: | |
| progress_callback(f"Estimated {num_est} screenshot(s)", 10) | |
| # Prepare filenames | |
| base_path, extension = ( | |
| (save_path.rsplit(".", 1)[0], save_path.rsplit(".", 1)[1]) | |
| if "." in save_path | |
| else (save_path, "png") | |
| ) | |
| screenshot_count = 0 | |
| scroll_position = 0 | |
| while screenshot_count < max_screenshots: | |
| # Check for early cancellation | |
| if cancel_event and cancel_event.is_set(): | |
| print("π Screenshot generation aborted by user.") | |
| break | |
| screenshot_count += 1 | |
| # Scroll to position | |
| page.evaluate(f"window.scrollTo(0, {scroll_position})") | |
| time.sleep(0.3) | |
| actual_scroll = page.evaluate("window.pageYOffset") | |
| current_total = page.evaluate( | |
| "document.documentElement.scrollHeight" | |
| ) | |
| print( | |
| f"π Shot {screenshot_count}: " | |
| f"scroll={actual_scroll}/{current_total - viewport_h}", flush=True | |
| ) | |
| # βββ Consistent naming: base(1).png, base(2).png, ... βββ | |
| screenshot_path = f"{base_path}({screenshot_count}).{extension}" | |
| # Capture screenshot | |
| screenshot_bytes = page.screenshot(full_page=False) | |
| img = Image.open(io.BytesIO(screenshot_bytes)) | |
| # Verify exact target size | |
| target_size = (viewport_width, viewport_height) | |
| if img.size != target_size: | |
| print(f" β οΈ Raw {img.size} β resizing to {target_size}") | |
| img = img.resize(target_size, Image.Resampling.LANCZOS) | |
| img.save(screenshot_path, "PNG") | |
| screenshots.append(screenshot_path) | |
| print( | |
| f" β {os.path.basename(screenshot_path)} " | |
| f"({img.size[0]}Γ{img.size[1]})", flush=True | |
| ) | |
| if progress_callback: | |
| pct = 10 + int((screenshot_count / max(num_est, 1)) * 80) | |
| progress_callback( | |
| f"Captured screenshot {screenshot_count}/{num_est}", | |
| min(pct, 90) | |
| ) | |
| # βββ Check if we've reached the bottom βββ | |
| max_scroll = current_total - viewport_h | |
| if actual_scroll >= max_scroll - 2: | |
| print("π Reached bottom of page.", flush=True) | |
| break | |
| # Advance with overlap | |
| scroll_position += viewport_h - overlap | |
| # Clamp to capture the very last strip | |
| if scroll_position > max_scroll: | |
| scroll_position = max_scroll | |
| print(f"\nβ Done β {screenshot_count} screenshot(s) saved.") | |
| finally: | |
| # Close page (but browser stays alive in pool) | |
| if page: | |
| try: | |
| page.close() | |
| except Exception: | |
| pass | |
| # Clean up temp file | |
| if temp_path and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return screenshots | |
| # Alias for compatibility | |
| take_screenshot_selenium = take_screenshot_playwright |