from pathlib import Path import json import re import time from typing import List import fitz import pytesseract from PIL import Image from um_handbook_config import ( GENERAL_PDF, COMPLETE_PDF, GENERAL_BLOCKS, COMPLETE_BLOCKS, DATA_ROOT, MARKDOWN_DIR, REPORTS_DIR, ) PROJECT_DIR = Path(__file__).resolve().parent DATA_ROOT.mkdir(exist_ok=True) MARKDOWN_DIR.mkdir(exist_ok=True) REPORTS_DIR.mkdir(exist_ok=True) GENERAL_MD = MARKDOWN_DIR / "general_handbook_structured.md" COMPLETE_MD = MARKDOWN_DIR / "complete_handbook_structured.md" REPORT_PATH = REPORTS_DIR / "um_handbook_markdown_report.json" BAD_PAGE_PATTERNS = [ r"\bmore info\b", r"fsktm[_\.]?um", r"POSTGRADUATE\s+PROGRAMME\s+HANDBOOK", r"UNDERGRADUATE\s+PROGRAMME\s+HANDBOOK", r"^C\s*O\s*N\s*T\s*E\s*N\s*T\s*S$", ] def normalize_whitespace(text: str) -> str: text = text.replace("\u00a0", " ").replace("\xad", "") text = text.replace("fi", "fi").replace("fl", "fl") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def clean_page_text(text: str) -> str: lines = [] for raw in text.splitlines(): line = raw.strip() if not line: continue if re.fullmatch(r"\d+", line): continue if len(line) == 1 and not line.isalnum(): continue lines.append(line) text = "\n".join(lines) text = re.sub(r"(?m)^\s*[•▪●]\s*", "- ", text) text = re.sub(r"(?m)^\s*([a-z])\)\s*", r"- ", text) return normalize_whitespace(text) def ocr_page(page: fitz.Page, zoom: float = 1.5) -> str: pix = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) return pytesseract.image_to_string(img) def looks_like_noise_page(text: str) -> bool: t = normalize_whitespace(text) if not t: return True compact = t.replace("\n", " ") alpha_words = re.findall(r"[A-Za-z][A-Za-z&'/-]+", compact) # real content pages normally have more than a handful of alphabetic words if len(alpha_words) < 8: return True # low-information cover / contents / banner pages for pattern in BAD_PAGE_PATTERNS: if re.search(pattern, compact, flags=re.IGNORECASE): if len(alpha_words) < 60: return True # very short all-caps dividers if len(compact) < 120 and compact.upper() == compact and len(alpha_words) < 15: return True return False def extract_page_text(doc: fitz.Document, page_number_1_based: int) -> tuple[str, str]: page = doc[page_number_1_based - 1] native = clean_page_text(page.get_text("text")) source = "native" need_ocr = len(native) < 120 or looks_like_noise_page(native) if need_ocr: ocr_text = clean_page_text(ocr_page(page)) if len(ocr_text) > len(native): native = ocr_text source = "ocr" if looks_like_noise_page(native): return "", f"{source}_filtered" return native, source def progress_bar(current: int, total: int, width: int = 28) -> str: if total <= 0: return "[no-progress]" filled = int(width * current / total) bar = "#" * filled + "-" * (width - filled) pct = (current / total) * 100 return f"[{bar}] {current}/{total} ({pct:5.1f}%)" def block_to_markdown(doc: fitz.Document, block: dict, block_index: int, total_blocks: int, pdf_label: str) -> tuple[str, dict]: start, end = block["pages"] if block.get("manual_text"): print() print("=" * 90) print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end} | MANUAL OVERRIDE") print("=" * 90) body = normalize_whitespace(block["manual_text"]) header = ( f"## {block['section']} :: {block['subsection']}\n\n" f"- scope_label: {block['scope_label']}\n" f"- source_doc: {block['source_doc']}\n" f"- pages: {start}-{end}\n" ) meta = { "section": block["section"], "subsection": block["subsection"], "scope_label": block["scope_label"], "source_doc": block["source_doc"], "pages": [start, end], "page_stats": [{"page": f"{start}-{end}", "source": "manual_visual_override", "chars": len(body), "seconds": 0.0}], "total_chars": len(body), "seconds": 0.0, } print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | MANUAL OVERRIDE") return (header + ("\n" + body + "\n" if body else "\n"), meta) pieces: List[str] = [] page_stats = [] block_start_time = time.time() total_pages = end - start + 1 print() print("=" * 90) print(f"[BLOCK {block_index}/{total_blocks}] {pdf_label} | {block['section']} :: {block['subsection']} | pages {start}-{end}") print("=" * 90) for i, p in enumerate(range(start, end + 1), start=1): page_start_time = time.time() print(f" {progress_bar(i, total_pages)} -> extracting page {p} ... ", end="", flush=True) text, source = extract_page_text(doc, p) elapsed = time.time() - page_start_time if text: pieces.append(f"### Page {p}\n{text}") page_stats.append( { "page": p, "source": source, "chars": len(text), "seconds": round(elapsed, 2), } ) print(f"{source.upper():12s} | chars={len(text):5d} | {elapsed:6.2f}s", flush=True) block_elapsed = time.time() - block_start_time body = "\n\n".join(pieces).strip() header = ( f"## {block['section']} :: {block['subsection']}\n\n" f"- scope_label: {block['scope_label']}\n" f"- source_doc: {block['source_doc']}\n" f"- pages: {start}-{end}\n" ) meta = { "section": block["section"], "subsection": block["subsection"], "scope_label": block["scope_label"], "source_doc": block["source_doc"], "pages": [start, end], "page_stats": page_stats, "total_chars": len(body), "seconds": round(block_elapsed, 2), } print(f"[DONE BLOCK] {block['section']} :: {block['subsection']} | {block_elapsed:.2f}s") return (header + ("\n" + body + "\n" if body else "\n"), meta) def write_markdown(pdf_path: Path, blocks: list[dict], out_path: Path, title: str) -> list[dict]: print() print("#" * 100) print(f"[START] Building markdown for: {title}") print(f"[PDF] {pdf_path}") print(f"[OUT] {out_path}") print(f"[BLOCKS] {len(blocks)}") print("#" * 100) sections = [f"# {title}", ""] report_rows = [] start_time = time.time() with fitz.open(pdf_path) as doc: for idx, block in enumerate(blocks, start=1): md, meta = block_to_markdown( doc=doc, block=block, block_index=idx, total_blocks=len(blocks), pdf_label=pdf_path.name, ) sections.append(md) report_rows.append(meta) out_path.write_text("\n\n".join(sections).strip() + "\n", encoding="utf-8") elapsed = time.time() - start_time print(f"[DONE FILE] {title} -> {out_path} | {elapsed:.2f}s") return report_rows def main() -> None: total_start = time.time() print("[INFO] Markdown preprocess started") print(f"[INFO] General PDF : {GENERAL_PDF}") print(f"[INFO] Complete PDF: {COMPLETE_PDF}") print(f"[INFO] General MD : {GENERAL_MD}") print(f"[INFO] Complete MD : {COMPLETE_MD}") print(f"[INFO] Report Path : {REPORT_PATH}") general_report = write_markdown( GENERAL_PDF, GENERAL_BLOCKS, GENERAL_MD, "General Handbook (Structured Markdown)", ) complete_report = write_markdown( COMPLETE_PDF, COMPLETE_BLOCKS, COMPLETE_MD, "Complete Handbook (Structured Markdown)", ) report = { "generated_files": { "general_markdown": str(GENERAL_MD), "complete_markdown": str(COMPLETE_MD), }, "general_block_count": len(general_report), "complete_block_count": len(complete_report), "general_blocks": general_report, "complete_blocks": complete_report, "total_seconds": round(time.time() - total_start, 2), } REPORT_PATH.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print() print("=" * 100) print(f"Wrote: {GENERAL_MD}") print(f"Wrote: {COMPLETE_MD}") print(f"Wrote: {REPORT_PATH}") print(f"[ALL DONE] Total time: {time.time() - total_start:.2f}s") print("=" * 100) if __name__ == "__main__": main()