exam-simulator / ocr_agent.py
Benny-Tang's picture
Update ocr_agent.py
467a83a verified
# ocr_agent.py
import os
import re
import json
import io
import fitz # PyMuPDF
from PIL import Image
import pytesseract
from datetime import datetime
DATA_DIR = "data"
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
MEDIA_DIR = "media"
os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(MEDIA_DIR, exist_ok=True)
def sanitize_text(s):
return re.sub(r'\s+', ' ', s).strip()
def parse_mcq_from_text(block_text):
"""
Try to parse a block of text for a single question with choices.
Return dict: {"text": "...", "choices": [...], "answer": None}
"""
# Attempt to find choice lines starting with A., A), A
lines = [l.strip() for l in block_text.splitlines() if l.strip()]
# find choice start indices
choice_indices = []
for i, ln in enumerate(lines):
if re.match(r'^[A-D][\.\)]\s+', ln, re.IGNORECASE) or re.match(r'^[A-D]\s+-\s+', ln, re.IGNORECASE):
choice_indices.append(i)
if not choice_indices:
# no obvious choices — return whole block as question text
return {"text": sanitize_text(block_text), "choices": [], "answer": None}
# question text = lines up to first choice line
first_choice_i = choice_indices[0]
qtext = " ".join(lines[:first_choice_i])
choices = []
# collect consecutive choice lines until next question (non-choice)
for idx in choice_indices:
ln = lines[idx]
# remove leading A. or A) or A -
ln_clean = re.sub(r'^[A-D][\.\)\-]\s*', '', ln, flags=re.IGNORECASE)
choices.append(ln_clean.strip())
return {"text": sanitize_text(qtext), "choices": choices, "answer": None}
def extract_questions(pdf_path, subject, year, paper=2):
"""
Extracts questions and images from given PDF.
Saves output JSON to data/processed/spm_{year}_{subject}_paper{paper}.json
Returns list of question dicts.
"""
doc = fitz.open(pdf_path)
questions = []
qid_base = int(datetime.now().timestamp()) # base for ids if missing
qcounter = 0
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text("text") or ""
# if no text, attempt OCR over the image of whole page
if not text.strip():
pix = page.get_pixmap(dpi=200)
img_bytes = pix.tobytes()
try:
img = Image.open(io.BytesIO(img_bytes))
ocr_text = pytesseract.image_to_string(img, lang="eng+msa")
text = ocr_text
except Exception:
text = ""
# extract embedded images (diagrams)
images = page.get_images(full=True)
image_paths = []
for img_index, imginfo in enumerate(images):
xref = imginfo[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
ext = base_image.get("ext", "png")
img_name = f"{year}_{subject}_p{paper}_pg{page_num+1}_{img_index}.{ext}"
img_path = os.path.join(MEDIA_DIR, img_name)
with open(img_path, "wb") as f:
f.write(image_bytes)
image_paths.append(img_path)
# Try splitting page text by question numbers (e.g., '1.', '2.' at start of line)
# Create chunks
# use regex to split on lines starting with number dot or number)
splits = re.split(r'\n\s*(\d+)[\.\)]\s*', text)
# splits format: ['', '1', 'text1', '2', 'text2', ...] or similar
if len(splits) <= 1:
# fallback: treat whole page as single block
parsed = parse_mcq_from_text(text)
parsed["id"] = qid_base + qcounter
parsed["subject"] = subject
parsed["paper"] = paper
parsed["year"] = year
parsed["image"] = image_paths[0] if image_paths else None
parsed["source"] = os.path.basename(pdf_path)
questions.append(parsed)
qcounter += 1
else:
# iterate pairs
# splits structure: prefix, num1, block1, num2, block2, ...
it = iter(splits)
prefix = next(it, "")
while True:
try:
num = next(it)
block = next(it)
except StopIteration:
break
parsed = parse_mcq_from_text(block)
parsed["id"] = qid_base + qcounter
parsed["subject"] = subject
parsed["paper"] = paper
parsed["year"] = year
# attach first diagram of page (best-effort)
parsed["image"] = image_paths[0] if image_paths else None
parsed["source"] = os.path.basename(pdf_path)
questions.append(parsed)
qcounter += 1
# Save processed JSON
out_fname = os.path.join(PROCESSED_DIR, f"spm_{year}_{subject.lower()}_paper{paper}.json")
with open(out_fname, "w", encoding="utf-8") as f:
json.dump(questions, f, ensure_ascii=False, indent=2)
return questions
# CLI convenience
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Extract questions & images from a PDF into JSON")
parser.add_argument("--pdf", required=True, help="Path to PDF")
parser.add_argument("--subject", required=True, help="Subject short name e.g. BM, English")
parser.add_argument("--year", required=True, type=int, help="Year e.g. 2019")
parser.add_argument("--paper", default=2, type=int, help="Paper number (1 or 2)")
args = parser.parse_args()
res = extract_questions(args.pdf, args.subject, args.year, args.paper)
print(f"Extracted {len(res)} questions -> {os.path.join(PROCESSED_DIR, f'spm_{args.year}_{args.subject.lower()}_paper{args.paper}.json')}")