import streamlit as st import tempfile import os import json import zipfile import io import time import traceback import html as html_lib from pathlib import Path from typing import Optional, List, Dict, Any # ── Page config ────────────────────────────────────────────────────────────── st.set_page_config( page_title="Docling AIO Converter", page_icon="📄", layout="wide", initial_sidebar_state="expanded", ) # ── CSS ─────────────────────────────────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ── Helpers ─────────────────────────────────────────────────────────────────── SUPPORTED_EXTENSIONS = { "pdf": "📕", "docx": "📘", "doc": "📘", "pptx": "📙", "ppt": "📙", "xlsx": "📗", "xls": "📗", "csv": "📊", "html": "🌐", "htm": "🌐", "md": "📝", "txt": "📄", "png": "🖼️", "jpg": "🖼️", "jpeg": "🖼️", "tiff": "🖼️", "tif": "🖼️", "bmp": "🖼️", "webp": "🖼️", "asciidoc": "📃", "adoc": "📃", "xml": "📑", "json": "📋", } OUTPUT_FORMATS = { "Markdown (.md)": "md", "HTML (.html)": "html", "JSON (.json)": "json", "Plain Text (.txt)": "txt", "DocTags (.doctags)":"doctags", } ELEMENT_LABELS = { "Paragraphs / Text": "paragraph", "Section Headers": "section_header", "Titles": "title", "Tables": "table", "Figures / Pictures": "picture", "Captions": "caption", "Footnotes": "footnote", "Formulas / Equations":"formula", "List Items": "list_item", "Code Blocks": "code", "Page Headers": "page_header", "Page Footers": "page_footer", "Key-Value Regions": "key_value_region", "Form Elements": "form", "Document Index": "document_index", } def file_icon(filename: str) -> str: ext = Path(filename).suffix.lstrip(".").lower() return SUPPORTED_EXTENSIONS.get(ext, "📄") def fmt_bytes(n: int) -> str: for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} TB" def fmt_time(s: float) -> str: return f"{s:.1f}s" if s < 60 else f"{int(s//60)}m {int(s%60)}s" # ── Lazy-load Docling (heavy) ───────────────────────────────────────────────── @st.cache_resource(show_spinner=False) def _load_docling(): """Import docling once and cache.""" from docling.document_converter import DocumentConverter, PdfFormatOption, WordFormatOption from docling.datamodel.pipeline_options import ( PdfPipelineOptions, TableStructureOptions, EasyOcrOptions, TesseractCliOcrOptions, ) from docling.datamodel.base_models import InputFormat, ConversionStatus try: from docling.datamodel.pipeline_options import TableFormerMode except ImportError: TableFormerMode = None try: from docling_core.types.doc import ImageRefMode, DocItemLabel except ImportError: from docling.datamodel.base_models import ImageRefMode, DocItemLabel # type: ignore return { "DocumentConverter": DocumentConverter, "PdfFormatOption": PdfFormatOption, "WordFormatOption": WordFormatOption, "PdfPipelineOptions": PdfPipelineOptions, "TableStructureOptions": TableStructureOptions, "EasyOcrOptions": EasyOcrOptions, "TesseractCliOcrOptions": TesseractCliOcrOptions, "InputFormat": InputFormat, "ConversionStatus": ConversionStatus, "TableFormerMode": TableFormerMode, "ImageRefMode": ImageRefMode, "DocItemLabel": DocItemLabel, } # ── Sidebar Config ──────────────────────────────────────────────────────────── def sidebar() -> Dict[str, Any]: cfg: Dict[str, Any] = {} with st.sidebar: st.markdown("## ⚙️ Configuration") # ── OCR ────────────────────────────────────────────────────────────── st.markdown('
🔍 OCR Settings
', unsafe_allow_html=True) cfg["do_ocr"] = st.checkbox("Enable OCR", value=True, help="Optical Character Recognition for scanned/image-based content.") cfg["force_full_page_ocr"] = st.checkbox("Force full-page OCR", value=False, help="Run OCR on every page even if text layer exists.") cfg["ocr_engine"] = st.radio("OCR Engine", ["EasyOCR", "Tesseract"], horizontal=True, help="EasyOCR is pure-Python; Tesseract requires system install.") cfg["ocr_languages"] = st.multiselect( "OCR Languages", ["en", "de", "fr", "es", "it", "pt", "nl", "ru", "zh", "ja", "ko", "ar", "hi", "pl", "cs", "ro", "sv", "da", "fi", "no", "hu", "tr"], default=["en"], help="Languages for OCR. EasyOCR supports all; Tesseract needs packs installed.") # ── Table Extraction ────────────────────────────────────────────────── st.markdown('
📊 Table Extraction
', unsafe_allow_html=True) cfg["do_table_structure"] = st.checkbox("Extract table structure", value=True, help="Use TableFormer model to detect rows/columns/cells in tables.") cfg["table_mode"] = st.radio("TableFormer mode", ["Accurate (slower)", "Fast (lighter)"], index=0, horizontal=True, help="Accurate uses the full model; Fast is a smaller/faster variant.") cfg["do_cell_matching"] = st.checkbox("Cell text matching", value=True, help="Match detected cells back to underlying PDF text for accuracy.") # ── Image Handling ──────────────────────────────────────────────────── st.markdown('
🖼️ Image & Page Rendering
', unsafe_allow_html=True) cfg["generate_page_images"] = st.checkbox("Generate page images", value=False, help="Rasterise each page as an image (needed for embedded page images in output).") cfg["generate_picture_images"] = st.checkbox("Generate picture crops", value=True, help="Extract figure/picture regions as cropped images.") cfg["images_scale"] = st.slider("Rendering scale (DPI multiplier)", 1.0, 4.0, 2.0, 0.5, help="Higher = better quality but slower & more memory.") cfg["generate_table_images"] = st.checkbox("Generate table images", value=False, help="Also rasterise table regions as images.") # ── Content Elements ────────────────────────────────────────────────── st.markdown('
📋 Content Elements to Include
', unsafe_allow_html=True) st.caption("Uncheck elements you want to exclude from the output.") selected_labels = [] for label_name, label_val in ELEMENT_LABELS.items(): default = True # default off for things rarely needed if label_val in ("page_header", "page_footer", "document_index", "key_value_region", "form"): default = False if st.checkbox(label_name, value=default, key=f"lbl_{label_val}"): selected_labels.append(label_val) cfg["selected_labels"] = selected_labels # ── Output Format ───────────────────────────────────────────────────── st.markdown('
📤 Output Format
', unsafe_allow_html=True) cfg["output_format"] = st.selectbox("Convert to", list(OUTPUT_FORMATS.keys())) # ── Format-specific options ─────────────────────────────────────────── fmt = OUTPUT_FORMATS[cfg["output_format"]] if fmt in ("md", "html"): cfg["image_mode"] = st.selectbox( "Image handling in output", ["Placeholder comment", "Embedded (base64)", "Referenced path", "Omit images"], help="How images appear in Markdown / HTML output.") if fmt == "md": cfg["strict_text"] = st.checkbox("Strict text mode", value=False, help="Disable Markdown enrichment; output pure text lines.") cfg["indent"] = st.slider("List indent (spaces)", 2, 8, 4, 2) # ── PDF-specific ────────────────────────────────────────────────────── st.markdown('
📕 PDF-Specific Options
', unsafe_allow_html=True) cfg["abort_on_error"] = st.checkbox("Abort batch on first error", value=False) cfg["max_file_mb"] = st.slider("Max file size (MB)", 5, 200, 50, help="Files larger than this will be skipped with a warning.") st.markdown("---") st.caption("Powered by [Docling](https://github.com/DS4SD/docling) · IBM Research") return cfg # ── Converter logic ─────────────────────────────────────────────────────────── def build_converter(cfg: Dict[str, Any], dl) -> Any: """Construct a DocumentConverter from sidebar config.""" PdfPipelineOptions = dl["PdfPipelineOptions"] TableStructureOptions = dl["TableStructureOptions"] EasyOcrOptions = dl["EasyOcrOptions"] TesseractCliOcrOptions = dl["TesseractCliOcrOptions"] PdfFormatOption = dl["PdfFormatOption"] DocumentConverter = dl["DocumentConverter"] TableFormerMode = dl["TableFormerMode"] InputFormat = dl["InputFormat"] # OCR backend ocr_options = None if cfg["do_ocr"]: if cfg["ocr_engine"] == "EasyOCR": ocr_options = EasyOcrOptions(lang=cfg["ocr_languages"]) else: ocr_options = TesseractCliOcrOptions(lang="+".join(cfg["ocr_languages"])) # Table structure tbl_kwargs = {"do_cell_matching": cfg["do_cell_matching"]} if TableFormerMode is not None: tbl_kwargs["mode"] = (TableFormerMode.ACCURATE if "Accurate" in cfg["table_mode"] else TableFormerMode.FAST) tbl_opts = TableStructureOptions(**tbl_kwargs) # PDF pipeline pdf_opts_kwargs = dict( do_ocr=cfg["do_ocr"], do_table_structure=cfg["do_table_structure"], table_structure_options=tbl_opts, generate_page_images=cfg["generate_page_images"], generate_picture_images=cfg["generate_picture_images"], images_scale=cfg["images_scale"], ) if cfg["do_ocr"] and ocr_options is not None: pdf_opts_kwargs["ocr_options"] = ocr_options if cfg["force_full_page_ocr"] and cfg["do_ocr"]: pdf_opts_kwargs["force_full_page_ocr"] = True if hasattr(PdfPipelineOptions, "generate_table_images"): pdf_opts_kwargs["generate_table_images"] = cfg.get("generate_table_images", False) pdf_pipeline_opts = PdfPipelineOptions(**pdf_opts_kwargs) format_options = { InputFormat.PDF: PdfFormatOption(pipeline_options=pdf_pipeline_opts), } converter = DocumentConverter(format_options=format_options) return converter def image_ref_mode(cfg, dl): ImageRefMode = dl["ImageRefMode"] choice = cfg.get("image_mode", "Placeholder comment") mapping = { "Placeholder comment": ImageRefMode.PLACEHOLDER, "Embedded (base64)": ImageRefMode.EMBEDDED, "Referenced path": ImageRefMode.REFERENCED, "Omit images": ImageRefMode.PLACEHOLDER, # handled via labels } return mapping.get(choice, ImageRefMode.PLACEHOLDER) def resolve_labels(cfg, dl): DocItemLabel = dl["DocItemLabel"] # build label objects from selected string values labels = [] label_map = {v: v for v in ELEMENT_LABELS.values()} for lv in cfg["selected_labels"]: try: labels.append(DocItemLabel(lv)) except Exception: pass return labels if labels else None def do_export(doc, cfg, dl) -> str: """Export converted document to the chosen format.""" fmt = OUTPUT_FORMATS[cfg["output_format"]] labels = resolve_labels(cfg, dl) try: if fmt == "md": kwargs = dict( image_mode=image_ref_mode(cfg, dl), strict_text=cfg.get("strict_text", False), indent=cfg.get("indent", 4), ) if labels is not None: kwargs["labels"] = labels return doc.export_to_markdown(**kwargs) elif fmt == "html": kwargs = dict(image_mode=image_ref_mode(cfg, dl)) if labels is not None: kwargs["labels"] = labels return doc.export_to_html(**kwargs) elif fmt == "json": d = doc.export_to_dict() return json.dumps(d, indent=2, ensure_ascii=False) elif fmt == "txt": kwargs = {} if labels is not None: kwargs["labels"] = labels return doc.export_to_text(**kwargs) elif fmt == "doctags": try: return doc.export_to_document_tokens() except AttributeError: return doc.export_to_markdown() except TypeError: # Fallback: export without unsupported kwargs if fmt == "md": return doc.export_to_markdown() elif fmt == "html": return doc.export_to_html() elif fmt == "json": return json.dumps(doc.export_to_dict(), indent=2, ensure_ascii=False) else: return doc.export_to_text() return "" def convert_file(path: str, cfg: Dict[str, Any], converter, dl) -> Dict[str, Any]: """Run docling on a single file. Returns result dict.""" ConversionStatus = dl["ConversionStatus"] t0 = time.time() try: result = converter.convert(path) elapsed = time.time() - t0 if result.status not in (ConversionStatus.SUCCESS, ConversionStatus.PARTIAL_SUCCESS): return {"ok": False, "error": f"Conversion failed: {result.status}", "elapsed": elapsed} doc = result.document exported = do_export(doc, cfg, dl) # metadata meta = {} try: meta["pages"] = len(result.document.pages) if hasattr(result.document, "pages") else "N/A" except Exception: meta["pages"] = "N/A" try: meta["tables"] = len([i for i in doc.iterate_items() if hasattr(i[1], 'label') and str(getattr(i[1], 'label', '')).endswith('table')]) except Exception: meta["tables"] = "N/A" try: meta["figures"] = len(doc.pictures) if hasattr(doc, "pictures") else "N/A" except Exception: meta["figures"] = "N/A" return { "ok": True, "content": exported, "elapsed": elapsed, "status": str(result.status), "meta": meta, } except Exception as e: return { "ok": False, "error": f"{type(e).__name__}: {e}", "traceback": traceback.format_exc(), "elapsed": time.time() - t0, } # ── Main ────────────────────────────────────────────────────────────────────── def main(): cfg = sidebar() # Hero st.markdown("""
⚡ Powered by Docling 🤗 HuggingFace Spaces

