""" LocalTools Server - PDF Editor & Image Scraper FastAPI backend with async operations, logging, and cleanup """ import asyncio import io import json import logging import re import uuid import zipfile from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List from urllib.parse import urljoin, urlparse import aiohttp import fitz # PyMuPDF from PIL import Image from pydantic import BaseModel, Field, HttpUrl from pydantic_settings import BaseSettings from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware # =============== Configuration =============== class Settings(BaseSettings): """Application settings with environment variable support""" app_name: str = "LocalTools" debug: bool = False # Paths work_dir: str = "work" output_dir: str = "outputs" static_dir: str = "static" # Limits max_pdf_size_mb: int = 100 max_image_size_mb: int = 50 min_image_dimension: int = 50 request_timeout: int = 60 # Cleanup cleanup_interval_hours: int = 24 file_retention_hours: int = 48 class Config: env_prefix = "LOCALTOOLS_" settings = Settings() # =============== Logging Setup =============== logging.basicConfig( level=logging.DEBUG if settings.debug else logging.INFO, format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger("localtools") # =============== Path Setup =============== APP_DIR = Path(__file__).parent STATIC_DIR = APP_DIR / settings.static_dir WORK_DIR = APP_DIR / settings.work_dir OUT_DIR = APP_DIR / settings.output_dir SCRAPE_DIR = WORK_DIR / "scrape_jobs" for directory in [WORK_DIR, OUT_DIR, SCRAPE_DIR]: directory.mkdir(exist_ok=True) # =============== Constants =============== UA_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp"} # =============== Response Models =============== class ScrapeResult(BaseModel): job_id: str images: List[dict] total_found: int filtered_count: int class HealthResponse(BaseModel): status: str version: str = "1.0.0" uptime_seconds: float # =============== Cleanup Task =============== startup_time = datetime.now() async def cleanup_old_files(): """Remove files older than retention period""" retention = timedelta(hours=settings.file_retention_hours) cutoff = datetime.now() - retention removed = 0 for directory in [WORK_DIR, OUT_DIR]: for file_path in directory.glob("*"): if file_path.is_file(): mtime = datetime.fromtimestamp(file_path.stat().st_mtime) if mtime < cutoff: try: file_path.unlink() removed += 1 except Exception as e: logger.warning(f"Failed to remove {file_path}: {e}") # Clean old scrape jobs for job_dir in SCRAPE_DIR.glob("*"): if job_dir.is_dir(): mtime = datetime.fromtimestamp(job_dir.stat().st_mtime) if mtime < cutoff: try: for f in job_dir.glob("*"): f.unlink() job_dir.rmdir() removed += 1 except Exception as e: logger.warning(f"Failed to remove job {job_dir}: {e}") if removed > 0: logger.info(f"Cleanup: removed {removed} old files/directories") async def periodic_cleanup(): """Run cleanup periodically""" while True: await asyncio.sleep(settings.cleanup_interval_hours * 3600) await cleanup_old_files() # =============== Lifespan =============== @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events""" logger.info(f"Starting {settings.app_name}") # Start cleanup task cleanup_task = asyncio.create_task(periodic_cleanup()) # Run initial cleanup await cleanup_old_files() yield # Shutdown cleanup_task.cancel() logger.info("Shutting down") # =============== App Setup =============== app = FastAPI( title=settings.app_name, description="PDF Editor & Image Scraper API", version="1.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Static files app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") # =============== PDF Helpers =============== def to_points(value: float, unit: str) -> float: """Convert measurement to PDF points""" conversions = { "pt": 1.0, "points": 1.0, "in": 72.0, "inch": 72.0, "inches": 72.0, "cm": 72.0 / 2.54, "centimeter": 72.0 / 2.54, "centimeters": 72.0 / 2.54, "mm": 72.0 / 25.4, "millimeter": 72.0 / 25.4, "millimeters": 72.0 / 25.4, } unit_lower = unit.lower().strip() if unit_lower not in conversions: raise ValueError(f"Invalid unit: {unit}. Use: pt, in, cm, or mm") return float(value) * conversions[unit_lower] def parse_page_spec(spec: str, total_pages: int) -> set[int]: """ Parse page specification string to set of 0-based page indices. Input: "1,3,5" or "2-6" or "1,3-5,9" (1-based) Output: Set of 0-based indices """ spec = (spec or "").strip().replace(" ", "") if not spec: return set() result = set() for part in spec.split(","): if not part: continue try: if "-" in part: start_str, end_str = part.split("-", 1) start, end = int(start_str), int(end_str) if start > end: start, end = end, start for p in range(start, end + 1): if 1 <= p <= total_pages: result.add(p - 1) else: p = int(part) if 1 <= p <= total_pages: result.add(p - 1) except ValueError: logger.warning(f"Invalid page spec part: {part}") continue return result def extract_drive_file_id(url: str) -> Optional[str]: """Extract Google Drive file ID from URL""" patterns = [ r"/file/d/([^/]+)", r"[?&]id=([^&]+)", ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None async def download_from_url(url: str, out_path: Path, is_drive: bool = False) -> Path: """Download file from URL using aiohttp""" timeout = aiohttp.ClientTimeout(total=settings.request_timeout) async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session: if is_drive: file_id = extract_drive_file_id(url) if not file_id: raise ValueError("Invalid Google Drive URL. Use format: /file/d//view") download_url = f"https://drive.google.com/uc?export=download&id={file_id}" async with session.get(download_url) as resp: resp.raise_for_status() # Check for confirmation token (large files) cookies = resp.cookies confirm_token = None for key, cookie in cookies.items(): if key.startswith("download_warning"): confirm_token = cookie.value break if confirm_token: download_url = f"{download_url}&confirm={confirm_token}" async with session.get(download_url) as resp2: resp2.raise_for_status() content = await resp2.read() else: content = await resp.read() else: async with session.get(url) as resp: resp.raise_for_status() content = await resp.read() # Validate PDF if not content[:5] == b"%PDF-": # Check if it's an HTML error page if b" None: """Validate uploaded PDF data""" if not data: raise ValueError("Empty file uploaded") if len(data) > settings.max_pdf_size_mb * 1024 * 1024: raise ValueError(f"File too large. Maximum size: {settings.max_pdf_size_mb}MB") if not data[:5] == b"%PDF-": raise ValueError("Invalid PDF file") def add_text_watermark(page: fitz.Page, text: str, font_size: float, rotate: int) -> None: """Add text watermark to PDF page""" if not text.strip(): return rect = page.rect box = fitz.Rect( rect.x0 + rect.width * 0.05, rect.y0 + rect.height * 0.05, rect.x1 - rect.width * 0.05, rect.y1 - rect.height * 0.05, ) try: page.insert_textbox( box, text, fontsize=float(font_size), rotate=int(rotate), align=fitz.TEXT_ALIGN_CENTER, color=(0.55, 0.55, 0.55), overlay=True, fill_opacity=0.18, stroke_opacity=0.18, ) except TypeError: # Fallback for older PyMuPDF versions page.insert_textbox( box, text, fontsize=float(font_size), rotate=int(rotate), align=fitz.TEXT_ALIGN_CENTER, color=(0.75, 0.75, 0.75), overlay=True, ) def process_pdf( input_path: Path, output_path: Path, remove_pages: str, crop_top: float, crop_bottom: float, crop_left: float, crop_right: float, unit: str, watermark_text: str, watermark_size: float, watermark_rotate: int, ) -> dict: """Process PDF with cropping, page removal, and watermark""" src = fitz.open(str(input_path)) total = src.page_count remove_set = parse_page_spec(remove_pages, total) # Convert crop values to points T = to_points(crop_top, unit) B = to_points(crop_bottom, unit) L = to_points(crop_left, unit) R = to_points(crop_right, unit) out = fitz.open() kept = 0 for i in range(total): if i in remove_set: continue out.insert_pdf(src, from_page=i, to_page=i) page = out.load_page(out.page_count - 1) rect = page.rect crop_rect = fitz.Rect(rect.x0 + L, rect.y0 + T, rect.x1 - R, rect.y1 - B) if crop_rect.is_empty or crop_rect.width <= 2 or crop_rect.height <= 2: raise ValueError(f"Invalid crop on page {i+1}. Reduce crop values.") page.set_cropbox(crop_rect) if watermark_text.strip(): add_text_watermark(page, watermark_text, watermark_size, watermark_rotate) kept += 1 if kept == 0: raise ValueError("All pages removed. Output would be empty.") output_path.parent.mkdir(parents=True, exist_ok=True) out.save(str(output_path)) out.close() src.close() logger.info(f"Processed PDF: {kept}/{total} pages kept") return { "original_pages": total, "output_pages": kept, "removed_pages": len(remove_set) } # =============== Image Scraper Helpers =============== def normalize_url(url: str) -> str: """Normalize URL by removing fragment""" parsed = urlparse(url) return parsed._replace(fragment="").geturl() def best_from_srcset(srcset: str, base_url: str) -> Optional[str]: """Extract best quality image URL from srcset attribute""" if not srcset: return None candidates = [] for part in srcset.split(","): part = part.strip() if not part: continue bits = part.split() img_url = urljoin(base_url, bits[0]) score = 0.0 if len(bits) > 1: descriptor = bits[1].lower().strip() try: if descriptor.endswith("w"): score = float(descriptor[:-1]) elif descriptor.endswith("x"): score = float(descriptor[:-1]) * 10000.0 except ValueError: pass candidates.append((score, img_url)) if not candidates: return None candidates.sort(key=lambda x: x[0], reverse=True) return candidates[0][1] def safe_filename(url: str, fallback: str) -> str: """Generate safe filename from URL""" name = Path(urlparse(url).path).name or fallback name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name) if "." not in name: name += ".jpg" return name[:120] async def fetch_image(session: aiohttp.ClientSession, img_url: str) -> tuple[bytes, Optional[int], Optional[int], Optional[str]]: """Fetch image and return bytes with dimensions""" async with session.get(img_url) as resp: resp.raise_for_status() content_type = resp.headers.get("Content-Type", "") data = await resp.read() width = height = None try: with Image.open(io.BytesIO(data)) as img: width, height = img.size except Exception: pass return data, width, height, content_type async def scrape_images(page_url: str) -> dict: """Scrape images from webpage""" page_url = page_url.strip() if not page_url: raise ValueError("Web page URL is required") timeout = aiohttp.ClientTimeout(total=settings.request_timeout) async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session: # Fetch page HTML async with session.get(page_url) as resp: resp.raise_for_status() html = await resp.text() # Parse HTML (using regex for simplicity, avoiding lxml dependency issues) found_urls: List[str] = [] # Find all img tags img_pattern = r']+>' lazy_attrs = ["data-src", "data-original", "data-lazy-src", "data-url", "data-image", "data-srcset", "srcset", "src"] for img_match in re.finditer(img_pattern, html, re.IGNORECASE): img_tag = img_match.group() for attr in lazy_attrs: attr_pattern = rf'{attr}=["\']([^"\']+)["\']' attr_match = re.search(attr_pattern, img_tag, re.IGNORECASE) if attr_match: value = attr_match.group(1) if "srcset" in attr.lower(): best = best_from_srcset(value, page_url) if best: found_urls.append(best) else: found_urls.append(urljoin(page_url, value)) # Deduplicate seen = set() deduped = [] for url in found_urls: normalized = normalize_url(url) if normalized not in seen and normalized.startswith("http"): seen.add(normalized) deduped.append(normalized) logger.info(f"Found {len(deduped)} unique image URLs on {page_url}") # Create job directory job_id = uuid.uuid4().hex[:10] job_dir = SCRAPE_DIR / job_id job_dir.mkdir(parents=True, exist_ok=True) # Fetch images concurrently images = [] filtered = 0 async def process_image(idx: int, url: str): nonlocal filtered try: data, width, height, content_type = await fetch_image(session, url) # Filter small images if width and height: if width < settings.min_image_dimension or height < settings.min_image_dimension: filtered += 1 return None img_id = uuid.uuid4().hex[:10] filename = safe_filename(url, f"image_{idx}.jpg") # Save image data (job_dir / f"{img_id}.bin").write_bytes(data) return { "id": img_id, "url": url, "filename": filename, "width": width, "height": height, "bytes": len(data), } except Exception as e: logger.debug(f"Failed to fetch {url}: {e}") return None # Process images with concurrency limit semaphore = asyncio.Semaphore(10) async def limited_process(idx: int, url: str): async with semaphore: return await process_image(idx, url) tasks = [limited_process(idx, url) for idx, url in enumerate(deduped, 1)] results = await asyncio.gather(*tasks) images = [r for r in results if r is not None] # Save metadata meta = { "page_url": page_url, "scraped_at": datetime.now().isoformat(), "images": images } (job_dir / "meta.json").write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) logger.info(f"Scraped {len(images)} images, filtered {filtered} small icons") return { "job_id": job_id, "images": images, "total_found": len(deduped), "filtered_count": filtered } def load_scrape_job(job_id: str) -> dict: """Load scrape job metadata""" job_dir = SCRAPE_DIR / job_id meta_path = job_dir / "meta.json" if not job_dir.exists() or not meta_path.exists(): raise HTTPException(status_code=404, detail="Scrape job not found. Please scrape again.") try: return json.loads(meta_path.read_text(encoding="utf-8")) except Exception as e: logger.error(f"Failed to load job {job_id}: {e}") raise HTTPException(status_code=500, detail="Corrupted scrape job metadata") # =============== Utility Functions =============== def sanitize_filename(name: str, default: str = "file") -> str: """Sanitize filename for safe file system use""" name = (name or default).strip() name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name) return name[:200] or default def generate_job_id() -> str: """Generate unique job ID""" return uuid.uuid4().hex[:10] # =============== API Routes =============== @app.get("/", response_class=HTMLResponse) async def home(): """Serve main HTML page""" index_path = STATIC_DIR / "index.html" if not index_path.exists(): raise HTTPException(status_code=500, detail="Frontend not found") return HTMLResponse(index_path.read_text(encoding="utf-8")) @app.get("/api/health", response_model=HealthResponse) async def health_check(): """Health check endpoint""" uptime = (datetime.now() - startup_time).total_seconds() return HealthResponse(status="healthy", uptime_seconds=uptime) @app.post("/api/fetch") async def api_fetch_pdf( url: str = Form(""), file: UploadFile = File(None), output_name: str = Form("original.pdf"), ): """Fetch/upload PDF for preview (no processing)""" output_name = sanitize_filename(output_name, "original.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" job_id = generate_job_id() input_path = WORK_DIR / f"original_{job_id}.pdf" try: if file is not None and file.filename: data = await file.read() validate_pdf_upload(data) input_path.write_bytes(data) logger.info(f"Uploaded PDF: {file.filename}") elif url.strip(): is_drive = "drive.google.com" in url await download_from_url(url, input_path, is_drive=is_drive) else: raise ValueError("Provide a PDF URL or upload a file") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except aiohttp.ClientError as e: logger.error(f"Download failed: {e}") raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") except Exception as e: logger.exception("Unexpected error in fetch") raise HTTPException(status_code=500, detail="Internal server error") return FileResponse( path=str(input_path), media_type="application/pdf", filename=output_name ) @app.post("/api/process") async def api_process_pdf( url: str = Form(""), file: UploadFile = File(None), output_name: str = Form("cropped.pdf"), remove_pages: str = Form(""), unit: str = Form("mm"), top: float = Form(0), bottom: float = Form(0), left: float = Form(0), right: float = Form(0), watermark_text: str = Form(""), watermark_size: float = Form(36), watermark_rotate: int = Form(45), ): """Process PDF with cropping, page removal, and watermark""" output_name = sanitize_filename(output_name, "cropped.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" job_id = generate_job_id() input_path = WORK_DIR / f"input_{job_id}.pdf" output_path = OUT_DIR / f"{job_id}_{output_name}" # Get input PDF try: if file is not None and file.filename: data = await file.read() validate_pdf_upload(data) input_path.write_bytes(data) elif url.strip(): is_drive = "drive.google.com" in url await download_from_url(url, input_path, is_drive=is_drive) else: raise ValueError("Provide a PDF URL or upload a file") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except aiohttp.ClientError as e: raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") # Process PDF try: process_pdf( input_path=input_path, output_path=output_path, remove_pages=remove_pages, crop_top=top, crop_bottom=bottom, crop_left=left, crop_right=right, unit=unit, watermark_text=watermark_text, watermark_size=watermark_size, watermark_rotate=watermark_rotate, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.exception("PDF processing failed") raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name ) @app.post("/api/scrape-images") async def api_scrape_images(page_url: str = Form(...)): """Scrape images from a webpage""" try: result = await scrape_images(page_url) return JSONResponse(result) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except aiohttp.ClientError as e: raise HTTPException(status_code=400, detail=f"Failed to fetch page: {str(e)}") except Exception as e: logger.exception("Scrape failed") raise HTTPException(status_code=500, detail="Scraping failed") @app.post("/api/download-zip") async def api_download_zip( job_id: str = Form(...), image_ids: str = Form(...), zip_name: str = Form("images.zip") ): """Download selected images as ZIP""" meta = load_scrape_job(job_id.strip()) job_dir = SCRAPE_DIR / job_id.strip() selected = [x.strip() for x in image_ids.split(",") if x.strip()] if not selected: raise HTTPException(status_code=400, detail="No images selected") zip_name = sanitize_filename(zip_name, "images.zip") if not zip_name.lower().endswith(".zip"): zip_name += ".zip" output_path = OUT_DIR / f"{generate_job_id()}_{zip_name}" id_to_meta = {img["id"]: img for img in meta.get("images", [])} with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for img_id in selected: bin_path = job_dir / f"{img_id}.bin" if bin_path.exists(): filename = id_to_meta.get(img_id, {}).get("filename", f"{img_id}.jpg") zf.writestr(filename, bin_path.read_bytes()) logger.info(f"Created ZIP with {len(selected)} images") return FileResponse( path=str(output_path), media_type="application/zip", filename=zip_name ) @app.post("/api/download-pdf") async def api_download_pdf( job_id: str = Form(...), image_ids: str = Form(...), pdf_name: str = Form("images.pdf") ): """Download selected images as PDF""" meta = load_scrape_job(job_id.strip()) job_dir = SCRAPE_DIR / job_id.strip() selected = [x.strip() for x in image_ids.split(",") if x.strip()] if not selected: raise HTTPException(status_code=400, detail="No images selected") pdf_name = sanitize_filename(pdf_name, "images.pdf") if not pdf_name.lower().endswith(".pdf"): pdf_name += ".pdf" # Convert images to PDF pages pil_pages: List[Image.Image] = [] for img_id in selected: bin_path = job_dir / f"{img_id}.bin" if not bin_path.exists(): continue try: with Image.open(bin_path) as img: rgb = img.convert("RGB") pil_pages.append(rgb.copy()) except Exception as e: logger.warning(f"Failed to process image {img_id}: {e}") continue if not pil_pages: raise HTTPException(status_code=400, detail="No valid images to convert") output_path = OUT_DIR / f"{generate_job_id()}_{pdf_name}" first = pil_pages[0] rest = pil_pages[1:] if len(pil_pages) > 1 else [] first.save(output_path, "PDF", save_all=True, append_images=rest) logger.info(f"Created PDF with {len(pil_pages)} images") return FileResponse( path=str(output_path), media_type="application/pdf", filename=pdf_name ) @app.post("/api/cleanup") async def api_trigger_cleanup(background_tasks: BackgroundTasks): """Manually trigger cleanup (for admin use)""" background_tasks.add_task(cleanup_old_files) return {"message": "Cleanup scheduled"} @app.post("/api/remove-watermark") async def api_remove_watermark( url: str = Form(""), file: UploadFile = File(None), output_name: str = Form("cleaned.pdf"), watermark_text: str = Form("Educated Nepal"), method: str = Form("inpaint"), intensity: int = Form(50), dpi: int = Form(120), quality: int = Form(70), ): """Remove watermark from PDF using image processing""" try: from watermark_remover import remove_watermark_from_pdf, CV2_AVAILABLE if not CV2_AVAILABLE: raise HTTPException( status_code=500, detail="OpenCV not installed. Run: pip install opencv-python-headless" ) except ImportError as e: raise HTTPException(status_code=500, detail=str(e)) output_name = sanitize_filename(output_name, "cleaned.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" job_id = generate_job_id() input_path = WORK_DIR / f"input_{job_id}.pdf" output_path = OUT_DIR / f"{job_id}_{output_name}" # Get input PDF try: if file is not None and file.filename: data = await file.read() validate_pdf_upload(data) input_path.write_bytes(data) elif url.strip(): is_drive = "drive.google.com" in url await download_from_url(url, input_path, is_drive=is_drive) else: raise ValueError("Provide a PDF URL or upload a file") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except aiohttp.ClientError as e: raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") # Clamp values dpi = max(72, min(200, dpi)) quality = max(30, min(95, quality)) # Process watermark removal try: pdf_bytes = input_path.read_bytes() original_size = len(pdf_bytes) result_bytes = remove_watermark_from_pdf( pdf_bytes=pdf_bytes, watermark_text=watermark_text, method=method, intensity=intensity, dpi=dpi, jpeg_quality=quality ) output_path.write_bytes(result_bytes) output_size = len(result_bytes) logger.info(f"Watermark removed: {output_name}, {original_size/1024:.0f}KB -> {output_size/1024:.0f}KB") except Exception as e: logger.exception("Watermark removal failed") raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name, headers={ "X-Original-Size": str(original_size), "X-Output-Size": str(output_size) } ) @app.post("/api/watermark-preview") async def api_watermark_preview( url: str = Form(""), file: UploadFile = File(None), page: int = Form(0), method: str = Form("inpaint"), intensity: int = Form(50), ): """Preview watermark removal on a single page - returns original and processed images""" try: from watermark_remover import preview_single_page, CV2_AVAILABLE if not CV2_AVAILABLE: raise HTTPException(status_code=500, detail="OpenCV not installed") except ImportError as e: raise HTTPException(status_code=500, detail=str(e)) # Get input PDF try: if file is not None and file.filename: pdf_bytes = await file.read() validate_pdf_upload(pdf_bytes) elif url.strip(): job_id = generate_job_id() input_path = WORK_DIR / f"preview_{job_id}.pdf" is_drive = "drive.google.com" in url await download_from_url(url, input_path, is_drive=is_drive) pdf_bytes = input_path.read_bytes() else: raise ValueError("Provide a PDF URL or upload a file") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Generate preview try: import base64 original_png, processed_png = preview_single_page( pdf_bytes=pdf_bytes, page_num=page, method=method, intensity=intensity, dpi=100 ) return JSONResponse({ "original": base64.b64encode(original_png).decode(), "processed": base64.b64encode(processed_png).decode(), "page": page }) except Exception as e: logger.exception("Preview generation failed") raise HTTPException(status_code=500, detail=f"Preview failed: {str(e)}") # =============== Images to PDF =============== @app.post("/api/images-to-pdf") async def api_images_to_pdf( files: List[UploadFile] = File(...), order: str = Form(""), output_name: str = Form("images.pdf"), page_size: str = Form("a4"), margin: int = Form(20), ): """Convert multiple images to PDF with custom order""" if not files: raise HTTPException(status_code=400, detail="No images provided") output_name = sanitize_filename(output_name, "images.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" # Parse order if provided (comma-separated indices) if order.strip(): try: indices = [int(i.strip()) for i in order.split(",")] ordered_files = [files[i] for i in indices if 0 <= i < len(files)] except (ValueError, IndexError): ordered_files = files else: ordered_files = files # Page sizes in points page_sizes = { "a4": (595, 842), "letter": (612, 792), "a3": (842, 1191), "fit": None # Fit to image } try: pil_images = [] for f in ordered_files: data = await f.read() try: img = Image.open(io.BytesIO(data)) if img.mode in ('RGBA', 'P'): img = img.convert('RGB') pil_images.append(img) except Exception as e: logger.warning(f"Skipping invalid image {f.filename}: {e}") continue if not pil_images: raise HTTPException(status_code=400, detail="No valid images found") # Create PDF output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" if page_size == "fit": # Each page fits the image first = pil_images[0] rest = pil_images[1:] if len(pil_images) > 1 else [] first.save(output_path, "PDF", save_all=True, append_images=rest) else: # Fixed page size with margins page_w, page_h = page_sizes.get(page_size, page_sizes["a4"]) doc = fitz.open() for img in pil_images: # Save image to bytes img_buffer = io.BytesIO() img.save(img_buffer, format='JPEG', quality=90) img_bytes = img_buffer.getvalue() # Create page page = doc.new_page(width=page_w, height=page_h) # Calculate image rect with margins img_w, img_h = img.size available_w = page_w - 2 * margin available_h = page_h - 2 * margin # Scale to fit scale = min(available_w / img_w, available_h / img_h) new_w = img_w * scale new_h = img_h * scale # Center on page x = (page_w - new_w) / 2 y = (page_h - new_h) / 2 rect = fitz.Rect(x, y, x + new_w, y + new_h) page.insert_image(rect, stream=img_bytes) doc.save(str(output_path)) doc.close() logger.info(f"Created PDF from {len(pil_images)} images") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name ) except Exception as e: logger.exception("Images to PDF failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Merge PDFs =============== @app.post("/api/merge-pdf") async def api_merge_pdf( files: List[UploadFile] = File(...), order: str = Form(""), output_name: str = Form("merged.pdf"), ): """Merge multiple PDFs into one""" if not files or len(files) < 2: raise HTTPException(status_code=400, detail="At least 2 PDF files required") output_name = sanitize_filename(output_name, "merged.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" # Parse order if provided if order.strip(): try: indices = [int(i.strip()) for i in order.split(",")] ordered_files = [files[i] for i in indices if 0 <= i < len(files)] except (ValueError, IndexError): ordered_files = files else: ordered_files = files try: output_doc = fitz.open() total_pages = 0 for f in ordered_files: data = await f.read() if not data[:5] == b"%PDF-": logger.warning(f"Skipping non-PDF file: {f.filename}") continue src_doc = fitz.open(stream=data, filetype="pdf") output_doc.insert_pdf(src_doc) total_pages += len(src_doc) src_doc.close() if total_pages == 0: raise HTTPException(status_code=400, detail="No valid PDF files found") output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" output_doc.save(str(output_path), deflate=True) output_doc.close() logger.info(f"Merged {len(ordered_files)} PDFs, {total_pages} total pages") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name ) except Exception as e: logger.exception("PDF merge failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Split PDF =============== @app.post("/api/split-pdf") async def api_split_pdf( file: UploadFile = File(...), mode: str = Form("all"), pages: str = Form(""), output_name: str = Form("split"), ): """ Split PDF into multiple files. Modes: 'all' (each page), 'range' (specific pages), 'chunks' (every N pages) """ data = await file.read() validate_pdf_upload(data) output_name = sanitize_filename(output_name, "split") try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) if mode == "all": # Split into individual pages job_id = generate_job_id() zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for i in range(total_pages): page_doc = fitz.open() page_doc.insert_pdf(src_doc, from_page=i, to_page=i) pdf_bytes = page_doc.tobytes() zf.writestr(f"{output_name}_page_{i+1}.pdf", pdf_bytes) page_doc.close() src_doc.close() logger.info(f"Split PDF into {total_pages} individual pages") return FileResponse( path=str(zip_path), media_type="application/zip", filename=f"{output_name}_pages.zip" ) elif mode == "range": # Extract specific pages page_set = parse_page_spec(pages, total_pages) if not page_set: raise HTTPException(status_code=400, detail="No valid pages specified") output_doc = fitz.open() for i in sorted(page_set): output_doc.insert_pdf(src_doc, from_page=i, to_page=i) output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.pdf" output_doc.save(str(output_path)) output_doc.close() src_doc.close() logger.info(f"Extracted {len(page_set)} pages from PDF") return FileResponse( path=str(output_path), media_type="application/pdf", filename=f"{output_name}.pdf" ) elif mode == "chunks": # Split into chunks of N pages try: chunk_size = int(pages) if pages else 1 chunk_size = max(1, chunk_size) except ValueError: chunk_size = 1 job_id = generate_job_id() zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: chunk_num = 1 for start in range(0, total_pages, chunk_size): end = min(start + chunk_size - 1, total_pages - 1) chunk_doc = fitz.open() chunk_doc.insert_pdf(src_doc, from_page=start, to_page=end) pdf_bytes = chunk_doc.tobytes() zf.writestr(f"{output_name}_part_{chunk_num}.pdf", pdf_bytes) chunk_doc.close() chunk_num += 1 src_doc.close() logger.info(f"Split PDF into {chunk_num-1} chunks of {chunk_size} pages") return FileResponse( path=str(zip_path), media_type="application/zip", filename=f"{output_name}_parts.zip" ) else: raise HTTPException(status_code=400, detail="Invalid split mode") except HTTPException: raise except Exception as e: logger.exception("PDF split failed") raise HTTPException(status_code=500, detail=str(e)) # =============== PDF to Images =============== @app.post("/api/pdf-to-images") async def api_pdf_to_images( file: UploadFile = File(...), format: str = Form("png"), dpi: int = Form(200), pages: str = Form(""), output_name: str = Form("pages"), ): """Convert PDF pages to high-quality images (PNG or JPG) DPI Guide: - 150: Fast, small files (web preview) - 200: Good quality (default) - 300: Print quality - 400: Ultra sharp (presentations) - 600: Maximum quality (OCR/archive) """ data = await file.read() validate_pdf_upload(data) output_name = sanitize_filename(output_name, "pages") format = format.lower() if format.lower() in ["png", "jpg", "jpeg"] else "png" if format == "jpeg": format = "jpg" # Allow higher DPI for ultra-sharp output (up to 600) dpi = max(72, min(600, dpi)) try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) # Parse page selection if pages.strip(): page_set = parse_page_spec(pages, total_pages) else: page_set = set(range(total_pages)) if not page_set: raise HTTPException(status_code=400, detail="No valid pages specified") job_id = generate_job_id() zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" # Vector render at specified DPI - handles portrait/landscape automatically zoom = dpi / 72.0 matrix = fitz.Matrix(zoom, zoom) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for i in sorted(page_set): page = src_doc.load_page(i) # alpha=False removes transparency for sharper text pix = page.get_pixmap(matrix=matrix, alpha=False) if format == "png": img_bytes = pix.tobytes("png") else: # Convert to JPG via PIL img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=95, optimize=True) img_bytes = buffer.getvalue() zf.writestr(f"{output_name}_page_{i+1}.{format}", img_bytes) src_doc.close() logger.info(f"Converted {len(page_set)} pages to {format.upper()} at {dpi} DPI") return FileResponse( path=str(zip_path), media_type="application/zip", filename=f"{output_name}_images.zip" ) except HTTPException: raise except Exception as e: logger.exception("PDF to images failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Compress PDF =============== @app.post("/api/compress-pdf") async def api_compress_pdf( file: UploadFile = File(...), quality: int = Form(60), output_name: str = Form("compressed.pdf"), ): """Compress PDF by reducing image quality and cleaning metadata""" data = await file.read() validate_pdf_upload(data) output_name = sanitize_filename(output_name, "compressed.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" # Use quality directly (10-100 scale, same as image compression) jpeg_quality = max(10, min(100, quality)) # Calculate max dimension based on quality # Lower quality = smaller max dimension for more compression max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000 logger.info(f"Compressing PDF with quality={jpeg_quality}, max_dim={max_dim}") try: src_doc = fitz.open(stream=data, filetype="pdf") images_processed = 0 # Compress images within the PDF for page_num in range(len(src_doc)): page = src_doc.load_page(page_num) image_list = page.get_images(full=True) for img_index, img_info in enumerate(image_list): xref = img_info[0] try: # Extract image base_image = src_doc.extract_image(xref) if not base_image: continue image_bytes = base_image["image"] original_img_size = len(image_bytes) # Skip very small images if original_img_size < 5000: continue # Open and compress image img = Image.open(io.BytesIO(image_bytes)) # Convert to RGB if necessary if img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode == 'RGBA': background.paste(img, mask=img.split()[3]) img = background else: img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') # Resize if larger than max_dim if max(img.size) > max_dim: ratio = max_dim / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) # Compress to JPEG with specified quality buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) compressed_bytes = buffer.getvalue() # Always replace to apply quality setting (even if slightly larger) # This ensures quality slider actually affects output page.replace_image(xref, stream=compressed_bytes) images_processed += 1 except Exception as e: logger.debug(f"Could not compress image {xref}: {e}") continue logger.info(f"Processed {images_processed} images") output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" # Save with compression options src_doc.save( str(output_path), garbage=4, # Remove unused objects deflate=True, # Compress streams clean=True, # Clean content streams ) original_size = len(data) compressed_size = output_path.stat().st_size # If compression didn't help much, try alternative method if compressed_size >= original_size * 0.95: src_doc.close() # Fallback: just clean and deflate without image processing src_doc = fitz.open(stream=data, filetype="pdf") src_doc.save( str(output_path), garbage=4, deflate=True, clean=True, ) compressed_size = output_path.stat().st_size reduction = ((original_size - compressed_size) / original_size) * 100 if reduction < 0: reduction = 0 # If still larger, just copy original output_path.write_bytes(data) compressed_size = original_size src_doc.close() logger.info(f"Compressed PDF: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name, headers={ "X-Original-Size": str(original_size), "X-Compressed-Size": str(compressed_size), "X-Reduction-Percent": f"{reduction:.1f}" } ) except Exception as e: logger.exception("PDF compression failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Rotate PDF =============== @app.post("/api/rotate-pdf") async def api_rotate_pdf( file: UploadFile = File(...), rotation: int = Form(90), pages: str = Form(""), output_name: str = Form("rotated.pdf"), ): """Rotate PDF pages (90, 180, or 270 degrees)""" data = await file.read() validate_pdf_upload(data) output_name = sanitize_filename(output_name, "rotated.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" # Normalize rotation to 90, 180, or 270 rotation = int(rotation) % 360 if rotation not in [90, 180, 270]: rotation = 90 try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) # Parse page selection (empty = all pages) if pages.strip(): page_set = parse_page_spec(pages, total_pages) else: page_set = set(range(total_pages)) # Rotate selected pages for i in page_set: page = src_doc.load_page(i) page.set_rotation(page.rotation + rotation) output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" src_doc.save(str(output_path)) src_doc.close() logger.info(f"Rotated {len(page_set)} pages by {rotation}°") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name ) except Exception as e: logger.exception("PDF rotation failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Add Page Numbers =============== @app.post("/api/add-page-numbers") async def api_add_page_numbers( file: UploadFile = File(...), position: str = Form("bottom-center"), format: str = Form("Page {n} of {total}"), start_number: int = Form(1), font_size: int = Form(11), margin: int = Form(30), output_name: str = Form("numbered.pdf"), ): """Add page numbers to PDF""" data = await file.read() validate_pdf_upload(data) output_name = sanitize_filename(output_name, "numbered.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" # Position mapping positions = { "top-left": ("left", "top"), "top-center": ("center", "top"), "top-right": ("right", "top"), "bottom-left": ("left", "bottom"), "bottom-center": ("center", "bottom"), "bottom-right": ("right", "bottom"), } h_align, v_align = positions.get(position, ("center", "bottom")) try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) for i in range(total_pages): page = src_doc.load_page(i) rect = page.rect # Format page number text page_num = start_number + i text = format.replace("{n}", str(page_num)).replace("{total}", str(total_pages)) # Calculate position text_width = fitz.get_text_length(text, fontsize=font_size) if h_align == "left": x = margin elif h_align == "right": x = rect.width - margin - text_width else: # center x = (rect.width - text_width) / 2 if v_align == "top": y = margin + font_size else: # bottom y = rect.height - margin # Insert text page.insert_text( (x, y), text, fontsize=font_size, color=(0.3, 0.3, 0.3), ) output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" src_doc.save(str(output_path)) src_doc.close() logger.info(f"Added page numbers to {total_pages} pages") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name ) except Exception as e: logger.exception("Add page numbers failed") raise HTTPException(status_code=500, detail=str(e)) # =============== PDF OCR (Text Extraction) =============== # Check for Tesseract availability TESSERACT_AVAILABLE = False try: import pytesseract # Test if tesseract is installed pytesseract.get_tesseract_version() TESSERACT_AVAILABLE = True except Exception: logger.warning("Tesseract not available. OCR will use basic text extraction only.") @app.post("/api/pdf-ocr") async def api_pdf_ocr( file: UploadFile = File(...), language: str = Form("eng"), pages: str = Form(""), output_format: str = Form("txt"), dpi: int = Form(200), ): """Extract text from PDF using OCR (Tesseract) or native text extraction""" data = await file.read() validate_pdf_upload(data) dpi = max(100, min(400, dpi)) try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) # Parse page selection if pages.strip(): page_set = parse_page_spec(pages, total_pages) else: page_set = set(range(total_pages)) if not page_set: raise HTTPException(status_code=400, detail="No valid pages specified") all_text = [] for page_num in sorted(page_set): page = src_doc.load_page(page_num) # First try native text extraction native_text = page.get_text("text").strip() # If native text is substantial, use it if len(native_text) > 50: all_text.append(f"--- Page {page_num + 1} ---\n{native_text}") elif TESSERACT_AVAILABLE: # Use OCR for scanned pages import pytesseract zoom = dpi / 72.0 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) # Convert to PIL Image img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) # Run OCR ocr_text = pytesseract.image_to_string(img, lang=language) all_text.append(f"--- Page {page_num + 1} (OCR) ---\n{ocr_text.strip()}") else: # No OCR available, use whatever native text we got all_text.append(f"--- Page {page_num + 1} ---\n{native_text if native_text else '[No text detected - Tesseract not installed]'}") src_doc.close() combined_text = "\n\n".join(all_text) job_id = generate_job_id() if output_format == "json": # Return as JSON return JSONResponse({ "pages": len(page_set), "text": combined_text, "ocr_used": TESSERACT_AVAILABLE, }) else: # Return as text file output_path = OUT_DIR / f"{job_id}_extracted.txt" output_path.write_text(combined_text, encoding="utf-8") logger.info(f"Extracted text from {len(page_set)} pages") return FileResponse( path=str(output_path), media_type="text/plain", filename="extracted_text.txt" ) except HTTPException: raise except Exception as e: logger.exception("PDF OCR failed") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/ocr-status") async def api_ocr_status(): """Check if Tesseract OCR is available""" return {"available": TESSERACT_AVAILABLE} # =============== Enhance Quality =============== def enhance_image(img: Image.Image, level: str = "medium") -> Image.Image: """Enhance image quality with sharpening and contrast adjustment""" from PIL import ImageEnhance, ImageFilter # Level presets: (sharpness, contrast, color, brightness) presets = { "light": (1.2, 1.05, 1.05, 1.02), "medium": (1.4, 1.1, 1.1, 1.03), "strong": (1.6, 1.15, 1.15, 1.05), } sharpness, contrast, color, brightness = presets.get(level, presets["medium"]) # Convert to RGB if needed if img.mode in ('RGBA', 'P'): # Preserve alpha if present if img.mode == 'RGBA': alpha = img.split()[3] img = img.convert('RGB') else: img = img.convert('RGB') alpha = None else: img = img.convert('RGB') alpha = None # Apply slight unsharp mask for detail enhancement img = img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=50, threshold=3)) # Enhance sharpness enhancer = ImageEnhance.Sharpness(img) img = enhancer.enhance(sharpness) # Enhance contrast enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(contrast) # Enhance color saturation slightly enhancer = ImageEnhance.Color(img) img = enhancer.enhance(color) # Slight brightness adjustment enhancer = ImageEnhance.Brightness(img) img = enhancer.enhance(brightness) # Restore alpha channel if it existed if alpha: img = img.convert('RGBA') img.putalpha(alpha) return img @app.post("/api/enhance-image") async def api_enhance_image( file: UploadFile = File(...), level: str = Form("medium"), upscale: float = Form(1.0), output_name: str = Form("enhanced"), ): """Enhance image quality with sharpening and optional upscaling""" data = await file.read() original_size = len(data) if original_size > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") output_name = sanitize_filename(output_name, "enhanced") level = level if level in ["light", "medium", "strong"] else "medium" upscale = max(1.0, min(2.0, upscale)) # Limit upscale to 2x max try: img = Image.open(io.BytesIO(data)) original_format = file.filename.split('.')[-1].lower() if file.filename else 'jpg' # Upscale if requested (using LANCZOS for quality) if upscale > 1.0: new_size = (int(img.size[0] * upscale), int(img.size[1] * upscale)) img = img.resize(new_size, Image.Resampling.LANCZOS) # Apply enhancement img = enhance_image(img, level) # Determine output format and quality if original_format in ['jpg', 'jpeg']: output_format = 'JPEG' ext = 'jpg' mime = 'image/jpeg' if img.mode == 'RGBA': img = img.convert('RGB') # Use quality that keeps file size reasonable quality = 88 elif original_format == 'webp': output_format = 'WEBP' ext = 'webp' mime = 'image/webp' quality = 88 else: output_format = 'PNG' ext = 'png' mime = 'image/png' quality = None output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" save_kwargs = {'optimize': True} if quality: save_kwargs['quality'] = quality img.save(str(output_path), output_format, **save_kwargs) enhanced_size = output_path.stat().st_size # If file got too large (more than 3x original), reduce quality if enhanced_size > original_size * 3 and output_format in ['JPEG', 'WEBP']: quality = 75 img.save(str(output_path), output_format, quality=quality, optimize=True) enhanced_size = output_path.stat().st_size logger.info(f"Enhanced image: {original_size} -> {enhanced_size} bytes") return FileResponse( path=str(output_path), media_type=mime, filename=f"{output_name}.{ext}", headers={ "X-Original-Size": str(original_size), "X-Enhanced-Size": str(enhanced_size), } ) except Exception as e: logger.exception("Image enhancement failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/enhance-pdf") async def api_enhance_pdf( file: UploadFile = File(...), level: str = Form("medium"), dpi: int = Form(150), output_name: str = Form("enhanced.pdf"), ): """Enhance PDF quality by improving embedded images""" data = await file.read() original_size = len(data) validate_pdf_upload(data) output_name = sanitize_filename(output_name, "enhanced.pdf") if not output_name.lower().endswith(".pdf"): output_name += ".pdf" level = level if level in ["light", "medium", "strong"] else "medium" dpi = max(100, min(200, dpi)) # Limit DPI to prevent bloat # Quality settings based on level jpeg_quality = {"light": 82, "medium": 85, "strong": 88}.get(level, 85) try: src_doc = fitz.open(stream=data, filetype="pdf") images_enhanced = 0 for page_num in range(len(src_doc)): page = src_doc.load_page(page_num) image_list = page.get_images(full=True) for img_info in image_list: xref = img_info[0] try: base_image = src_doc.extract_image(xref) if not base_image: continue image_bytes = base_image["image"] # Skip small images (icons, etc.) if len(image_bytes) < 5000: continue # Open and enhance image img = Image.open(io.BytesIO(image_bytes)) # Skip if already small dimensions if max(img.size) < 100: continue # Convert mode if needed if img.mode in ('RGBA', 'P'): img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') # Apply enhancement img = enhance_image(img, level) # Save as optimized JPEG buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) enhanced_bytes = buffer.getvalue() # Only replace if not significantly larger if len(enhanced_bytes) <= len(image_bytes) * 1.5: page.replace_image(xref, stream=enhanced_bytes) images_enhanced += 1 except Exception as e: logger.debug(f"Could not enhance image {xref}: {e}") continue output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" # Save with optimization src_doc.save( str(output_path), garbage=3, deflate=True, clean=True, ) src_doc.close() enhanced_size = output_path.stat().st_size # Safety check: if file is way too large, just return original if enhanced_size > original_size * 4: output_path.write_bytes(data) enhanced_size = original_size logger.warning("Enhanced PDF was too large, returning original") size_change = ((enhanced_size - original_size) / original_size) * 100 logger.info(f"Enhanced PDF: {images_enhanced} images, {original_size} -> {enhanced_size} bytes ({size_change:+.1f}%)") return FileResponse( path=str(output_path), media_type="application/pdf", filename=output_name, headers={ "X-Original-Size": str(original_size), "X-Enhanced-Size": str(enhanced_size), "X-Images-Enhanced": str(images_enhanced), } ) except Exception as e: logger.exception("PDF enhancement failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Image Tools =============== # Check for rembg availability REMBG_AVAILABLE = False rembg_remove = None def _check_rembg(): """Safely check if rembg is available""" global REMBG_AVAILABLE, rembg_remove try: import sys import io # Capture stderr to suppress rembg warnings old_stderr = sys.stderr sys.stderr = io.StringIO() try: from rembg import remove rembg_remove = remove REMBG_AVAILABLE = True finally: sys.stderr = old_stderr except (ImportError, Exception) as e: REMBG_AVAILABLE = False logger.info(f"rembg not available: {e}") # Don't check at import time - check lazily on first use # _check_rembg() @app.post("/api/remove-background") async def api_remove_background( file: UploadFile = File(...), output_name: str = Form("no-bg.png"), ): """Remove background from image using AI (rembg)""" global REMBG_AVAILABLE, rembg_remove # Lazy load rembg on first use if rembg_remove is None and not REMBG_AVAILABLE: try: from rembg import remove rembg_remove = remove REMBG_AVAILABLE = True except Exception as e: logger.warning(f"rembg not available: {e}") REMBG_AVAILABLE = False if not REMBG_AVAILABLE or rembg_remove is None: raise HTTPException( status_code=500, detail="rembg not installed. Run: pip install rembg[gpu] or pip install rembg" ) data = await file.read() if len(data) > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") output_name = sanitize_filename(output_name, "no-bg.png") if not output_name.lower().endswith(".png"): output_name += ".png" try: # Process with rembg result = rembg_remove(data) output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" output_path.write_bytes(result) logger.info(f"Background removed: {output_name}") return FileResponse( path=str(output_path), media_type="image/png", filename=output_name ) except Exception as e: logger.exception("Background removal failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/add-image-watermark") async def api_add_image_watermark( file: UploadFile = File(...), text: str = Form(""), position: str = Form("center"), opacity: int = Form(50), font_size: int = Form(36), color: str = Form("#000000"), rotation: int = Form(0), output_name: str = Form("watermarked"), ): """Add text watermark to image""" data = await file.read() if len(data) > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") if not text.strip(): raise HTTPException(status_code=400, detail="Watermark text is required") output_name = sanitize_filename(output_name, "watermarked") try: from PIL import ImageDraw, ImageFont img = Image.open(io.BytesIO(data)) if img.mode != 'RGBA': img = img.convert('RGBA') # Create watermark layer watermark = Image.new('RGBA', img.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(watermark) # Try to use a font, fallback to default try: font = ImageFont.truetype("arial.ttf", font_size) except: font = ImageFont.load_default() # Parse color color_hex = color.lstrip('#') r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4)) alpha = int(255 * opacity / 100) # Get text size bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] # Calculate position positions = { "top-left": (20, 20), "top-center": ((img.width - text_width) // 2, 20), "top-right": (img.width - text_width - 20, 20), "center": ((img.width - text_width) // 2, (img.height - text_height) // 2), "bottom-left": (20, img.height - text_height - 20), "bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20), "bottom-right": (img.width - text_width - 20, img.height - text_height - 20), "tile": None, } if position == "tile": # Tile watermark across image for y in range(0, img.height, text_height + 100): for x in range(0, img.width, text_width + 100): draw.text((x, y), text, font=font, fill=(r, g, b, alpha)) else: pos = positions.get(position, positions["center"]) draw.text(pos, text, font=font, fill=(r, g, b, alpha)) # Rotate watermark if needed if rotation != 0: watermark = watermark.rotate(rotation, expand=False, center=(img.width//2, img.height//2)) # Composite result = Image.alpha_composite(img, watermark) # Determine output format original_format = file.filename.split('.')[-1].lower() if file.filename else 'png' if original_format in ['jpg', 'jpeg']: result = result.convert('RGB') output_format = 'JPEG' output_name += '.jpg' mime = 'image/jpeg' else: output_format = 'PNG' output_name += '.png' mime = 'image/png' output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" result.save(str(output_path), output_format, quality=95) logger.info(f"Watermark added: {output_name}") return FileResponse(path=str(output_path), media_type=mime, filename=output_name) except Exception as e: logger.exception("Add watermark failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/resize-image") async def api_resize_image( file: UploadFile = File(...), preset: str = Form("custom"), width: int = Form(0), height: int = Form(0), maintain_aspect: bool = Form(True), output_name: str = Form("resized"), ): """Resize image with presets or custom dimensions""" data = await file.read() if len(data) > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") output_name = sanitize_filename(output_name, "resized") # Presets presets = { "thumbnail": (150, 150), "small": (320, 240), "medium": (640, 480), "hd": (1280, 720), "fullhd": (1920, 1080), "4k": (3840, 2160), "square-sm": (500, 500), "square-lg": (1000, 1000), } try: img = Image.open(io.BytesIO(data)) original_width, original_height = img.size # Determine target size if preset != "custom" and preset in presets: target_width, target_height = presets[preset] else: target_width = width if width > 0 else original_width target_height = height if height > 0 else original_height # Maintain aspect ratio if maintain_aspect: ratio = min(target_width / original_width, target_height / original_height) target_width = int(original_width * ratio) target_height = int(original_height * ratio) # Resize resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS) # Determine output format original_format = file.filename.split('.')[-1].lower() if file.filename else 'png' format_map = {'jpg': 'JPEG', 'jpeg': 'JPEG', 'png': 'PNG', 'webp': 'WEBP', 'gif': 'GIF'} output_format = format_map.get(original_format, 'PNG') ext = original_format if original_format in format_map else 'png' if output_format == 'JPEG' and resized.mode == 'RGBA': resized = resized.convert('RGB') output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" resized.save(str(output_path), output_format, quality=95) mime_map = {'JPEG': 'image/jpeg', 'PNG': 'image/png', 'WEBP': 'image/webp', 'GIF': 'image/gif'} logger.info(f"Resized image: {original_width}x{original_height} -> {target_width}x{target_height}") return FileResponse( path=str(output_path), media_type=mime_map.get(output_format, 'image/png'), filename=f"{output_name}.{ext}", headers={ "X-Original-Size": f"{original_width}x{original_height}", "X-New-Size": f"{target_width}x{target_height}" } ) except Exception as e: logger.exception("Resize image failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/convert-image") async def api_convert_image( file: UploadFile = File(...), target_format: str = Form("png"), quality: int = Form(90), output_name: str = Form("converted"), ): """Convert image between formats (JPG, PNG, WebP, GIF)""" data = await file.read() if len(data) > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") output_name = sanitize_filename(output_name, "converted") target_format = target_format.lower() format_map = { 'jpg': ('JPEG', 'image/jpeg', 'jpg'), 'jpeg': ('JPEG', 'image/jpeg', 'jpg'), 'png': ('PNG', 'image/png', 'png'), 'webp': ('WEBP', 'image/webp', 'webp'), 'gif': ('GIF', 'image/gif', 'gif'), } if target_format not in format_map: raise HTTPException(status_code=400, detail="Unsupported format. Use: jpg, png, webp, gif") pil_format, mime, ext = format_map[target_format] try: img = Image.open(io.BytesIO(data)) # Handle transparency for JPEG if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None) img = background elif pil_format == 'JPEG' and img.mode != 'RGB': img = img.convert('RGB') output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" save_kwargs = {} if pil_format in ['JPEG', 'WEBP']: save_kwargs['quality'] = quality if pil_format == 'PNG': save_kwargs['optimize'] = True img.save(str(output_path), pil_format, **save_kwargs) logger.info(f"Converted image to {target_format.upper()}") return FileResponse( path=str(output_path), media_type=mime, filename=f"{output_name}.{ext}" ) except Exception as e: logger.exception("Convert image failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/compress-image") async def api_compress_image( file: UploadFile = File(...), quality: int = Form(70), output_name: str = Form("compressed"), ): """Compress image to reduce file size""" data = await file.read() original_size = len(data) if original_size > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") output_name = sanitize_filename(output_name, "compressed") quality = max(10, min(100, quality)) try: img = Image.open(io.BytesIO(data)) # Always convert to JPEG for effective compression with quality control # (PNG optimize doesn't use quality setting and gives minimal compression) if img.mode in ('RGBA', 'P'): # Handle transparency by compositing on white background background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode == 'RGBA': background.paste(img, mask=img.split()[3]) img = background else: img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') output_format = 'JPEG' ext = 'jpg' mime = 'image/jpeg' save_kwargs = {'quality': quality, 'optimize': True} output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" img.save(str(output_path), output_format, **save_kwargs) compressed_size = output_path.stat().st_size reduction = ((original_size - compressed_size) / original_size) * 100 logger.info(f"Compressed image: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)") return FileResponse( path=str(output_path), media_type=mime, filename=f"{output_name}.{ext}", headers={ "X-Original-Size": str(original_size), "X-Compressed-Size": str(compressed_size), "X-Reduction-Percent": f"{reduction:.1f}" } ) except Exception as e: logger.exception("Compress image failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Estimate File Size Endpoints =============== @app.post("/api/estimate/compress-image") async def api_estimate_compress_image( file: UploadFile = File(...), quality: int = Form(70), ): """Estimate compressed image file size - matches actual compression logic""" data = await file.read() original_size = len(data) if original_size > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") quality = max(10, min(100, quality)) try: img = Image.open(io.BytesIO(data)) # Always convert to JPEG for compression (matches actual compression logic) if img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode == 'RGBA': background.paste(img, mask=img.split()[3]) img = background else: img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=quality, optimize=True) estimated_size = len(buffer.getvalue()) reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0 return JSONResponse({ "original_size": original_size, "estimated_size": estimated_size, "reduction_percent": round(max(0, reduction), 1) }) except Exception as e: logger.exception("Estimate compress image failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/estimate/convert-image") async def api_estimate_convert_image( file: UploadFile = File(...), target_format: str = Form("png"), quality: int = Form(90), ): """Estimate converted image file size""" data = await file.read() original_size = len(data) if original_size > settings.max_image_size_mb * 1024 * 1024: raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") target_format = target_format.lower() format_map = { 'jpg': ('JPEG', 'jpg'), 'jpeg': ('JPEG', 'jpg'), 'png': ('PNG', 'png'), 'webp': ('WEBP', 'webp'), 'gif': ('GIF', 'gif'), } if target_format not in format_map: raise HTTPException(status_code=400, detail="Unsupported format") pil_format, ext = format_map[target_format] try: img = Image.open(io.BytesIO(data)) # Handle transparency for JPEG if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None) img = background elif pil_format == 'JPEG' and img.mode != 'RGB': img = img.convert('RGB') buffer = io.BytesIO() save_kwargs = {} if pil_format in ['JPEG', 'WEBP']: save_kwargs['quality'] = quality if pil_format == 'PNG': save_kwargs['optimize'] = True img.save(buffer, pil_format, **save_kwargs) estimated_size = len(buffer.getvalue()) return JSONResponse({ "original_size": original_size, "estimated_size": estimated_size, "target_format": target_format.upper() }) except Exception as e: logger.exception("Estimate convert image failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/estimate/compress-pdf") async def api_estimate_compress_pdf( file: UploadFile = File(...), quality: int = Form(60), ): """Estimate compressed PDF file size by actually compressing it in memory""" data = await file.read() original_size = len(data) validate_pdf_upload(data) # Use quality directly (10-100 scale) jpeg_quality = max(10, min(100, quality)) max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000 try: src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) # Process ALL pages for accurate estimation for page_num in range(total_pages): page = src_doc.load_page(page_num) image_list = page.get_images(full=True) for img_info in image_list: xref = img_info[0] try: base_image = src_doc.extract_image(xref) if not base_image or len(base_image["image"]) < 5000: continue image_bytes = base_image["image"] img = Image.open(io.BytesIO(image_bytes)) if img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode == 'RGBA': background.paste(img, mask=img.split()[3]) img = background else: img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') # Resize if larger than max_dim if max(img.size) > max_dim: ratio = max_dim / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) compressed_bytes = buffer.getvalue() # Always replace to apply quality setting page.replace_image(xref, stream=compressed_bytes) except Exception: continue # Get actual compressed size compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True) estimated_size = len(compressed_bytes) src_doc.close() reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0 return JSONResponse({ "original_size": original_size, "estimated_size": estimated_size, "reduction_percent": round(max(0, reduction), 1), "quality": quality }) except Exception as e: logger.exception("Estimate compress PDF failed") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/rembg-status") async def api_rembg_status(): """Check if rembg is available""" global REMBG_AVAILABLE, rembg_remove # Try to load rembg if not checked yet if rembg_remove is None and not REMBG_AVAILABLE: try: from rembg import remove rembg_remove = remove REMBG_AVAILABLE = True except Exception: REMBG_AVAILABLE = False return {"available": REMBG_AVAILABLE} # =============== Preview Endpoints =============== def image_to_base64(img: Image.Image, format: str = "PNG", quality: int = 95) -> str: """Convert PIL Image to base64 string""" import base64 buffer = io.BytesIO() if format == "JPEG" and img.mode in ('RGBA', 'P'): img = img.convert('RGB') img.save(buffer, format=format, quality=quality if format == "JPEG" else None) return base64.b64encode(buffer.getvalue()).decode() def prepare_preview_image(img: Image.Image, max_size: int = 1920) -> Image.Image: """Prepare image for preview - only resize if larger than max_size (HD)""" if max(img.size) > max_size: img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) return img def pdf_page_to_base64(pdf_bytes: bytes, page_num: int = 0, dpi: int = 200) -> str: """Convert PDF page to base64 PNG at high quality""" import base64 doc = fitz.open(stream=pdf_bytes, filetype="pdf") page = doc.load_page(page_num) zoom = dpi / 72.0 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) png_bytes = pix.tobytes("png") doc.close() return base64.b64encode(png_bytes).decode() @app.post("/api/preview/compress-pdf") async def api_preview_compress_pdf( file: UploadFile = File(...), quality: int = Form(60), ): """Preview PDF compression - returns original and compressed first page""" data = await file.read() validate_pdf_upload(data) jpeg_quality = max(10, min(100, quality)) max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000 try: # Get original first page (higher DPI for better quality) original_b64 = pdf_page_to_base64(data, 0, 150) # Compress PDF src_doc = fitz.open(stream=data, filetype="pdf") for page_num in range(min(1, len(src_doc))): # Only process first page for preview page = src_doc.load_page(page_num) image_list = page.get_images(full=True) for img_info in image_list: xref = img_info[0] try: base_image = src_doc.extract_image(xref) if not base_image or len(base_image["image"]) < 5000: continue img = Image.open(io.BytesIO(base_image["image"])) if img.mode in ('RGBA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') if img.mode == 'RGBA': background.paste(img, mask=img.split()[3]) img = background else: img = img.convert('RGB') elif img.mode != 'RGB': img = img.convert('RGB') # Resize if larger than max_dim if max(img.size) > max_dim: ratio = max_dim / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) # Always replace to apply quality setting page.replace_image(xref, stream=buffer.getvalue()) except Exception: continue compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True) src_doc.close() processed_b64 = pdf_page_to_base64(compressed_bytes, 0, 150) return JSONResponse({ "original": original_b64, "processed": processed_b64, "original_size": len(data), "processed_size": len(compressed_bytes) }) except Exception as e: logger.exception("Compress preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/rotate-pdf") async def api_preview_rotate_pdf( file: UploadFile = File(...), rotation: int = Form(90), ): """Preview PDF rotation - returns original and rotated first page""" data = await file.read() validate_pdf_upload(data) rotation = int(rotation) % 360 if rotation not in [90, 180, 270]: rotation = 90 try: original_b64 = pdf_page_to_base64(data, 0, 150) src_doc = fitz.open(stream=data, filetype="pdf") page = src_doc.load_page(0) page.set_rotation(page.rotation + rotation) rotated_bytes = src_doc.tobytes() src_doc.close() processed_b64 = pdf_page_to_base64(rotated_bytes, 0, 150) return JSONResponse({ "original": original_b64, "processed": processed_b64 }) except Exception as e: logger.exception("Rotate preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/page-numbers") async def api_preview_page_numbers( file: UploadFile = File(...), position: str = Form("bottom-center"), format: str = Form("Page {n} of {total}"), start_number: int = Form(1), font_size: int = Form(11), ): """Preview page numbers - returns original and numbered first page""" data = await file.read() validate_pdf_upload(data) positions = { "top-left": ("left", "top"), "top-center": ("center", "top"), "top-right": ("right", "top"), "bottom-left": ("left", "bottom"), "bottom-center": ("center", "bottom"), "bottom-right": ("right", "bottom"), } h_align, v_align = positions.get(position, ("center", "bottom")) margin = 30 try: original_b64 = pdf_page_to_base64(data, 0, 150) src_doc = fitz.open(stream=data, filetype="pdf") total_pages = len(src_doc) page = src_doc.load_page(0) rect = page.rect text = format.replace("{n}", str(start_number)).replace("{total}", str(total_pages)) text_width = fitz.get_text_length(text, fontsize=font_size) if h_align == "left": x = margin elif h_align == "right": x = rect.width - margin - text_width else: x = (rect.width - text_width) / 2 if v_align == "top": y = margin + font_size else: y = rect.height - margin page.insert_text((x, y), text, fontsize=font_size, color=(0.3, 0.3, 0.3)) numbered_bytes = src_doc.tobytes() src_doc.close() processed_b64 = pdf_page_to_base64(numbered_bytes, 0, 150) return JSONResponse({ "original": original_b64, "processed": processed_b64 }) except Exception as e: logger.exception("Page numbers preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/remove-background") async def api_preview_remove_background( file: UploadFile = File(...), ): """Preview background removal - returns original and processed image in HD quality""" global REMBG_AVAILABLE, rembg_remove if rembg_remove is None and not REMBG_AVAILABLE: try: from rembg import remove rembg_remove = remove REMBG_AVAILABLE = True except Exception: REMBG_AVAILABLE = False if not REMBG_AVAILABLE or rembg_remove is None: raise HTTPException(status_code=500, detail="rembg not installed") data = await file.read() try: img = Image.open(io.BytesIO(data)) img = prepare_preview_image(img) original_b64 = image_to_base64(img, "PNG") # Process with rembg img_buffer = io.BytesIO() img_rgb = img.convert('RGB') if img.mode != 'RGB' else img img_rgb.save(img_buffer, format='PNG') result = rembg_remove(img_buffer.getvalue()) result_img = Image.open(io.BytesIO(result)) processed_b64 = image_to_base64(result_img, "PNG") return JSONResponse({ "original": original_b64, "processed": processed_b64 }) except Exception as e: logger.exception("Remove background preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/enhance-image") async def api_preview_enhance_image( file: UploadFile = File(...), level: str = Form("medium"), upscale: float = Form(1.0), ): """Preview image enhancement - returns original and enhanced image in HD quality""" data = await file.read() try: img = Image.open(io.BytesIO(data)) img = prepare_preview_image(img) original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95) work_img = img.copy() if upscale > 1.0: new_size = (int(work_img.size[0] * min(upscale, 2.0)), int(work_img.size[1] * min(upscale, 2.0))) work_img = work_img.resize(new_size, Image.Resampling.LANCZOS) enhanced = enhance_image(work_img, level) enhanced = prepare_preview_image(enhanced) processed_b64 = image_to_base64(enhanced.convert('RGB'), "JPEG", 95) return JSONResponse({ "original": original_b64, "processed": processed_b64 }) except Exception as e: logger.exception("Enhance preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/image-watermark") async def api_preview_image_watermark( file: UploadFile = File(...), text: str = Form(""), position: str = Form("center"), opacity: int = Form(50), font_size: int = Form(36), color: str = Form("#000000"), ): """Preview image watermark - returns original and watermarked image in HD quality""" from PIL import ImageDraw, ImageFont data = await file.read() if not text.strip(): raise HTTPException(status_code=400, detail="Watermark text required") try: img = Image.open(io.BytesIO(data)) img = prepare_preview_image(img) original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95) if img.mode != 'RGBA': img = img.convert('RGBA') watermark = Image.new('RGBA', img.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(watermark) try: font = ImageFont.truetype("arial.ttf", font_size) except: font = ImageFont.load_default() color_hex = color.lstrip('#') r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4)) alpha = int(255 * opacity / 100) bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] positions = { "top-left": (20, 20), "top-center": ((img.width - text_width) // 2, 20), "top-right": (img.width - text_width - 20, 20), "center": ((img.width - text_width) // 2, (img.height - text_height) // 2), "bottom-left": (20, img.height - text_height - 20), "bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20), "bottom-right": (img.width - text_width - 20, img.height - text_height - 20), } pos = positions.get(position, positions["center"]) draw.text(pos, text, font=font, fill=(r, g, b, alpha)) result = Image.alpha_composite(img, watermark) processed_b64 = image_to_base64(result.convert('RGB'), "JPEG", 95) return JSONResponse({ "original": original_b64, "processed": processed_b64 }) except Exception as e: logger.exception("Watermark preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/resize-image") async def api_preview_resize_image( file: UploadFile = File(...), preset: str = Form("custom"), width: int = Form(0), height: int = Form(0), maintain_aspect: bool = Form(True), ): """Preview image resize - returns original and resized image in HD quality""" data = await file.read() presets = { "thumbnail": (150, 150), "small": (320, 240), "medium": (640, 480), "hd": (1280, 720), "fullhd": (1920, 1080), "4k": (3840, 2160), "square-sm": (500, 500), "square-lg": (1000, 1000), } try: img = Image.open(io.BytesIO(data)) original_width, original_height = img.size # Original preview (HD quality) original_preview = prepare_preview_image(img.copy()) original_b64 = image_to_base64(original_preview.convert('RGB'), "JPEG", 95) # Determine target size if preset != "custom" and preset in presets: target_width, target_height = presets[preset] else: target_width = width if width > 0 else original_width target_height = height if height > 0 else original_height if maintain_aspect: ratio = min(target_width / original_width, target_height / original_height) target_width = int(original_width * ratio) target_height = int(original_height * ratio) # Resized preview (HD quality) resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS) resized_preview = prepare_preview_image(resized) processed_b64 = image_to_base64(resized_preview.convert('RGB'), "JPEG", 95) return JSONResponse({ "original": original_b64, "processed": processed_b64, "original_size": f"{original_width}x{original_height}", "new_size": f"{target_width}x{target_height}" }) except Exception as e: logger.exception("Resize preview failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/preview/compress-image") async def api_preview_compress_image( file: UploadFile = File(...), quality: int = Form(70), ): """Preview image compression - returns original and compressed image in HD quality""" data = await file.read() original_size = len(data) try: img = Image.open(io.BytesIO(data)) img = prepare_preview_image(img) original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 98) # Compress with user's quality setting if img.mode != 'RGB': img = img.convert('RGB') buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=quality, optimize=True) compressed_size = len(buffer.getvalue()) compressed_img = Image.open(io.BytesIO(buffer.getvalue())) processed_b64 = image_to_base64(compressed_img, "JPEG", 98) return JSONResponse({ "original": original_b64, "processed": processed_b64, "original_size": original_size, "compressed_size": compressed_size }) except Exception as e: logger.exception("Compress preview failed") raise HTTPException(status_code=500, detail=str(e)) # =============== Run Server =============== if __name__ == "__main__": import uvicorn uvicorn.run( "server:app", host="127.0.0.1", port=8000, reload=settings.debug )