Buckets:
| from fastapi import UploadFile | |
| from docx import Document | |
| from docx.text.paragraph import Paragraph | |
| from docx.table import Table | |
| from enum import Enum | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| import uuid | |
| from pathlib import Path | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| import io | |
| class BlockType(str, Enum): | |
| TEXT = "text" | |
| TABLE = "table" | |
| IMAGE = "image" | |
| HEADING = "heading" | |
| class ParsedBlock: | |
| BLOCK_TYPE: BlockType | |
| content: str | |
| elem_index: int | |
| page_index: int | |
| heading_level: Optional[int] = None | |
| table_data: Optional[int] = None | |
| image_id: str = None | |
| image_format: str = None | |
| storage_id: str = None | |
| source_file: str = "" | |
| metadata: dict = field(default_factory=dict) | |
| #the logic depends on the tag encountered right now. images are inside p tags, and table are there own tag just like p | |
| #the main return value is the block here, so when we encounter a p tag we append its block to our List -> block | |
| #so for reusablity i will define reusable helper functions for each tag | |
| def para_text(para: Paragraph) -> str: | |
| text = para.text.strip() | |
| return text | |
| def read_table(table: Table) -> str: | |
| data = [] | |
| for row in table.rows: | |
| row_data = [] | |
| for cell in row.cells: | |
| row_data.append(cell.text.strip()) | |
| data.append(row_data) | |
| return data | |
| #later add a description for a table along with the table, same can be done for image | |
| def table_to_markdown(table_data: list[list[str]]) -> str: | |
| lines = [] | |
| headings = table_data[0] | |
| seperator = ["---"] * len(headings) | |
| rows = table_data[1:] | |
| heading_line = [] | |
| for heading in headings: | |
| heading_line.append("| " + str(heading) + " ") | |
| lines.append("".join(heading_line) + " |") | |
| seperator_line = [] | |
| for sep in seperator: | |
| seperator_line.append("| " + str(sep) + " ") | |
| lines.append("".join(seperator_line) + " |") | |
| for row in rows: | |
| row_line = [] | |
| for cell in row: | |
| row_line.append(" | " + str(cell) + " ") | |
| lines.append("".join(row_line) + " |") | |
| return "\n".join(lines) | |
| def get_level_score(para: Paragraph) -> int: | |
| f_size = 0 | |
| font_size = [] | |
| score = 0 | |
| for run in para.runs: | |
| font_size.append(run.font.size.pt) | |
| if(len(font_size) > 0): | |
| f_size = sum(font_size) / len(font_size) | |
| if(para.style.name == "SectionHeader"): | |
| score += 1 | |
| if(f_size > 11): | |
| score += 2 | |
| return score | |
| def get_image_from_para(para: Paragraph, i: int, image_map: dict[str, tuple[bytes, str]]) -> list[ParsedBlock]: | |
| data = [] | |
| for run in para.runs: | |
| for elem in run._r.iter(): | |
| tag = elem.tag.split("}")[-1] | |
| if(tag == "blip"): | |
| r_id = elem.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed') | |
| if r_id and r_id in image_map: | |
| image_bytes, image_ext = image_map[r_id] | |
| s_id = uuid.uuid4() | |
| data.append(ParsedBlock( | |
| BLOCK_TYPE= "image", | |
| content = f"[Image Here, IMG ID - `{r_id}`]", | |
| elem_index= i, | |
| page_index = 0, | |
| image_id = r_id, | |
| image_format = image_ext, | |
| storage_id = s_id | |
| )) | |
| return data | |
| async def parse_doc(file: UploadFile) -> list[ParsedBlock]: | |
| content = await file.read() | |
| file_stream = io.BytesIO(content) | |
| doc = Document(file_stream) | |
| doc_zip = zipfile.ZipFile(file_stream) | |
| body = doc.element.body | |
| blocks : list[ParsedBlock] = [] | |
| rels_xml = doc_zip.read("word/_rels/document.xml.rels") | |
| rel_tree = ET.fromstring(rels_xml) | |
| image_map: dict[str, tuple[bytes, str]] = {} | |
| for rel in rel_tree: | |
| rel_type = rel.get("Type", "") | |
| rel_target = rel.get("Target", "") | |
| rel_id = rel.get("Id", "") | |
| if "image" in rel_type.lower(): | |
| image_path = "word/" + rel_target | |
| image_bytes = doc_zip.read(image_path) | |
| image_ext = Path(rel_target).suffix.lstrip(".").lower() | |
| image_map[rel_id] = (image_bytes, image_ext) | |
| for i, child in enumerate(body): | |
| tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag | |
| para = Paragraph(child, doc) | |
| if(tag == "p"): | |
| text = para_text(para) | |
| level = get_level_score(para) | |
| if(text != ""): | |
| block = ParsedBlock( | |
| BLOCK_TYPE= "text", | |
| content = text, | |
| elem_index= i, | |
| page_index = 0, | |
| heading_level=level | |
| ) | |
| blocks.append(block) | |
| image_blocks = get_image_from_para(para, i, image_map) | |
| if(len(image_blocks) > 0): | |
| blocks.extend(image_blocks) | |
| elif(tag == "tbl"): | |
| para = Table(child, doc) | |
| raw_data = read_table(para) | |
| table_data = table_to_markdown(raw_data) | |
| block = ParsedBlock( | |
| BLOCK_TYPE= "table", | |
| content = table_data, | |
| elem_index = i, | |
| page_index = 0 | |
| ) | |
| blocks.append(block) | |
| return blocks | |
| if __name__ == "__main__": | |
| doc = Document("./documents/Champion_Petfoods_CSS_SOP_Final 1 1.docx") | |
| blocks = parse_doc(doc) | |
| with open("output.txt", "w") as file: | |
| for block in blocks: | |
| # Write directly to the file in each iteration | |
| file.write(f"type: {block.BLOCK_TYPE}\n{block.content}\n\n") | |
| print("Done! Data saved to output.txt") | |
Xet Storage Details
- Size:
- 5.85 kB
- Xet hash:
- d39bbe42f620f8e97280b8bde6a0f9a0449d7e0ca10341fb30a3218a1f027b2a
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.