| """
|
| layout_aware_parser.py
|
| -----------------------
|
| A layout-aware document parser that handles both PDF and Word (.docx) files.
|
| Detects and tags: TEXT blocks, TABLES, and IMAGES with their positional metadata.
|
|
|
| Output is a structured list of ParsedBlock objects β ready to feed into a chunking pipeline.
|
| """
|
|
|
| import os
|
| import io
|
| import json
|
| import base64
|
| from enum import Enum
|
| from dataclasses import dataclass, field, asdict
|
| from pathlib import Path
|
| from typing import Optional
|
|
|
|
|
|
|
|
|
|
|
|
|
| class BlockType(str, Enum):
|
| TEXT = "text"
|
| TABLE = "table"
|
| IMAGE = "image"
|
| HEADING = "heading"
|
|
|
|
|
|
|
| @dataclass
|
| class ParsedBlock:
|
| """
|
| A single logical unit extracted from the document.
|
| Every block carries enough metadata to reconstruct
|
| its position and origin for downstream retrieval.
|
| """
|
| block_type: BlockType
|
| content: str
|
| page_or_index: int
|
| heading_level: Optional[int] = None
|
| table_data: Optional[list] = None
|
| image_bytes: Optional[bytes] = None
|
| image_format: Optional[str] = None
|
| source_file: str = ""
|
| metadata: dict = field(default_factory=dict)
|
|
|
| def to_dict(self) -> dict:
|
| d = asdict(self)
|
|
|
| if d["image_bytes"]:
|
| d["image_bytes"] = base64.b64encode(d["image_bytes"]).decode()
|
| return d
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _table_to_markdown(table_data: list[list[str]]) -> str:
|
| """Convert a 2D list of cell strings into a Markdown table."""
|
| if not table_data:
|
| return ""
|
| header = table_data[0]
|
| separator = ["---"] * len(header)
|
| rows = table_data[1:]
|
| lines = []
|
| lines.append("| " + " | ".join(str(c) for c in header) + " |")
|
| lines.append("| " + " | ".join(separator) + " |")
|
| for row in rows:
|
|
|
| padded = list(row) + [""] * (len(header) - len(row))
|
| lines.append("| " + " | ".join(str(c) for c in padded) + " |")
|
| return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
| def parse_pdf(file_path: str) -> list[ParsedBlock]:
|
| """
|
| Parses a PDF file page-by-page.
|
|
|
| Strategy per page:
|
| 1. Extract the text dictionary with block-level granularity.
|
| Each block carries (x0, y0, x1, y1, text, block_no, block_type)
|
| where block_type == 0 is text, block_type == 1 is image.
|
| 2. Extract tables using PyMuPDF's built-in find_tables() (available
|
| since v1.23). Table cells are read as text.
|
| 3. Extract embedded images and store their raw bytes.
|
|
|
| Blocks are yielded in top-to-bottom, left-to-right reading order.
|
| """
|
| import fitz
|
|
|
| blocks: list[ParsedBlock] = []
|
| doc = fitz.open(file_path)
|
| source = Path(file_path).name
|
|
|
| for page_num, page in enumerate(doc, start=1):
|
|
|
|
|
|
|
| table_rects = []
|
| try:
|
| tabs = page.find_tables()
|
| for tab in tabs.tables:
|
| table_rects.append(tab.bbox)
|
| raw_data = tab.extract()
|
| md_table = _table_to_markdown(raw_data)
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.TABLE,
|
| content = md_table,
|
| page_or_index = page_num,
|
| table_data = raw_data,
|
| source_file = source,
|
| metadata = {
|
| "bbox": list(tab.bbox),
|
| "row_count": len(raw_data),
|
| "col_count": len(raw_data[0]) if raw_data else 0,
|
| }
|
| ))
|
| except Exception:
|
|
|
| pass
|
|
|
|
|
| text_dict = page.get_text("dict", sort=True)
|
| for block in text_dict.get("blocks", []):
|
| btype = block.get("type", -1)
|
|
|
| if btype == 0:
|
|
|
| bx0, by0, bx1, by1 = block["bbox"]
|
| in_table = any(
|
| bx0 >= rx0 - 2 and by0 >= ry0 - 2
|
| and bx1 <= rx1 + 2 and by1 <= ry1 + 2
|
| for (rx0, ry0, rx1, ry1) in table_rects
|
| )
|
| if in_table:
|
| continue
|
|
|
|
|
| full_text = ""
|
| max_font_size = 0
|
| is_bold = False
|
| for line in block.get("lines", []):
|
| for span in line.get("spans", []):
|
| full_text += span.get("text", "")
|
| size = span.get("size", 0)
|
| if size > max_font_size:
|
| max_font_size = size
|
| if "bold" in span.get("font", "").lower():
|
| is_bold = True
|
| full_text += "\n"
|
|
|
| full_text = full_text.strip()
|
| if not full_text:
|
| continue
|
|
|
|
|
| is_heading = (max_font_size >= 14 or is_bold) and len(full_text) < 200
|
|
|
| if is_heading:
|
|
|
| if max_font_size >= 22:
|
| h_level = 1
|
| elif max_font_size >= 18:
|
| h_level = 2
|
| elif max_font_size >= 14:
|
| h_level = 3
|
| else:
|
| h_level = 4
|
|
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.HEADING,
|
| content = full_text,
|
| page_or_index = page_num,
|
| heading_level = h_level,
|
| source_file = source,
|
| metadata = {
|
| "font_size": max_font_size,
|
| "bold": is_bold,
|
| "bbox": list(block["bbox"]),
|
| }
|
| ))
|
| else:
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.TEXT,
|
| content = full_text,
|
| page_or_index = page_num,
|
| source_file = source,
|
| metadata = {
|
| "font_size": max_font_size,
|
| "bbox": list(block["bbox"]),
|
| }
|
| ))
|
|
|
| elif btype == 1:
|
| pass
|
|
|
|
|
| image_list = page.get_images(full=True)
|
| for img_index, img_info in enumerate(image_list):
|
| xref = img_info[0]
|
| base_image = doc.extract_image(xref)
|
| img_bytes = base_image["image"]
|
| img_ext = base_image["ext"]
|
| width = base_image["width"]
|
| height = base_image["height"]
|
|
|
|
|
| if width < 50 or height < 50:
|
| continue
|
|
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.IMAGE,
|
| content = f"[IMAGE on page {page_num}, index {img_index} β send to vision model for caption]",
|
| page_or_index = page_num,
|
| image_bytes = img_bytes,
|
| image_format = img_ext,
|
| source_file = source,
|
| metadata = {
|
| "width": width,
|
| "height": height,
|
| "xref": xref,
|
| "image_index": img_index,
|
| }
|
| ))
|
|
|
| doc.close()
|
| return blocks
|
|
|
|
|
|
|
|
|
|
|
|
|
| def parse_docx(file_path: str) -> list[ParsedBlock]:
|
| """
|
| Parses a Word (.docx) file by iterating over the document body
|
| in document order (paragraphs and tables are siblings under <body>).
|
|
|
| Strategy:
|
| - Paragraphs with a 'Heading' style β HEADING blocks
|
| - Normal paragraphs β TEXT blocks
|
| - Table elements β TABLE blocks (cells read as text)
|
| - Inline images (runs with <pic:pic>) β IMAGE blocks
|
|
|
| python-docx gives us document order for free via document.element.body,
|
| which is the raw XML body. We iterate over it to preserve interleaving.
|
| """
|
| from docx import Document
|
| from docx.oxml.ns import qn
|
| from docx.table import Table
|
| from docx.text.paragraph import Paragraph
|
| import zipfile
|
|
|
| doc = Document(file_path)
|
| blocks : list[ParsedBlock] = []
|
| source = Path(file_path).name
|
| elem_index = 0
|
|
|
|
|
| docx_zip = zipfile.ZipFile(file_path)
|
|
|
|
|
|
|
| image_map: dict[str, tuple[bytes, str]] = {}
|
| try:
|
| rels_xml = docx_zip.read("word/_rels/document.xml.rels")
|
| import xml.etree.ElementTree as ET
|
| rels_tree = ET.fromstring(rels_xml)
|
| for rel in rels_tree:
|
| rel_type = rel.get("Type", "")
|
| rel_target = rel.get("Target", "")
|
| rel_id = rel.get("Id", "")
|
| if "image" in rel_type.lower():
|
| img_path = "word/" + rel_target.lstrip("/")
|
| try:
|
| img_bytes = docx_zip.read(img_path)
|
| img_ext = Path(rel_target).suffix.lstrip(".").lower()
|
| image_map[rel_id] = (img_bytes, img_ext)
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
|
|
| def para_text(para: Paragraph) -> str:
|
| return para.text.strip()
|
|
|
|
|
| def heading_level(para: Paragraph) -> Optional[int]:
|
| style_name = para.style.name if para.style else ""
|
| if "Heading" in style_name:
|
| try:
|
| return int(style_name.split()[-1])
|
| except ValueError:
|
| return 1
|
| return None
|
|
|
|
|
| def extract_images_from_para(para: Paragraph, index: int) -> list[ParsedBlock]:
|
| img_blocks = []
|
| for run in para.runs:
|
|
|
| drawing_elems = run._r.findall(".//" + qn("a:blip"), run._r.nsmap) if hasattr(run._r, 'nsmap') else []
|
|
|
| for elem in run._r.iter():
|
| tag = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
|
| if tag == "blip":
|
| r_embed = elem.get("{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed")
|
| if r_embed and r_embed in image_map:
|
| img_bytes, img_ext = image_map[r_embed]
|
| img_blocks.append(ParsedBlock(
|
| block_type = BlockType.IMAGE,
|
| content = f"[IMAGE at element index {index} β send to vision model for caption]",
|
| page_or_index = index,
|
| image_bytes = img_bytes,
|
| image_format = img_ext,
|
| source_file = source,
|
| metadata = {
|
| "r_embed": r_embed,
|
| "element_index": index,
|
| }
|
| ))
|
| return img_blocks
|
|
|
|
|
| def read_table(table: Table) -> list[list[str]]:
|
| data = []
|
| for row in table.rows:
|
| row_data = []
|
| for cell in row.cells:
|
| row_data.append(cell.text.strip())
|
| data.append(row_data)
|
| return data
|
|
|
|
|
|
|
| body = doc.element.body
|
| for child in body:
|
| tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
| elem_index += 1
|
|
|
| if tag == "p":
|
|
|
| para = Paragraph(child, doc)
|
| text = para_text(para)
|
| h_level = heading_level(para)
|
|
|
|
|
| img_blocks = extract_images_from_para(para, elem_index)
|
| blocks.extend(img_blocks)
|
|
|
| if not text:
|
| continue
|
|
|
| if h_level is not None:
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.HEADING,
|
| content = text,
|
| page_or_index = elem_index,
|
| heading_level = h_level,
|
| source_file = source,
|
| metadata = {
|
| "style": para.style.name,
|
| "element_index": elem_index,
|
| }
|
| ))
|
| else:
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.TEXT,
|
| content = text,
|
| page_or_index = elem_index,
|
| source_file = source,
|
| metadata = {
|
| "style": para.style.name if para.style else "",
|
| "element_index": elem_index,
|
| }
|
| ))
|
|
|
| elif tag == "tbl":
|
| table = Table(child, doc)
|
| raw_data = read_table(table)
|
| md_table = _table_to_markdown(raw_data)
|
| blocks.append(ParsedBlock(
|
| block_type = BlockType.TABLE,
|
| content = md_table,
|
| page_or_index = elem_index,
|
| table_data = raw_data,
|
| source_file = source,
|
| metadata = {
|
| "element_index": elem_index,
|
| "row_count": len(raw_data),
|
| "col_count": len(raw_data[0]) if raw_data else 0,
|
| }
|
| ))
|
|
|
| docx_zip.close()
|
| return blocks
|
|
|
|
|
|
|
|
|
|
|
|
|
| def parse_document(file_path: str) -> list[ParsedBlock]:
|
| """
|
| Auto-detects file type and routes to the appropriate parser.
|
| Returns a flat list of ParsedBlock objects in document order.
|
| """
|
| ext = Path(file_path).suffix.lower()
|
| if ext == ".pdf":
|
| return parse_pdf(file_path)
|
| elif ext in (".docx", ".doc"):
|
| if ext == ".doc":
|
| raise ValueError(".doc (legacy format) is not supported. Please convert to .docx first.")
|
| return parse_docx(file_path)
|
| else:
|
| raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def print_parse_summary(blocks: list[ParsedBlock], show_content_preview: bool = True) -> None:
|
| """Print a human-readable summary of what was parsed."""
|
| from collections import Counter
|
| counts = Counter(b.block_type for b in blocks)
|
|
|
| print("=" * 60)
|
| print(f" PARSE SUMMARY β {len(blocks)} total blocks")
|
| print("=" * 60)
|
| for btype, count in counts.items():
|
| print(f" {btype.value.upper():<10} {count} block(s)")
|
| print("-" * 60)
|
|
|
| for i, block in enumerate(blocks):
|
| prefix = {
|
| BlockType.HEADING: f"H{block.heading_level}",
|
| BlockType.TEXT: "TXT",
|
| BlockType.TABLE: "TBL",
|
| BlockType.IMAGE: "IMG",
|
| }.get(block.block_type, "???")
|
|
|
| location = f"page={block.page_or_index}" if block.source_file.endswith(".pdf") \
|
| else f"idx={block.page_or_index}"
|
|
|
| print(f"\n[{i:03d}] {prefix:<4} {location} source={block.source_file}")
|
|
|
| if show_content_preview:
|
| preview = block.content[:180].replace("\n", " β΅ ")
|
| print(f" {preview}{'...' if len(block.content) > 180 else ''}")
|
|
|
| if block.block_type == BlockType.TABLE and block.table_data:
|
| print(f" rows={block.metadata.get('row_count')} cols={block.metadata.get('col_count')}")
|
|
|
| if block.block_type == BlockType.IMAGE:
|
| size_kb = len(block.image_bytes) / 1024 if block.image_bytes else 0
|
| print(f" format={block.image_format} size={size_kb:.1f}KB "
|
| f"dims={block.metadata.get('width')}x{block.metadata.get('height')}")
|
|
|
| print("=" * 60)
|
|
|
|
|
| def save_images(blocks: list[ParsedBlock], output_dir: str = "./parsed_images") -> None:
|
| """
|
| Saves all IMAGE blocks to disk.
|
| Useful for visual inspection or before sending to a vision model.
|
| """
|
| os.makedirs(output_dir, exist_ok=True)
|
| saved = 0
|
| for block in blocks:
|
| if block.block_type == BlockType.IMAGE and block.image_bytes:
|
| fname = (
|
| f"{Path(block.source_file).stem}"
|
| f"_p{block.page_or_index}"
|
| f"_i{block.metadata.get('image_index', block.metadata.get('r_embed', saved))}"
|
| f".{block.image_format or 'png'}"
|
| )
|
| out_path = os.path.join(output_dir, fname)
|
| with open(out_path, "wb") as f:
|
| f.write(block.image_bytes)
|
| print(f" Saved: {out_path}")
|
| saved += 1
|
| print(f" Total images saved: {saved}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| import sys
|
|
|
| if len(sys.argv) < 2:
|
| print("Usage: python document_parser.py <path_to_file.pdf_or_docx>")
|
| print("\nRunning self-test with a synthetic DOCX...")
|
|
|
|
|
| from docx import Document as DocxDoc
|
| from docx.oxml.ns import qn as docx_qn
|
| import tempfile
|
|
|
| tmp = tempfile.NamedTemporaryFile(suffix=".docx", delete=False)
|
| tmp.close()
|
|
|
| d = DocxDoc()
|
| d.add_heading("Standard Operating Procedure: Onboarding", level=1)
|
| d.add_heading("1. Introduction", level=2)
|
| d.add_paragraph(
|
| "This SOP outlines the steps required to onboard a new employee "
|
| "into the organisation. All steps must be followed in order."
|
| )
|
| d.add_heading("2. Approval Matrix", level=2)
|
| t = d.add_table(rows=3, cols=3)
|
| t.cell(0, 0).text = "Step"
|
| t.cell(0, 1).text = "Responsible"
|
| t.cell(0, 2).text = "Deadline"
|
| t.cell(1, 0).text = "Send welcome email"
|
| t.cell(1, 1).text = "HR"
|
| t.cell(1, 2).text = "Day 1"
|
| t.cell(2, 0).text = "Assign laptop"
|
| t.cell(2, 1).text = "IT"
|
| t.cell(2, 2).text = "Day 1"
|
| d.add_heading("3. Process Notes", level=2)
|
| d.add_paragraph(
|
| "If the employee requires special equipment, raise a ticket with IT "
|
| "at least 5 working days before the start date."
|
| )
|
| d.save(tmp.name)
|
|
|
| blocks = parse_document(tmp.name)
|
| print_parse_summary(blocks)
|
| os.unlink(tmp.name)
|
|
|
| else:
|
| file_path = sys.argv[1]
|
| print(f"Parsing: {file_path}")
|
| blocks = parse_document(file_path)
|
| print_parse_summary(blocks)
|
|
|
|
|
| img_blocks = [b for b in blocks if b.block_type == BlockType.IMAGE]
|
| if img_blocks:
|
| print(f"\nFound {len(img_blocks)} image(s). Saving to ./parsed_images/")
|
| save_images(blocks)
|
|
|
|
|
| out_json = Path(file_path).stem + "_parsed.json"
|
| with open(out_json, "w") as f:
|
| json.dump([b.to_dict() for b in blocks], f, indent=2)
|
| print(f"\nFull output saved to: {out_json}") |