Buckets:
| """ | |
| layout_aware_parser.py | |
| ----------------------- | |
| A layout-aware document parser that handles both PDF and Word (.docx) files. | |
| Detects and tags: TEXT blocks, TABLES, and IMAGES with their positional metadata. | |
| Output is a structured list of ParsedBlock objects — ready to feed into a chunking pipeline. | |
| """ | |
| import os | |
| import io | |
| import json | |
| import base64 | |
| from enum import Enum | |
| from dataclasses import dataclass, field, asdict | |
| from pathlib import Path | |
| from typing import Optional | |
| # ───────────────────────────────────────────── | |
| # Data Models | |
| # ───────────────────────────────────────────── | |
| class BlockType(str, Enum): | |
| TEXT = "text" | |
| TABLE = "table" | |
| IMAGE = "image" | |
| HEADING = "heading" | |
| class ParsedBlock: | |
| """ | |
| A single logical unit extracted from the document. | |
| Every block carries enough metadata to reconstruct | |
| its position and origin for downstream retrieval. | |
| """ | |
| block_type: BlockType | |
| content: str # Text content OR markdown table OR image caption placeholder | |
| page_or_index: int # Page number (PDF) or element index (DOCX) | |
| heading_level: Optional[int] = None # 1–9 for HEADING blocks, None otherwise | |
| table_data: Optional[list] = None # Raw 2D list of cell strings for TABLE blocks | |
| image_bytes: Optional[bytes] = None # Raw image bytes for IMAGE blocks (save or send to vision model) | |
| image_format: Optional[str] = None # e.g. "png", "jpeg" | |
| source_file: str = "" | |
| metadata: dict = field(default_factory=dict) | |
| def to_dict(self) -> dict: | |
| d = asdict(self) | |
| # bytes are not JSON-serialisable — encode as base64 string for inspection | |
| if d["image_bytes"]: | |
| d["image_bytes"] = base64.b64encode(d["image_bytes"]).decode() | |
| return d | |
| # ───────────────────────────────────────────── | |
| # Helpers | |
| # ───────────────────────────────────────────── | |
| def _table_to_markdown(table_data: list[list[str]]) -> str: | |
| """Convert a 2D list of cell strings into a Markdown table.""" | |
| if not table_data: | |
| return "" | |
| header = table_data[0] | |
| separator = ["---"] * len(header) | |
| rows = table_data[1:] | |
| lines = [] | |
| lines.append("| " + " | ".join(str(c) for c in header) + " |") | |
| lines.append("| " + " | ".join(separator) + " |") | |
| for row in rows: | |
| # Pad short rows to match header width | |
| padded = list(row) + [""] * (len(header) - len(row)) | |
| lines.append("| " + " | ".join(str(c) for c in padded) + " |") | |
| return "\n".join(lines) | |
| # ───────────────────────────────────────────── | |
| # PDF Parser (uses PyMuPDF / fitz) | |
| # ───────────────────────────────────────────── | |
| def parse_pdf(file_path: str) -> list[ParsedBlock]: | |
| """ | |
| Parses a PDF file page-by-page. | |
| Strategy per page: | |
| 1. Extract the text dictionary with block-level granularity. | |
| Each block carries (x0, y0, x1, y1, text, block_no, block_type) | |
| where block_type == 0 is text, block_type == 1 is image. | |
| 2. Extract tables using PyMuPDF's built-in find_tables() (available | |
| since v1.23). Table cells are read as text. | |
| 3. Extract embedded images and store their raw bytes. | |
| Blocks are yielded in top-to-bottom, left-to-right reading order. | |
| """ | |
| import fitz # PyMuPDF | |
| blocks: list[ParsedBlock] = [] | |
| doc = fitz.open(file_path) | |
| source = Path(file_path).name | |
| for page_num, page in enumerate(doc, start=1): | |
| # ── Tables ────────────────────────────────────────────────────── | |
| # Extract tables first so we can record their bounding boxes and | |
| # skip the underlying text blocks that fall inside them. | |
| table_rects = [] | |
| try: | |
| tabs = page.find_tables() | |
| for tab in tabs.tables: | |
| table_rects.append(tab.bbox) # fitz.Rect | |
| raw_data = tab.extract() # list[list[str]] | |
| md_table = _table_to_markdown(raw_data) | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.TABLE, | |
| content = md_table, | |
| page_or_index = page_num, | |
| table_data = raw_data, | |
| source_file = source, | |
| metadata = { | |
| "bbox": list(tab.bbox), | |
| "row_count": len(raw_data), | |
| "col_count": len(raw_data[0]) if raw_data else 0, | |
| } | |
| )) | |
| except Exception: | |
| # find_tables() is only in newer PyMuPDF; degrade gracefully | |
| pass | |
| # ── Text blocks ───────────────────────────────────────────────── | |
| text_dict = page.get_text("dict", sort=True) # sort=True → reading order | |
| for block in text_dict.get("blocks", []): | |
| btype = block.get("type", -1) | |
| if btype == 0: # Text block | |
| # Skip if this block's bbox overlaps a detected table region | |
| bx0, by0, bx1, by1 = block["bbox"] | |
| in_table = any( | |
| bx0 >= rx0 - 2 and by0 >= ry0 - 2 | |
| and bx1 <= rx1 + 2 and by1 <= ry1 + 2 | |
| for (rx0, ry0, rx1, ry1) in table_rects | |
| ) | |
| if in_table: | |
| continue | |
| # Collect text and detect heading via font size heuristic | |
| full_text = "" | |
| max_font_size = 0 | |
| is_bold = False | |
| for line in block.get("lines", []): | |
| for span in line.get("spans", []): | |
| full_text += span.get("text", "") | |
| size = span.get("size", 0) | |
| if size > max_font_size: | |
| max_font_size = size | |
| if "bold" in span.get("font", "").lower(): | |
| is_bold = True | |
| full_text += "\n" | |
| full_text = full_text.strip() | |
| if not full_text: | |
| continue | |
| # Heuristic: large or bold short text = heading | |
| is_heading = (max_font_size >= 14 or is_bold) and len(full_text) < 200 | |
| if is_heading: | |
| # Map font size to heading level (rough heuristic) | |
| if max_font_size >= 22: | |
| h_level = 1 | |
| elif max_font_size >= 18: | |
| h_level = 2 | |
| elif max_font_size >= 14: | |
| h_level = 3 | |
| else: | |
| h_level = 4 | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.HEADING, | |
| content = full_text, | |
| page_or_index = page_num, | |
| heading_level = h_level, | |
| source_file = source, | |
| metadata = { | |
| "font_size": max_font_size, | |
| "bold": is_bold, | |
| "bbox": list(block["bbox"]), | |
| } | |
| )) | |
| else: | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.TEXT, | |
| content = full_text, | |
| page_or_index = page_num, | |
| source_file = source, | |
| metadata = { | |
| "font_size": max_font_size, | |
| "bbox": list(block["bbox"]), | |
| } | |
| )) | |
| elif btype == 1: # Image block — skip here, handled below via get_images | |
| pass | |
| # ── Images ────────────────────────────────────────────────────── | |
| image_list = page.get_images(full=True) | |
| for img_index, img_info in enumerate(image_list): | |
| xref = img_info[0] | |
| base_image = doc.extract_image(xref) | |
| img_bytes = base_image["image"] | |
| img_ext = base_image["ext"] # e.g. "png", "jpeg" | |
| width = base_image["width"] | |
| height = base_image["height"] | |
| # Skip tiny images (likely decorative icons / bullets) | |
| if width < 50 or height < 50: | |
| continue | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.IMAGE, | |
| content = f"[IMAGE on page {page_num}, index {img_index} — send to vision model for caption]", | |
| page_or_index = page_num, | |
| image_bytes = img_bytes, | |
| image_format = img_ext, | |
| source_file = source, | |
| metadata = { | |
| "width": width, | |
| "height": height, | |
| "xref": xref, | |
| "image_index": img_index, | |
| } | |
| )) | |
| doc.close() | |
| return blocks | |
| # ───────────────────────────────────────────── | |
| # DOCX Parser (uses python-docx) | |
| # ───────────────────────────────────────────── | |
| def parse_docx(file_path: str) -> list[ParsedBlock]: | |
| """ | |
| Parses a Word (.docx) file by iterating over the document body | |
| in document order (paragraphs and tables are siblings under <body>). | |
| Strategy: | |
| - Paragraphs with a 'Heading' style → HEADING blocks | |
| - Normal paragraphs → TEXT blocks | |
| - Table elements → TABLE blocks (cells read as text) | |
| - Inline images (runs with <pic:pic>) → IMAGE blocks | |
| python-docx gives us document order for free via document.element.body, | |
| which is the raw XML body. We iterate over it to preserve interleaving. | |
| """ | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from docx.table import Table | |
| from docx.text.paragraph import Paragraph | |
| import zipfile | |
| doc = Document(file_path) | |
| blocks : list[ParsedBlock] = [] | |
| source = Path(file_path).name | |
| elem_index = 0 # position counter (DOCX has no page numbers at parse time) | |
| # We need access to embedded images → open the docx as a zip | |
| docx_zip = zipfile.ZipFile(file_path) | |
| # Build a map: relationship_id → image bytes | |
| # Images in docx are stored in word/media/ and referenced via rId in document.xml.rels | |
| image_map: dict[str, tuple[bytes, str]] = {} | |
| try: | |
| rels_xml = docx_zip.read("word/_rels/document.xml.rels") | |
| import xml.etree.ElementTree as ET | |
| rels_tree = ET.fromstring(rels_xml) | |
| for rel in rels_tree: | |
| rel_type = rel.get("Type", "") | |
| rel_target = rel.get("Target", "") | |
| rel_id = rel.get("Id", "") | |
| if "image" in rel_type.lower(): | |
| img_path = "word/" + rel_target.lstrip("/") | |
| try: | |
| img_bytes = docx_zip.read(img_path) | |
| img_ext = Path(rel_target).suffix.lstrip(".").lower() | |
| image_map[rel_id] = (img_bytes, img_ext) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # Helper: extract text from a paragraph element | |
| def para_text(para: Paragraph) -> str: | |
| return para.text.strip() | |
| # Helper: determine heading level from paragraph style | |
| def heading_level(para: Paragraph) -> Optional[int]: | |
| style_name = para.style.name if para.style else "" | |
| if "Heading" in style_name: | |
| try: | |
| return int(style_name.split()[-1]) | |
| except ValueError: | |
| return 1 | |
| return None | |
| # Helper: extract images from a paragraph's runs | |
| def extract_images_from_para(para: Paragraph, index: int) -> list[ParsedBlock]: | |
| img_blocks = [] | |
| for run in para.runs: | |
| # Check for drawing/image XML in the run | |
| drawing_elems = run._r.findall(".//" + qn("a:blip"), run._r.nsmap) if hasattr(run._r, 'nsmap') else [] | |
| # Simpler: look for blip elements which reference images via r:embed | |
| for elem in run._r.iter(): | |
| tag = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag | |
| if tag == "blip": | |
| r_embed = elem.get("{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed") | |
| if r_embed and r_embed in image_map: | |
| img_bytes, img_ext = image_map[r_embed] | |
| img_blocks.append(ParsedBlock( | |
| block_type = BlockType.IMAGE, | |
| content = f"[IMAGE at element index {index} — send to vision model for caption]", | |
| page_or_index = index, | |
| image_bytes = img_bytes, | |
| image_format = img_ext, | |
| source_file = source, | |
| metadata = { | |
| "r_embed": r_embed, | |
| "element_index": index, | |
| } | |
| )) | |
| return img_blocks | |
| # Helper: read a docx Table into a 2D list | |
| def read_table(table: Table) -> list[list[str]]: | |
| data = [] | |
| for row in table.rows: | |
| row_data = [] | |
| for cell in row.cells: | |
| row_data.append(cell.text.strip()) | |
| data.append(row_data) | |
| return data | |
| # ── Iterate document body in order ────────────────────────────────── | |
| # document.element.body children are either <w:p> (paragraph) or <w:tbl> (table) | |
| body = doc.element.body | |
| for child in body: | |
| tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag | |
| elem_index += 1 | |
| if tag == "p": # Paragraph | |
| # Wrap in Paragraph object for style access | |
| para = Paragraph(child, doc) | |
| text = para_text(para) | |
| h_level = heading_level(para) | |
| # Check for images inside this paragraph | |
| img_blocks = extract_images_from_para(para, elem_index) | |
| blocks.extend(img_blocks) | |
| if not text: | |
| continue # empty paragraph (spacer) | |
| if h_level is not None: | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.HEADING, | |
| content = text, | |
| page_or_index = elem_index, | |
| heading_level = h_level, | |
| source_file = source, | |
| metadata = { | |
| "style": para.style.name, | |
| "element_index": elem_index, | |
| } | |
| )) | |
| else: | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.TEXT, | |
| content = text, | |
| page_or_index = elem_index, | |
| source_file = source, | |
| metadata = { | |
| "style": para.style.name if para.style else "", | |
| "element_index": elem_index, | |
| } | |
| )) | |
| elif tag == "tbl": # Table | |
| table = Table(child, doc) | |
| raw_data = read_table(table) | |
| md_table = _table_to_markdown(raw_data) | |
| blocks.append(ParsedBlock( | |
| block_type = BlockType.TABLE, | |
| content = md_table, | |
| page_or_index = elem_index, | |
| table_data = raw_data, | |
| source_file = source, | |
| metadata = { | |
| "element_index": elem_index, | |
| "row_count": len(raw_data), | |
| "col_count": len(raw_data[0]) if raw_data else 0, | |
| } | |
| )) | |
| docx_zip.close() | |
| return blocks | |
| # ───────────────────────────────────────────── | |
| # Unified Entry Point | |
| # ───────────────────────────────────────────── | |
| def parse_document(file_path: str) -> list[ParsedBlock]: | |
| """ | |
| Auto-detects file type and routes to the appropriate parser. | |
| Returns a flat list of ParsedBlock objects in document order. | |
| """ | |
| ext = Path(file_path).suffix.lower() | |
| if ext == ".pdf": | |
| return parse_pdf(file_path) | |
| elif ext in (".docx", ".doc"): | |
| if ext == ".doc": | |
| raise ValueError(".doc (legacy format) is not supported. Please convert to .docx first.") | |
| return parse_docx(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .docx") | |
| # ───────────────────────────────────────────── | |
| # Pretty Printer (for development/debugging) | |
| # ───────────────────────────────────────────── | |
| def print_parse_summary(blocks: list[ParsedBlock], show_content_preview: bool = True) -> None: | |
| """Print a human-readable summary of what was parsed.""" | |
| from collections import Counter | |
| counts = Counter(b.block_type for b in blocks) | |
| print("=" * 60) | |
| print(f" PARSE SUMMARY — {len(blocks)} total blocks") | |
| print("=" * 60) | |
| for btype, count in counts.items(): | |
| print(f" {btype.value.upper():<10} {count} block(s)") | |
| print("-" * 60) | |
| for i, block in enumerate(blocks): | |
| prefix = { | |
| BlockType.HEADING: f"H{block.heading_level}", | |
| BlockType.TEXT: "TXT", | |
| BlockType.TABLE: "TBL", | |
| BlockType.IMAGE: "IMG", | |
| }.get(block.block_type, "???") | |
| location = f"page={block.page_or_index}" if block.source_file.endswith(".pdf") \ | |
| else f"idx={block.page_or_index}" | |
| print(f"\n[{i:03d}] {prefix:<4} {location} source={block.source_file}") | |
| if show_content_preview: | |
| preview = block.content[:180].replace("\n", " ↵ ") | |
| print(f" {preview}{'...' if len(block.content) > 180 else ''}") | |
| if block.block_type == BlockType.TABLE and block.table_data: | |
| print(f" rows={block.metadata.get('row_count')} cols={block.metadata.get('col_count')}") | |
| if block.block_type == BlockType.IMAGE: | |
| size_kb = len(block.image_bytes) / 1024 if block.image_bytes else 0 | |
| print(f" format={block.image_format} size={size_kb:.1f}KB " | |
| f"dims={block.metadata.get('width')}x{block.metadata.get('height')}") | |
| print("=" * 60) | |
| def save_images(blocks: list[ParsedBlock], output_dir: str = "./parsed_images") -> None: | |
| """ | |
| Saves all IMAGE blocks to disk. | |
| Useful for visual inspection or before sending to a vision model. | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| saved = 0 | |
| for block in blocks: | |
| if block.block_type == BlockType.IMAGE and block.image_bytes: | |
| fname = ( | |
| f"{Path(block.source_file).stem}" | |
| f"_p{block.page_or_index}" | |
| f"_i{block.metadata.get('image_index', block.metadata.get('r_embed', saved))}" | |
| f".{block.image_format or 'png'}" | |
| ) | |
| out_path = os.path.join(output_dir, fname) | |
| with open(out_path, "wb") as f: | |
| f.write(block.image_bytes) | |
| print(f" Saved: {out_path}") | |
| saved += 1 | |
| print(f" Total images saved: {saved}") | |
| # ───────────────────────────────────────────── | |
| # Usage Example | |
| # ───────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python document_parser.py <path_to_file.pdf_or_docx>") | |
| print("\nRunning self-test with a synthetic DOCX...") | |
| # ── Self-test: create a tiny DOCX and parse it ────────────────── | |
| from docx import Document as DocxDoc | |
| from docx.oxml.ns import qn as docx_qn | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile(suffix=".docx", delete=False) | |
| tmp.close() | |
| d = DocxDoc() | |
| d.add_heading("Standard Operating Procedure: Onboarding", level=1) | |
| d.add_heading("1. Introduction", level=2) | |
| d.add_paragraph( | |
| "This SOP outlines the steps required to onboard a new employee " | |
| "into the organisation. All steps must be followed in order." | |
| ) | |
| d.add_heading("2. Approval Matrix", level=2) | |
| t = d.add_table(rows=3, cols=3) | |
| t.cell(0, 0).text = "Step" | |
| t.cell(0, 1).text = "Responsible" | |
| t.cell(0, 2).text = "Deadline" | |
| t.cell(1, 0).text = "Send welcome email" | |
| t.cell(1, 1).text = "HR" | |
| t.cell(1, 2).text = "Day 1" | |
| t.cell(2, 0).text = "Assign laptop" | |
| t.cell(2, 1).text = "IT" | |
| t.cell(2, 2).text = "Day 1" | |
| d.add_heading("3. Process Notes", level=2) | |
| d.add_paragraph( | |
| "If the employee requires special equipment, raise a ticket with IT " | |
| "at least 5 working days before the start date." | |
| ) | |
| d.save(tmp.name) | |
| blocks = parse_document(tmp.name) | |
| print_parse_summary(blocks) | |
| os.unlink(tmp.name) | |
| else: | |
| file_path = sys.argv[1] | |
| print(f"Parsing: {file_path}") | |
| blocks = parse_document(file_path) | |
| print_parse_summary(blocks) | |
| # Optionally save images | |
| img_blocks = [b for b in blocks if b.block_type == BlockType.IMAGE] | |
| if img_blocks: | |
| print(f"\nFound {len(img_blocks)} image(s). Saving to ./parsed_images/") | |
| save_images(blocks) | |
| # Optionally dump JSON | |
| out_json = Path(file_path).stem + "_parsed.json" | |
| with open(out_json, "w") as f: | |
| json.dump([b.to_dict() for b in blocks], f, indent=2) | |
| print(f"\nFull output saved to: {out_json}") |
Xet Storage Details
- Size:
- 24.1 kB
- Xet hash:
- 6fa746fa6971f1b62f5fd34ef4aac4afe847e764d186437a6e758638860a4c73
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.