File size: 5,846 Bytes
f7e9181 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | from fastapi import UploadFile
from docx import Document
from docx.text.paragraph import Paragraph
from docx.table import Table
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from pathlib import Path
import zipfile
import xml.etree.ElementTree as ET
import io
class BlockType(str, Enum):
TEXT = "text"
TABLE = "table"
IMAGE = "image"
HEADING = "heading"
@dataclass
class ParsedBlock:
BLOCK_TYPE: BlockType
content: str
elem_index: int
page_index: int
heading_level: Optional[int] = None
table_data: Optional[int] = None
image_id: str = None
image_format: str = None
storage_id: str = None
source_file: str = ""
metadata: dict = field(default_factory=dict)
#the logic depends on the tag encountered right now. images are inside p tags, and table are there own tag just like p
#the main return value is the block here, so when we encounter a p tag we append its block to our List -> block
#so for reusablity i will define reusable helper functions for each tag
def para_text(para: Paragraph) -> str:
text = para.text.strip()
return text
def read_table(table: Table) -> str:
data = []
for row in table.rows:
row_data = []
for cell in row.cells:
row_data.append(cell.text.strip())
data.append(row_data)
return data
#later add a description for a table along with the table, same can be done for image
def table_to_markdown(table_data: list[list[str]]) -> str:
lines = []
headings = table_data[0]
seperator = ["---"] * len(headings)
rows = table_data[1:]
heading_line = []
for heading in headings:
heading_line.append("| " + str(heading) + " ")
lines.append("".join(heading_line) + " |")
seperator_line = []
for sep in seperator:
seperator_line.append("| " + str(sep) + " ")
lines.append("".join(seperator_line) + " |")
for row in rows:
row_line = []
for cell in row:
row_line.append(" | " + str(cell) + " ")
lines.append("".join(row_line) + " |")
return "\n".join(lines)
def get_level_score(para: Paragraph) -> int:
f_size = 0
font_size = []
score = 0
for run in para.runs:
font_size.append(run.font.size.pt)
if(len(font_size) > 0):
f_size = sum(font_size) / len(font_size)
if(para.style.name == "SectionHeader"):
score += 1
if(f_size > 11):
score += 2
return score
def get_image_from_para(para: Paragraph, i: int, image_map: dict[str, tuple[bytes, str]]) -> list[ParsedBlock]:
data = []
for run in para.runs:
for elem in run._r.iter():
tag = elem.tag.split("}")[-1]
if(tag == "blip"):
r_id = elem.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed')
if r_id and r_id in image_map:
image_bytes, image_ext = image_map[r_id]
s_id = uuid.uuid4()
data.append(ParsedBlock(
BLOCK_TYPE= "image",
content = f"[Image Here, IMG ID - `{r_id}`]",
elem_index= i,
page_index = 0,
image_id = r_id,
image_format = image_ext,
storage_id = s_id
))
return data
async def parse_doc(file: UploadFile) -> list[ParsedBlock]:
content = await file.read()
file_stream = io.BytesIO(content)
doc = Document(file_stream)
doc_zip = zipfile.ZipFile(file_stream)
body = doc.element.body
blocks : list[ParsedBlock] = []
rels_xml = doc_zip.read("word/_rels/document.xml.rels")
rel_tree = ET.fromstring(rels_xml)
image_map: dict[str, tuple[bytes, str]] = {}
for rel in rel_tree:
rel_type = rel.get("Type", "")
rel_target = rel.get("Target", "")
rel_id = rel.get("Id", "")
if "image" in rel_type.lower():
image_path = "word/" + rel_target
image_bytes = doc_zip.read(image_path)
image_ext = Path(rel_target).suffix.lstrip(".").lower()
image_map[rel_id] = (image_bytes, image_ext)
for i, child in enumerate(body):
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
para = Paragraph(child, doc)
if(tag == "p"):
text = para_text(para)
level = get_level_score(para)
if(text != ""):
block = ParsedBlock(
BLOCK_TYPE= "text",
content = text,
elem_index= i,
page_index = 0,
heading_level=level
)
blocks.append(block)
image_blocks = get_image_from_para(para, i, image_map)
if(len(image_blocks) > 0):
blocks.extend(image_blocks)
elif(tag == "tbl"):
para = Table(child, doc)
raw_data = read_table(para)
table_data = table_to_markdown(raw_data)
block = ParsedBlock(
BLOCK_TYPE= "table",
content = table_data,
elem_index = i,
page_index = 0
)
blocks.append(block)
return blocks
if __name__ == "__main__":
doc = Document("./documents/Champion_Petfoods_CSS_SOP_Final 1 1.docx")
blocks = parse_doc(doc)
with open("output.txt", "w") as file:
for block in blocks:
# Write directly to the file in each iteration
file.write(f"type: {block.BLOCK_TYPE}\n{block.content}\n\n")
print("Done! Data saved to output.txt")
|