TensorTalk / UM_Handbook /UM_Handbook_Markdown_Preprocess.py
TensorCat's picture
Upload 30 files
052d67e verified
from pathlib import Path
import json
import re
import time
from typing import List
import fitz
import pytesseract
from PIL import Image
from um_handbook_config import (
GENERAL_PDF,
COMPLETE_PDF,
GENERAL_BLOCKS,
COMPLETE_BLOCKS,
DATA_ROOT,
MARKDOWN_DIR,
REPORTS_DIR,
)
PROJECT_DIR = Path(__file__).resolve().parent
DATA_ROOT.mkdir(exist_ok=True)
MARKDOWN_DIR.mkdir(exist_ok=True)
REPORTS_DIR.mkdir(exist_ok=True)
GENERAL_MD = MARKDOWN_DIR / "general_handbook_structured.md"
COMPLETE_MD = MARKDOWN_DIR / "complete_handbook_structured.md"
REPORT_PATH = REPORTS_DIR / "um_handbook_markdown_report.json"
BAD_PAGE_PATTERNS = [
r"\bmore info\b",
r"fsktm[_\.]?um",
r"POSTGRADUATE\s+PROGRAMME\s+HANDBOOK",
r"UNDERGRADUATE\s+PROGRAMME\s+HANDBOOK",
r"^C\s*O\s*N\s*T\s*E\s*N\s*T\s*S$",
]
def normalize_whitespace(text: str) -> str:
text = text.replace("\u00a0", " ").replace("\xad", "")
text = text.replace("fi", "fi").replace("fl", "fl")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def clean_page_text(text: str) -> str:
lines = []
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
if re.fullmatch(r"\d+", line):
continue
if len(line) == 1 and not line.isalnum():
continue
lines.append(line)
text = "\n".join(lines)
text = re.sub(r"(?m)^\s*[•▪●]\s*", "- ", text)
text = re.sub(r"(?m)^\s*([a-z])\)\s*", r"- ", text)
return normalize_whitespace(text)
def ocr_page(page: fitz.Page, zoom: float = 1.5) -> str:
pix = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
return pytesseract.image_to_string(img)
def looks_like_noise_page(text: str) -> bool:
t = normalize_whitespace(text)
if not t:
return True
compact = t.replace("\n", " ")
alpha_words = re.findall(r"[A-Za-z][A-Za-z&'/-]+", compact)
# real content pages normally have more than a handful of alphabetic words
if len(alpha_words) < 8:
return True
# low-information cover / contents / banner pages
for pattern in BAD_PAGE_PATTERNS:
if re.search(pattern, compact, flags=re.IGNORECASE):
if len(alpha_words) < 60:
return True
# very short all-caps dividers
if len(compact) < 120 and compact.upper() == compact and len(alpha_words) < 15:
return True
return False
def extract_page_text(doc: fitz.Document, page_number_1_based: int) -> tuple[str, str]:
page = doc[page_number_1_based - 1]
native = clean_page_text(page.get_text("text"))
source = "native"
need_ocr = len(native) < 120 or looks_like_noise_page(native)
if need_ocr:
ocr_text = clean_page_text(ocr_page(page))
if len(ocr_text) > len(native):
native = ocr_text
source = "ocr"
if looks_like_noise_page(native):
return "", f"{source}_filtered"
return native, source
def progress_bar(current: int, total: int, width: int = 28) -> str:
if total <= 0:
return "[no-progress]"
filled = int(width * current / total)
bar = "#" * filled + "-" * (width - filled)
pct = (current / total) * 100
return f"[{bar}] {current}/{total} ({pct:5.1f}%)"
def block_to_markdown(doc: fitz.Document, block: dict, block_index: int, total_blocks: int, pdf_label: str) -> tuple[str, dict]:
start, end = block["pages"]
if block.get("manual_text"):
print()
print("=" * 90)
print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end} | MANUAL OVERRIDE")
print("=" * 90)
body = normalize_whitespace(block["manual_text"])
header = (
f"## {block['section']} :: {block['subsection']}\n\n"
f"- scope_label: {block['scope_label']}\n"
f"- source_doc: {block['source_doc']}\n"
f"- pages: {start}-{end}\n"
)
meta = {
"section": block["section"],
"subsection": block["subsection"],
"scope_label": block["scope_label"],
"source_doc": block["source_doc"],
"pages": [start, end],
"page_stats": [{"page": f"{start}-{end}", "source": "manual_visual_override", "chars": len(body), "seconds": 0.0}],
"total_chars": len(body),
"seconds": 0.0,
}
print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | MANUAL OVERRIDE")
return (header + ("\n" + body + "\n" if body else "\n"), meta)
pieces: List[str] = []
page_stats = []
block_start_time = time.time()
total_pages = end - start + 1
print()
print("=" * 90)
print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end}")
print("=" * 90)
for i, p in enumerate(range(start, end + 1), start=1):
page_start_time = time.time()
print(f" {progress_bar(i, total_pages)} -> extracting page {p} ... ", end="", flush=True)
text, source = extract_page_text(doc, p)
elapsed = time.time() - page_start_time
if text:
pieces.append(f"### Page {p}\n{text}")
page_stats.append(
{
"page": p,
"source": source,
"chars": len(text),
"seconds": round(elapsed, 2),
}
)
print(f"{source.upper():12s} | chars={len(text):5d} | {elapsed:6.2f}s", flush=True)
block_elapsed = time.time() - block_start_time
body = "\n\n".join(pieces).strip()
header = (
f"## {block['section']} :: {block['subsection']}\n\n"
f"- scope_label: {block['scope_label']}\n"
f"- source_doc: {block['source_doc']}\n"
f"- pages: {start}-{end}\n"
)
meta = {
"section": block["section"],
"subsection": block["subsection"],
"scope_label": block["scope_label"],
"source_doc": block["source_doc"],
"pages": [start, end],
"page_stats": page_stats,
"total_chars": len(body),
"seconds": round(block_elapsed, 2),
}
print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | {block_elapsed:.2f}s")
return (header + ("\n" + body + "\n" if body else "\n"), meta)
def write_markdown(pdf_path: Path, blocks: list[dict], out_path: Path, title: str) -> list[dict]:
print()
print("#" * 100)
print(f"[START] Building markdown for: {title}")
print(f"[PDF] {pdf_path}")
print(f"[OUT] {out_path}")
print(f"[BLOCKS] {len(blocks)}")
print("#" * 100)
sections = [f"# {title}", ""]
report_rows = []
start_time = time.time()
with fitz.open(pdf_path) as doc:
for idx, block in enumerate(blocks, start=1):
md, meta = block_to_markdown(
doc=doc,
block=block,
block_index=idx,
total_blocks=len(blocks),
pdf_label=pdf_path.name,
)
sections.append(md)
report_rows.append(meta)
out_path.write_text("\n\n".join(sections).strip() + "\n", encoding="utf-8")
elapsed = time.time() - start_time
print(f"[DONE FILE] {title} -> {out_path} | {elapsed:.2f}s")
return report_rows
def main() -> None:
total_start = time.time()
print("[INFO] Markdown preprocess started")
print(f"[INFO] General PDF : {GENERAL_PDF}")
print(f"[INFO] Complete PDF: {COMPLETE_PDF}")
print(f"[INFO] General MD : {GENERAL_MD}")
print(f"[INFO] Complete MD : {COMPLETE_MD}")
print(f"[INFO] Report Path : {REPORT_PATH}")
general_report = write_markdown(
GENERAL_PDF,
GENERAL_BLOCKS,
GENERAL_MD,
"General Handbook (Structured Markdown)",
)
complete_report = write_markdown(
COMPLETE_PDF,
COMPLETE_BLOCKS,
COMPLETE_MD,
"Complete Handbook (Structured Markdown)",
)
report = {
"generated_files": {
"general_markdown": str(GENERAL_MD),
"complete_markdown": str(COMPLETE_MD),
},
"general_block_count": len(general_report),
"complete_block_count": len(complete_report),
"general_blocks": general_report,
"complete_blocks": complete_report,
"total_seconds": round(time.time() - total_start, 2),
}
REPORT_PATH.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print()
print("=" * 100)
print(f"Wrote: {GENERAL_MD}")
print(f"Wrote: {COMPLETE_MD}")
print(f"Wrote: {REPORT_PATH}")
print(f"[ALL DONE] Total time: {time.time() - total_start:.2f}s")
print("=" * 100)
if __name__ == "__main__":
main()