| |
|
|
| """ |
| Created by Shengbo.Zhang on 2021/09/20 |
| """ |
|
|
| import os |
| import re |
| import logging |
| import pdfplumber |
| from docx import Document |
| from Pdf2Txt.config import * |
| from Pdf2Txt.config import _check_ann_title_processable |
| from pdf2docx import Converter |
| from collections import Counter |
| from pdfminer.pdfpage import PDFPage |
| from pdfminer.layout import LAParams, LTTextBox |
| from pdfminer.converter import PDFPageAggregator |
| from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter |
|
|
|
|
| |
| logging.disable(logging.INFO) |
| logging.disable(logging.WARNING) |
|
|
|
|
| def get_string_list_from_pdf(pdf_path): |
| ''' |
| 从一个PDF文件中直接逐行读取文本内容(除表格以外的正文),结果存放在一个列表中 |
| :param pdf_path: 一个字符串,PDF文件的路径地址 |
| :return: 两个列表:string_list,ann_info_list。前者存放PDF的逐行文本内容,后者存放公告的头部信息(例如:证券代码、证券简称、公告编号等) |
| ''' |
| string_list = [] |
| ann_info_list = [] |
| with pdfplumber.open(pdf_path) as pdf: |
| for id, page in enumerate(pdf.pages): |
| bboxes = [table.bbox for table in page.find_tables()] |
| def _not_within_bboxes(obj): |
| def _obj_in_bbox(_bbox): |
| v_mid = (obj["top"] + obj["bottom"]) / 2 |
| h_mid = (obj["x0"] + obj["x1"]) / 2 |
| x0, top, x1, bottom = _bbox |
| return (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom) |
| return not any(_obj_in_bbox(__bbox) for __bbox in bboxes) |
| new_page = page.filter(_not_within_bboxes) |
| string = new_page.extract_text() |
| string_split = string.split('\n') |
| if id == 0: |
| ann_info_list = string_split[:10] |
| string_split = [new_string.replace(' ', '').replace('\n', '').replace('\t', '') + '\n' for new_string in string_split] |
| string_split = list(filter(lambda x: x != '\n' and x != '', string_split)) |
| string_list.extend(string_split) |
| return string_list, ann_info_list |
|
|
|
|
| def get_ann_info_from_pdf(pdf_path): |
| ''' |
| 获取PDF公告文件的头部信息(此处截取了前5行文本,可能包括非头部数据,将在refine_txt_list()中进一步处理) |
| :param pdf_path: 一个字符串,PDF文件的路径地址 |
| :return: 一个列表,存放PDF公告文件的头部信息(例如:证券代码、证券简称、公告编号等) |
| ''' |
| try: |
| with pdfplumber.open(pdf_path) as pdf: |
| string = pdf.pages[0].extract_text() |
| string_split = string.split('\n') |
| ann_info_list = string_split[:10] |
| except: |
| ann_info_list = [] |
| return ann_info_list |
|
|
|
|
| def get_string_list_from_pdf_converted_docx(pdf_path, docx_path): |
| ''' |
| 将PDF文件转换为Docx格式,逐行读取Docx文件中的正文内容(除表格以外) |
| :param pdf_path: 一个字符串,PDF文件的路径地址 |
| :return: 一个列表,string_list,存放PDF的逐行文本内容;一个Document实例对象,存放临时的Docx文件 |
| ''' |
| document = None |
| string_list = [] |
| if docx_path == '': |
| output_docx_file_path = f"{os.path.dirname(pdf_path)}//{os.path.basename(pdf_path)[:-4]}_{TEMP_DOCX_SUFFIX}.docx" |
| else: |
| output_docx_file_path = docx_path |
| is_success = get_docx_from_pdf(pdf_path=pdf_path, out_path=output_docx_file_path) |
| if is_success: |
| document = Document(output_docx_file_path) |
| for val in document.paragraphs: |
| tmp = val.text.strip() |
| tmp_list = tmp.split('\n') |
| for s in tmp_list: |
| s = s.strip() |
| if s == '': continue |
| string_list.append(s) |
| string_list = [string.replace(' ', '').replace('\n', '').replace('\t', '') + '\n' for string in string_list] |
| ann_headers = [] |
| for i, val in enumerate(string_list): |
| if i > 10: break |
| if val.strip()[-4:] == '有限公司': break |
| ann_headers.append(val) |
| for i, val1 in enumerate(string_list): |
| for j, val2 in enumerate(ann_headers): |
| if val1 == val2: string_list[i] = '' |
| if os.path.exists(output_docx_file_path): |
| os.remove(output_docx_file_path) |
| return string_list, document |
|
|
|
|
| def get_abscissa_dict_from_pdf(pdf_path): |
| ''' |
| 从一个PDF文件中逐行读取该行首个文本块字符的横坐标值(以PDF页面左上角为原点),以该行文本内容为键,横坐标值为值,建立一个字典 |
| :param pdf_path: 一个字符串,PDF文件的路径地址 |
| :return: 一个字典:abscissa_dict,存放PDF文件中某一文本块的起始横坐标值 |
| ''' |
| abscissa_dict = {} |
| fp = open(pdf_path, 'rb') |
| rsrcmgr = PDFResourceManager() |
| laparams = LAParams() |
| device = PDFPageAggregator(rsrcmgr=rsrcmgr, laparams=laparams) |
| interpreter = PDFPageInterpreter(rsrcmgr=rsrcmgr, device=device) |
| pages = PDFPage.get_pages(fp) |
| for i, page in enumerate(pages): |
| interpreter.process_page(page) |
| layout = device.get_result() |
| for lobj in layout: |
| if isinstance(lobj, LTTextBox): |
| x, text = int(lobj.bbox[0]), lobj.get_text() |
| tmp = text.replace(' ', '').replace('\n', '').replace('\t', '') + '\n' |
| if tmp != '\n' and tmp != '': |
| abscissa_dict[tmp] = x |
| fp.close() |
| return abscissa_dict |
|
|
|
|
| def get_min_abscissa_value(abscissa_dict, string_list_length): |
| ''' |
| 计算PDF文本块横坐标的最小值(正文块),这里假设该值至少应大于或等于某一阈值(此处设为文本总行数的1/4) |
| :param abscissa_dict: 一个字典,存放PDF文件中某一文本块的起始横坐标值 |
| :param string_list_length: 整型,PDF的文本字符串列表 |
| :return: 整型,PDF正文块横坐标的最小值 |
| ''' |
| abscissa_x_list = list(abscissa_dict.values()) |
| abscissa_x_list_counter = list(dict(Counter(abscissa_x_list)).items()) |
| abscissa_x_list_counter.sort() |
| x_threshold = string_list_length // 4 |
| min_abscissa_value = min(abscissa_x_list) |
| for item in abscissa_x_list_counter: |
| if item[1] >= x_threshold: |
| min_abscissa_value = item[0] |
| break |
| return min_abscissa_value |
|
|
|
|
| def refine_txt_list(txt, ann_info): |
| ''' |
| 此时PDF文件的文本字符串列表(正文)已经过首轮处理,此处将对它进行最后的格式上的优化 |
| :param txt: PDF的文本列表,包含PDF的正文文本内容 |
| :param ann_info: PDF的公告的头部信息 |
| :return: 一个新的PDF文本列表 |
| ''' |
| |
| if ann_info != []: |
| new_ann_info_list = [] |
| for i, val in enumerate(ann_info): |
| if val.strip() == '': continue |
| if val.strip()[-4:] == '有限公司': break |
| else: new_ann_info_list.append(' '.join(val.split()) + SEGMENT_SYMBOL) |
| if new_ann_info_list != []: |
| new_ann_info_list[-1] = new_ann_info_list[-1].replace(SEGMENT_SYMBOL, '') |
| if txt[0].strip()[-4:] == '有限公司': |
| for i in range(len(new_ann_info_list)): |
| txt.insert(0, '') |
| for i, val in enumerate(new_ann_info_list): |
| txt[i] = val |
| |
| for i, val in enumerate(txt): |
| if i > 10: break |
| else: |
| val = val.strip() |
| if _check_ann_title_processable(val): |
| if SEGMENT_SYMBOL not in val: |
| txt[i] = (SEGMENT_SYMBOL + val) |
| if val[-4:] == '有限公司': |
| if SEGMENT_SYMBOL not in txt[i]: |
| txt[i] = (SEGMENT_SYMBOL + val) |
| if _check_ann_title_processable(txt[i+1]): |
| txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '') |
| if txt[i+2].replace(SEGMENT_SYMBOL, '')[:3] == '本公司': |
| if SEGMENT_SYMBOL not in txt[i+2]: |
| txt[i+2] = (SEGMENT_SYMBOL + txt[i+2]) |
| txt[i+3] = txt[i+3].replace(SEGMENT_SYMBOL, '') |
| break |
| if _check_ann_title_processable(txt[i+2]): |
| txt[i+1] = txt[i+1].replace(SEGMENT_SYMBOL, '') |
| txt[i+2] = txt[i+2].replace(SEGMENT_SYMBOL, '') |
| if txt[i+3].replace(SEGMENT_SYMBOL, '')[:3] == '本公司': |
| if SEGMENT_SYMBOL not in txt[i+3]: |
| txt[i+3] = (SEGMENT_SYMBOL + txt[i+3]) |
| txt[i+4] = txt[i+4].replace(SEGMENT_SYMBOL, '') |
| break |
| |
| for i, _ in enumerate(txt): |
| |
| if (SEGMENT_SYMBOL not in txt[i]): |
| match_check = [1, 1, 1, 1, 1] |
| |
| match_1 = re.match('[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', txt[i]) |
| |
| match_2 = re.match('[0-9]{1,2}、', txt[i]) |
| |
| match_3 = re.match('[0-9]{1,2}\.', txt[i]) |
| |
| match_4 = re.match('[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', txt[i]) |
| |
| match_5 = re.match('[\(\(]+[0-9]{1,2}[\)\)]+', txt[i]) |
| if match_1: match_check[0] = match_1.start() |
| if match_2: match_check[1] = match_2.start() |
| if match_3: match_check[2] = match_3.start() |
| if match_4: match_check[3] = match_4.start() |
| if match_5: match_check[4] = match_5.start() |
| if 0 in match_check: |
| txt[i] = SEGMENT_SYMBOL + txt[i] |
| |
| if ('重要内容提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('重要内容提示') == 0): |
| txt[i] = SEGMENT_SYMBOL + txt[i] |
| |
| if (txt[i] == '单位:元') or (txt[i] == SEGMENT_SYMBOL + '单位:元'): |
| txt[i] = '' |
| |
| if ('特别提示' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特别提示') == 0): |
| txt[i] = SEGMENT_SYMBOL + txt[i] |
| |
| if ('特此公告' in txt[i]) and (SEGMENT_SYMBOL not in txt[i]) and (txt[i].index('特此公告') == 0): |
| txt[i] = SEGMENT_SYMBOL + txt[i] |
| |
| if (i+1) < len(txt) and (txt[i] == txt[i+1]): |
| txt[i] = '' |
| return txt |
|
|
|
|
| def get_docx_from_pdf(pdf_path, out_path): |
| ''' |
| 读入一个PDF文件,将其转换为Docx格式并临时存放于本地 |
| :param pdf_path: 输入的PDF公告文件的完整路径 |
| :param out_path: 输出的中间Docx结果文件的完整路径 |
| :return: 布尔值,是否转换成功 |
| ''' |
| cv = Converter(pdf_path) |
| try: |
| cv.convert(out_path) |
| except Exception: |
| cv.close() |
| return False |
| for p in cv.pages: |
| if not p.finalized: |
| cv.close() |
| return False |
| cv.close() |
| return True |
|
|
|
|
| def _get_table_row_feat(str): |
| ''' |
| 给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串) |
| :param str: 字符串 |
| :return: 字符串 |
| ''' |
| s = str.split() |
| r = '' |
| for c in s: |
| try: |
| _ = float(c) |
| r += '1' |
| except Exception: |
| r += '0' |
| return r |
|
|
|
|
| def append_table_from_docx(doc, txt): |
| ''' |
| 读取Docx文件中每个表格的内容,格式化处理后追加至PDF的文本列表中 |
| :param doc: 一个Document对象实例 |
| :param txt: 一个字符串列表,包含PDF的正文文本内容 |
| :return: 一个新的PDF文本列表 |
| ''' |
| data = [] |
| table_txt = [] |
| table_tag = '-' + TABLE_SYMBOL + '-' |
| for table in doc.tables[:]: |
| table_txt.append(f'{table_tag}\n') |
| for i, row in enumerate(table.rows[:]): |
| row_content = [] |
| for cell in row.cells[:]: |
| c = cell.text |
| new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','') |
| row_content.append(new_c) |
| if row_content == []: continue |
| if '本公司' in row_content[0]: |
| tmp = SEGMENT_SYMBOL |
| for line in row_content: |
| tmp += line.strip() |
| if '特别提示' in tmp: |
| tmp = tmp[:tmp.index('特别提示')+4]+SEGMENT_SYMBOL+tmp[tmp.index('特别提示')+4:] |
| for id, val in enumerate(txt): |
| if id > 10: break |
| else: |
| if _check_ann_title_processable(val): |
| txt.insert(id+1, tmp) |
| break |
| continue |
| if '证券代码' in row_content[0]: continue |
| data.append(row_content) |
| new_row = '^' + TABLE_CELL_SYMBOL.join(row_content) + '$\n' |
| if new_row.replace(TABLE_CELL_SYMBOL,'') != '^$\n': |
| table_txt.append(new_row) |
| data.append(f'{table_tag}\n') |
| table_txt.append(f'{table_tag}\n') |
| flag = False |
| for i, val in enumerate(table_txt): |
| if val == f'{table_tag}\n': |
| if not flag: |
| flag = True |
| else: |
| table_txt[i] = '^$\n' |
| else: |
| flag = False |
| table_txt = list(filter(lambda x: x != '^$\n', table_txt)) |
| for i, val in enumerate(table_txt): |
| if val == f'{table_tag}\n' and (i > 0) and (i < len(table_txt)-1): |
| feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', '')) |
| feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', '')) |
| if feat1 == feat2: |
| table_txt[i] = '^$\n' |
| if len(table_txt) == 1 and table_txt[0] == f'{table_tag}\n': |
| table_txt[0] = '^$\n' |
| for i, val in enumerate(table_txt): |
| if val == table_tag: |
| continue |
| if val == '^$\n': |
| table_txt[i] = '' |
| continue |
| table_txt[i] = val[1:][:-2] + '\n' |
| txt.extend(table_txt) |
| return txt |
|
|
|
|
| def output_txt_string(txt_path, txt_string): |
| ''' |
| 将PDF公告的格式化文本字符串写出至一个.txt的纯文本文件 |
| :param txt_path: 纯文本文件的路径 |
| :param txt_string: PDF公告的纯文本字符串 |
| :return: 布尔值,是否写出成功 |
| ''' |
| try: |
| with open(txt_path, "w", encoding='utf-8') as f: |
| f.write(txt_string) |
| |
| |
| |
| |
| |
| except: |
| return False |
| return True |
|
|
|
|
| def get_txt_from_pdf(pdf_path, docx_path=''): |
| ''' |
| 给定一个PDF格式的公告文件,将其转化为格式化的TXT文本字符串 |
| :param pdf_path: 一个字符串,PDF文件的路径地址 |
| :return: 一个字符串,PDF经转换后的纯文本(已格式化,前部正文,后部表格) |
| ''' |
| txt_string = '' |
| ann_info_list = get_ann_info_from_pdf(pdf_path) |
| string_list, document = get_string_list_from_pdf_converted_docx(pdf_path, docx_path) |
| if ann_info_list != [] and string_list != [] and document is not None: |
| abscissa_dict = get_abscissa_dict_from_pdf(pdf_path) |
| min_abscissa_value = get_min_abscissa_value(abscissa_dict, len(string_list)) |
| for i, val in enumerate(string_list): |
| if i > 10: break |
| if val.replace('\n', '')[-4:] == '有限公司': break |
| else: abscissa_dict[val] = min_abscissa_value |
| txt_list = [] |
| for id, string in enumerate(string_list): |
| new_string = string.replace('\n', '').replace('\t', '').replace(' ', '').replace(' ', '').replace('', '').replace(',', '') |
| if (not (len(new_string) <= 3 and new_string.isdigit())) and string != '': |
| try: |
| if abscissa_dict[string] > min_abscissa_value: |
| txt_list.append(SEGMENT_SYMBOL + new_string) |
| else: |
| txt_list.append(new_string) |
| except: |
| txt_list.append(new_string) |
| txt_list = refine_txt_list(txt_list, ann_info_list) |
| if document is not None: |
| txt_list.append(SEGMENT_SYMBOL) |
| txt_list = append_table_from_docx(doc=document, txt=txt_list) |
| for val in txt_list: |
| txt_string += val |
| return txt_string |