import streamlit as st
import tempfile
import os
import json
import zipfile
import io
import time
import traceback
import html as html_lib
from pathlib import Path
from typing import Optional, List, Dict, Any
# ── Page config ──────────────────────────────────────────────────────────────
st.set_page_config(
page_title="Docling AIO Converter",
page_icon="📄",
layout="wide",
initial_sidebar_state="expanded",
)
# ── CSS ───────────────────────────────────────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ── Helpers ───────────────────────────────────────────────────────────────────
SUPPORTED_EXTENSIONS = {
"pdf": "📕", "docx": "📘", "doc": "📘", "pptx": "📙", "ppt": "📙",
"xlsx": "📗", "xls": "📗", "csv": "📊", "html": "🌐", "htm": "🌐",
"md": "📝", "txt": "📄", "png": "🖼️", "jpg": "🖼️", "jpeg": "🖼️",
"tiff": "🖼️", "tif": "🖼️", "bmp": "🖼️", "webp": "🖼️",
"asciidoc": "📃", "adoc": "📃", "xml": "📑", "json": "📋",
}
OUTPUT_FORMATS = {
"Markdown (.md)": "md",
"HTML (.html)": "html",
"JSON (.json)": "json",
"Plain Text (.txt)": "txt",
"DocTags (.doctags)":"doctags",
}
ELEMENT_LABELS = {
"Paragraphs / Text": "paragraph",
"Section Headers": "section_header",
"Titles": "title",
"Tables": "table",
"Figures / Pictures": "picture",
"Captions": "caption",
"Footnotes": "footnote",
"Formulas / Equations":"formula",
"List Items": "list_item",
"Code Blocks": "code",
"Page Headers": "page_header",
"Page Footers": "page_footer",
"Key-Value Regions": "key_value_region",
"Form Elements": "form",
"Document Index": "document_index",
}
def file_icon(filename: str) -> str:
ext = Path(filename).suffix.lstrip(".").lower()
return SUPPORTED_EXTENSIONS.get(ext, "📄")
def fmt_bytes(n: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} TB"
def fmt_time(s: float) -> str:
return f"{s:.1f}s" if s < 60 else f"{int(s//60)}m {int(s%60)}s"
# ── Lazy-load Docling (heavy) ─────────────────────────────────────────────────
@st.cache_resource(show_spinner=False)
def _load_docling():
"""Import docling once and cache."""
from docling.document_converter import DocumentConverter, PdfFormatOption, WordFormatOption
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
TableStructureOptions,
EasyOcrOptions,
TesseractCliOcrOptions,
)
from docling.datamodel.base_models import InputFormat, ConversionStatus
try:
from docling.datamodel.pipeline_options import TableFormerMode
except ImportError:
TableFormerMode = None
try:
from docling_core.types.doc import ImageRefMode, DocItemLabel
except ImportError:
from docling.datamodel.base_models import ImageRefMode, DocItemLabel # type: ignore
return {
"DocumentConverter": DocumentConverter,
"PdfFormatOption": PdfFormatOption,
"WordFormatOption": WordFormatOption,
"PdfPipelineOptions": PdfPipelineOptions,
"TableStructureOptions": TableStructureOptions,
"EasyOcrOptions": EasyOcrOptions,
"TesseractCliOcrOptions": TesseractCliOcrOptions,
"InputFormat": InputFormat,
"ConversionStatus": ConversionStatus,
"TableFormerMode": TableFormerMode,
"ImageRefMode": ImageRefMode,
"DocItemLabel": DocItemLabel,
}
# ── Sidebar Config ────────────────────────────────────────────────────────────
def sidebar() -> Dict[str, Any]:
cfg: Dict[str, Any] = {}
with st.sidebar:
st.markdown("## ⚙️ Configuration")
# ── OCR ──────────────────────────────────────────────────────────────
st.markdown('
', unsafe_allow_html=True)
cfg["do_ocr"] = st.checkbox("Enable OCR", value=True,
help="Optical Character Recognition for scanned/image-based content.")
cfg["force_full_page_ocr"] = st.checkbox("Force full-page OCR", value=False,
help="Run OCR on every page even if text layer exists.")
cfg["ocr_engine"] = st.radio("OCR Engine", ["EasyOCR", "Tesseract"],
horizontal=True,
help="EasyOCR is pure-Python; Tesseract requires system install.")
cfg["ocr_languages"] = st.multiselect(
"OCR Languages",
["en", "de", "fr", "es", "it", "pt", "nl", "ru", "zh", "ja", "ko",
"ar", "hi", "pl", "cs", "ro", "sv", "da", "fi", "no", "hu", "tr"],
default=["en"],
help="Languages for OCR. EasyOCR supports all; Tesseract needs packs installed.")
# ── Table Extraction ──────────────────────────────────────────────────
st.markdown('', unsafe_allow_html=True)
cfg["do_table_structure"] = st.checkbox("Extract table structure", value=True,
help="Use TableFormer model to detect rows/columns/cells in tables.")
cfg["table_mode"] = st.radio("TableFormer mode",
["Accurate (slower)", "Fast (lighter)"],
index=0, horizontal=True,
help="Accurate uses the full model; Fast is a smaller/faster variant.")
cfg["do_cell_matching"] = st.checkbox("Cell text matching", value=True,
help="Match detected cells back to underlying PDF text for accuracy.")
# ── Image Handling ────────────────────────────────────────────────────
st.markdown('', unsafe_allow_html=True)
cfg["generate_page_images"] = st.checkbox("Generate page images", value=False,
help="Rasterise each page as an image (needed for embedded page images in output).")
cfg["generate_picture_images"] = st.checkbox("Generate picture crops", value=True,
help="Extract figure/picture regions as cropped images.")
cfg["images_scale"] = st.slider("Rendering scale (DPI multiplier)", 1.0, 4.0, 2.0, 0.5,
help="Higher = better quality but slower & more memory.")
cfg["generate_table_images"] = st.checkbox("Generate table images", value=False,
help="Also rasterise table regions as images.")
# ── Content Elements ──────────────────────────────────────────────────
st.markdown('', unsafe_allow_html=True)
st.caption("Uncheck elements you want to exclude from the output.")
selected_labels = []
for label_name, label_val in ELEMENT_LABELS.items():
default = True
# default off for things rarely needed
if label_val in ("page_header", "page_footer", "document_index",
"key_value_region", "form"):
default = False
if st.checkbox(label_name, value=default, key=f"lbl_{label_val}"):
selected_labels.append(label_val)
cfg["selected_labels"] = selected_labels
# ── Output Format ─────────────────────────────────────────────────────
st.markdown('', unsafe_allow_html=True)
cfg["output_format"] = st.selectbox("Convert to", list(OUTPUT_FORMATS.keys()))
# ── Format-specific options ───────────────────────────────────────────
fmt = OUTPUT_FORMATS[cfg["output_format"]]
if fmt in ("md", "html"):
cfg["image_mode"] = st.selectbox(
"Image handling in output",
["Placeholder comment", "Embedded (base64)", "Referenced path", "Omit images"],
help="How images appear in Markdown / HTML output.")
if fmt == "md":
cfg["strict_text"] = st.checkbox("Strict text mode", value=False,
help="Disable Markdown enrichment; output pure text lines.")
cfg["indent"] = st.slider("List indent (spaces)", 2, 8, 4, 2)
# ── PDF-specific ──────────────────────────────────────────────────────
st.markdown('', unsafe_allow_html=True)
cfg["abort_on_error"] = st.checkbox("Abort batch on first error", value=False)
cfg["max_file_mb"] = st.slider("Max file size (MB)", 5, 200, 50,
help="Files larger than this will be skipped with a warning.")
st.markdown("---")
st.caption("Powered by [Docling](https://github.com/DS4SD/docling) · IBM Research")
return cfg
# ── Converter logic ───────────────────────────────────────────────────────────
def build_converter(cfg: Dict[str, Any], dl) -> Any:
"""Construct a DocumentConverter from sidebar config."""
PdfPipelineOptions = dl["PdfPipelineOptions"]
TableStructureOptions = dl["TableStructureOptions"]
EasyOcrOptions = dl["EasyOcrOptions"]
TesseractCliOcrOptions = dl["TesseractCliOcrOptions"]
PdfFormatOption = dl["PdfFormatOption"]
DocumentConverter = dl["DocumentConverter"]
TableFormerMode = dl["TableFormerMode"]
InputFormat = dl["InputFormat"]
# OCR backend
ocr_options = None
if cfg["do_ocr"]:
if cfg["ocr_engine"] == "EasyOCR":
ocr_options = EasyOcrOptions(lang=cfg["ocr_languages"])
else:
ocr_options = TesseractCliOcrOptions(lang="+".join(cfg["ocr_languages"]))
# Table structure
tbl_kwargs = {"do_cell_matching": cfg["do_cell_matching"]}
if TableFormerMode is not None:
tbl_kwargs["mode"] = (TableFormerMode.ACCURATE
if "Accurate" in cfg["table_mode"]
else TableFormerMode.FAST)
tbl_opts = TableStructureOptions(**tbl_kwargs)
# PDF pipeline
pdf_opts_kwargs = dict(
do_ocr=cfg["do_ocr"],
do_table_structure=cfg["do_table_structure"],
table_structure_options=tbl_opts,
generate_page_images=cfg["generate_page_images"],
generate_picture_images=cfg["generate_picture_images"],
images_scale=cfg["images_scale"],
)
if cfg["do_ocr"] and ocr_options is not None:
pdf_opts_kwargs["ocr_options"] = ocr_options
if cfg["force_full_page_ocr"] and cfg["do_ocr"]:
pdf_opts_kwargs["force_full_page_ocr"] = True
if hasattr(PdfPipelineOptions, "generate_table_images"):
pdf_opts_kwargs["generate_table_images"] = cfg.get("generate_table_images", False)
pdf_pipeline_opts = PdfPipelineOptions(**pdf_opts_kwargs)
format_options = {
InputFormat.PDF: PdfFormatOption(pipeline_options=pdf_pipeline_opts),
}
converter = DocumentConverter(format_options=format_options)
return converter
def image_ref_mode(cfg, dl):
ImageRefMode = dl["ImageRefMode"]
choice = cfg.get("image_mode", "Placeholder comment")
mapping = {
"Placeholder comment": ImageRefMode.PLACEHOLDER,
"Embedded (base64)": ImageRefMode.EMBEDDED,
"Referenced path": ImageRefMode.REFERENCED,
"Omit images": ImageRefMode.PLACEHOLDER, # handled via labels
}
return mapping.get(choice, ImageRefMode.PLACEHOLDER)
def resolve_labels(cfg, dl):
DocItemLabel = dl["DocItemLabel"]
# build label objects from selected string values
labels = []
label_map = {v: v for v in ELEMENT_LABELS.values()}
for lv in cfg["selected_labels"]:
try:
labels.append(DocItemLabel(lv))
except Exception:
pass
return labels if labels else None
def do_export(doc, cfg, dl) -> str:
"""Export converted document to the chosen format."""
fmt = OUTPUT_FORMATS[cfg["output_format"]]
labels = resolve_labels(cfg, dl)
try:
if fmt == "md":
kwargs = dict(
image_mode=image_ref_mode(cfg, dl),
strict_text=cfg.get("strict_text", False),
indent=cfg.get("indent", 4),
)
if labels is not None:
kwargs["labels"] = labels
return doc.export_to_markdown(**kwargs)
elif fmt == "html":
kwargs = dict(image_mode=image_ref_mode(cfg, dl))
if labels is not None:
kwargs["labels"] = labels
return doc.export_to_html(**kwargs)
elif fmt == "json":
d = doc.export_to_dict()
return json.dumps(d, indent=2, ensure_ascii=False)
elif fmt == "txt":
kwargs = {}
if labels is not None:
kwargs["labels"] = labels
return doc.export_to_text(**kwargs)
elif fmt == "doctags":
try:
return doc.export_to_document_tokens()
except AttributeError:
return doc.export_to_markdown()
except TypeError:
# Fallback: export without unsupported kwargs
if fmt == "md":
return doc.export_to_markdown()
elif fmt == "html":
return doc.export_to_html()
elif fmt == "json":
return json.dumps(doc.export_to_dict(), indent=2, ensure_ascii=False)
else:
return doc.export_to_text()
return ""
def convert_file(path: str, cfg: Dict[str, Any], converter, dl) -> Dict[str, Any]:
"""Run docling on a single file. Returns result dict."""
ConversionStatus = dl["ConversionStatus"]
t0 = time.time()
try:
result = converter.convert(path)
elapsed = time.time() - t0
if result.status not in (ConversionStatus.SUCCESS, ConversionStatus.PARTIAL_SUCCESS):
return {"ok": False, "error": f"Conversion failed: {result.status}", "elapsed": elapsed}
doc = result.document
exported = do_export(doc, cfg, dl)
# metadata
meta = {}
try:
meta["pages"] = len(result.document.pages) if hasattr(result.document, "pages") else "N/A"
except Exception:
meta["pages"] = "N/A"
try:
meta["tables"] = len([i for i in doc.iterate_items()
if hasattr(i[1], 'label') and
str(getattr(i[1], 'label', '')).endswith('table')])
except Exception:
meta["tables"] = "N/A"
try:
meta["figures"] = len(doc.pictures) if hasattr(doc, "pictures") else "N/A"
except Exception:
meta["figures"] = "N/A"
return {
"ok": True,
"content": exported,
"elapsed": elapsed,
"status": str(result.status),
"meta": meta,
}
except Exception as e:
return {
"ok": False,
"error": f"{type(e).__name__}: {e}",
"traceback": traceback.format_exc(),
"elapsed": time.time() - t0,
}
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
cfg = sidebar()
# Hero
st.markdown("""
⚡ Powered by Docling
🤗 HuggingFace Spaces
📄 Docling AIO Converter
Parse & convert any document — PDF, DOCX, PPTX, XLSX, images, HTML and more —
with full control over OCR, tables, figures, and output formatting.
""", unsafe_allow_html=True)
# ── Upload area ───────────────────────────────────────────────────────────
st.markdown("### 📁 Upload Documents")
max_mb = cfg.get("max_file_mb", 50)
uploaded = st.file_uploader(
f"Drag & drop files here · Max {max_mb} MB per file",
accept_multiple_files=True,
type=list(SUPPORTED_EXTENSIONS.keys()),
help="You can upload multiple files at once for batch conversion.",
)
if not uploaded:
st.info("👆 Upload one or more files to get started. "
"Adjust all settings in the **sidebar** before converting.", icon="ℹ️")
# Supported formats table
with st.expander("📋 Supported Input Formats"):
cols = st.columns(4)
items = list(SUPPORTED_EXTENSIONS.items())
for i, (ext, icon) in enumerate(items):
cols[i % 4].markdown(f"{icon} `.{ext}`")
return
# ── File list ─────────────────────────────────────────────────────────────
oversized = [f for f in uploaded if f.size > max_mb * 1024 * 1024]
valid = [f for f in uploaded if f.size <= max_mb * 1024 * 1024]
st.markdown(f"**{len(uploaded)} file(s) selected** · "
f"{len(valid)} ready"
+ (f" {len(oversized)} oversized"
if oversized else ""),
unsafe_allow_html=True)
for f in valid[:8]: # show preview of first 8
st.markdown(
f""
f"{file_icon(f.name)}"
f"{f.name}"
f"{fmt_bytes(f.size)}"
f"
",
unsafe_allow_html=True,
)
if len(valid) > 8:
st.caption(f"…and {len(valid)-8} more files")
for f in oversized:
st.warning(f"⚠️ **{f.name}** ({fmt_bytes(f.size)}) exceeds the {max_mb} MB limit and will be skipped.")
if not valid:
return
# ── Convert button ────────────────────────────────────────────────────────
col_btn, col_fmt, _ = st.columns([2, 2, 4])
with col_btn:
run = st.button("🚀 Convert All", use_container_width=True)
with col_fmt:
st.markdown(f"
{cfg['output_format']}", unsafe_allow_html=True)
if not run:
return
# ── Load Docling ──────────────────────────────────────────────────────────
with st.spinner("Loading Docling models (first run downloads ~1 GB of models)…"):
try:
dl = _load_docling()
except Exception as e:
st.error(f"Failed to import Docling: {e}\n\n"
"Make sure `docling` is installed (`pip install docling`).")
return
with st.spinner("Building converter pipeline…"):
try:
converter = build_converter(cfg, dl)
except Exception as e:
st.error(f"Could not build converter: {e}\n```\n{traceback.format_exc()}\n```")
return
# ── Process files ─────────────────────────────────────────────────────────
st.markdown("---")
st.markdown("### ⚙️ Processing")
results: Dict[str, Dict] = {}
overall_bar = st.progress(0)
status_area = st.empty()
with tempfile.TemporaryDirectory() as tmpdir:
for idx, uf in enumerate(valid):
fname = uf.name
status_area.markdown(
f""
f"{file_icon(fname)}"
f"{fname}"
f"⏳ converting…"
f"
",
unsafe_allow_html=True,
)
# Fix 4: each file gets its own subdirectory so two uploaded files
# with the same basename (e.g. "report.pdf" from different folders)
# never silently overwrite each other in the shared tmpdir.
file_subdir = os.path.join(tmpdir, str(idx))
os.makedirs(file_subdir, exist_ok=True)
tmp_path = os.path.join(file_subdir, fname)
# Fix 3: always seek(0) before reading — on Streamlit re-renders
# the BytesIO cursor is already at EOF and uf.read() returns b"",
# writing a zero-byte file that Docling then silently fails on.
uf.seek(0)
with open(tmp_path, "wb") as fh:
fh.write(uf.read())
result = convert_file(tmp_path, cfg, converter, dl)
results[fname] = result
overall_bar.progress((idx + 1) / len(valid))
if not result["ok"] and cfg.get("abort_on_error"):
st.error(f"❌ Aborted after error on **{fname}**:\n```\n{result['error']}\n```")
break
status_area.empty()
overall_bar.empty()
# ── Summary metrics ───────────────────────────────────────────────────────
ok_count = sum(1 for r in results.values() if r["ok"])
err_count = len(results) - ok_count
total_time = sum(r["elapsed"] for r in results.values())
st.markdown(
f""
f"
{len(results)}
Files processed
"
f"
"
f"
"
f"
{fmt_time(total_time)}
Total time
"
f"
",
unsafe_allow_html=True,
)
# ── Per-file results ──────────────────────────────────────────────────────
st.markdown("### 📂 Results")
ext = OUTPUT_FORMATS[cfg["output_format"]]
# Build ZIP in memory
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", zipfile.ZIP_DEFLATED) as zf:
for fname, res in results.items():
if res["ok"]:
out_name = Path(fname).stem + f".{ext}"
zf.writestr(out_name, res["content"])
zip_buf.seek(0)
dl_col1, dl_col2 = st.columns([2, 4])
with dl_col1:
st.download_button(
"⬇️ Download All as ZIP",
data=zip_buf,
file_name="docling_output.zip",
mime="application/zip",
use_container_width=True,
)
st.markdown("---")
for fname, res in results.items():
icon = file_icon(fname)
with st.expander(
f"{icon} **{fname}** "
+ ("✅" if res["ok"] else "❌")
+ f" · {fmt_time(res['elapsed'])}",
expanded=ok_count == 1,
):
if not res["ok"]:
st.error(f"**Error:** {res['error']}")
if "traceback" in res:
with st.expander("📋 Full traceback"):
st.code(res["traceback"], language="python")
else:
# Metadata strip
meta = res.get("meta", {})
m_cols = st.columns(4)
m_cols[0].metric("Pages", meta.get("pages", "—"))
m_cols[1].metric("Tables", meta.get("tables", "—"))
m_cols[2].metric("Figures", meta.get("figures", "—"))
m_cols[3].metric("Time", fmt_time(res["elapsed"]))
content = res["content"]
out_name = Path(fname).stem + f".{ext}"
# Download individual file
mime_map = {
"md": "text/markdown",
"html": "text/html",
"json": "application/json",
"txt": "text/plain",
"doctags": "text/plain",
}
st.download_button(
f"⬇️ Download {out_name}",
data=content.encode("utf-8"),
file_name=out_name,
mime=mime_map.get(ext, "text/plain"),
key=f"dl_{fname}",
)
# Preview
char_count = len(content)
word_count = len(content.split())
st.caption(f"Output: **{char_count:,} chars** · **{word_count:,} words**")
if ext == "md":
tab1, tab2 = st.tabs(["📖 Rendered", "📝 Raw Markdown"])
with tab1:
st.markdown(content[:15000] + ("\n\n*[truncated for preview…]*"
if len(content) > 15000 else ""),
unsafe_allow_html=True)
with tab2:
# Fix 5: escape before injecting into HTML — raw doc content
# can contain <, >, & which would break the div or be executed.
preview_raw = html_lib.escape(content[:12000])
suffix = "…[truncated]" if len(content) > 12000 else ""
st.markdown(f"{preview_raw}{suffix}
",
unsafe_allow_html=True)
elif ext == "html":
tab1, tab2 = st.tabs(["🌐 Rendered", "📝 HTML Source"])
with tab1:
st.components.v1.html(content, height=600, scrolling=True)
with tab2:
st.code(content[:10000], language="html")
elif ext == "json":
try:
parsed = json.loads(content)
st.json(parsed, expanded=False)
except Exception:
st.code(content[:10000], language="json")
else:
# Fix 5: escape before injecting into HTML — same reason as above.
preview_plain = html_lib.escape(content[:12000])
suffix = "…[truncated]" if len(content) > 12000 else ""
st.markdown(f"{preview_plain}{suffix}
",
unsafe_allow_html=True)
if __name__ == "__main__":
main()