from fastapi import UploadFile from docx import Document from docx.text.paragraph import Paragraph from docx.table import Table from enum import Enum from dataclasses import dataclass, field from typing import Optional import uuid from pathlib import Path import zipfile import xml.etree.ElementTree as ET import io class BlockType(str, Enum): TEXT = "text" TABLE = "table" IMAGE = "image" HEADING = "heading" @dataclass class ParsedBlock: BLOCK_TYPE: BlockType content: str elem_index: int page_index: int heading_level: Optional[int] = None table_data: Optional[int] = None image_id: str = None image_format: str = None storage_id: str = None source_file: str = "" metadata: dict = field(default_factory=dict) #the logic depends on the tag encountered right now. images are inside p tags, and table are there own tag just like p #the main return value is the block here, so when we encounter a p tag we append its block to our List -> block #so for reusablity i will define reusable helper functions for each tag def para_text(para: Paragraph) -> str: text = para.text.strip() return text def read_table(table: Table) -> str: data = [] for row in table.rows: row_data = [] for cell in row.cells: row_data.append(cell.text.strip()) data.append(row_data) return data #later add a description for a table along with the table, same can be done for image def table_to_markdown(table_data: list[list[str]]) -> str: lines = [] headings = table_data[0] seperator = ["---"] * len(headings) rows = table_data[1:] heading_line = [] for heading in headings: heading_line.append("| " + str(heading) + " ") lines.append("".join(heading_line) + " |") seperator_line = [] for sep in seperator: seperator_line.append("| " + str(sep) + " ") lines.append("".join(seperator_line) + " |") for row in rows: row_line = [] for cell in row: row_line.append(" | " + str(cell) + " ") lines.append("".join(row_line) + " |") return "\n".join(lines) def get_level_score(para: Paragraph) -> int: f_size = 0 font_size = [] score = 0 for run in para.runs: font_size.append(run.font.size.pt) if(len(font_size) > 0): f_size = sum(font_size) / len(font_size) if(para.style.name == "SectionHeader"): score += 1 if(f_size > 11): score += 2 return score def get_image_from_para(para: Paragraph, i: int, image_map: dict[str, tuple[bytes, str]]) -> list[ParsedBlock]: data = [] for run in para.runs: for elem in run._r.iter(): tag = elem.tag.split("}")[-1] if(tag == "blip"): r_id = elem.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed') if r_id and r_id in image_map: image_bytes, image_ext = image_map[r_id] s_id = uuid.uuid4() data.append(ParsedBlock( BLOCK_TYPE= "image", content = f"[Image Here, IMG ID - `{r_id}`]", elem_index= i, page_index = 0, image_id = r_id, image_format = image_ext, storage_id = s_id )) return data async def parse_doc(file: UploadFile) -> list[ParsedBlock]: content = await file.read() file_stream = io.BytesIO(content) doc = Document(file_stream) doc_zip = zipfile.ZipFile(file_stream) body = doc.element.body blocks : list[ParsedBlock] = [] rels_xml = doc_zip.read("word/_rels/document.xml.rels") rel_tree = ET.fromstring(rels_xml) image_map: dict[str, tuple[bytes, str]] = {} for rel in rel_tree: rel_type = rel.get("Type", "") rel_target = rel.get("Target", "") rel_id = rel.get("Id", "") if "image" in rel_type.lower(): image_path = "word/" + rel_target image_bytes = doc_zip.read(image_path) image_ext = Path(rel_target).suffix.lstrip(".").lower() image_map[rel_id] = (image_bytes, image_ext) for i, child in enumerate(body): tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag para = Paragraph(child, doc) if(tag == "p"): text = para_text(para) level = get_level_score(para) if(text != ""): block = ParsedBlock( BLOCK_TYPE= "text", content = text, elem_index= i, page_index = 0, heading_level=level ) blocks.append(block) image_blocks = get_image_from_para(para, i, image_map) if(len(image_blocks) > 0): blocks.extend(image_blocks) elif(tag == "tbl"): para = Table(child, doc) raw_data = read_table(para) table_data = table_to_markdown(raw_data) block = ParsedBlock( BLOCK_TYPE= "table", content = table_data, elem_index = i, page_index = 0 ) blocks.append(block) return blocks if __name__ == "__main__": doc = Document("./documents/Champion_Petfoods_CSS_SOP_Final 1 1.docx") blocks = parse_doc(doc) with open("output.txt", "w") as file: for block in blocks: # Write directly to the file in each iteration file.write(f"type: {block.BLOCK_TYPE}\n{block.content}\n\n") print("Done! Data saved to output.txt")