TensorTalk / UM_Handbook /UM_Handbook_Markdown_Preprocess.py
TensorCat's picture
Upload 30 files
052d67e verified
raw
history blame
8.94 kB
from pathlib import Path
import json
import re
import time
from typing import List
import fitz
import pytesseract
from PIL import Image
from um_handbook_config import (
GENERAL_PDF,
COMPLETE_PDF,
GENERAL_BLOCKS,
COMPLETE_BLOCKS,
DATA_ROOT,
MARKDOWN_DIR,
REPORTS_DIR,
)
PROJECT_DIR = Path(__file__).resolve().parent
DATA_ROOT.mkdir(exist_ok=True)
MARKDOWN_DIR.mkdir(exist_ok=True)
REPORTS_DIR.mkdir(exist_ok=True)
GENERAL_MD = MARKDOWN_DIR / "general_handbook_structured.md"
COMPLETE_MD = MARKDOWN_DIR / "complete_handbook_structured.md"
REPORT_PATH = REPORTS_DIR / "um_handbook_markdown_report.json"
BAD_PAGE_PATTERNS = [
r"\bmore info\b",
r"fsktm[_\.]?um",
r"POSTGRADUATE\s+PROGRAMME\s+HANDBOOK",
r"UNDERGRADUATE\s+PROGRAMME\s+HANDBOOK",
r"^C\s*O\s*N\s*T\s*E\s*N\s*T\s*S$",
]
def normalize_whitespace(text: str) -> str:
text = text.replace("\u00a0", " ").replace("\xad", "")
text = text.replace("fi", "fi").replace("fl", "fl")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def clean_page_text(text: str) -> str:
lines = []
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
if re.fullmatch(r"\d+", line):
continue
if len(line) == 1 and not line.isalnum():
continue
lines.append(line)
text = "\n".join(lines)
text = re.sub(r"(?m)^\s*[•▪●]\s*", "- ", text)
text = re.sub(r"(?m)^\s*([a-z])\)\s*", r"- ", text)
return normalize_whitespace(text)
def ocr_page(page: fitz.Page, zoom: float = 1.5) -> str:
pix = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
return pytesseract.image_to_string(img)
def looks_like_noise_page(text: str) -> bool:
t = normalize_whitespace(text)
if not t:
return True
compact = t.replace("\n", " ")
alpha_words = re.findall(r"[A-Za-z][A-Za-z&'/-]+", compact)
# real content pages normally have more than a handful of alphabetic words
if len(alpha_words) < 8:
return True
# low-information cover / contents / banner pages
for pattern in BAD_PAGE_PATTERNS:
if re.search(pattern, compact, flags=re.IGNORECASE):
if len(alpha_words) < 60:
return True
# very short all-caps dividers
if len(compact) < 120 and compact.upper() == compact and len(alpha_words) < 15:
return True
return False
def extract_page_text(doc: fitz.Document, page_number_1_based: int) -> tuple[str, str]:
page = doc[page_number_1_based - 1]
native = clean_page_text(page.get_text("text"))
source = "native"
need_ocr = len(native) < 120 or looks_like_noise_page(native)
if need_ocr:
ocr_text = clean_page_text(ocr_page(page))
if len(ocr_text) > len(native):
native = ocr_text
source = "ocr"
if looks_like_noise_page(native):
return "", f"{source}_filtered"
return native, source
def progress_bar(current: int, total: int, width: int = 28) -> str:
if total <= 0:
return "[no-progress]"
filled = int(width * current / total)
bar = "#" * filled + "-" * (width - filled)
pct = (current / total) * 100
return f"[{bar}] {current}/{total} ({pct:5.1f}%)"
def block_to_markdown(doc: fitz.Document, block: dict, block_index: int, total_blocks: int, pdf_label: str) -> tuple[str, dict]:
start, end = block["pages"]
if block.get("manual_text"):
print()
print("=" * 90)
print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end} | MANUAL OVERRIDE")
print("=" * 90)
body = normalize_whitespace(block["manual_text"])
header = (
f"## {block['section']} :: {block['subsection']}\n\n"
f"- scope_label: {block['scope_label']}\n"
f"- source_doc: {block['source_doc']}\n"
f"- pages: {start}-{end}\n"
)
meta = {
"section": block["section"],
"subsection": block["subsection"],
"scope_label": block["scope_label"],
"source_doc": block["source_doc"],
"pages": [start, end],
"page_stats": [{"page": f"{start}-{end}", "source": "manual_visual_override", "chars": len(body), "seconds": 0.0}],
"total_chars": len(body),
"seconds": 0.0,
}
print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | MANUAL OVERRIDE")
return (header + ("\n" + body + "\n" if body else "\n"), meta)
pieces: List[str] = []
page_stats = []
block_start_time = time.time()
total_pages = end - start + 1
print()
print("=" * 90)
print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end}")
print("=" * 90)
for i, p in enumerate(range(start, end + 1), start=1):
page_start_time = time.time()
print(f" {progress_bar(i, total_pages)} -> extracting page {p} ... ", end="", flush=True)
text, source = extract_page_text(doc, p)
elapsed = time.time() - page_start_time
if text:
pieces.append(f"### Page {p}\n{text}")
page_stats.append(
{
"page": p,
"source": source,
"chars": len(text),
"seconds": round(elapsed, 2),
}
)
print(f"{source.upper():12s} | chars={len(text):5d} | {elapsed:6.2f}s", flush=True)
block_elapsed = time.time() - block_start_time
body = "\n\n".join(pieces).strip()
header = (
f"## {block['section']} :: {block['subsection']}\n\n"
f"- scope_label: {block['scope_label']}\n"
f"- source_doc: {block['source_doc']}\n"
f"- pages: {start}-{end}\n"
)
meta = {
"section": block["section"],
"subsection": block["subsection"],
"scope_label": block["scope_label"],
"source_doc": block["source_doc"],
"pages": [start, end],
"page_stats": page_stats,
"total_chars": len(body),
"seconds": round(block_elapsed, 2),
}
print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | {block_elapsed:.2f}s")
return (header + ("\n" + body + "\n" if body else "\n"), meta)
def write_markdown(pdf_path: Path, blocks: list[dict], out_path: Path, title: str) -> list[dict]:
print()
print("#" * 100)
print(f"[START] Building markdown for: {title}")
print(f"[PDF] {pdf_path}")
print(f"[OUT] {out_path}")
print(f"[BLOCKS] {len(blocks)}")
print("#" * 100)
sections = [f"# {title}", ""]
report_rows = []
start_time = time.time()
with fitz.open(pdf_path) as doc:
for idx, block in enumerate(blocks, start=1):
md, meta = block_to_markdown(
doc=doc,
block=block,
block_index=idx,
total_blocks=len(blocks),
pdf_label=pdf_path.name,
)
sections.append(md)
report_rows.append(meta)
out_path.write_text("\n\n".join(sections).strip() + "\n", encoding="utf-8")
elapsed = time.time() - start_time
print(f"[DONE FILE] {title} -> {out_path} | {elapsed:.2f}s")
return report_rows
def main() -> None:
total_start = time.time()
print("[INFO] Markdown preprocess started")
print(f"[INFO] General PDF : {GENERAL_PDF}")
print(f"[INFO] Complete PDF: {COMPLETE_PDF}")
print(f"[INFO] General MD : {GENERAL_MD}")
print(f"[INFO] Complete MD : {COMPLETE_MD}")
print(f"[INFO] Report Path : {REPORT_PATH}")
general_report = write_markdown(
GENERAL_PDF,
GENERAL_BLOCKS,
GENERAL_MD,
"General Handbook (Structured Markdown)",
)
complete_report = write_markdown(
COMPLETE_PDF,
COMPLETE_BLOCKS,
COMPLETE_MD,
"Complete Handbook (Structured Markdown)",
)
report = {
"generated_files": {
"general_markdown": str(GENERAL_MD),
"complete_markdown": str(COMPLETE_MD),
},
"general_block_count": len(general_report),
"complete_block_count": len(complete_report),
"general_blocks": general_report,
"complete_blocks": complete_report,
"total_seconds": round(time.time() - total_start, 2),
}
REPORT_PATH.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print()
print("=" * 100)
print(f"Wrote: {GENERAL_MD}")
print(f"Wrote: {COMPLETE_MD}")
print(f"Wrote: {REPORT_PATH}")
print(f"[ALL DONE] Total time: {time.time() - total_start:.2f}s")
print("=" * 100)
if __name__ == "__main__":
main()