Benny-Tang commited on
Commit
bf49bb2
·
verified ·
1 Parent(s): dc8d23a

Update ocr_agent.py

Browse files
Files changed (1) hide show
  1. ocr_agent.py +69 -277
ocr_agent.py CHANGED
@@ -1,285 +1,77 @@
1
- """
2
- ocr_agent.py
3
- Handles converting scanned/digital SPM PDF papers into structured JSON question lists.
4
-
5
- Capabilities:
6
- - Try pdfplumber (best for digital PDFs with selectable text)
7
- - Fallback to pytesseract + pdf2image for scanned PDFs
8
- - Parse raw extracted text into question objects (MCQ-centric)
9
- - Optional "naturalize" pass using GLM-4.5 to rewrite/clean question text
10
-
11
- Outputs a list of question dicts suitable for merge_questions.py:
12
- [
13
- {
14
- "question_type": "mcq",
15
- "text": "...",
16
- "choices": ["A", "B", "C", "D"],
17
- "topics": [],
18
- "difficulty": 3
19
- },
20
- ...
21
- ]
22
- """
23
-
24
  import os
25
- import re
26
  import json
27
- import logging
28
- from typing import List, Dict
29
-
30
- # try import optional heavy deps; functions will check existence
31
- try:
32
- import pdfplumber
33
- except Exception:
34
- pdfplumber = None
35
-
36
- try:
37
- from pdf2image import convert_from_path
38
- from PIL import Image
39
- import pytesseract
40
- except Exception:
41
- convert_from_path = None
42
- Image = None
43
- pytesseract = None
44
-
45
- # optional GLM cleaning
46
- import requests
47
- GLM_API_URL = "https://api.your-glm-provider.com/v1/chat/completions"
48
- GLM_API_KEY = os.getenv("ZHIPUAI_API_KEY")
49
-
50
- logger = logging.getLogger(__name__)
51
- logging.basicConfig(level=logging.INFO)
52
-
53
-
54
- def extract_text_pdfplumber(pdf_path: str) -> str:
55
- """Extract text using pdfplumber (works well for digital PDFs)."""
56
- if pdfplumber is None:
57
- raise RuntimeError("pdfplumber is not installed")
58
- texts = []
59
- with pdfplumber.open(pdf_path) as pdf:
60
- for page in pdf.pages:
61
- t = page.extract_text()
62
- if t:
63
- texts.append(t)
64
- return "\n\n".join(texts)
65
-
66
-
67
- def extract_text_tesseract(pdf_path: str, dpi: int = 200, fmt="jpeg") -> str:
68
- """Fallback OCR using pytesseract via pdf2image (for scanned PDFs)."""
69
- if convert_from_path is None or pytesseract is None:
70
- raise RuntimeError("pdf2image/pytesseract not available")
71
- texts = []
72
- # convert each page to image
73
- images = convert_from_path(pdf_path, dpi=dpi)
74
- for img in images:
75
- text = pytesseract.image_to_string(img, lang='eng+msa') # english + malay if Tesseract lang installed
76
- texts.append(text)
77
- return "\n\n".join(texts)
78
-
79
-
80
- def try_extract_text(pdf_path: str) -> str:
81
- """Try pdfplumber first, fallback to tesseract. Returns raw extracted text."""
82
- logger.info("Attempting pdfplumber extraction...")
83
- if pdfplumber:
84
- try:
85
- text = extract_text_pdfplumber(pdf_path)
86
- # heuristics: if extracted text is short, it's probably scanned — fall back
87
- if len(text.strip()) >= 200:
88
- logger.info("pdfplumber extraction looks OK (length=%d)", len(text))
89
- return text
90
- else:
91
- logger.info("pdfplumber produced short text; falling back to OCR")
92
- except Exception as e:
93
- logger.warning("pdfplumber extraction failed: %s", e)
94
-
95
- # fallback
96
- logger.info("Attempting pytesseract extraction...")
97
- if pytesseract and convert_from_path:
98
- try:
99
- text = extract_text_tesseract(pdf_path)
100
- logger.info("pytesseract extraction done (length=%d)", len(text))
101
- return text
102
- except Exception as e:
103
- logger.error("pytesseract extraction failed: %s", e)
104
- raise
105
- else:
106
- raise RuntimeError("No available PDF/text extraction method (pdfplumber or pytesseract required).")
107
-
108
-
109
- # --- parsing heuristics --- #
110
- _RE_Q_SPLIT = re.compile(r'\n\s*\d+\.\s+', flags=re.MULTILINE) # split on numbered questions like "1. "
111
- _RE_OPTION_LINE = re.compile(r'^[A-D][\).\s]+', flags=re.MULTILINE)
112
- _RE_FIND_OPTIONS = re.compile(r'(?:A[\).\s].*?)(?:B[\).\s].*?)(?:C[\).\s].*?)(?:D[\).\s].*?)', re.S)
113
-
114
-
115
- def parse_mcq_blocks(raw_text: str) -> List[Dict]:
116
- """
117
- Attempt to parse MCQ questions from raw_text.
118
- Strategy:
119
- - Normalize line breaks.
120
- - Split by question numbers (1., 2., etc.)
121
- - In each block try to find A/B/C/D option markers and separate choices.
122
- - Return list of question dicts. Best-effort; may require human review for tricky PDFs.
123
- """
124
- text = raw_text.replace('\r\n', '\n').replace('\r', '\n')
125
- # ensure leading "1. " if not present (some PDFs may use different style)
126
- parts = re.split(r'\n(?=\d+\.\s)', "\n" + text) # keeps the numbers as part of each block
127
-
128
- questions = []
129
- for part in parts:
130
- part = part.strip()
131
- if not part:
132
- continue
133
- # find the question number at start
134
- m = re.match(r'^\d+\.\s*(.*)', part, flags=re.S)
135
- if m:
136
- body = m.group(1).strip()
137
- else:
138
- body = part
139
-
140
- # attempt to extract choices
141
- # search for A) / A. / A space markers
142
- # find options by locating ' A ' ' B ' ' C ' ' D ' lines
143
- # try different heuristics
144
- options = []
145
- # heuristic 1: find pattern A) ... B) ... C) ... D)
146
- opt_match = re.search(r'(A[\)\.\s].*?)(?=B[\)\.\s])', body, flags=re.S)
147
- if opt_match:
148
- # use robust method: find all options by A B C D markers
149
- # replace newlines inside options with spaces, then split by markers
150
- raw = body
151
- # find start of options (first 'A' marker)
152
- start = re.search(r'\bA[\)\.\s]', raw)
153
- if start:
154
- q_text = raw[:start.start()].strip()
155
- options_text = raw[start.start():].strip()
156
- # split by A/B/C/D markers
157
- items = re.split(r'(?=\b[A-D][\)\.]\s*)', options_text)
158
- choices = []
159
- for it in items:
160
- it = it.strip()
161
- if not it:
162
- continue
163
- # remove leading "A) " or "A. "
164
- it2 = re.sub(r'^[A-D][\)\.]\s*', '', it)
165
- choices.append(it2.strip().replace('\n', ' '))
166
- if len(choices) >= 2:
167
- questions.append({
168
- "question_type": "mcq",
169
- "text": q_text,
170
- "choices": choices,
171
- "topics": [],
172
- "difficulty": 3
173
- })
174
- continue
175
-
176
- # heuristic 2: lines with A) style
177
- lines = body.split('\n')
178
- choice_lines = [ln for ln in lines if re.match(r'^\s*[A-D][\)\.]\s*', ln)]
179
- if len(choice_lines) >= 2:
180
- # gather contiguous lines starting where first option appears
181
- first_idx = next(i for i, ln in enumerate(lines) if re.match(r'^\s*[A-D][\)\.]\s*', ln))
182
- q_text = ' '.join([ln.strip() for ln in lines[:first_idx]])
183
- choices = []
184
- for ln in lines[first_idx:]:
185
- m = re.match(r'^\s*([A-D])[)\.]\s*(.*)', ln)
186
- if m:
187
- choices.append(m.group(2).strip())
188
- if choices:
189
- questions.append({
190
- "question_type": "mcq",
191
- "text": q_text,
192
- "choices": choices,
193
- "topics": [],
194
- "difficulty": 3
195
- })
196
  continue
197
 
198
- # fallback: treat entire block as a short-answer or descriptive question
199
- questions.append({
200
- "question_type": "short_answer",
201
- "text": body.strip(),
202
- "choices": [],
203
- "topics": [],
204
- "difficulty": 3
205
- })
206
-
207
- return questions
208
-
209
-
210
- # --- GLM-based naturalizer (optional) --- #
211
- def glm_naturalize_question(q_text: str, choices: List[str]=None) -> Dict:
212
- """
213
- Use GLM-4.5 to 'clean' and naturalize a single question.
214
- Returns dict with keys: text, choices (possibly unchanged), note (optional).
215
- NOTE: this uses your GLM API key and incurs cost.
216
- """
217
- if not GLM_API_KEY:
218
- # no API key — return original
219
- return {"text": q_text, "choices": choices or []}
220
-
221
- system_prompt = "You are a helpful editor who rewrites exam questions to be clear, natural, concise, and exam-appropriate. Do not change the meaning."
222
- user_prompt = f"Question: {q_text}\n\nChoices: {json.dumps(choices or [])}\n\nReturn JSON: {{'text': '...', 'choices': [...]}}, no extra commentary."
223
- headers = {"Authorization": f"Bearer {GLM_API_KEY}", "Content-Type": "application/json"}
224
- payload = {
225
- "model": "glm-4.5",
226
- "messages": [
227
- {"role": "system", "content": system_prompt},
228
- {"role": "user", "content": user_prompt}
229
- ],
230
- "temperature": 0.2,
231
- "max_tokens": 300
232
- }
233
- try:
234
- r = requests.post(GLM_API_URL, headers=headers, json=payload, timeout=30)
235
- r.raise_for_status()
236
- data = r.json()
237
- raw = data["choices"][0]["message"]["content"]
238
- # try extract JSON
239
- m = re.search(r"(\{[\s\S]*\})", raw)
240
- if m:
241
- cleaned = json.loads(m.group(1).replace("'", '"'))
242
- # ensure choices exist
243
- return {"text": cleaned.get("text", q_text), "choices": cleaned.get("choices", choices or [])}
244
- except Exception as e:
245
- logger.warning("GLM naturalize failed: %s", e)
246
- return {"text": q_text, "choices": choices or []}
247
-
248
-
249
- # --- top-level conversion function --- #
250
- def pdf_to_questions(pdf_path: str, year: int = None, subject: str = None, naturalize: bool = False) -> List[Dict]:
251
- """
252
- Convert a PDF path to a list of question dicts.
253
- If naturalize=True and GLM key present, will call GLM to rewrite extracted questions.
254
- """
255
- raw = try_extract_text(pdf_path)
256
- parsed = parse_mcq_blocks(raw)
257
 
