Spaces:
Running
Running
| """Utilities for parsing PDF, EPUB, MOBI files and extracting/generating covers. | |
| Content is returned as a list of segments. Each segment is either: | |
| {"type": "text", "content": "..."} or | |
| {"type": "image", "src": "/book_images/abc123_0.png"} | |
| This allows the frontend to render images inline between text blocks. | |
| """ | |
| import os | |
| import base64 | |
| import fitz # PyMuPDF | |
| import ebooklib | |
| from ebooklib import epub | |
| from PIL import Image, ImageDraw, ImageFont | |
| from bs4 import BeautifulSoup | |
| import mobi | |
| import shutil | |
| import settings | |
| IMG_COUNTER = 0 | |
| def ensure_dirs(): | |
| os.makedirs(settings.UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(settings.COVERS_DIR, exist_ok=True) | |
| os.makedirs(settings.IMAGES_DIR, exist_ok=True) | |
| def _save_image_bytes(img_bytes: bytes, book_id: str, ext: str = "png") -> str: | |
| """Save image bytes to disk and return the URL path.""" | |
| global IMG_COUNTER | |
| IMG_COUNTER += 1 | |
| fname = f"{book_id}_{IMG_COUNTER}.{ext}" | |
| fpath = os.path.join(settings.IMAGES_DIR, fname) | |
| with open(fpath, "wb") as f: | |
| f.write(img_bytes) | |
| return f"/book_images/{fname}" | |
| def _clean_pdf_text(raw: str) -> str: | |
| """Clean PDF text: join mid-sentence line breaks, keep paragraph breaks.""" | |
| import re | |
| lines = raw.split("\n") | |
| result = [] | |
| for line in lines: | |
| line = line.rstrip() | |
| if not line: | |
| # Empty line = paragraph break | |
| if result and result[-1] != "\n": | |
| result.append("\n") | |
| continue | |
| if result and result[-1] != "\n": | |
| prev = result[-1] | |
| # If previous line ends with sentence-ending punctuation, start new paragraph | |
| if prev and prev[-1] in '.!?:;»""\u201d\u2019': | |
| result.append("\n") | |
| result.append(line) | |
| elif prev and prev[-1] == '-': | |
| # Hyphenated word across lines: join without space, remove hyphen | |
| result[-1] = prev[:-1] + line | |
| else: | |
| # Mid-sentence break: join with space | |
| result[-1] = prev + " " + line | |
| else: | |
| result.append(line) | |
| # Collapse multiple newlines | |
| text = "\n".join(result) | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| def extract_pdf_content(filepath: str, book_id: str) -> list[dict]: | |
| """Extract text and images from PDF, returning ordered segments.""" | |
| doc = fitz.open(filepath) | |
| segments = [] | |
| for page in doc: | |
| blocks = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)["blocks"] | |
| # Collect raw text blocks and images sorted by position | |
| page_text_parts = [] | |
| for block in sorted(blocks, key=lambda b: (b["bbox"][1], b["bbox"][0])): | |
| if block["type"] == 0: # text block | |
| text = "" | |
| for line in block.get("lines", []): | |
| for span in line.get("spans", []): | |
| text += span.get("text", "") | |
| text += "\n" | |
| text = text.strip() | |
| if text: | |
| page_text_parts.append(text) | |
| elif block["type"] == 1: # image block | |
| # Flush accumulated text before the image | |
| if page_text_parts: | |
| merged = _clean_pdf_text("\n".join(page_text_parts)) | |
| if merged: | |
| # Split into paragraphs | |
| for para in merged.split("\n"): | |
| para = para.strip() | |
| if para: | |
| segments.append({"type": "text", "content": para}) | |
| page_text_parts = [] | |
| try: | |
| img_bytes = block.get("image") | |
| if img_bytes and len(img_bytes) > 500: | |
| ext = block.get("ext", "png") or "png" | |
| src = _save_image_bytes(img_bytes, book_id, ext) | |
| segments.append({"type": "image", "src": src}) | |
| except Exception: | |
| pass | |
| # Flush remaining text from this page | |
| if page_text_parts: | |
| merged = _clean_pdf_text("\n".join(page_text_parts)) | |
| if merged: | |
| for para in merged.split("\n"): | |
| para = para.strip() | |
| if para: | |
| segments.append({"type": "text", "content": para}) | |
| doc.close() | |
| return segments | |
| def extract_pdf_cover(filepath: str, book_id: str) -> str | None: | |
| doc = fitz.open(filepath) | |
| if len(doc) == 0: | |
| doc.close() | |
| return None | |
| page = doc[0] | |
| images = page.get_images(full=True) | |
| if images: | |
| xref = images[0][0] | |
| base_image = doc.extract_image(xref) | |
| img_bytes = base_image["image"] | |
| ext = base_image["ext"] | |
| cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") | |
| with open(cover_path, "wb") as f: | |
| f.write(img_bytes) | |
| doc.close() | |
| return cover_path | |
| doc.close() | |
| return None | |
| def extract_epub_content(filepath: str, book_id: str) -> list[dict]: | |
| """Extract text and images from EPUB, returning ordered segments.""" | |
| book = epub.read_epub(filepath, options={'ignore_ncx': True}) | |
| # Build a map of image items by their file name for lookup | |
| image_map = {} | |
| for item in book.get_items_of_type(ebooklib.ITEM_IMAGE): | |
| image_map[item.get_name()] = item | |
| # Also map by basename for relative references | |
| image_map[os.path.basename(item.get_name())] = item | |
| segments = [] | |
| for item in book.get_items_of_type(ebooklib.ITEM_DOCUMENT): | |
| soup = BeautifulSoup(item.get_content(), "html.parser") | |
| for elem in soup.descendants: | |
| if elem.name == 'img' or elem.name == 'image': | |
| src_attr = elem.get('src') or elem.get('xlink:href') or elem.get('href', '') | |
| # Resolve the image | |
| img_name = src_attr.split('/')[-1] if src_attr else '' | |
| img_item = image_map.get(src_attr) or image_map.get(img_name) | |
| if img_item: | |
| try: | |
| img_bytes = img_item.get_content() | |
| if len(img_bytes) > 500: | |
| ext = img_name.rsplit('.', 1)[-1] if '.' in img_name else 'png' | |
| s = _save_image_bytes(img_bytes, book_id, ext) | |
| segments.append({"type": "image", "src": s}) | |
| except Exception: | |
| pass | |
| elif elem.name in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote'): | |
| text = elem.get_text(separator=" ", strip=True) | |
| if text: | |
| seg = {"type": "text", "content": text} | |
| if elem.name in ('h1', 'h2', 'h3'): | |
| seg["heading"] = True | |
| segments.append(seg) | |
| # Deduplicate consecutive identical text segments | |
| deduped = [] | |
| for seg in segments: | |
| if deduped and seg["type"] == "text" and deduped[-1]["type"] == "text" and deduped[-1]["content"] == seg["content"]: | |
| continue | |
| deduped.append(seg) | |
| return deduped | |
| def extract_epub_cover(filepath: str, book_id: str) -> str | None: | |
| book = epub.read_epub(filepath, options={'ignore_ncx': True}) | |
| for item in book.get_items(): | |
| if 'cover' in (item.get_name() or '').lower() and item.get_type() in ( | |
| ebooklib.ITEM_IMAGE, ebooklib.ITEM_COVER | |
| ): | |
| ext = item.get_name().rsplit('.', 1)[-1] if '.' in item.get_name() else 'jpg' | |
| cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") | |
| with open(cover_path, "wb") as f: | |
| f.write(item.get_content()) | |
| return cover_path | |
| for item in book.get_items_of_type(ebooklib.ITEM_IMAGE): | |
| ext = item.get_name().rsplit('.', 1)[-1] if '.' in item.get_name() else 'jpg' | |
| cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.{ext}") | |
| with open(cover_path, "wb") as f: | |
| f.write(item.get_content()) | |
| return cover_path | |
| return None | |
| def extract_mobi_content(filepath: str, book_id: str) -> list[dict]: | |
| """Extract text from MOBI as segments.""" | |
| tempdir, extracted_path = mobi.extract(filepath) | |
| try: | |
| if extracted_path and os.path.isfile(extracted_path): | |
| with open(extracted_path, "r", encoding="utf-8", errors="ignore") as f: | |
| content = f.read() | |
| soup = BeautifulSoup(content, "html.parser") | |
| segments = [] | |
| for elem in soup.find_all(['p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): | |
| text = elem.get_text(separator=" ", strip=True) | |
| if text: | |
| segments.append({"type": "text", "content": text}) | |
| return segments if segments else [{"type": "text", "content": "[Empty book]"}] | |
| return [{"type": "text", "content": "[Could not extract MOBI content]"}] | |
| finally: | |
| if tempdir and os.path.isdir(tempdir): | |
| shutil.rmtree(tempdir, ignore_errors=True) | |
| def extract_mobi_cover(filepath: str, book_id: str) -> str | None: | |
| return None | |
| def generate_cover(title: str, book_id: str) -> str: | |
| img = Image.new("RGB", (400, 600), color=(45, 55, 72)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 28) | |
| except (OSError, IOError): | |
| font = ImageFont.load_default() | |
| words = title.split() | |
| lines, current = [], "" | |
| for w in words: | |
| test = f"{current} {w}".strip() | |
| bbox = draw.textbbox((0, 0), test, font=font) | |
| if bbox[2] - bbox[0] > 360: | |
| if current: lines.append(current) | |
| current = w | |
| else: | |
| current = test | |
| if current: lines.append(current) | |
| y = 200 | |
| for line in lines: | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| w = bbox[2] - bbox[0] | |
| draw.text(((400 - w) / 2, y), line, fill="white", font=font) | |
| y += 40 | |
| cover_path = os.path.join(settings.COVERS_DIR, f"{book_id}.png") | |
| img.save(cover_path) | |
| return cover_path | |
| def parse_book(filepath: str, book_id: str, title: str) -> dict: | |
| """Parse a book file and return content segments + cover path + chapters.""" | |
| import re | |
| ext = os.path.splitext(filepath)[1].lower() | |
| segments = [] | |
| cover = None | |
| if ext == ".pdf": | |
| segments = extract_pdf_content(filepath, book_id) | |
| cover = extract_pdf_cover(filepath, book_id) | |
| elif ext == ".epub": | |
| segments = extract_epub_content(filepath, book_id) | |
| cover = extract_epub_cover(filepath, book_id) | |
| elif ext == ".mobi": | |
| segments = extract_mobi_content(filepath, book_id) | |
| cover = extract_mobi_cover(filepath, book_id) | |
| else: | |
| raise ValueError(f"Unsupported format: {ext}") | |
| if not cover: | |
| cover = generate_cover(title, book_id) | |
| # Build chapter index from headings or chapter-like patterns | |
| chapters = [] | |
| chapter_pattern = re.compile( | |
| r'^(chapter|chapitre|capitolo|capítulo|kapitel|hoofdstuk|kapittel|rozdział)\s+\w+', | |
| re.IGNORECASE | |
| ) | |
| for i, seg in enumerate(segments): | |
| if seg.get("heading"): | |
| chapters.append({"title": seg["content"][:80], "segment": i}) | |
| elif seg["type"] == "text" and chapter_pattern.match(seg["content"].strip()): | |
| chapters.append({"title": seg["content"].strip()[:80], "segment": i}) | |
| # Flat page text for DB | |
| current_text = [seg["content"] for seg in segments if seg["type"] == "text"] | |
| pages = ["\n\n".join(current_text)] if current_text else ["[Empty]"] | |
| return {"pages": pages, "segments": segments, "cover": cover, "chapters": chapters} | |