| from fastapi import UploadFile
|
| from docx import Document
|
| from docx.text.paragraph import Paragraph
|
| from docx.table import Table
|
| from enum import Enum
|
| from dataclasses import dataclass, field
|
| from typing import Optional
|
| import uuid
|
| from pathlib import Path
|
| import zipfile
|
| import xml.etree.ElementTree as ET
|
| import io
|
|
|
|
|
| class BlockType(str, Enum):
|
| TEXT = "text"
|
| TABLE = "table"
|
| IMAGE = "image"
|
| HEADING = "heading"
|
|
|
| @dataclass
|
| class ParsedBlock:
|
| BLOCK_TYPE: BlockType
|
| content: str
|
| elem_index: int
|
| page_index: int
|
| heading_level: Optional[int] = None
|
| table_data: Optional[int] = None
|
| image_id: str = None
|
| image_format: str = None
|
| storage_id: str = None
|
| source_file: str = ""
|
| metadata: dict = field(default_factory=dict)
|
|
|
|
|
|
|
|
|
|
|
| def para_text(para: Paragraph) -> str:
|
| text = para.text.strip()
|
| return text
|
|
|
| def read_table(table: Table) -> str:
|
| data = []
|
| for row in table.rows:
|
| row_data = []
|
| for cell in row.cells:
|
| row_data.append(cell.text.strip())
|
| data.append(row_data)
|
| return data
|
|
|
|
|
|
|
| def table_to_markdown(table_data: list[list[str]]) -> str:
|
| lines = []
|
| headings = table_data[0]
|
| seperator = ["---"] * len(headings)
|
| rows = table_data[1:]
|
|
|
| heading_line = []
|
| for heading in headings:
|
| heading_line.append("| " + str(heading) + " ")
|
| lines.append("".join(heading_line) + " |")
|
|
|
| seperator_line = []
|
| for sep in seperator:
|
| seperator_line.append("| " + str(sep) + " ")
|
| lines.append("".join(seperator_line) + " |")
|
|
|
| for row in rows:
|
| row_line = []
|
| for cell in row:
|
| row_line.append(" | " + str(cell) + " ")
|
| lines.append("".join(row_line) + " |")
|
| return "\n".join(lines)
|
|
|
| def get_level_score(para: Paragraph) -> int:
|
| f_size = 0
|
| font_size = []
|
| score = 0
|
| for run in para.runs:
|
| font_size.append(run.font.size.pt)
|
| if(len(font_size) > 0):
|
| f_size = sum(font_size) / len(font_size)
|
|
|
| if(para.style.name == "SectionHeader"):
|
| score += 1
|
|
|
| if(f_size > 11):
|
| score += 2
|
|
|
| return score
|
|
|
| def get_image_from_para(para: Paragraph, i: int, image_map: dict[str, tuple[bytes, str]]) -> list[ParsedBlock]:
|
| data = []
|
| for run in para.runs:
|
| for elem in run._r.iter():
|
| tag = elem.tag.split("}")[-1]
|
| if(tag == "blip"):
|
| r_id = elem.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed')
|
| if r_id and r_id in image_map:
|
| image_bytes, image_ext = image_map[r_id]
|
| s_id = uuid.uuid4()
|
| data.append(ParsedBlock(
|
| BLOCK_TYPE= "image",
|
| content = f"[Image Here, IMG ID - `{r_id}`]",
|
| elem_index= i,
|
| page_index = 0,
|
| image_id = r_id,
|
| image_format = image_ext,
|
| storage_id = s_id
|
| ))
|
| return data
|
|
|
|
|
|
|
| async def parse_doc(file: UploadFile) -> list[ParsedBlock]:
|
| content = await file.read()
|
| file_stream = io.BytesIO(content)
|
| doc = Document(file_stream)
|
| doc_zip = zipfile.ZipFile(file_stream)
|
| body = doc.element.body
|
| blocks : list[ParsedBlock] = []
|
| rels_xml = doc_zip.read("word/_rels/document.xml.rels")
|
| rel_tree = ET.fromstring(rels_xml)
|
|
|
| image_map: dict[str, tuple[bytes, str]] = {}
|
| for rel in rel_tree:
|
| rel_type = rel.get("Type", "")
|
| rel_target = rel.get("Target", "")
|
| rel_id = rel.get("Id", "")
|
| if "image" in rel_type.lower():
|
| image_path = "word/" + rel_target
|
| image_bytes = doc_zip.read(image_path)
|
| image_ext = Path(rel_target).suffix.lstrip(".").lower()
|
| image_map[rel_id] = (image_bytes, image_ext)
|
|
|
| for i, child in enumerate(body):
|
| tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
| para = Paragraph(child, doc)
|
| if(tag == "p"):
|
| text = para_text(para)
|
| level = get_level_score(para)
|
|
|
| if(text != ""):
|
| block = ParsedBlock(
|
| BLOCK_TYPE= "text",
|
| content = text,
|
| elem_index= i,
|
| page_index = 0,
|
| heading_level=level
|
| )
|
| blocks.append(block)
|
| image_blocks = get_image_from_para(para, i, image_map)
|
| if(len(image_blocks) > 0):
|
| blocks.extend(image_blocks)
|
|
|
| elif(tag == "tbl"):
|
| para = Table(child, doc)
|
| raw_data = read_table(para)
|
| table_data = table_to_markdown(raw_data)
|
| block = ParsedBlock(
|
| BLOCK_TYPE= "table",
|
| content = table_data,
|
| elem_index = i,
|
| page_index = 0
|
| )
|
| blocks.append(block)
|
| return blocks
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| doc = Document("./documents/Champion_Petfoods_CSS_SOP_Final 1 1.docx")
|
| blocks = parse_doc(doc)
|
|
|
| with open("output.txt", "w") as file:
|
| for block in blocks:
|
|
|
| file.write(f"type: {block.BLOCK_TYPE}\n{block.content}\n\n")
|
|
|
| print("Done! Data saved to output.txt")
|
|
|
|
|
|
|