| from PIL import Image, ImageFilter |
| import cv2 |
| import pytesseract |
| from pytesseract import Output |
| from os import listdir, getcwd |
| from os.path import isfile, join |
| import numpy as np |
| import json |
| import matplotlib.pyplot as plt |
| from pdf2image import convert_from_path |
| from matplotlib import pyplot as plt |
| import re |
| import requests |
| import json |
|
|
| def getResponse(prompt) : |
| url = "https://muryshev-mixtral-api.hf.space/completion" |
| |
| payload = json.dumps({ |
| "prompt": '[INST]' + prompt + '[/INST]' |
| }) |
| |
| headers = { |
| 'Content-Type': 'application/json' |
| } |
| |
| response = requests.request("POST", url, headers = headers, data = payload) |
| result = response.content.decode('utf-8') |
| return result |
|
|
| def getOrgAddr(application) : |
| |
| |
| |
| |
|
|
| prefix = ''''Отвечайте всегда ТОЛЬКО НА РУССКОМ языке. Я предоставляю тебе "материал". Идентифицируй организацию и адрес организации в зависимости от указанного филиала. |
| Используй такой формат: "Организация: *название*; Адрес: *адрес*;". |
| Ты не комментируешь, не объясняешь, не выражаешь мысли, вообще ничего больше не говоришь. |
| Материал: ''' |
| |
| prompt = prefix + application |
| response = getResponse(prompt) |
|
|
| s = response.strip() |
| |
| |
| |
| |
| |
| l = response.split('\n') |
| ll = [] |
| for s in l : |
| s = s.strip() |
| if ('Адрес:' in s or 'Организация:' in s) and s not in ll : |
| ll.append(s) |
|
|
| result = '\n'.join(ll) |
| |
| return result |
|
|
| def processFiles(pdfs, verbose = False) : |
| images_per_pdf_2d = [convert_from_path(file) for file in pdfs] |
| |
| images_per_pdf = [] |
| docfilenames = [] |
| pagenames = [] |
| fileindices = [] |
| for i in range(len(images_per_pdf_2d)) : |
| docfilenames.append(pdfs[i][:-4]) |
| pageindices = [] |
| for j in range(len(images_per_pdf_2d[i])) : |
| images_per_pdf.append(images_per_pdf_2d[i][j]) |
| pagenames.append(pdfs[i][:-4] + '_page_' + str(j)) |
| pageindices.append(len(pagenames) - 1) |
| |
| |
| fileindices.append(pageindices) |
| |
| gray_images_per_pdf_cropped = [] |
| for i in range(len(images_per_pdf)) : |
| image = images_per_pdf[i] |
| crop = image.convert("L").crop(( |
| 750, 150, |
| 1654, 850 |
| )) |
| gray_images_per_pdf_cropped.append(crop) |
| |
| texts = [pytesseract.image_to_string(image, lang='rus') for image in gray_images_per_pdf_cropped] |
| fulltexts = [pytesseract.image_to_string(image, lang='rus') for image in images_per_pdf] |
| |
| cropped_images = gray_images_per_pdf_cropped |
| init_size = cropped_images[0].size |
| thresh_imgs = [ |
| image.resize( |
| (init_size[0] //4, init_size[1] // 4) |
| ).point( |
| lambda x: 0 if x < 220 else 255 |
| ).filter( |
| ImageFilter.MedianFilter(5) |
| ).filter( |
| ImageFilter.MinFilter(15) |
| ) for i,(name,image) in enumerate(zip(pagenames, cropped_images)) |
| ] |
| |
| masks = thresh_imgs |
| masks_arr = [np.array(img) for img in masks] |
| mask_shape = masks_arr[0].shape |
| |
| str_size = 7 |
| masks = [] |
| masks_bw = [] |
| for name, mask in zip(pagenames, masks_arr): |
| cleaned_mask = mask.copy() |
| |
| for iter in range(mask_shape[0] // str_size): |
| temp_mean = int(cleaned_mask[iter*str_size : iter*str_size + str_size, :].mean()) |
| |
| if (temp_mean < 49) or (temp_mean > 160): |
| cleaned_mask[iter*str_size : iter*str_size + str_size, :] = 255 |
| |
| vertical_threshold = 200 |
| |
| for i in range(mask_shape[1] // str_size + 1): |
| if (i*str_size + str_size) > mask_shape[1]: |
| temp_mean_vertical = int(cleaned_mask[:, i*str_size : mask_shape[1]].mean()) |
| |
| if temp_mean_vertical > vertical_threshold: |
| cleaned_mask[:, i*str_size : mask_shape[1]] = 255 |
| else: |
| temp_mean_vertical = int(cleaned_mask[:, i*str_size : i*str_size + str_size].mean()) |
| |
| if temp_mean_vertical > vertical_threshold: |
| cleaned_mask[:, i*str_size : i*str_size + str_size] = 255 |
| |
| image = Image.fromarray(cleaned_mask).filter( |
| ImageFilter.MedianFilter(13) |
| ).filter( |
| ImageFilter.MinFilter(25) |
| ) |
| masks.append(image) |
| masks_bw.append(image.convert('1')) |
| |
| masks_bw_arr = [np.array(img) for img in masks_bw] |
| |
| |
| |
| addressexists = [bool((~mask_bw).sum()) for mask_bw in masks_bw_arr] |
| |
| |
| |
| CBnames = [ |
| 'цб рф', |
| 'центральный банк', |
| 'центрального банка', |
| 'банк россии', |
| 'банка россии', |
| ] |
| |
| |
| |
| toCB = [] |
| for i in range(len(addressexists)) : |
| iftoCB = False |
| for j in range(len(CBnames)) : |
| if addressexists[i] and CBnames[j] in texts[i].lower() : |
| iftoCB = True |
| break |
| |
| toCB.append(iftoCB) |
| |
| |
| |
| docindices = [] |
| doctypes = [] |
| for i in range(len(fileindices)) : |
| docs = [] |
| types = [] |
| pages = [] |
| doctype = False |
| for j in range(len(fileindices[i])) : |
| index = fileindices[i][j] |
| ifaddress = addressexists[index] |
| iftoCB = toCB[index] |
| if ifaddress : |
| if len(pages) > 0 : |
| docs.append(pages) |
| types.append(doctype) |
| |
| pages = [] |
| doctype = iftoCB |
| |
| pages.append(index) |
| |
| docs.append(pages) |
| types.append(doctype) |
| docindices.append(docs) |
| doctypes.append(types) |
| |
| cropped = cropped_images |
| orig_size = cropped[0].size |
| masks = [mask.convert('L').resize((orig_size)) for mask in masks] |
| |
| if verbose : |
| for i in range(len(masks)) : |
| img = np.array(masks[i]) |
| out = np.array(cropped[i]) |
| |
| bw = cv2.inRange(img, 0, 12) |
| contours, hierarchy = cv2.findContours(bw, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) |
| |
| aaa = cv2.drawContours(out, contours, -1, (0, 255, 0), 5, cv2.LINE_AA, hierarchy, 1) |
| |
| print() |
| print(pagenames[i]) |
| print('Address exists :', addressexists[i]) |
| print('To CB :', toCB[i]) |
| |
| |
| |
| |
| plt.imshow(Image.fromarray(aaa)) |
| plt.show() |
| |
| |
| |
| docs_info = [] |
| for i in range(len(docindices)) : |
| docs = [] |
| if verbose : |
| print('File =', docfilenames[i]) |
| |
| for j in range(len(docindices[i])) : |
| doc = {} |
| doctype = 'Сопроводительное письмо' |
| if doctypes[i][j] : |
| doctype = 'Обращение' |
| |
| doc['Тип документа'] = doctype |
| text = '' |
| if verbose : |
| print('Doc =', j, 'Type =', doctype) |
|
|
| index = docindices[i][j][0] |
| orginfo = '' |
| if toCB[index] : |
| orginfo = getOrgAddr(texts[index]) |
|
|
| doc['Атрибуты'] = orginfo |
| |
| for k in range(len(docindices[i][j])) : |
| index = docindices[i][j][k] |
| text += fulltexts[index] |
| if verbose : |
| print('Page =', pagenames[index]) |
| print(fulltexts[index]) |
| print('--- end of page ---') |
| print() |
|
|
| text = re.sub(r'\n +', r'\n', text) |
| text = re.sub(r'\n+', r'\n', text) |
| doc['Текст документа'] = text |
| docs.append(doc) |
| |
| docs_info.append(docs) |
| |
| for i in range(len(docindices)) : |
| for j in range(len(docindices[i])) : |
| for k in range(len(docindices[i][j])) : |
| index = docindices[i][j][k] |
| if toCB[index] : |
| orginfo = getOrgAddr(texts[index]) |
| |
| print() |
| print(orginfo) |
| print() |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| return docs_info |
|
|
| def processSingleFile(file) : |
| return processFiles([file])[0] |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |