| import os |
| import tempfile |
| import io |
| import json |
| import numpy as np |
| import cv2 |
| from PIL import Image |
| from pdf2image import convert_from_bytes |
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse |
| import uvicorn |
|
|
| |
| GENAI_API_KEY = os.getenv("GENAI_API_KEY") |
| if not GENAI_API_KEY: |
| raise Exception("GENAI_API_KEY not set in environment") |
|
|
| |
| from google import genai |
| from google.genai import types |
|
|
| |
| client = genai.Client(api_key=GENAI_API_KEY) |
|
|
| app = FastAPI(title="Student Result Card API") |
|
|
| |
| TEMP_FOLDER = tempfile.gettempdir() |
| RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json") |
|
|
| |
| |
| |
|
|
| def extract_json_from_output(output_str: str): |
| """ |
| Extracts a JSON object from a string containing extra text. |
| """ |
| start = output_str.find('{') |
| end = output_str.rfind('}') |
| if start == -1 or end == -1: |
| print("No JSON block found in the output.") |
| return None |
| json_str = output_str[start:end+1] |
| try: |
| result = json.loads(json_str) |
| return result |
| except json.JSONDecodeError as e: |
| print("Error decoding JSON:", e) |
| return None |
|
|
| def parse_all_answers(image_input: Image.Image) -> str: |
| """ |
| Extracts answers from an image of a 15-question answer sheet. |
| Returns the response text (JSON string). |
| """ |
| output_format = """ |
| Answer in the following JSON format. Do not write anything else: |
| { |
| "Answers": { |
| "1": "<option or text>", |
| "2": "<option or text>", |
| "3": "<option or text>", |
| "4": "<option or text>", |
| "5": "<option or text>", |
| "6": "<option or text>", |
| "7": "<option or text>", |
| "8": "<option or text>", |
| "9": "<option or text>", |
| "10": "<option or text>", |
| "11": "<free-text answer>", |
| "12": "<free-text answer>", |
| "13": "<free-text answer>", |
| "14": "<free-text answer>", |
| "15": "<free-text answer>" |
| } |
| } |
| """ |
| prompt = f""" |
| You are an assistant that extracts answers from an image. |
| The image is a screenshot of an answer sheet containing 15 questions. |
| For questions 1 to 10, the answers are multiple-choice selections. |
| For questions 11 to 15, the answers are free-text responses. |
| Extract the answer for each question (1 to 15) and provide the result in JSON using the format below: |
| {output_format} |
| """ |
| response = client.models.generate_content( |
| model="gemini-2.0-flash", |
| contents=[prompt, image_input] |
| ) |
| return response.text |
|
|
| def parse_info(image_input: Image.Image) -> str: |
| """ |
| Extracts candidate information including name, number, country, level and paper from an image. |
| Returns the response text (JSON string). |
| """ |
| output_format = """ |
| Answer in the following JSON format. Do not write anything else: |
| { |
| "Candidate Info": { |
| "Name": "<name>", |
| "Number": "<number>", |
| "Country": "<country>", |
| "Level": "<level>", |
| "Paper": "<paper>" |
| } |
| } |
| """ |
| prompt = f""" |
| You are an assistant that extracts candidate information from an image. |
| The image contains candidate details including name, candidate number, country, level and paper. |
| Extract the information accurately and provide the result in JSON using the following format: |
| {output_format} |
| """ |
| response = client.models.generate_content( |
| model="gemini-2.0-flash", |
| contents=[prompt, image_input] |
| ) |
| return response.text |
|
|
| def parse_paper(student_info_text: str) -> str: |
| """ |
| Extracts the Paper field from candidate information. |
| Returns the paper letter (e.g. "A", "B", or "K") as a string. |
| """ |
| prompt = f""" |
| You are an assistant that extracts the Paper from candidate information. |
| The candidate information contains details including their paper designation. |
| Extract the Paper value (one alphabet only) from the following: |
| {student_info_text} |
| """ |
| response = client.models.generate_content( |
| model="gemini-2.0-flash", |
| contents=[prompt, student_info_text] |
| ) |
| return response.text.strip() |
|
|
| def calculate_result(student_answers: dict, correct_answers: dict) -> dict: |
| """ |
| Compares student's answers with the correct answers and calculates the score. |
| Assumes JSON structures with a top-level "Answers" key containing Q1 to Q15. |
| """ |
| student_all = student_answers.get("Answers", {}) |
| correct_all = correct_answers.get("Answers", {}) |
| total_questions = 15 |
| marks = 0 |
| detailed = {} |
| |
| for q in map(str, range(1, total_questions + 1)): |
| stud_ans = student_all.get(q, "").strip() |
| corr_ans = correct_all.get(q, "").strip() |
| if stud_ans == corr_ans: |
| marks += 1 |
| detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Correct"} |
| else: |
| detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Incorrect"} |
| |
| percentage = (marks / total_questions) * 100 |
| result_card = { |
| "Total Marks": marks, |
| "Total Questions": total_questions, |
| "Percentage": percentage, |
| "Detailed Results": detailed |
| } |
| return result_card |
|
|
| |
| |
| |
| def load_answer_key(pdf_bytes: bytes) -> dict: |
| """ |
| Converts a PDF (as bytes) to images, extracts the last page, and parses the answers. |
| Returns the parsed JSON answer key. |
| """ |
| images = convert_from_bytes(pdf_bytes) |
| last_page_image = images[-1] |
| answer_key_response = parse_all_answers(last_page_image) |
| answer_key = extract_json_from_output(answer_key_response) |
| return answer_key |
|
|
| |
| |
| |
|
|
| from typing import Optional |
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import JSONResponse |
| import numpy as np |
| import cv2 |
| import json |
| from PIL import Image |
|
|
| app = FastAPI() |
|
|
| @app.post("/process") |
| async def process_pdfs( |
| student_pdf: UploadFile = File( |
| ..., |
| description="PDF with all student answer sheets (one page per student)" |
| ), |
| paper_k_pdf: UploadFile = File( |
| ..., |
| description="Answer key PDF for Paper K" |
| ), |
| paper_a_pdf: Optional[UploadFile] = File( |
| None, |
| description="(Optional) Answer key PDF for Paper A" |
| ), |
| paper_b_pdf: Optional[UploadFile] = File( |
| None, |
| description="(Optional) Answer key PDF for Paper B" |
| ), |
| ): |
| try: |
| |
| student_pdf_bytes = await student_pdf.read() |
| paper_k_bytes = await paper_k_pdf.read() |
|
|
| |
| paper_a_bytes = await paper_a_pdf.read() if paper_a_pdf else None |
| paper_b_bytes = await paper_b_pdf.read() if paper_b_pdf else None |
|
|
| |
| answer_keys = { |
| "K": load_answer_key(paper_k_bytes) |
| } |
| if paper_a_bytes is not None: |
| answer_keys["A"] = load_answer_key(paper_a_bytes) |
| if paper_b_bytes is not None: |
| answer_keys["B"] = load_answer_key(paper_b_bytes) |
|
|
| |
| student_images = convert_from_bytes(student_pdf_bytes) |
| all_results = [] |
|
|
| for idx, page in enumerate(student_images): |
| print(f"Processing student page {idx+1}...") |
|
|
| |
| page_cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR) |
| h, w = page_cv.shape[:2] |
| mask = np.zeros((h, w), dtype="uint8") |
| top = int(h * 0.10) |
| bottom = int(h * 0.75) |
| cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1) |
| masked = cv2.bitwise_and(page_cv, page_cv, mask=mask) |
| coords = cv2.findNonZero(mask) |
| if coords is None: |
| continue |
| x, y, mw, mh = cv2.boundingRect(coords) |
| cand_pil = Image.fromarray( |
| cv2.cvtColor(masked[y:y+mh, x:x+mw], cv2.COLOR_BGR2RGB) |
| ) |
| info_resp = parse_info(cand_pil) |
| cand_info = extract_json_from_output(info_resp) |
|
|
| |
| paper = cand_info.get("Candidate Info", {}).get("Paper", "").strip().upper() |
| if not paper: |
| paper = parse_paper(info_resp).upper() |
| print(f"Student {idx+1} Paper: {paper}") |
|
|
| |
| if paper not in answer_keys or answer_keys[paper] is None: |
| print(f"Skipping: no answer key for paper '{paper}'") |
| continue |
| correct_key = answer_keys[paper] |
|
|
| |
| ans_resp = parse_all_answers(page) |
| stud_answers = extract_json_from_output(ans_resp) |
|
|
| |
| result = calculate_result(stud_answers, correct_key) |
| all_results.append({ |
| "Student Index": idx + 1, |
| "Candidate Info": cand_info.get("Candidate Info", {}), |
| "Student Answers": stud_answers, |
| "Correct Answer Key": correct_key, |
| "Result": result |
| }) |
|
|
| |
| with open(RESULT_FILE, "w", encoding="utf-8") as f: |
| json.dump({"results": all_results}, f, indent=2) |
|
|
| return JSONResponse(content={"results": all_results}) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/download") |
| async def download_results(): |
| """ |
| Returns the result JSON file stored in the temporary folder. |
| """ |
| if not os.path.exists(RESULT_FILE): |
| raise HTTPException(status_code=404, detail="Result file not found. Please run /process first.") |
| return StreamingResponse( |
| open(RESULT_FILE, "rb"), |
| media_type="application/json", |
| headers={"Content-Disposition": f"attachment; filename=result_cards.json"} |
| ) |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "Welcome to the Student Result Card API.", |
| "usage": "POST PDFs to /process (student answer sheet, paper A, paper B, paper K). Then use /download to retrieve the results." |
| } |
|
|
| if __name__ == "__main__": |
| uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |
|
|