258
- if naturalize:
259
- cleaned = []
260
- for q in parsed:
261
- try:
262
- res = glm_naturalize_question(q["text"], q.get("choices", []))
263
- q["text"] = res["text"]
264
- q["choices"] = res["choices"]
265
- except Exception as e:
266
- logger.warning("naturalize failed for question: %s", e)
267
- cleaned.append(q)
268
- parsed = cleaned
269
 
270
- # attach year/subject info placeholders (merge script will assign final subject key)
271
- for q in parsed:
272
- if year:
273
- q.setdefault("year", year)
274
- if subject:
275
- q.setdefault("subject", subject)
276
- return parsed
277
 
 
278
 
279
- def pdf_to_json_file(pdf_path: str, out_json_path: str, year: int = None, subject: str = None, naturalize: bool = False):
280
- qs = pdf_to_questions(pdf_path, year=year, subject=subject, naturalize=naturalize)
281
- # Write basic JSON array (questions without ids)
282
- with open(out_json_path, "w", encoding="utf-8") as f:
283
- json.dump(qs, f, indent=2, ensure_ascii=False)
284
- logger.info("Wrote %d questions to %s", len(qs), out_json_path)
285
- return out_json_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
 
2
  import json
3
+ import pytesseract
4
+ from PIL import Image
5
+ import pdfplumber
6
+
7
+ class OcrAgent:
8
+ def __init__(self, language="eng"):
9
+ self.language = language
10
+
11
+ def extract_from_image(self, image_path):
12
+ img = Image.open(image_path)
13
+ text = pytesseract.image_to_string(img, lang=self.language)
14
+ return text
15
+
16
+ def extract_from_pdf(self, pdf_path):
17
+ """Extract text from each page. Uses native text when available, OCR fallback otherwise."""
18
+ text_blocks = []
19
+ with pdfplumber.open(pdf_path) as pdf:
20
+ for page in pdf.pages:
21
+ text = page.extract_text()
22
+ if not text: # scanned page fallback
23
+ pil_img = page.to_image(resolution=300).original
24
+ text = pytesseract.image_to_string(pil_img, lang=self.language)
25
+ text_blocks.append(text)
26
+ return "\n".join(text_blocks)
27
+
28
+ def clean_text(self, raw_text):
29
+ """Basic cleanup of OCR noise."""
30
+ lines = raw_text.splitlines()
31
+ cleaned = [line.strip() for line in lines if line.strip()]
32
+ return " ".join(cleaned)
33
+
34
+ def text_to_json(self, raw_text, subject="BM", year="2018", output_dir="data"):
35
+ """
36
+ Convert cleaned text into simple JSON format.
37
+ Assumes format like:
38
+ 1. Question text
39
+ A. option
40
+ B. option
41
+ ...
42
+ """
43
+ questions = []
44
+ current_q = None
45
+
46
+ for line in raw_text.splitlines():
47
+ line = line.strip()
48
+ if not line:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  continue
50
 
51
+ if line[0].isdigit() and "." in line[:3]:
52
+ # New question
53
+ if current_q:
54
+ questions.append(current_q)
55
+ q_text = line[line.find(".") + 1:].strip()
56
+ current_q = {"text": q_text, "choices": [], "topics": ["general"]}
57
+ elif line[0] in ["A", "B", "C", "D"] and line[1] == ".":
58
+ # Answer choice
59
+ if current_q:
60
+ choice_text = line[2:].strip()
61
+ current_q["choices"].append(choice_text)
62
+ else:
63
+ # Continuation of question text
64
+ if current_q:
65
+ current_q["text"] += " " + line
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
+ if current_q:
68
+ questions.append(current_q)
 
 
 
 
 
 
 
 
 
69
 
70
+ # Save JSON
71
+ os.makedirs(output_dir, exist_ok=True)
72
+ filename = f"{output_dir}/spm_{year}_{subject}.json"
73
+ with open(filename, "w", encoding="utf-8") as f:
74
+ json.dump(questions, f, indent=2, ensure_ascii=False)
 
 
75
 
76
+ return filename
77