| import json |
| import pandas as pd |
| import asyncio |
| import base64 |
| from PIL import Image, ImageDraw |
| from aiohttp import ClientSession |
| from io import BytesIO |
| from data.model import OCR_model |
| from data.data import name_distribute |
|
|
| async def getImage(img_url): |
| async with ClientSession() as session: |
| try: |
| async with session.get(img_url) as response: |
| img_data = await response.read() |
| return BytesIO(img_data) |
| except Exception as e: |
| print({"Error in getImage":str(e)}) |
|
|
|
|
| async def detection(model,img_content): |
| try: |
| img = Image.open(img_content) |
| |
| result = model(img,device=0,conf=0.6) |
| |
| detection = {} |
| data = json.loads(result[0].tojson()) |
| rect_data = [] |
| for items in data: |
| rect_data.append(items["box"]) |
| print(items["box"]) |
| img = await create_rectangle(Image.open(img_content),rect_data) |
| if len(data) == 0: |
| res = {"AI": "No Detection"} |
| detection.update(res) |
| else: |
| df = pd.DataFrame(data) |
| name_counts = df['name'].value_counts().sort_index() |
| |
| for name, count in name_counts.items(): |
| res = {name: count} |
| detection.update(res) |
| return detection, img |
| except Exception as e: |
| print({"Error in detection":str(e)}) |
|
|
|
|
| async def share_iamge(img): |
| try: |
| |
| buffer = BytesIO() |
| img.save(buffer, format="JPEG") |
| base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") |
| data = {"Arabic Detected": base64_image} |
| return data |
| except Exception as e: |
| print({"Error in share_iamge":str(e)}) |
|
|
|
|
|
|
| async def create_rectangle(image_data,cords): |
| try: |
| drawing = ImageDraw.Draw(image_data) |
|
|
| for item in cords: |
| x1 = item["x1"] |
| x2 = item["x2"] |
| y1 = item["y1"] |
| y2 = item["y2"] |
| drawing.rectangle([x1,y1,x2,y2],outline="red",width=1) |
| return image_data |
| except Exception as e: |
| print({"Error in create_rectangle":str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
| async def format_result(res,conv): |
| try: |
| result = {} |
| for key,value in conv.items(): |
| if key in res: |
| data = {value:res[key]} |
| result.update(data) |
| return result |
| except Exception as e: |
| print({"Error in format_result":str(e)}) |
|
|
|
|
|
|
|
|
|
|
|
|
| async def mainDet(url): |
| try: |
| image = await asyncio.create_task(getImage(url)) |
| detect_data,img = await asyncio.create_task(detection(OCR_model, image)) |
| tab_data = await asyncio.create_task(format_result(detect_data,name_distribute)) |
| img_data = await asyncio.create_task(share_iamge(img)) |
| result = { |
| "tabularData":{} if len(tab_data)==0 else tab_data, |
| "imageData":{} |
| } |
| return json.dumps(result) |
|
|
| except Exception as e: |
| print({"Error in mainDet":str(e)}) |
|
|
|
|