Spaces:
Runtime error
Runtime error
File size: 5,824 Bytes
467a83a 978258e e14bed5 467a83a 978258e 467a83a 978258e 467a83a 978258e 467a83a b21249a e14bed5 b21249a dd23511 b21249a 8780512 855ae47 915d954 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | # ocr_agent.py
import os
import re
import json
import io
import fitz # PyMuPDF
from PIL import Image
import pytesseract
from datetime import datetime
DATA_DIR = "data"
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
MEDIA_DIR = "media"
os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(MEDIA_DIR, exist_ok=True)
def sanitize_text(s):
return re.sub(r'\s+', ' ', s).strip()
def parse_mcq_from_text(block_text):
"""
Try to parse a block of text for a single question with choices.
Return dict: {"text": "...", "choices": [...], "answer": None}
"""
# Attempt to find choice lines starting with A., A), A
lines = [l.strip() for l in block_text.splitlines() if l.strip()]
# find choice start indices
choice_indices = []
for i, ln in enumerate(lines):
if re.match(r'^[A-D][\.\)]\s+', ln, re.IGNORECASE) or re.match(r'^[A-D]\s+-\s+', ln, re.IGNORECASE):
choice_indices.append(i)
if not choice_indices:
# no obvious choices — return whole block as question text
return {"text": sanitize_text(block_text), "choices": [], "answer": None}
# question text = lines up to first choice line
first_choice_i = choice_indices[0]
qtext = " ".join(lines[:first_choice_i])
choices = []
# collect consecutive choice lines until next question (non-choice)
for idx in choice_indices:
ln = lines[idx]
# remove leading A. or A) or A -
ln_clean = re.sub(r'^[A-D][\.\)\-]\s*', '', ln, flags=re.IGNORECASE)
choices.append(ln_clean.strip())
return {"text": sanitize_text(qtext), "choices": choices, "answer": None}
def extract_questions(pdf_path, subject, year, paper=2):
"""
Extracts questions and images from given PDF.
Saves output JSON to data/processed/spm_{year}_{subject}_paper{paper}.json
Returns list of question dicts.
"""
doc = fitz.open(pdf_path)
questions = []
qid_base = int(datetime.now().timestamp()) # base for ids if missing
qcounter = 0
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text("text") or ""
# if no text, attempt OCR over the image of whole page
if not text.strip():
pix = page.get_pixmap(dpi=200)
img_bytes = pix.tobytes()
try:
img = Image.open(io.BytesIO(img_bytes))
ocr_text = pytesseract.image_to_string(img, lang="eng+msa")
text = ocr_text
except Exception:
text = ""
# extract embedded images (diagrams)
images = page.get_images(full=True)
image_paths = []
for img_index, imginfo in enumerate(images):
xref = imginfo[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
ext = base_image.get("ext", "png")
img_name = f"{year}_{subject}_p{paper}_pg{page_num+1}_{img_index}.{ext}"
img_path = os.path.join(MEDIA_DIR, img_name)
with open(img_path, "wb") as f:
f.write(image_bytes)
image_paths.append(img_path)
# Try splitting page text by question numbers (e.g., '1.', '2.' at start of line)
# Create chunks
# use regex to split on lines starting with number dot or number)
splits = re.split(r'\n\s*(\d+)[\.\)]\s*', text)
# splits format: ['', '1', 'text1', '2', 'text2', ...] or similar
if len(splits) <= 1:
# fallback: treat whole page as single block
parsed = parse_mcq_from_text(text)
parsed["id"] = qid_base + qcounter
parsed["subject"] = subject
parsed["paper"] = paper
parsed["year"] = year
parsed["image"] = image_paths[0] if image_paths else None
parsed["source"] = os.path.basename(pdf_path)
questions.append(parsed)
qcounter += 1
else:
# iterate pairs
# splits structure: prefix, num1, block1, num2, block2, ...
it = iter(splits)
prefix = next(it, "")
while True:
try:
num = next(it)
block = next(it)
except StopIteration:
break
parsed = parse_mcq_from_text(block)
parsed["id"] = qid_base + qcounter
parsed["subject"] = subject
parsed["paper"] = paper
parsed["year"] = year
# attach first diagram of page (best-effort)
parsed["image"] = image_paths[0] if image_paths else None
parsed["source"] = os.path.basename(pdf_path)
questions.append(parsed)
qcounter += 1
# Save processed JSON
out_fname = os.path.join(PROCESSED_DIR, f"spm_{year}_{subject.lower()}_paper{paper}.json")
with open(out_fname, "w", encoding="utf-8") as f:
json.dump(questions, f, ensure_ascii=False, indent=2)
return questions
# CLI convenience
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Extract questions & images from a PDF into JSON")
parser.add_argument("--pdf", required=True, help="Path to PDF")
parser.add_argument("--subject", required=True, help="Subject short name e.g. BM, English")
parser.add_argument("--year", required=True, type=int, help="Year e.g. 2019")
parser.add_argument("--paper", default=2, type=int, help="Paper number (1 or 2)")
args = parser.parse_args()
res = extract_questions(args.pdf, args.subject, args.year, args.paper)
print(f"Extracted {len(res)} questions -> {os.path.join(PROCESSED_DIR, f'spm_{args.year}_{args.subject.lower()}_paper{args.paper}.json')}")
|