Spaces:
Runtime error
Runtime error
| # ocr_agent.py | |
| import os | |
| import re | |
| import json | |
| import io | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import pytesseract | |
| from datetime import datetime | |
| DATA_DIR = "data" | |
| PROCESSED_DIR = os.path.join(DATA_DIR, "processed") | |
| MEDIA_DIR = "media" | |
| os.makedirs(PROCESSED_DIR, exist_ok=True) | |
| os.makedirs(MEDIA_DIR, exist_ok=True) | |
| def sanitize_text(s): | |
| return re.sub(r'\s+', ' ', s).strip() | |
| def parse_mcq_from_text(block_text): | |
| """ | |
| Try to parse a block of text for a single question with choices. | |
| Return dict: {"text": "...", "choices": [...], "answer": None} | |
| """ | |
| # Attempt to find choice lines starting with A., A), A | |
| lines = [l.strip() for l in block_text.splitlines() if l.strip()] | |
| # find choice start indices | |
| choice_indices = [] | |
| for i, ln in enumerate(lines): | |
| if re.match(r'^[A-D][\.\)]\s+', ln, re.IGNORECASE) or re.match(r'^[A-D]\s+-\s+', ln, re.IGNORECASE): | |
| choice_indices.append(i) | |
| if not choice_indices: | |
| # no obvious choices — return whole block as question text | |
| return {"text": sanitize_text(block_text), "choices": [], "answer": None} | |
| # question text = lines up to first choice line | |
| first_choice_i = choice_indices[0] | |
| qtext = " ".join(lines[:first_choice_i]) | |
| choices = [] | |
| # collect consecutive choice lines until next question (non-choice) | |
| for idx in choice_indices: | |
| ln = lines[idx] | |
| # remove leading A. or A) or A - | |
| ln_clean = re.sub(r'^[A-D][\.\)\-]\s*', '', ln, flags=re.IGNORECASE) | |
| choices.append(ln_clean.strip()) | |
| return {"text": sanitize_text(qtext), "choices": choices, "answer": None} | |
| def extract_questions(pdf_path, subject, year, paper=2): | |
| """ | |
| Extracts questions and images from given PDF. | |
| Saves output JSON to data/processed/spm_{year}_{subject}_paper{paper}.json | |
| Returns list of question dicts. | |
| """ | |
| doc = fitz.open(pdf_path) | |
| questions = [] | |
| qid_base = int(datetime.now().timestamp()) # base for ids if missing | |
| qcounter = 0 | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text = page.get_text("text") or "" | |
| # if no text, attempt OCR over the image of whole page | |
| if not text.strip(): | |
| pix = page.get_pixmap(dpi=200) | |
| img_bytes = pix.tobytes() | |
| try: | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| ocr_text = pytesseract.image_to_string(img, lang="eng+msa") | |
| text = ocr_text | |
| except Exception: | |
| text = "" | |
| # extract embedded images (diagrams) | |
| images = page.get_images(full=True) | |
| image_paths = [] | |
| for img_index, imginfo in enumerate(images): | |
| xref = imginfo[0] | |
| base_image = doc.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| ext = base_image.get("ext", "png") | |
| img_name = f"{year}_{subject}_p{paper}_pg{page_num+1}_{img_index}.{ext}" | |
| img_path = os.path.join(MEDIA_DIR, img_name) | |
| with open(img_path, "wb") as f: | |
| f.write(image_bytes) | |
| image_paths.append(img_path) | |
| # Try splitting page text by question numbers (e.g., '1.', '2.' at start of line) | |
| # Create chunks | |
| # use regex to split on lines starting with number dot or number) | |
| splits = re.split(r'\n\s*(\d+)[\.\)]\s*', text) | |
| # splits format: ['', '1', 'text1', '2', 'text2', ...] or similar | |
| if len(splits) <= 1: | |
| # fallback: treat whole page as single block | |
| parsed = parse_mcq_from_text(text) | |
| parsed["id"] = qid_base + qcounter | |
| parsed["subject"] = subject | |
| parsed["paper"] = paper | |
| parsed["year"] = year | |
| parsed["image"] = image_paths[0] if image_paths else None | |
| parsed["source"] = os.path.basename(pdf_path) | |
| questions.append(parsed) | |
| qcounter += 1 | |
| else: | |
| # iterate pairs | |
| # splits structure: prefix, num1, block1, num2, block2, ... | |
| it = iter(splits) | |
| prefix = next(it, "") | |
| while True: | |
| try: | |
| num = next(it) | |
| block = next(it) | |
| except StopIteration: | |
| break | |
| parsed = parse_mcq_from_text(block) | |
| parsed["id"] = qid_base + qcounter | |
| parsed["subject"] = subject | |
| parsed["paper"] = paper | |
| parsed["year"] = year | |
| # attach first diagram of page (best-effort) | |
| parsed["image"] = image_paths[0] if image_paths else None | |
| parsed["source"] = os.path.basename(pdf_path) | |
| questions.append(parsed) | |
| qcounter += 1 | |
| # Save processed JSON | |
| out_fname = os.path.join(PROCESSED_DIR, f"spm_{year}_{subject.lower()}_paper{paper}.json") | |
| with open(out_fname, "w", encoding="utf-8") as f: | |
| json.dump(questions, f, ensure_ascii=False, indent=2) | |
| return questions | |
| # CLI convenience | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Extract questions & images from a PDF into JSON") | |
| parser.add_argument("--pdf", required=True, help="Path to PDF") | |
| parser.add_argument("--subject", required=True, help="Subject short name e.g. BM, English") | |
| parser.add_argument("--year", required=True, type=int, help="Year e.g. 2019") | |
| parser.add_argument("--paper", default=2, type=int, help="Paper number (1 or 2)") | |
| args = parser.parse_args() | |
| res = extract_questions(args.pdf, args.subject, args.year, args.paper) | |
| print(f"Extracted {len(res)} questions -> {os.path.join(PROCESSED_DIR, f'spm_{args.year}_{args.subject.lower()}_paper{args.paper}.json')}") | |