"""Utilities for parsing PDF, EPUB, MOBI files and extracting/generating covers. Content is returned as a list of segments. Each segment is either: {"type": "text", "content": "..."} or {"type": "image", "src": "/book_images/abc123_0.png"} This allows the frontend to render images inline between text blocks. """ import os import base64 import fitz # PyMuPDF import ebooklib from ebooklib import epub from PIL import Image, ImageDraw, ImageFont from bs4 import BeautifulSoup import mobi import shutil import settings IMG_COUNTER = 0 def ensure_dirs(): os.makedirs(settings.UPLOAD_DIR, exist_ok=True) os.makedirs(settings.COVERS_DIR, exist_ok=True) os.makedirs(settings.IMAGES_DIR, exist_ok=True) def _save_image_bytes(img_bytes: bytes, book_id: str, ext: str = "png") -> str: """Save image bytes to disk and return the URL path.""" global IMG_COUNTER IMG_COUNTER += 1 fname = f"{book_id}_{IMG_COUNTER}.{ext}" fpath = os.path.join(settings.IMAGES_DIR, fname) with open(fpath, "wb") as f: f.write(img_bytes) return f"/book_images/{fname}" def _clean_pdf_text(raw: str) -> str: """Clean PDF text: join mid-sentence line breaks, keep paragraph breaks.""" import re lines = raw.split("\n") result = [] for line in lines: line = line.rstrip() if not line: # Empty line = paragraph break if result and result[-1] != "\n": result.append("\n") continue if result and result[-1] != "\n": prev = result[-1] # If previous line ends with sentence-ending punctuation, start new paragraph if prev and prev[-1] in '.!?:;»""\u201d\u2019': result.append("\n") result.append(line) elif prev and prev[-1] == '-': # Hyphenated word across lines: join without space, remove hyphen result[-1] = prev[:-1] + line else: # Mid-sentence break: join with space result[-1] = prev + " " + line else: result.append(line) # Collapse multiple newlines text = "\n".join(result) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def extract_pdf_content(filepath: str, book_id: str) -> list[dict]: """Extract text and images from PDF, returning ordered segments.""" doc = fitz.open(filepath) segments = [] for page in doc: blocks = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)["blocks"] # Collect raw text blocks and images sorted by position page_text_parts = [] for block in sorted(blocks, key=lambda b: (b["bbox"][1], b["bbox"][0])): if block["type"] == 0: # text block text = "" for line in block.get("lines", []): for span in line.get("spans", []): text += span.get("text", "") text += "\n" text = text.strip() if text: page_text_parts.append(text) elif block["type"] == 1: # image block # Flush accumulated text before the image if page_text_parts: merged = _clean_pdf_text("\n".join(page_text_parts)) if merged: # Split into paragraphs for para in merged.split("\n"): para = para.strip() if para: segments.append({"type": "text", "content": para}) page_text_parts = [] try: img_bytes = block.get("image") if img_bytes and len(img_bytes) > 500: ext = block.get("ext", "png") or "png" src = _save_image_bytes(img_bytes, book_id, ext) segments.append({"type": "image", "src": src}) except Exception: pass # Flush remaining text from this page if page_text_parts: merged = _clean_pdf_text("\n".join(page_text_parts)) if merged: for para in merged.split("\n"): para = para.strip() if para: segments.append({"type": "text", "content": para}) doc.close() return segments def extract_pdf_cover(filepath: str, book_id: str) -> str | None: doc = fitz.open(filepath) if len(doc) == 0: doc.close() return None page = doc[0] images = page.get_images(full=True) if images: xref = images[0][0] base_image = doc.extract_image(xref) img_bytes = base_image["image"] ext = base_image["ext"] cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") with open(cover_path, "wb") as f: f.write(img_bytes) doc.close() return cover_path doc.close() return None def extract_epub_content(filepath: str, book_id: str) -> list[dict]: """Extract text and images from EPUB, returning ordered segments.""" book = epub.read_epub(filepath, options={'ignore_ncx': True}) # Build a map of image items by their file name for lookup image_map = {} for item in book.get_items_of_type(ebooklib.ITEM_IMAGE): image_map[item.get_name()] = item # Also map by basename for relative references image_map[os.path.basename(item.get_name())] = item segments = [] for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): soup = BeautifulSoup(item.get_content(), "html.parser") for elem in soup.descendants: if elem.name == 'img' or elem.name == 'image': src_attr = elem.get('src') or elem.get('xlink:href') or elem.get('href', '') # Resolve the image img_name = src_attr.split('/')[-1] if src_attr else '' img_item = image_map.get(src_attr) or image_map.get(img_name) if img_item: try: img_bytes = img_item.get_content() if len(img_bytes) > 500: ext = img_name.rsplit('.', 1)[-1] if '.' in img_name else 'png' s = _save_image_bytes(img_bytes, book_id, ext) segments.append({"type": "image", "src": s}) except Exception: pass elif elem.name in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote'): text = elem.get_text(separator=" ", strip=True) if text: seg = {"type": "text", "content": text} if elem.name in ('h1', 'h2', 'h3'): seg["heading"] = True segments.append(seg) # Deduplicate consecutive identical text segments deduped = [] for seg in segments: if deduped and seg["type"] == "text" and deduped[-1]["type"] == "text" and deduped[-1]["content"] == seg["content"]: continue deduped.append(seg) return deduped def extract_epub_cover(filepath: str, book_id: str) -> str | None: book = epub.read_epub(filepath, options={'ignore_ncx': True}) for item in book.get_items(): if 'cover' in (item.get_name() or '').lower() and item.get_type() in ( ebooklib.ITEM_IMAGE, ebooklib.ITEM_COVER ): ext = item.get_name().rsplit('.', 1)[-1] if '.' in item.get_name() else 'jpg' cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") with open(cover_path, "wb") as f: f.write(item.get_content()) return cover_path for item in book.get_items_of_type(ebooklib.ITEM_IMAGE): ext = item.get_name().rsplit('.', 1)[-1] if '.' in item.get_name() else 'jpg' cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") with open(cover_path, "wb") as f: f.write(item.get_content()) return cover_path return None def extract_mobi_content(filepath: str, book_id: str) -> list[dict]: """Extract text from MOBI as segments.""" tempdir, extracted_path = mobi.extract(filepath) try: if extracted_path and os.path.isfile(extracted_path): with open(extracted_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() soup = BeautifulSoup(content, "html.parser") segments = [] for elem in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): text = elem.get_text(separator=" ", strip=True) if text: segments.append({"type": "text", "content": text}) return segments if segments else [{"type": "text", "content": "[Empty book]"}] return [{"type": "text", "content": "[Could not extract MOBI content]"}] finally: if tempdir and os.path.isdir(tempdir): shutil.rmtree(tempdir, ignore_errors=True) def extract_mobi_cover(filepath: str, book_id: str) -> str | None: return None def generate_cover(title: str, book_id: str) -> str: img = Image.new("RGB", (400, 600), color=(45, 55, 72)) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("arial.ttf", 28) except (OSError, IOError): font = ImageFont.load_default() words = title.split() lines, current = [], "" for w in words: test = f"{current} {w}".strip() bbox = draw.textbbox((0, 0), test, font=font) if bbox[2] - bbox[0] > 360: if current: lines.append(current) current = w else: current = test if current: lines.append(current) y = 200 for line in lines: bbox = draw.textbbox((0, 0), line, font=font) w = bbox[2] - bbox[0] draw.text(((400 - w) / 2, y), line, fill="white", font=font) y += 40 cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.png") img.save(cover_path) return cover_path def parse_book(filepath: str, book_id: str, title: str) -> dict: """Parse a book file and return content segments + cover path + chapters.""" import re ext = os.path.splitext(filepath)[1].lower() segments = [] cover = None if ext == ".pdf": segments = extract_pdf_content(filepath, book_id) cover = extract_pdf_cover(filepath, book_id) elif ext == ".epub": segments = extract_epub_content(filepath, book_id) cover = extract_epub_cover(filepath, book_id) elif ext == ".mobi": segments = extract_mobi_content(filepath, book_id) cover = extract_mobi_cover(filepath, book_id) else: raise ValueError(f"Unsupported format: {ext}") if not cover: cover = generate_cover(title, book_id) # Build chapter index from headings or chapter-like patterns chapters = [] chapter_pattern = re.compile( r'^(chapter|chapitre|capitolo|capítulo|kapitel|hoofdstuk|kapittel|rozdział)\s+\w+', re.IGNORECASE ) for i, seg in enumerate(segments): if seg.get("heading"): chapters.append({"title": seg["content"][:80], "segment": i}) elif seg["type"] == "text" and chapter_pattern.match(seg["content"].strip()): chapters.append({"title": seg["content"].strip()[:80], "segment": i}) # Flat page text for DB current_text = [seg["content"] for seg in segments if seg["type"] == "text"] pages = ["\n\n".join(current_text)] if current_text else ["[Empty]"] return {"pages": pages, "segments": segments, "cover": cover, "chapters": chapters}