pdftools / server.py
Shivakafle038's picture
Enhanced PDF to Images: ultra-high DPI up to 600, sharper text rendering
01b1aa1
"""
LocalTools Server - PDF Editor & Image Scraper
FastAPI backend with async operations, logging, and cleanup
"""
import asyncio
import io
import json
import logging
import re
import uuid
import zipfile
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List
from urllib.parse import urljoin, urlparse
import aiohttp
import fitz # PyMuPDF
from PIL import Image
from pydantic import BaseModel, Field, HttpUrl
from pydantic_settings import BaseSettings
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
# =============== Configuration ===============
class Settings(BaseSettings):
"""Application settings with environment variable support"""
app_name: str = "LocalTools"
debug: bool = False
# Paths
work_dir: str = "work"
output_dir: str = "outputs"
static_dir: str = "static"
# Limits
max_pdf_size_mb: int = 100
max_image_size_mb: int = 50
min_image_dimension: int = 50
request_timeout: int = 60
# Cleanup
cleanup_interval_hours: int = 24
file_retention_hours: int = 48
class Config:
env_prefix = "LOCALTOOLS_"
settings = Settings()
# =============== Logging Setup ===============
logging.basicConfig(
level=logging.DEBUG if settings.debug else logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("localtools")
# =============== Path Setup ===============
APP_DIR = Path(__file__).parent
STATIC_DIR = APP_DIR / settings.static_dir
WORK_DIR = APP_DIR / settings.work_dir
OUT_DIR = APP_DIR / settings.output_dir
SCRAPE_DIR = WORK_DIR / "scrape_jobs"
for directory in [WORK_DIR, OUT_DIR, SCRAPE_DIR]:
directory.mkdir(exist_ok=True)
# =============== Constants ===============
UA_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp"}
# =============== Response Models ===============
class ScrapeResult(BaseModel):
job_id: str
images: List[dict]
total_found: int
filtered_count: int
class HealthResponse(BaseModel):
status: str
version: str = "1.0.0"
uptime_seconds: float
# =============== Cleanup Task ===============
startup_time = datetime.now()
async def cleanup_old_files():
"""Remove files older than retention period"""
retention = timedelta(hours=settings.file_retention_hours)
cutoff = datetime.now() - retention
removed = 0
for directory in [WORK_DIR, OUT_DIR]:
for file_path in directory.glob("*"):
if file_path.is_file():
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime < cutoff:
try:
file_path.unlink()
removed += 1
except Exception as e:
logger.warning(f"Failed to remove {file_path}: {e}")
# Clean old scrape jobs
for job_dir in SCRAPE_DIR.glob("*"):
if job_dir.is_dir():
mtime = datetime.fromtimestamp(job_dir.stat().st_mtime)
if mtime < cutoff:
try:
for f in job_dir.glob("*"):
f.unlink()
job_dir.rmdir()
removed += 1
except Exception as e:
logger.warning(f"Failed to remove job {job_dir}: {e}")
if removed > 0:
logger.info(f"Cleanup: removed {removed} old files/directories")
async def periodic_cleanup():
"""Run cleanup periodically"""
while True:
await asyncio.sleep(settings.cleanup_interval_hours * 3600)
await cleanup_old_files()
# =============== Lifespan ===============
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events"""
logger.info(f"Starting {settings.app_name}")
# Start cleanup task
cleanup_task = asyncio.create_task(periodic_cleanup())
# Run initial cleanup
await cleanup_old_files()
yield
# Shutdown
cleanup_task.cancel()
logger.info("Shutting down")
# =============== App Setup ===============
app = FastAPI(
title=settings.app_name,
description="PDF Editor & Image Scraper API",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Static files
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
# =============== PDF Helpers ===============
def to_points(value: float, unit: str) -> float:
"""Convert measurement to PDF points"""
conversions = {
"pt": 1.0,
"points": 1.0,
"in": 72.0,
"inch": 72.0,
"inches": 72.0,
"cm": 72.0 / 2.54,
"centimeter": 72.0 / 2.54,
"centimeters": 72.0 / 2.54,
"mm": 72.0 / 25.4,
"millimeter": 72.0 / 25.4,
"millimeters": 72.0 / 25.4,
}
unit_lower = unit.lower().strip()
if unit_lower not in conversions:
raise ValueError(f"Invalid unit: {unit}. Use: pt, in, cm, or mm")
return float(value) * conversions[unit_lower]
def parse_page_spec(spec: str, total_pages: int) -> set[int]:
"""
Parse page specification string to set of 0-based page indices.
Input: "1,3,5" or "2-6" or "1,3-5,9" (1-based)
Output: Set of 0-based indices
"""
spec = (spec or "").strip().replace(" ", "")
if not spec:
return set()
result = set()
for part in spec.split(","):
if not part:
continue
try:
if "-" in part:
start_str, end_str = part.split("-", 1)
start, end = int(start_str), int(end_str)
if start > end:
start, end = end, start
for p in range(start, end + 1):
if 1 <= p <= total_pages:
result.add(p - 1)
else:
p = int(part)
if 1 <= p <= total_pages:
result.add(p - 1)
except ValueError:
logger.warning(f"Invalid page spec part: {part}")
continue
return result
def extract_drive_file_id(url: str) -> Optional[str]:
"""Extract Google Drive file ID from URL"""
patterns = [
r"/file/d/([^/]+)",
r"[?&]id=([^&]+)",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
async def download_from_url(url: str, out_path: Path, is_drive: bool = False) -> Path:
"""Download file from URL using aiohttp"""
timeout = aiohttp.ClientTimeout(total=settings.request_timeout)
async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session:
if is_drive:
file_id = extract_drive_file_id(url)
if not file_id:
raise ValueError("Invalid Google Drive URL. Use format: /file/d/<ID>/view")
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
async with session.get(download_url) as resp:
resp.raise_for_status()
# Check for confirmation token (large files)
cookies = resp.cookies
confirm_token = None
for key, cookie in cookies.items():
if key.startswith("download_warning"):
confirm_token = cookie.value
break
if confirm_token:
download_url = f"{download_url}&confirm={confirm_token}"
async with session.get(download_url) as resp2:
resp2.raise_for_status()
content = await resp2.read()
else:
content = await resp.read()
else:
async with session.get(url) as resp:
resp.raise_for_status()
content = await resp.read()
# Validate PDF
if not content[:5] == b"%PDF-":
# Check if it's an HTML error page
if b"<html" in content[:1000].lower() or b"sign in" in content[:1000].lower():
raise ValueError("Access denied. For Drive files, set sharing to 'Anyone with link'")
raise ValueError("Downloaded file is not a valid PDF")
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(content)
logger.info(f"Downloaded PDF: {out_path.name} ({len(content)} bytes)")
return out_path
def validate_pdf_upload(data: bytes) -> None:
"""Validate uploaded PDF data"""
if not data:
raise ValueError("Empty file uploaded")
if len(data) > settings.max_pdf_size_mb * 1024 * 1024:
raise ValueError(f"File too large. Maximum size: {settings.max_pdf_size_mb}MB")
if not data[:5] == b"%PDF-":
raise ValueError("Invalid PDF file")
def add_text_watermark(page: fitz.Page, text: str, font_size: float, rotate: int) -> None:
"""Add text watermark to PDF page"""
if not text.strip():
return
rect = page.rect
box = fitz.Rect(
rect.x0 + rect.width * 0.05,
rect.y0 + rect.height * 0.05,
rect.x1 - rect.width * 0.05,
rect.y1 - rect.height * 0.05,
)
try:
page.insert_textbox(
box, text,
fontsize=float(font_size),
rotate=int(rotate),
align=fitz.TEXT_ALIGN_CENTER,
color=(0.55, 0.55, 0.55),
overlay=True,
fill_opacity=0.18,
stroke_opacity=0.18,
)
except TypeError:
# Fallback for older PyMuPDF versions
page.insert_textbox(
box, text,
fontsize=float(font_size),
rotate=int(rotate),
align=fitz.TEXT_ALIGN_CENTER,
color=(0.75, 0.75, 0.75),
overlay=True,
)
def process_pdf(
input_path: Path,
output_path: Path,
remove_pages: str,
crop_top: float,
crop_bottom: float,
crop_left: float,
crop_right: float,
unit: str,
watermark_text: str,
watermark_size: float,
watermark_rotate: int,
) -> dict:
"""Process PDF with cropping, page removal, and watermark"""
src = fitz.open(str(input_path))
total = src.page_count
remove_set = parse_page_spec(remove_pages, total)
# Convert crop values to points
T = to_points(crop_top, unit)
B = to_points(crop_bottom, unit)
L = to_points(crop_left, unit)
R = to_points(crop_right, unit)
out = fitz.open()
kept = 0
for i in range(total):
if i in remove_set:
continue
out.insert_pdf(src, from_page=i, to_page=i)
page = out.load_page(out.page_count - 1)
rect = page.rect
crop_rect = fitz.Rect(rect.x0 + L, rect.y0 + T, rect.x1 - R, rect.y1 - B)
if crop_rect.is_empty or crop_rect.width <= 2 or crop_rect.height <= 2:
raise ValueError(f"Invalid crop on page {i+1}. Reduce crop values.")
page.set_cropbox(crop_rect)
if watermark_text.strip():
add_text_watermark(page, watermark_text, watermark_size, watermark_rotate)
kept += 1
if kept == 0:
raise ValueError("All pages removed. Output would be empty.")
output_path.parent.mkdir(parents=True, exist_ok=True)
out.save(str(output_path))
out.close()
src.close()
logger.info(f"Processed PDF: {kept}/{total} pages kept")
return {
"original_pages": total,
"output_pages": kept,
"removed_pages": len(remove_set)
}
# =============== Image Scraper Helpers ===============
def normalize_url(url: str) -> str:
"""Normalize URL by removing fragment"""
parsed = urlparse(url)
return parsed._replace(fragment="").geturl()
def best_from_srcset(srcset: str, base_url: str) -> Optional[str]:
"""Extract best quality image URL from srcset attribute"""
if not srcset:
return None
candidates = []
for part in srcset.split(","):
part = part.strip()
if not part:
continue
bits = part.split()
img_url = urljoin(base_url, bits[0])
score = 0.0
if len(bits) > 1:
descriptor = bits[1].lower().strip()
try:
if descriptor.endswith("w"):
score = float(descriptor[:-1])
elif descriptor.endswith("x"):
score = float(descriptor[:-1]) * 10000.0
except ValueError:
pass
candidates.append((score, img_url))
if not candidates:
return None
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][1]
def safe_filename(url: str, fallback: str) -> str:
"""Generate safe filename from URL"""
name = Path(urlparse(url).path).name or fallback
name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name)
if "." not in name:
name += ".jpg"
return name[:120]
async def fetch_image(session: aiohttp.ClientSession, img_url: str) -> tuple[bytes, Optional[int], Optional[int], Optional[str]]:
"""Fetch image and return bytes with dimensions"""
async with session.get(img_url) as resp:
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
data = await resp.read()
width = height = None
try:
with Image.open(io.BytesIO(data)) as img:
width, height = img.size
except Exception:
pass
return data, width, height, content_type
async def scrape_images(page_url: str) -> dict:
"""Scrape images from webpage"""
page_url = page_url.strip()
if not page_url:
raise ValueError("Web page URL is required")
timeout = aiohttp.ClientTimeout(total=settings.request_timeout)
async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session:
# Fetch page HTML
async with session.get(page_url) as resp:
resp.raise_for_status()
html = await resp.text()
# Parse HTML (using regex for simplicity, avoiding lxml dependency issues)
found_urls: List[str] = []
# Find all img tags
img_pattern = r'<img[^>]+>'
lazy_attrs = ["data-src", "data-original", "data-lazy-src", "data-url", "data-image", "data-srcset", "srcset", "src"]
for img_match in re.finditer(img_pattern, html, re.IGNORECASE):
img_tag = img_match.group()
for attr in lazy_attrs:
attr_pattern = rf'{attr}=["\']([^"\']+)["\']'
attr_match = re.search(attr_pattern, img_tag, re.IGNORECASE)
if attr_match:
value = attr_match.group(1)
if "srcset" in attr.lower():
best = best_from_srcset(value, page_url)
if best:
found_urls.append(best)
else:
found_urls.append(urljoin(page_url, value))
# Deduplicate
seen = set()
deduped = []
for url in found_urls:
normalized = normalize_url(url)
if normalized not in seen and normalized.startswith("http"):
seen.add(normalized)
deduped.append(normalized)
logger.info(f"Found {len(deduped)} unique image URLs on {page_url}")
# Create job directory
job_id = uuid.uuid4().hex[:10]
job_dir = SCRAPE_DIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
# Fetch images concurrently
images = []
filtered = 0
async def process_image(idx: int, url: str):
nonlocal filtered
try:
data, width, height, content_type = await fetch_image(session, url)
# Filter small images
if width and height:
if width < settings.min_image_dimension or height < settings.min_image_dimension:
filtered += 1
return None
img_id = uuid.uuid4().hex[:10]
filename = safe_filename(url, f"image_{idx}.jpg")
# Save image data
(job_dir / f"{img_id}.bin").write_bytes(data)
return {
"id": img_id,
"url": url,
"filename": filename,
"width": width,
"height": height,
"bytes": len(data),
}
except Exception as e:
logger.debug(f"Failed to fetch {url}: {e}")
return None
# Process images with concurrency limit
semaphore = asyncio.Semaphore(10)
async def limited_process(idx: int, url: str):
async with semaphore:
return await process_image(idx, url)
tasks = [limited_process(idx, url) for idx, url in enumerate(deduped, 1)]
results = await asyncio.gather(*tasks)
images = [r for r in results if r is not None]
# Save metadata
meta = {
"page_url": page_url,
"scraped_at": datetime.now().isoformat(),
"images": images
}
(job_dir / "meta.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8"
)
logger.info(f"Scraped {len(images)} images, filtered {filtered} small icons")
return {
"job_id": job_id,
"images": images,
"total_found": len(deduped),
"filtered_count": filtered
}
def load_scrape_job(job_id: str) -> dict:
"""Load scrape job metadata"""
job_dir = SCRAPE_DIR / job_id
meta_path = job_dir / "meta.json"
if not job_dir.exists() or not meta_path.exists():
raise HTTPException(status_code=404, detail="Scrape job not found. Please scrape again.")
try:
return json.loads(meta_path.read_text(encoding="utf-8"))
except Exception as e:
logger.error(f"Failed to load job {job_id}: {e}")
raise HTTPException(status_code=500, detail="Corrupted scrape job metadata")
# =============== Utility Functions ===============
def sanitize_filename(name: str, default: str = "file") -> str:
"""Sanitize filename for safe file system use"""
name = (name or default).strip()
name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name)
return name[:200] or default
def generate_job_id() -> str:
"""Generate unique job ID"""
return uuid.uuid4().hex[:10]
# =============== API Routes ===============
@app.get("/", response_class=HTMLResponse)
async def home():
"""Serve main HTML page"""
index_path = STATIC_DIR / "index.html"
if not index_path.exists():
raise HTTPException(status_code=500, detail="Frontend not found")
return HTMLResponse(index_path.read_text(encoding="utf-8"))
@app.get("/api/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
uptime = (datetime.now() - startup_time).total_seconds()
return HealthResponse(status="healthy", uptime_seconds=uptime)
@app.post("/api/fetch")
async def api_fetch_pdf(
url: str = Form(""),
file: UploadFile = File(None),
output_name: str = Form("original.pdf"),
):
"""Fetch/upload PDF for preview (no processing)"""
output_name = sanitize_filename(output_name, "original.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
job_id = generate_job_id()
input_path = WORK_DIR / f"original_{job_id}.pdf"
try:
if file is not None and file.filename:
data = await file.read()
validate_pdf_upload(data)
input_path.write_bytes(data)
logger.info(f"Uploaded PDF: {file.filename}")
elif url.strip():
is_drive = "drive.google.com" in url
await download_from_url(url, input_path, is_drive=is_drive)
else:
raise ValueError("Provide a PDF URL or upload a file")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except aiohttp.ClientError as e:
logger.error(f"Download failed: {e}")
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
except Exception as e:
logger.exception("Unexpected error in fetch")
raise HTTPException(status_code=500, detail="Internal server error")
return FileResponse(
path=str(input_path),
media_type="application/pdf",
filename=output_name
)
@app.post("/api/process")
async def api_process_pdf(
url: str = Form(""),
file: UploadFile = File(None),
output_name: str = Form("cropped.pdf"),
remove_pages: str = Form(""),
unit: str = Form("mm"),
top: float = Form(0),
bottom: float = Form(0),
left: float = Form(0),
right: float = Form(0),
watermark_text: str = Form(""),
watermark_size: float = Form(36),
watermark_rotate: int = Form(45),
):
"""Process PDF with cropping, page removal, and watermark"""
output_name = sanitize_filename(output_name, "cropped.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
job_id = generate_job_id()
input_path = WORK_DIR / f"input_{job_id}.pdf"
output_path = OUT_DIR / f"{job_id}_{output_name}"
# Get input PDF
try:
if file is not None and file.filename:
data = await file.read()
validate_pdf_upload(data)
input_path.write_bytes(data)
elif url.strip():
is_drive = "drive.google.com" in url
await download_from_url(url, input_path, is_drive=is_drive)
else:
raise ValueError("Provide a PDF URL or upload a file")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except aiohttp.ClientError as e:
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
# Process PDF
try:
process_pdf(
input_path=input_path,
output_path=output_path,
remove_pages=remove_pages,
crop_top=top,
crop_bottom=bottom,
crop_left=left,
crop_right=right,
unit=unit,
watermark_text=watermark_text,
watermark_size=watermark_size,
watermark_rotate=watermark_rotate,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.exception("PDF processing failed")
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name
)
@app.post("/api/scrape-images")
async def api_scrape_images(page_url: str = Form(...)):
"""Scrape images from a webpage"""
try:
result = await scrape_images(page_url)
return JSONResponse(result)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except aiohttp.ClientError as e:
raise HTTPException(status_code=400, detail=f"Failed to fetch page: {str(e)}")
except Exception as e:
logger.exception("Scrape failed")
raise HTTPException(status_code=500, detail="Scraping failed")
@app.post("/api/download-zip")
async def api_download_zip(
job_id: str = Form(...),
image_ids: str = Form(...),
zip_name: str = Form("images.zip")
):
"""Download selected images as ZIP"""
meta = load_scrape_job(job_id.strip())
job_dir = SCRAPE_DIR / job_id.strip()
selected = [x.strip() for x in image_ids.split(",") if x.strip()]
if not selected:
raise HTTPException(status_code=400, detail="No images selected")
zip_name = sanitize_filename(zip_name, "images.zip")
if not zip_name.lower().endswith(".zip"):
zip_name += ".zip"
output_path = OUT_DIR / f"{generate_job_id()}_{zip_name}"
id_to_meta = {img["id"]: img for img in meta.get("images", [])}
with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for img_id in selected:
bin_path = job_dir / f"{img_id}.bin"
if bin_path.exists():
filename = id_to_meta.get(img_id, {}).get("filename", f"{img_id}.jpg")
zf.writestr(filename, bin_path.read_bytes())
logger.info(f"Created ZIP with {len(selected)} images")
return FileResponse(
path=str(output_path),
media_type="application/zip",
filename=zip_name
)
@app.post("/api/download-pdf")
async def api_download_pdf(
job_id: str = Form(...),
image_ids: str = Form(...),
pdf_name: str = Form("images.pdf")
):
"""Download selected images as PDF"""
meta = load_scrape_job(job_id.strip())
job_dir = SCRAPE_DIR / job_id.strip()
selected = [x.strip() for x in image_ids.split(",") if x.strip()]
if not selected:
raise HTTPException(status_code=400, detail="No images selected")
pdf_name = sanitize_filename(pdf_name, "images.pdf")
if not pdf_name.lower().endswith(".pdf"):
pdf_name += ".pdf"
# Convert images to PDF pages
pil_pages: List[Image.Image] = []
for img_id in selected:
bin_path = job_dir / f"{img_id}.bin"
if not bin_path.exists():
continue
try:
with Image.open(bin_path) as img:
rgb = img.convert("RGB")
pil_pages.append(rgb.copy())
except Exception as e:
logger.warning(f"Failed to process image {img_id}: {e}")
continue
if not pil_pages:
raise HTTPException(status_code=400, detail="No valid images to convert")
output_path = OUT_DIR / f"{generate_job_id()}_{pdf_name}"
first = pil_pages[0]
rest = pil_pages[1:] if len(pil_pages) > 1 else []
first.save(output_path, "PDF", save_all=True, append_images=rest)
logger.info(f"Created PDF with {len(pil_pages)} images")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=pdf_name
)
@app.post("/api/cleanup")
async def api_trigger_cleanup(background_tasks: BackgroundTasks):
"""Manually trigger cleanup (for admin use)"""
background_tasks.add_task(cleanup_old_files)
return {"message": "Cleanup scheduled"}
@app.post("/api/remove-watermark")
async def api_remove_watermark(
url: str = Form(""),
file: UploadFile = File(None),
output_name: str = Form("cleaned.pdf"),
watermark_text: str = Form("Educated Nepal"),
method: str = Form("inpaint"),
intensity: int = Form(50),
dpi: int = Form(120),
quality: int = Form(70),
):
"""Remove watermark from PDF using image processing"""
try:
from watermark_remover import remove_watermark_from_pdf, CV2_AVAILABLE
if not CV2_AVAILABLE:
raise HTTPException(
status_code=500,
detail="OpenCV not installed. Run: pip install opencv-python-headless"
)
except ImportError as e:
raise HTTPException(status_code=500, detail=str(e))
output_name = sanitize_filename(output_name, "cleaned.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
job_id = generate_job_id()
input_path = WORK_DIR / f"input_{job_id}.pdf"
output_path = OUT_DIR / f"{job_id}_{output_name}"
# Get input PDF
try:
if file is not None and file.filename:
data = await file.read()
validate_pdf_upload(data)
input_path.write_bytes(data)
elif url.strip():
is_drive = "drive.google.com" in url
await download_from_url(url, input_path, is_drive=is_drive)
else:
raise ValueError("Provide a PDF URL or upload a file")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except aiohttp.ClientError as e:
raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}")
# Clamp values
dpi = max(72, min(200, dpi))
quality = max(30, min(95, quality))
# Process watermark removal
try:
pdf_bytes = input_path.read_bytes()
original_size = len(pdf_bytes)
result_bytes = remove_watermark_from_pdf(
pdf_bytes=pdf_bytes,
watermark_text=watermark_text,
method=method,
intensity=intensity,
dpi=dpi,
jpeg_quality=quality
)
output_path.write_bytes(result_bytes)
output_size = len(result_bytes)
logger.info(f"Watermark removed: {output_name}, {original_size/1024:.0f}KB -> {output_size/1024:.0f}KB")
except Exception as e:
logger.exception("Watermark removal failed")
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name,
headers={
"X-Original-Size": str(original_size),
"X-Output-Size": str(output_size)
}
)
@app.post("/api/watermark-preview")
async def api_watermark_preview(
url: str = Form(""),
file: UploadFile = File(None),
page: int = Form(0),
method: str = Form("inpaint"),
intensity: int = Form(50),
):
"""Preview watermark removal on a single page - returns original and processed images"""
try:
from watermark_remover import preview_single_page, CV2_AVAILABLE
if not CV2_AVAILABLE:
raise HTTPException(status_code=500, detail="OpenCV not installed")
except ImportError as e:
raise HTTPException(status_code=500, detail=str(e))
# Get input PDF
try:
if file is not None and file.filename:
pdf_bytes = await file.read()
validate_pdf_upload(pdf_bytes)
elif url.strip():
job_id = generate_job_id()
input_path = WORK_DIR / f"preview_{job_id}.pdf"
is_drive = "drive.google.com" in url
await download_from_url(url, input_path, is_drive=is_drive)
pdf_bytes = input_path.read_bytes()
else:
raise ValueError("Provide a PDF URL or upload a file")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Generate preview
try:
import base64
original_png, processed_png = preview_single_page(
pdf_bytes=pdf_bytes,
page_num=page,
method=method,
intensity=intensity,
dpi=100
)
return JSONResponse({
"original": base64.b64encode(original_png).decode(),
"processed": base64.b64encode(processed_png).decode(),
"page": page
})
except Exception as e:
logger.exception("Preview generation failed")
raise HTTPException(status_code=500, detail=f"Preview failed: {str(e)}")
# =============== Images to PDF ===============
@app.post("/api/images-to-pdf")
async def api_images_to_pdf(
files: List[UploadFile] = File(...),
order: str = Form(""),
output_name: str = Form("images.pdf"),
page_size: str = Form("a4"),
margin: int = Form(20),
):
"""Convert multiple images to PDF with custom order"""
if not files:
raise HTTPException(status_code=400, detail="No images provided")
output_name = sanitize_filename(output_name, "images.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
# Parse order if provided (comma-separated indices)
if order.strip():
try:
indices = [int(i.strip()) for i in order.split(",")]
ordered_files = [files[i] for i in indices if 0 <= i < len(files)]
except (ValueError, IndexError):
ordered_files = files
else:
ordered_files = files
# Page sizes in points
page_sizes = {
"a4": (595, 842),
"letter": (612, 792),
"a3": (842, 1191),
"fit": None # Fit to image
}
try:
pil_images = []
for f in ordered_files:
data = await f.read()
try:
img = Image.open(io.BytesIO(data))
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
pil_images.append(img)
except Exception as e:
logger.warning(f"Skipping invalid image {f.filename}: {e}")
continue
if not pil_images:
raise HTTPException(status_code=400, detail="No valid images found")
# Create PDF
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
if page_size == "fit":
# Each page fits the image
first = pil_images[0]
rest = pil_images[1:] if len(pil_images) > 1 else []
first.save(output_path, "PDF", save_all=True, append_images=rest)
else:
# Fixed page size with margins
page_w, page_h = page_sizes.get(page_size, page_sizes["a4"])
doc = fitz.open()
for img in pil_images:
# Save image to bytes
img_buffer = io.BytesIO()
img.save(img_buffer, format='JPEG', quality=90)
img_bytes = img_buffer.getvalue()
# Create page
page = doc.new_page(width=page_w, height=page_h)
# Calculate image rect with margins
img_w, img_h = img.size
available_w = page_w - 2 * margin
available_h = page_h - 2 * margin
# Scale to fit
scale = min(available_w / img_w, available_h / img_h)
new_w = img_w * scale
new_h = img_h * scale
# Center on page
x = (page_w - new_w) / 2
y = (page_h - new_h) / 2
rect = fitz.Rect(x, y, x + new_w, y + new_h)
page.insert_image(rect, stream=img_bytes)
doc.save(str(output_path))
doc.close()
logger.info(f"Created PDF from {len(pil_images)} images")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name
)
except Exception as e:
logger.exception("Images to PDF failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Merge PDFs ===============
@app.post("/api/merge-pdf")
async def api_merge_pdf(
files: List[UploadFile] = File(...),
order: str = Form(""),
output_name: str = Form("merged.pdf"),
):
"""Merge multiple PDFs into one"""
if not files or len(files) < 2:
raise HTTPException(status_code=400, detail="At least 2 PDF files required")
output_name = sanitize_filename(output_name, "merged.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
# Parse order if provided
if order.strip():
try:
indices = [int(i.strip()) for i in order.split(",")]
ordered_files = [files[i] for i in indices if 0 <= i < len(files)]
except (ValueError, IndexError):
ordered_files = files
else:
ordered_files = files
try:
output_doc = fitz.open()
total_pages = 0
for f in ordered_files:
data = await f.read()
if not data[:5] == b"%PDF-":
logger.warning(f"Skipping non-PDF file: {f.filename}")
continue
src_doc = fitz.open(stream=data, filetype="pdf")
output_doc.insert_pdf(src_doc)
total_pages += len(src_doc)
src_doc.close()
if total_pages == 0:
raise HTTPException(status_code=400, detail="No valid PDF files found")
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
output_doc.save(str(output_path), deflate=True)
output_doc.close()
logger.info(f"Merged {len(ordered_files)} PDFs, {total_pages} total pages")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name
)
except Exception as e:
logger.exception("PDF merge failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Split PDF ===============
@app.post("/api/split-pdf")
async def api_split_pdf(
file: UploadFile = File(...),
mode: str = Form("all"),
pages: str = Form(""),
output_name: str = Form("split"),
):
"""
Split PDF into multiple files.
Modes: 'all' (each page), 'range' (specific pages), 'chunks' (every N pages)
"""
data = await file.read()
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "split")
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
if mode == "all":
# Split into individual pages
job_id = generate_job_id()
zip_path = OUT_DIR / f"{job_id}_{output_name}.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for i in range(total_pages):
page_doc = fitz.open()
page_doc.insert_pdf(src_doc, from_page=i, to_page=i)
pdf_bytes = page_doc.tobytes()
zf.writestr(f"{output_name}_page_{i+1}.pdf", pdf_bytes)
page_doc.close()
src_doc.close()
logger.info(f"Split PDF into {total_pages} individual pages")
return FileResponse(
path=str(zip_path),
media_type="application/zip",
filename=f"{output_name}_pages.zip"
)
elif mode == "range":
# Extract specific pages
page_set = parse_page_spec(pages, total_pages)
if not page_set:
raise HTTPException(status_code=400, detail="No valid pages specified")
output_doc = fitz.open()
for i in sorted(page_set):
output_doc.insert_pdf(src_doc, from_page=i, to_page=i)
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.pdf"
output_doc.save(str(output_path))
output_doc.close()
src_doc.close()
logger.info(f"Extracted {len(page_set)} pages from PDF")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=f"{output_name}.pdf"
)
elif mode == "chunks":
# Split into chunks of N pages
try:
chunk_size = int(pages) if pages else 1
chunk_size = max(1, chunk_size)
except ValueError:
chunk_size = 1
job_id = generate_job_id()
zip_path = OUT_DIR / f"{job_id}_{output_name}.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
chunk_num = 1
for start in range(0, total_pages, chunk_size):
end = min(start + chunk_size - 1, total_pages - 1)
chunk_doc = fitz.open()
chunk_doc.insert_pdf(src_doc, from_page=start, to_page=end)
pdf_bytes = chunk_doc.tobytes()
zf.writestr(f"{output_name}_part_{chunk_num}.pdf", pdf_bytes)
chunk_doc.close()
chunk_num += 1
src_doc.close()
logger.info(f"Split PDF into {chunk_num-1} chunks of {chunk_size} pages")
return FileResponse(
path=str(zip_path),
media_type="application/zip",
filename=f"{output_name}_parts.zip"
)
else:
raise HTTPException(status_code=400, detail="Invalid split mode")
except HTTPException:
raise
except Exception as e:
logger.exception("PDF split failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== PDF to Images ===============
@app.post("/api/pdf-to-images")
async def api_pdf_to_images(
file: UploadFile = File(...),
format: str = Form("png"),
dpi: int = Form(200),
pages: str = Form(""),
output_name: str = Form("pages"),
):
"""Convert PDF pages to high-quality images (PNG or JPG)
DPI Guide:
- 150: Fast, small files (web preview)
- 200: Good quality (default)
- 300: Print quality
- 400: Ultra sharp (presentations)
- 600: Maximum quality (OCR/archive)
"""
data = await file.read()
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "pages")
format = format.lower() if format.lower() in ["png", "jpg", "jpeg"] else "png"
if format == "jpeg":
format = "jpg"
# Allow higher DPI for ultra-sharp output (up to 600)
dpi = max(72, min(600, dpi))
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
# Parse page selection
if pages.strip():
page_set = parse_page_spec(pages, total_pages)
else:
page_set = set(range(total_pages))
if not page_set:
raise HTTPException(status_code=400, detail="No valid pages specified")
job_id = generate_job_id()
zip_path = OUT_DIR / f"{job_id}_{output_name}.zip"
# Vector render at specified DPI - handles portrait/landscape automatically
zoom = dpi / 72.0
matrix = fitz.Matrix(zoom, zoom)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for i in sorted(page_set):
page = src_doc.load_page(i)
# alpha=False removes transparency for sharper text
pix = page.get_pixmap(matrix=matrix, alpha=False)
if format == "png":
img_bytes = pix.tobytes("png")
else:
# Convert to JPG via PIL
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=95, optimize=True)
img_bytes = buffer.getvalue()
zf.writestr(f"{output_name}_page_{i+1}.{format}", img_bytes)
src_doc.close()
logger.info(f"Converted {len(page_set)} pages to {format.upper()} at {dpi} DPI")
return FileResponse(
path=str(zip_path),
media_type="application/zip",
filename=f"{output_name}_images.zip"
)
except HTTPException:
raise
except Exception as e:
logger.exception("PDF to images failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Compress PDF ===============
@app.post("/api/compress-pdf")
async def api_compress_pdf(
file: UploadFile = File(...),
quality: int = Form(60),
output_name: str = Form("compressed.pdf"),
):
"""Compress PDF by reducing image quality and cleaning metadata"""
data = await file.read()
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "compressed.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
# Use quality directly (10-100 scale, same as image compression)
jpeg_quality = max(10, min(100, quality))
# Calculate max dimension based on quality
# Lower quality = smaller max dimension for more compression
max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000
logger.info(f"Compressing PDF with quality={jpeg_quality}, max_dim={max_dim}")
try:
src_doc = fitz.open(stream=data, filetype="pdf")
images_processed = 0
# Compress images within the PDF
for page_num in range(len(src_doc)):
page = src_doc.load_page(page_num)
image_list = page.get_images(full=True)
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
try:
# Extract image
base_image = src_doc.extract_image(xref)
if not base_image:
continue
image_bytes = base_image["image"]
original_img_size = len(image_bytes)
# Skip very small images
if original_img_size < 5000:
continue
# Open and compress image
img = Image.open(io.BytesIO(image_bytes))
# Convert to RGB if necessary
if img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
# Resize if larger than max_dim
if max(img.size) > max_dim:
ratio = max_dim / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Compress to JPEG with specified quality
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True)
compressed_bytes = buffer.getvalue()
# Always replace to apply quality setting (even if slightly larger)
# This ensures quality slider actually affects output
page.replace_image(xref, stream=compressed_bytes)
images_processed += 1
except Exception as e:
logger.debug(f"Could not compress image {xref}: {e}")
continue
logger.info(f"Processed {images_processed} images")
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
# Save with compression options
src_doc.save(
str(output_path),
garbage=4, # Remove unused objects
deflate=True, # Compress streams
clean=True, # Clean content streams
)
original_size = len(data)
compressed_size = output_path.stat().st_size
# If compression didn't help much, try alternative method
if compressed_size >= original_size * 0.95:
src_doc.close()
# Fallback: just clean and deflate without image processing
src_doc = fitz.open(stream=data, filetype="pdf")
src_doc.save(
str(output_path),
garbage=4,
deflate=True,
clean=True,
)
compressed_size = output_path.stat().st_size
reduction = ((original_size - compressed_size) / original_size) * 100
if reduction < 0:
reduction = 0
# If still larger, just copy original
output_path.write_bytes(data)
compressed_size = original_size
src_doc.close()
logger.info(f"Compressed PDF: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name,
headers={
"X-Original-Size": str(original_size),
"X-Compressed-Size": str(compressed_size),
"X-Reduction-Percent": f"{reduction:.1f}"
}
)
except Exception as e:
logger.exception("PDF compression failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Rotate PDF ===============
@app.post("/api/rotate-pdf")
async def api_rotate_pdf(
file: UploadFile = File(...),
rotation: int = Form(90),
pages: str = Form(""),
output_name: str = Form("rotated.pdf"),
):
"""Rotate PDF pages (90, 180, or 270 degrees)"""
data = await file.read()
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "rotated.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
# Normalize rotation to 90, 180, or 270
rotation = int(rotation) % 360
if rotation not in [90, 180, 270]:
rotation = 90
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
# Parse page selection (empty = all pages)
if pages.strip():
page_set = parse_page_spec(pages, total_pages)
else:
page_set = set(range(total_pages))
# Rotate selected pages
for i in page_set:
page = src_doc.load_page(i)
page.set_rotation(page.rotation + rotation)
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
src_doc.save(str(output_path))
src_doc.close()
logger.info(f"Rotated {len(page_set)} pages by {rotation}°")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name
)
except Exception as e:
logger.exception("PDF rotation failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Add Page Numbers ===============
@app.post("/api/add-page-numbers")
async def api_add_page_numbers(
file: UploadFile = File(...),
position: str = Form("bottom-center"),
format: str = Form("Page {n} of {total}"),
start_number: int = Form(1),
font_size: int = Form(11),
margin: int = Form(30),
output_name: str = Form("numbered.pdf"),
):
"""Add page numbers to PDF"""
data = await file.read()
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "numbered.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
# Position mapping
positions = {
"top-left": ("left", "top"),
"top-center": ("center", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-center": ("center", "bottom"),
"bottom-right": ("right", "bottom"),
}
h_align, v_align = positions.get(position, ("center", "bottom"))
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
for i in range(total_pages):
page = src_doc.load_page(i)
rect = page.rect
# Format page number text
page_num = start_number + i
text = format.replace("{n}", str(page_num)).replace("{total}", str(total_pages))
# Calculate position
text_width = fitz.get_text_length(text, fontsize=font_size)
if h_align == "left":
x = margin
elif h_align == "right":
x = rect.width - margin - text_width
else: # center
x = (rect.width - text_width) / 2
if v_align == "top":
y = margin + font_size
else: # bottom
y = rect.height - margin
# Insert text
page.insert_text(
(x, y),
text,
fontsize=font_size,
color=(0.3, 0.3, 0.3),
)
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
src_doc.save(str(output_path))
src_doc.close()
logger.info(f"Added page numbers to {total_pages} pages")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name
)
except Exception as e:
logger.exception("Add page numbers failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== PDF OCR (Text Extraction) ===============
# Check for Tesseract availability
TESSERACT_AVAILABLE = False
try:
import pytesseract
# Test if tesseract is installed
pytesseract.get_tesseract_version()
TESSERACT_AVAILABLE = True
except Exception:
logger.warning("Tesseract not available. OCR will use basic text extraction only.")
@app.post("/api/pdf-ocr")
async def api_pdf_ocr(
file: UploadFile = File(...),
language: str = Form("eng"),
pages: str = Form(""),
output_format: str = Form("txt"),
dpi: int = Form(200),
):
"""Extract text from PDF using OCR (Tesseract) or native text extraction"""
data = await file.read()
validate_pdf_upload(data)
dpi = max(100, min(400, dpi))
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
# Parse page selection
if pages.strip():
page_set = parse_page_spec(pages, total_pages)
else:
page_set = set(range(total_pages))
if not page_set:
raise HTTPException(status_code=400, detail="No valid pages specified")
all_text = []
for page_num in sorted(page_set):
page = src_doc.load_page(page_num)
# First try native text extraction
native_text = page.get_text("text").strip()
# If native text is substantial, use it
if len(native_text) > 50:
all_text.append(f"--- Page {page_num + 1} ---\n{native_text}")
elif TESSERACT_AVAILABLE:
# Use OCR for scanned pages
import pytesseract
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
# Run OCR
ocr_text = pytesseract.image_to_string(img, lang=language)
all_text.append(f"--- Page {page_num + 1} (OCR) ---\n{ocr_text.strip()}")
else:
# No OCR available, use whatever native text we got
all_text.append(f"--- Page {page_num + 1} ---\n{native_text if native_text else '[No text detected - Tesseract not installed]'}")
src_doc.close()
combined_text = "\n\n".join(all_text)
job_id = generate_job_id()
if output_format == "json":
# Return as JSON
return JSONResponse({
"pages": len(page_set),
"text": combined_text,
"ocr_used": TESSERACT_AVAILABLE,
})
else:
# Return as text file
output_path = OUT_DIR / f"{job_id}_extracted.txt"
output_path.write_text(combined_text, encoding="utf-8")
logger.info(f"Extracted text from {len(page_set)} pages")
return FileResponse(
path=str(output_path),
media_type="text/plain",
filename="extracted_text.txt"
)
except HTTPException:
raise
except Exception as e:
logger.exception("PDF OCR failed")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/ocr-status")
async def api_ocr_status():
"""Check if Tesseract OCR is available"""
return {"available": TESSERACT_AVAILABLE}
# =============== Enhance Quality ===============
def enhance_image(img: Image.Image, level: str = "medium") -> Image.Image:
"""Enhance image quality with sharpening and contrast adjustment"""
from PIL import ImageEnhance, ImageFilter
# Level presets: (sharpness, contrast, color, brightness)
presets = {
"light": (1.2, 1.05, 1.05, 1.02),
"medium": (1.4, 1.1, 1.1, 1.03),
"strong": (1.6, 1.15, 1.15, 1.05),
}
sharpness, contrast, color, brightness = presets.get(level, presets["medium"])
# Convert to RGB if needed
if img.mode in ('RGBA', 'P'):
# Preserve alpha if present
if img.mode == 'RGBA':
alpha = img.split()[3]
img = img.convert('RGB')
else:
img = img.convert('RGB')
alpha = None
else:
img = img.convert('RGB')
alpha = None
# Apply slight unsharp mask for detail enhancement
img = img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=50, threshold=3))
# Enhance sharpness
enhancer = ImageEnhance.Sharpness(img)
img = enhancer.enhance(sharpness)
# Enhance contrast
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(contrast)
# Enhance color saturation slightly
enhancer = ImageEnhance.Color(img)
img = enhancer.enhance(color)
# Slight brightness adjustment
enhancer = ImageEnhance.Brightness(img)
img = enhancer.enhance(brightness)
# Restore alpha channel if it existed
if alpha:
img = img.convert('RGBA')
img.putalpha(alpha)
return img
@app.post("/api/enhance-image")
async def api_enhance_image(
file: UploadFile = File(...),
level: str = Form("medium"),
upscale: float = Form(1.0),
output_name: str = Form("enhanced"),
):
"""Enhance image quality with sharpening and optional upscaling"""
data = await file.read()
original_size = len(data)
if original_size > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
output_name = sanitize_filename(output_name, "enhanced")
level = level if level in ["light", "medium", "strong"] else "medium"
upscale = max(1.0, min(2.0, upscale)) # Limit upscale to 2x max
try:
img = Image.open(io.BytesIO(data))
original_format = file.filename.split('.')[-1].lower() if file.filename else 'jpg'
# Upscale if requested (using LANCZOS for quality)
if upscale > 1.0:
new_size = (int(img.size[0] * upscale), int(img.size[1] * upscale))
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Apply enhancement
img = enhance_image(img, level)
# Determine output format and quality
if original_format in ['jpg', 'jpeg']:
output_format = 'JPEG'
ext = 'jpg'
mime = 'image/jpeg'
if img.mode == 'RGBA':
img = img.convert('RGB')
# Use quality that keeps file size reasonable
quality = 88
elif original_format == 'webp':
output_format = 'WEBP'
ext = 'webp'
mime = 'image/webp'
quality = 88
else:
output_format = 'PNG'
ext = 'png'
mime = 'image/png'
quality = None
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}"
save_kwargs = {'optimize': True}
if quality:
save_kwargs['quality'] = quality
img.save(str(output_path), output_format, **save_kwargs)
enhanced_size = output_path.stat().st_size
# If file got too large (more than 3x original), reduce quality
if enhanced_size > original_size * 3 and output_format in ['JPEG', 'WEBP']:
quality = 75
img.save(str(output_path), output_format, quality=quality, optimize=True)
enhanced_size = output_path.stat().st_size
logger.info(f"Enhanced image: {original_size} -> {enhanced_size} bytes")
return FileResponse(
path=str(output_path),
media_type=mime,
filename=f"{output_name}.{ext}",
headers={
"X-Original-Size": str(original_size),
"X-Enhanced-Size": str(enhanced_size),
}
)
except Exception as e:
logger.exception("Image enhancement failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/enhance-pdf")
async def api_enhance_pdf(
file: UploadFile = File(...),
level: str = Form("medium"),
dpi: int = Form(150),
output_name: str = Form("enhanced.pdf"),
):
"""Enhance PDF quality by improving embedded images"""
data = await file.read()
original_size = len(data)
validate_pdf_upload(data)
output_name = sanitize_filename(output_name, "enhanced.pdf")
if not output_name.lower().endswith(".pdf"):
output_name += ".pdf"
level = level if level in ["light", "medium", "strong"] else "medium"
dpi = max(100, min(200, dpi)) # Limit DPI to prevent bloat
# Quality settings based on level
jpeg_quality = {"light": 82, "medium": 85, "strong": 88}.get(level, 85)
try:
src_doc = fitz.open(stream=data, filetype="pdf")
images_enhanced = 0
for page_num in range(len(src_doc)):
page = src_doc.load_page(page_num)
image_list = page.get_images(full=True)
for img_info in image_list:
xref = img_info[0]
try:
base_image = src_doc.extract_image(xref)
if not base_image:
continue
image_bytes = base_image["image"]
# Skip small images (icons, etc.)
if len(image_bytes) < 5000:
continue
# Open and enhance image
img = Image.open(io.BytesIO(image_bytes))
# Skip if already small dimensions
if max(img.size) < 100:
continue
# Convert mode if needed
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
# Apply enhancement
img = enhance_image(img, level)
# Save as optimized JPEG
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True)
enhanced_bytes = buffer.getvalue()
# Only replace if not significantly larger
if len(enhanced_bytes) <= len(image_bytes) * 1.5:
page.replace_image(xref, stream=enhanced_bytes)
images_enhanced += 1
except Exception as e:
logger.debug(f"Could not enhance image {xref}: {e}")
continue
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
# Save with optimization
src_doc.save(
str(output_path),
garbage=3,
deflate=True,
clean=True,
)
src_doc.close()
enhanced_size = output_path.stat().st_size
# Safety check: if file is way too large, just return original
if enhanced_size > original_size * 4:
output_path.write_bytes(data)
enhanced_size = original_size
logger.warning("Enhanced PDF was too large, returning original")
size_change = ((enhanced_size - original_size) / original_size) * 100
logger.info(f"Enhanced PDF: {images_enhanced} images, {original_size} -> {enhanced_size} bytes ({size_change:+.1f}%)")
return FileResponse(
path=str(output_path),
media_type="application/pdf",
filename=output_name,
headers={
"X-Original-Size": str(original_size),
"X-Enhanced-Size": str(enhanced_size),
"X-Images-Enhanced": str(images_enhanced),
}
)
except Exception as e:
logger.exception("PDF enhancement failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Image Tools ===============
# Check for rembg availability
REMBG_AVAILABLE = False
rembg_remove = None
def _check_rembg():
"""Safely check if rembg is available"""
global REMBG_AVAILABLE, rembg_remove
try:
import sys
import io
# Capture stderr to suppress rembg warnings
old_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
from rembg import remove
rembg_remove = remove
REMBG_AVAILABLE = True
finally:
sys.stderr = old_stderr
except (ImportError, Exception) as e:
REMBG_AVAILABLE = False
logger.info(f"rembg not available: {e}")
# Don't check at import time - check lazily on first use
# _check_rembg()
@app.post("/api/remove-background")
async def api_remove_background(
file: UploadFile = File(...),
output_name: str = Form("no-bg.png"),
):
"""Remove background from image using AI (rembg)"""
global REMBG_AVAILABLE, rembg_remove
# Lazy load rembg on first use
if rembg_remove is None and not REMBG_AVAILABLE:
try:
from rembg import remove
rembg_remove = remove
REMBG_AVAILABLE = True
except Exception as e:
logger.warning(f"rembg not available: {e}")
REMBG_AVAILABLE = False
if not REMBG_AVAILABLE or rembg_remove is None:
raise HTTPException(
status_code=500,
detail="rembg not installed. Run: pip install rembg[gpu] or pip install rembg"
)
data = await file.read()
if len(data) > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
output_name = sanitize_filename(output_name, "no-bg.png")
if not output_name.lower().endswith(".png"):
output_name += ".png"
try:
# Process with rembg
result = rembg_remove(data)
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
output_path.write_bytes(result)
logger.info(f"Background removed: {output_name}")
return FileResponse(
path=str(output_path),
media_type="image/png",
filename=output_name
)
except Exception as e:
logger.exception("Background removal failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/add-image-watermark")
async def api_add_image_watermark(
file: UploadFile = File(...),
text: str = Form(""),
position: str = Form("center"),
opacity: int = Form(50),
font_size: int = Form(36),
color: str = Form("#000000"),
rotation: int = Form(0),
output_name: str = Form("watermarked"),
):
"""Add text watermark to image"""
data = await file.read()
if len(data) > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
if not text.strip():
raise HTTPException(status_code=400, detail="Watermark text is required")
output_name = sanitize_filename(output_name, "watermarked")
try:
from PIL import ImageDraw, ImageFont
img = Image.open(io.BytesIO(data))
if img.mode != 'RGBA':
img = img.convert('RGBA')
# Create watermark layer
watermark = Image.new('RGBA', img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark)
# Try to use a font, fallback to default
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
font = ImageFont.load_default()
# Parse color
color_hex = color.lstrip('#')
r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
alpha = int(255 * opacity / 100)
# Get text size
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Calculate position
positions = {
"top-left": (20, 20),
"top-center": ((img.width - text_width) // 2, 20),
"top-right": (img.width - text_width - 20, 20),
"center": ((img.width - text_width) // 2, (img.height - text_height) // 2),
"bottom-left": (20, img.height - text_height - 20),
"bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20),
"bottom-right": (img.width - text_width - 20, img.height - text_height - 20),
"tile": None,
}
if position == "tile":
# Tile watermark across image
for y in range(0, img.height, text_height + 100):
for x in range(0, img.width, text_width + 100):
draw.text((x, y), text, font=font, fill=(r, g, b, alpha))
else:
pos = positions.get(position, positions["center"])
draw.text(pos, text, font=font, fill=(r, g, b, alpha))
# Rotate watermark if needed
if rotation != 0:
watermark = watermark.rotate(rotation, expand=False, center=(img.width//2, img.height//2))
# Composite
result = Image.alpha_composite(img, watermark)
# Determine output format
original_format = file.filename.split('.')[-1].lower() if file.filename else 'png'
if original_format in ['jpg', 'jpeg']:
result = result.convert('RGB')
output_format = 'JPEG'
output_name += '.jpg'
mime = 'image/jpeg'
else:
output_format = 'PNG'
output_name += '.png'
mime = 'image/png'
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}"
result.save(str(output_path), output_format, quality=95)
logger.info(f"Watermark added: {output_name}")
return FileResponse(path=str(output_path), media_type=mime, filename=output_name)
except Exception as e:
logger.exception("Add watermark failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/resize-image")
async def api_resize_image(
file: UploadFile = File(...),
preset: str = Form("custom"),
width: int = Form(0),
height: int = Form(0),
maintain_aspect: bool = Form(True),
output_name: str = Form("resized"),
):
"""Resize image with presets or custom dimensions"""
data = await file.read()
if len(data) > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
output_name = sanitize_filename(output_name, "resized")
# Presets
presets = {
"thumbnail": (150, 150),
"small": (320, 240),
"medium": (640, 480),
"hd": (1280, 720),
"fullhd": (1920, 1080),
"4k": (3840, 2160),
"square-sm": (500, 500),
"square-lg": (1000, 1000),
}
try:
img = Image.open(io.BytesIO(data))
original_width, original_height = img.size
# Determine target size
if preset != "custom" and preset in presets:
target_width, target_height = presets[preset]
else:
target_width = width if width > 0 else original_width
target_height = height if height > 0 else original_height
# Maintain aspect ratio
if maintain_aspect:
ratio = min(target_width / original_width, target_height / original_height)
target_width = int(original_width * ratio)
target_height = int(original_height * ratio)
# Resize
resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS)
# Determine output format
original_format = file.filename.split('.')[-1].lower() if file.filename else 'png'
format_map = {'jpg': 'JPEG', 'jpeg': 'JPEG', 'png': 'PNG', 'webp': 'WEBP', 'gif': 'GIF'}
output_format = format_map.get(original_format, 'PNG')
ext = original_format if original_format in format_map else 'png'
if output_format == 'JPEG' and resized.mode == 'RGBA':
resized = resized.convert('RGB')
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}"
resized.save(str(output_path), output_format, quality=95)
mime_map = {'JPEG': 'image/jpeg', 'PNG': 'image/png', 'WEBP': 'image/webp', 'GIF': 'image/gif'}
logger.info(f"Resized image: {original_width}x{original_height} -> {target_width}x{target_height}")
return FileResponse(
path=str(output_path),
media_type=mime_map.get(output_format, 'image/png'),
filename=f"{output_name}.{ext}",
headers={
"X-Original-Size": f"{original_width}x{original_height}",
"X-New-Size": f"{target_width}x{target_height}"
}
)
except Exception as e:
logger.exception("Resize image failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/convert-image")
async def api_convert_image(
file: UploadFile = File(...),
target_format: str = Form("png"),
quality: int = Form(90),
output_name: str = Form("converted"),
):
"""Convert image between formats (JPG, PNG, WebP, GIF)"""
data = await file.read()
if len(data) > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
output_name = sanitize_filename(output_name, "converted")
target_format = target_format.lower()
format_map = {
'jpg': ('JPEG', 'image/jpeg', 'jpg'),
'jpeg': ('JPEG', 'image/jpeg', 'jpg'),
'png': ('PNG', 'image/png', 'png'),
'webp': ('WEBP', 'image/webp', 'webp'),
'gif': ('GIF', 'image/gif', 'gif'),
}
if target_format not in format_map:
raise HTTPException(status_code=400, detail="Unsupported format. Use: jpg, png, webp, gif")
pil_format, mime, ext = format_map[target_format]
try:
img = Image.open(io.BytesIO(data))
# Handle transparency for JPEG
if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None)
img = background
elif pil_format == 'JPEG' and img.mode != 'RGB':
img = img.convert('RGB')
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}"
save_kwargs = {}
if pil_format in ['JPEG', 'WEBP']:
save_kwargs['quality'] = quality
if pil_format == 'PNG':
save_kwargs['optimize'] = True
img.save(str(output_path), pil_format, **save_kwargs)
logger.info(f"Converted image to {target_format.upper()}")
return FileResponse(
path=str(output_path),
media_type=mime,
filename=f"{output_name}.{ext}"
)
except Exception as e:
logger.exception("Convert image failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/compress-image")
async def api_compress_image(
file: UploadFile = File(...),
quality: int = Form(70),
output_name: str = Form("compressed"),
):
"""Compress image to reduce file size"""
data = await file.read()
original_size = len(data)
if original_size > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
output_name = sanitize_filename(output_name, "compressed")
quality = max(10, min(100, quality))
try:
img = Image.open(io.BytesIO(data))
# Always convert to JPEG for effective compression with quality control
# (PNG optimize doesn't use quality setting and gives minimal compression)
if img.mode in ('RGBA', 'P'):
# Handle transparency by compositing on white background
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
output_format = 'JPEG'
ext = 'jpg'
mime = 'image/jpeg'
save_kwargs = {'quality': quality, 'optimize': True}
output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}"
img.save(str(output_path), output_format, **save_kwargs)
compressed_size = output_path.stat().st_size
reduction = ((original_size - compressed_size) / original_size) * 100
logger.info(f"Compressed image: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)")
return FileResponse(
path=str(output_path),
media_type=mime,
filename=f"{output_name}.{ext}",
headers={
"X-Original-Size": str(original_size),
"X-Compressed-Size": str(compressed_size),
"X-Reduction-Percent": f"{reduction:.1f}"
}
)
except Exception as e:
logger.exception("Compress image failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Estimate File Size Endpoints ===============
@app.post("/api/estimate/compress-image")
async def api_estimate_compress_image(
file: UploadFile = File(...),
quality: int = Form(70),
):
"""Estimate compressed image file size - matches actual compression logic"""
data = await file.read()
original_size = len(data)
if original_size > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
quality = max(10, min(100, quality))
try:
img = Image.open(io.BytesIO(data))
# Always convert to JPEG for compression (matches actual compression logic)
if img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=quality, optimize=True)
estimated_size = len(buffer.getvalue())
reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0
return JSONResponse({
"original_size": original_size,
"estimated_size": estimated_size,
"reduction_percent": round(max(0, reduction), 1)
})
except Exception as e:
logger.exception("Estimate compress image failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/estimate/convert-image")
async def api_estimate_convert_image(
file: UploadFile = File(...),
target_format: str = Form("png"),
quality: int = Form(90),
):
"""Estimate converted image file size"""
data = await file.read()
original_size = len(data)
if original_size > settings.max_image_size_mb * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB")
target_format = target_format.lower()
format_map = {
'jpg': ('JPEG', 'jpg'),
'jpeg': ('JPEG', 'jpg'),
'png': ('PNG', 'png'),
'webp': ('WEBP', 'webp'),
'gif': ('GIF', 'gif'),
}
if target_format not in format_map:
raise HTTPException(status_code=400, detail="Unsupported format")
pil_format, ext = format_map[target_format]
try:
img = Image.open(io.BytesIO(data))
# Handle transparency for JPEG
if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None)
img = background
elif pil_format == 'JPEG' and img.mode != 'RGB':
img = img.convert('RGB')
buffer = io.BytesIO()
save_kwargs = {}
if pil_format in ['JPEG', 'WEBP']:
save_kwargs['quality'] = quality
if pil_format == 'PNG':
save_kwargs['optimize'] = True
img.save(buffer, pil_format, **save_kwargs)
estimated_size = len(buffer.getvalue())
return JSONResponse({
"original_size": original_size,
"estimated_size": estimated_size,
"target_format": target_format.upper()
})
except Exception as e:
logger.exception("Estimate convert image failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/estimate/compress-pdf")
async def api_estimate_compress_pdf(
file: UploadFile = File(...),
quality: int = Form(60),
):
"""Estimate compressed PDF file size by actually compressing it in memory"""
data = await file.read()
original_size = len(data)
validate_pdf_upload(data)
# Use quality directly (10-100 scale)
jpeg_quality = max(10, min(100, quality))
max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000
try:
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
# Process ALL pages for accurate estimation
for page_num in range(total_pages):
page = src_doc.load_page(page_num)
image_list = page.get_images(full=True)
for img_info in image_list:
xref = img_info[0]
try:
base_image = src_doc.extract_image(xref)
if not base_image or len(base_image["image"]) < 5000:
continue
image_bytes = base_image["image"]
img = Image.open(io.BytesIO(image_bytes))
if img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
# Resize if larger than max_dim
if max(img.size) > max_dim:
ratio = max_dim / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True)
compressed_bytes = buffer.getvalue()
# Always replace to apply quality setting
page.replace_image(xref, stream=compressed_bytes)
except Exception:
continue
# Get actual compressed size
compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True)
estimated_size = len(compressed_bytes)
src_doc.close()
reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0
return JSONResponse({
"original_size": original_size,
"estimated_size": estimated_size,
"reduction_percent": round(max(0, reduction), 1),
"quality": quality
})
except Exception as e:
logger.exception("Estimate compress PDF failed")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/rembg-status")
async def api_rembg_status():
"""Check if rembg is available"""
global REMBG_AVAILABLE, rembg_remove
# Try to load rembg if not checked yet
if rembg_remove is None and not REMBG_AVAILABLE:
try:
from rembg import remove
rembg_remove = remove
REMBG_AVAILABLE = True
except Exception:
REMBG_AVAILABLE = False
return {"available": REMBG_AVAILABLE}
# =============== Preview Endpoints ===============
def image_to_base64(img: Image.Image, format: str = "PNG", quality: int = 95) -> str:
"""Convert PIL Image to base64 string"""
import base64
buffer = io.BytesIO()
if format == "JPEG" and img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(buffer, format=format, quality=quality if format == "JPEG" else None)
return base64.b64encode(buffer.getvalue()).decode()
def prepare_preview_image(img: Image.Image, max_size: int = 1920) -> Image.Image:
"""Prepare image for preview - only resize if larger than max_size (HD)"""
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
return img
def pdf_page_to_base64(pdf_bytes: bytes, page_num: int = 0, dpi: int = 200) -> str:
"""Convert PDF page to base64 PNG at high quality"""
import base64
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
page = doc.load_page(page_num)
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
png_bytes = pix.tobytes("png")
doc.close()
return base64.b64encode(png_bytes).decode()
@app.post("/api/preview/compress-pdf")
async def api_preview_compress_pdf(
file: UploadFile = File(...),
quality: int = Form(60),
):
"""Preview PDF compression - returns original and compressed first page"""
data = await file.read()
validate_pdf_upload(data)
jpeg_quality = max(10, min(100, quality))
max_dim = int(1000 + (quality / 100) * 2000) # Range: 1000-3000
try:
# Get original first page (higher DPI for better quality)
original_b64 = pdf_page_to_base64(data, 0, 150)
# Compress PDF
src_doc = fitz.open(stream=data, filetype="pdf")
for page_num in range(min(1, len(src_doc))): # Only process first page for preview
page = src_doc.load_page(page_num)
image_list = page.get_images(full=True)
for img_info in image_list:
xref = img_info[0]
try:
base_image = src_doc.extract_image(xref)
if not base_image or len(base_image["image"]) < 5000:
continue
img = Image.open(io.BytesIO(base_image["image"]))
if img.mode in ('RGBA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
img = background
else:
img = img.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
# Resize if larger than max_dim
if max(img.size) > max_dim:
ratio = max_dim / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True)
# Always replace to apply quality setting
page.replace_image(xref, stream=buffer.getvalue())
except Exception:
continue
compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True)
src_doc.close()
processed_b64 = pdf_page_to_base64(compressed_bytes, 0, 150)
return JSONResponse({
"original": original_b64,
"processed": processed_b64,
"original_size": len(data),
"processed_size": len(compressed_bytes)
})
except Exception as e:
logger.exception("Compress preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/rotate-pdf")
async def api_preview_rotate_pdf(
file: UploadFile = File(...),
rotation: int = Form(90),
):
"""Preview PDF rotation - returns original and rotated first page"""
data = await file.read()
validate_pdf_upload(data)
rotation = int(rotation) % 360
if rotation not in [90, 180, 270]:
rotation = 90
try:
original_b64 = pdf_page_to_base64(data, 0, 150)
src_doc = fitz.open(stream=data, filetype="pdf")
page = src_doc.load_page(0)
page.set_rotation(page.rotation + rotation)
rotated_bytes = src_doc.tobytes()
src_doc.close()
processed_b64 = pdf_page_to_base64(rotated_bytes, 0, 150)
return JSONResponse({
"original": original_b64,
"processed": processed_b64
})
except Exception as e:
logger.exception("Rotate preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/page-numbers")
async def api_preview_page_numbers(
file: UploadFile = File(...),
position: str = Form("bottom-center"),
format: str = Form("Page {n} of {total}"),
start_number: int = Form(1),
font_size: int = Form(11),
):
"""Preview page numbers - returns original and numbered first page"""
data = await file.read()
validate_pdf_upload(data)
positions = {
"top-left": ("left", "top"),
"top-center": ("center", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-center": ("center", "bottom"),
"bottom-right": ("right", "bottom"),
}
h_align, v_align = positions.get(position, ("center", "bottom"))
margin = 30
try:
original_b64 = pdf_page_to_base64(data, 0, 150)
src_doc = fitz.open(stream=data, filetype="pdf")
total_pages = len(src_doc)
page = src_doc.load_page(0)
rect = page.rect
text = format.replace("{n}", str(start_number)).replace("{total}", str(total_pages))
text_width = fitz.get_text_length(text, fontsize=font_size)
if h_align == "left":
x = margin
elif h_align == "right":
x = rect.width - margin - text_width
else:
x = (rect.width - text_width) / 2
if v_align == "top":
y = margin + font_size
else:
y = rect.height - margin
page.insert_text((x, y), text, fontsize=font_size, color=(0.3, 0.3, 0.3))
numbered_bytes = src_doc.tobytes()
src_doc.close()
processed_b64 = pdf_page_to_base64(numbered_bytes, 0, 150)
return JSONResponse({
"original": original_b64,
"processed": processed_b64
})
except Exception as e:
logger.exception("Page numbers preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/remove-background")
async def api_preview_remove_background(
file: UploadFile = File(...),
):
"""Preview background removal - returns original and processed image in HD quality"""
global REMBG_AVAILABLE, rembg_remove
if rembg_remove is None and not REMBG_AVAILABLE:
try:
from rembg import remove
rembg_remove = remove
REMBG_AVAILABLE = True
except Exception:
REMBG_AVAILABLE = False
if not REMBG_AVAILABLE or rembg_remove is None:
raise HTTPException(status_code=500, detail="rembg not installed")
data = await file.read()
try:
img = Image.open(io.BytesIO(data))
img = prepare_preview_image(img)
original_b64 = image_to_base64(img, "PNG")
# Process with rembg
img_buffer = io.BytesIO()
img_rgb = img.convert('RGB') if img.mode != 'RGB' else img
img_rgb.save(img_buffer, format='PNG')
result = rembg_remove(img_buffer.getvalue())
result_img = Image.open(io.BytesIO(result))
processed_b64 = image_to_base64(result_img, "PNG")
return JSONResponse({
"original": original_b64,
"processed": processed_b64
})
except Exception as e:
logger.exception("Remove background preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/enhance-image")
async def api_preview_enhance_image(
file: UploadFile = File(...),
level: str = Form("medium"),
upscale: float = Form(1.0),
):
"""Preview image enhancement - returns original and enhanced image in HD quality"""
data = await file.read()
try:
img = Image.open(io.BytesIO(data))
img = prepare_preview_image(img)
original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95)
work_img = img.copy()
if upscale > 1.0:
new_size = (int(work_img.size[0] * min(upscale, 2.0)), int(work_img.size[1] * min(upscale, 2.0)))
work_img = work_img.resize(new_size, Image.Resampling.LANCZOS)
enhanced = enhance_image(work_img, level)
enhanced = prepare_preview_image(enhanced)
processed_b64 = image_to_base64(enhanced.convert('RGB'), "JPEG", 95)
return JSONResponse({
"original": original_b64,
"processed": processed_b64
})
except Exception as e:
logger.exception("Enhance preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/image-watermark")
async def api_preview_image_watermark(
file: UploadFile = File(...),
text: str = Form(""),
position: str = Form("center"),
opacity: int = Form(50),
font_size: int = Form(36),
color: str = Form("#000000"),
):
"""Preview image watermark - returns original and watermarked image in HD quality"""
from PIL import ImageDraw, ImageFont
data = await file.read()
if not text.strip():
raise HTTPException(status_code=400, detail="Watermark text required")
try:
img = Image.open(io.BytesIO(data))
img = prepare_preview_image(img)
original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95)
if img.mode != 'RGBA':
img = img.convert('RGBA')
watermark = Image.new('RGBA', img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
font = ImageFont.load_default()
color_hex = color.lstrip('#')
r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4))
alpha = int(255 * opacity / 100)
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
positions = {
"top-left": (20, 20),
"top-center": ((img.width - text_width) // 2, 20),
"top-right": (img.width - text_width - 20, 20),
"center": ((img.width - text_width) // 2, (img.height - text_height) // 2),
"bottom-left": (20, img.height - text_height - 20),
"bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20),
"bottom-right": (img.width - text_width - 20, img.height - text_height - 20),
}
pos = positions.get(position, positions["center"])
draw.text(pos, text, font=font, fill=(r, g, b, alpha))
result = Image.alpha_composite(img, watermark)
processed_b64 = image_to_base64(result.convert('RGB'), "JPEG", 95)
return JSONResponse({
"original": original_b64,
"processed": processed_b64
})
except Exception as e:
logger.exception("Watermark preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/resize-image")
async def api_preview_resize_image(
file: UploadFile = File(...),
preset: str = Form("custom"),
width: int = Form(0),
height: int = Form(0),
maintain_aspect: bool = Form(True),
):
"""Preview image resize - returns original and resized image in HD quality"""
data = await file.read()
presets = {
"thumbnail": (150, 150),
"small": (320, 240),
"medium": (640, 480),
"hd": (1280, 720),
"fullhd": (1920, 1080),
"4k": (3840, 2160),
"square-sm": (500, 500),
"square-lg": (1000, 1000),
}
try:
img = Image.open(io.BytesIO(data))
original_width, original_height = img.size
# Original preview (HD quality)
original_preview = prepare_preview_image(img.copy())
original_b64 = image_to_base64(original_preview.convert('RGB'), "JPEG", 95)
# Determine target size
if preset != "custom" and preset in presets:
target_width, target_height = presets[preset]
else:
target_width = width if width > 0 else original_width
target_height = height if height > 0 else original_height
if maintain_aspect:
ratio = min(target_width / original_width, target_height / original_height)
target_width = int(original_width * ratio)
target_height = int(original_height * ratio)
# Resized preview (HD quality)
resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS)
resized_preview = prepare_preview_image(resized)
processed_b64 = image_to_base64(resized_preview.convert('RGB'), "JPEG", 95)
return JSONResponse({
"original": original_b64,
"processed": processed_b64,
"original_size": f"{original_width}x{original_height}",
"new_size": f"{target_width}x{target_height}"
})
except Exception as e:
logger.exception("Resize preview failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/preview/compress-image")
async def api_preview_compress_image(
file: UploadFile = File(...),
quality: int = Form(70),
):
"""Preview image compression - returns original and compressed image in HD quality"""
data = await file.read()
original_size = len(data)
try:
img = Image.open(io.BytesIO(data))
img = prepare_preview_image(img)
original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 98)
# Compress with user's quality setting
if img.mode != 'RGB':
img = img.convert('RGB')
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=quality, optimize=True)
compressed_size = len(buffer.getvalue())
compressed_img = Image.open(io.BytesIO(buffer.getvalue()))
processed_b64 = image_to_base64(compressed_img, "JPEG", 98)
return JSONResponse({
"original": original_b64,
"processed": processed_b64,
"original_size": original_size,
"compressed_size": compressed_size
})
except Exception as e:
logger.exception("Compress preview failed")
raise HTTPException(status_code=500, detail=str(e))
# =============== Run Server ===============
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"server:app",
host="127.0.0.1",
port=8000,
reload=settings.debug
)