📄 Docling AIO Converter

Parse & convert any document — PDF, DOCX, PPTX, XLSX, images, HTML and more — with full control over OCR, tables, figures, and output formatting.

""", unsafe_allow_html=True) # ── Upload area ─────────────────────────────────────────────────────────── st.markdown("### 📁 Upload Documents") max_mb = cfg.get("max_file_mb", 50) uploaded = st.file_uploader( f"Drag & drop files here · Max {max_mb} MB per file", accept_multiple_files=True, type=list(SUPPORTED_EXTENSIONS.keys()), help="You can upload multiple files at once for batch conversion.", ) if not uploaded: st.info("👆 Upload one or more files to get started. " "Adjust all settings in the **sidebar** before converting.", icon="ℹ️") # Supported formats table with st.expander("📋 Supported Input Formats"): cols = st.columns(4) items = list(SUPPORTED_EXTENSIONS.items()) for i, (ext, icon) in enumerate(items): cols[i % 4].markdown(f"{icon} `.{ext}`") return # ── File list ───────────────────────────────────────────────────────────── oversized = [f for f in uploaded if f.size > max_mb * 1024 * 1024] valid = [f for f in uploaded if f.size <= max_mb * 1024 * 1024] st.markdown(f"**{len(uploaded)} file(s) selected** · " f"{len(valid)} ready" + (f" {len(oversized)} oversized" if oversized else ""), unsafe_allow_html=True) for f in valid[:8]: # show preview of first 8 st.markdown( f"
" f"{file_icon(f.name)}" f"{f.name}" f"{fmt_bytes(f.size)}" f"
", unsafe_allow_html=True, ) if len(valid) > 8: st.caption(f"…and {len(valid)-8} more files") for f in oversized: st.warning(f"⚠️ **{f.name}** ({fmt_bytes(f.size)}) exceeds the {max_mb} MB limit and will be skipped.") if not valid: return # ── Convert button ──────────────────────────────────────────────────────── col_btn, col_fmt, _ = st.columns([2, 2, 4]) with col_btn: run = st.button("🚀 Convert All", use_container_width=True) with col_fmt: st.markdown(f"
{cfg['output_format']}", unsafe_allow_html=True) if not run: return # ── Load Docling ────────────────────────────────────────────────────────── with st.spinner("Loading Docling models (first run downloads ~1 GB of models)…"): try: dl = _load_docling() except Exception as e: st.error(f"Failed to import Docling: {e}\n\n" "Make sure `docling` is installed (`pip install docling`).") return with st.spinner("Building converter pipeline…"): try: converter = build_converter(cfg, dl) except Exception as e: st.error(f"Could not build converter: {e}\n```\n{traceback.format_exc()}\n```") return # ── Process files ───────────────────────────────────────────────────────── st.markdown("---") st.markdown("### ⚙️ Processing") results: Dict[str, Dict] = {} overall_bar = st.progress(0) status_area = st.empty() with tempfile.TemporaryDirectory() as tmpdir: for idx, uf in enumerate(valid): fname = uf.name status_area.markdown( f"
" f"{file_icon(fname)}" f"{fname}" f"⏳ converting…" f"
", unsafe_allow_html=True, ) # Fix 4: each file gets its own subdirectory so two uploaded files # with the same basename (e.g. "report.pdf" from different folders) # never silently overwrite each other in the shared tmpdir. file_subdir = os.path.join(tmpdir, str(idx)) os.makedirs(file_subdir, exist_ok=True) tmp_path = os.path.join(file_subdir, fname) # Fix 3: always seek(0) before reading — on Streamlit re-renders # the BytesIO cursor is already at EOF and uf.read() returns b"", # writing a zero-byte file that Docling then silently fails on. uf.seek(0) with open(tmp_path, "wb") as fh: fh.write(uf.read()) result = convert_file(tmp_path, cfg, converter, dl) results[fname] = result overall_bar.progress((idx + 1) / len(valid)) if not result["ok"] and cfg.get("abort_on_error"): st.error(f"❌ Aborted after error on **{fname}**:\n```\n{result['error']}\n```") break status_area.empty() overall_bar.empty() # ── Summary metrics ─────────────────────────────────────────────────────── ok_count = sum(1 for r in results.values() if r["ok"]) err_count = len(results) - ok_count total_time = sum(r["elapsed"] for r in results.values()) st.markdown( f"
" f"
{len(results)}
Files processed
" f"
{ok_count}
Succeeded
" f"
{err_count}
Failed
" f"
{fmt_time(total_time)}
Total time
" f"
", unsafe_allow_html=True, ) # ── Per-file results ────────────────────────────────────────────────────── st.markdown("### 📂 Results") ext = OUTPUT_FORMATS[cfg["output_format"]] # Build ZIP in memory zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w", zipfile.ZIP_DEFLATED) as zf: for fname, res in results.items(): if res["ok"]: out_name = Path(fname).stem + f".{ext}" zf.writestr(out_name, res["content"]) zip_buf.seek(0) dl_col1, dl_col2 = st.columns([2, 4]) with dl_col1: st.download_button( "⬇️ Download All as ZIP", data=zip_buf, file_name="docling_output.zip", mime="application/zip", use_container_width=True, ) st.markdown("---") for fname, res in results.items(): icon = file_icon(fname) with st.expander( f"{icon} **{fname}** " + ("✅" if res["ok"] else "❌") + f" · {fmt_time(res['elapsed'])}", expanded=ok_count == 1, ): if not res["ok"]: st.error(f"**Error:** {res['error']}") if "traceback" in res: with st.expander("📋 Full traceback"): st.code(res["traceback"], language="python") else: # Metadata strip meta = res.get("meta", {}) m_cols = st.columns(4) m_cols[0].metric("Pages", meta.get("pages", "—")) m_cols[1].metric("Tables", meta.get("tables", "—")) m_cols[2].metric("Figures", meta.get("figures", "—")) m_cols[3].metric("Time", fmt_time(res["elapsed"])) content = res["content"] out_name = Path(fname).stem + f".{ext}" # Download individual file mime_map = { "md": "text/markdown", "html": "text/html", "json": "application/json", "txt": "text/plain", "doctags": "text/plain", } st.download_button( f"⬇️ Download {out_name}", data=content.encode("utf-8"), file_name=out_name, mime=mime_map.get(ext, "text/plain"), key=f"dl_{fname}", ) # Preview char_count = len(content) word_count = len(content.split()) st.caption(f"Output: **{char_count:,} chars** · **{word_count:,} words**") if ext == "md": tab1, tab2 = st.tabs(["📖 Rendered", "📝 Raw Markdown"]) with tab1: st.markdown(content[:15000] + ("\n\n*[truncated for preview…]*" if len(content) > 15000 else ""), unsafe_allow_html=True) with tab2: # Fix 5: escape before injecting into HTML — raw doc content # can contain <, >, & which would break the div or be executed. preview_raw = html_lib.escape(content[:12000]) suffix = "…[truncated]" if len(content) > 12000 else "" st.markdown(f"
{preview_raw}{suffix}
", unsafe_allow_html=True) elif ext == "html": tab1, tab2 = st.tabs(["🌐 Rendered", "📝 HTML Source"]) with tab1: st.components.v1.html(content, height=600, scrolling=True) with tab2: st.code(content[:10000], language="html") elif ext == "json": try: parsed = json.loads(content) st.json(parsed, expanded=False) except Exception: st.code(content[:10000], language="json") else: # Fix 5: escape before injecting into HTML — same reason as above. preview_plain = html_lib.escape(content[:12000]) suffix = "…[truncated]" if len(content) > 12000 else "" st.markdown(f"
{preview_plain}{suffix}
", unsafe_allow_html=True) if __name__ == "__main__": main()