| |
|
|
| """ |
| Created by Shengbo.Zhang on 2021/08/13 |
| """ |
|
|
|
|
| import io |
| import re |
| import os |
| import csv |
| import logging |
| from docx import Document |
| from pdf2docx import Converter |
| from Pdf2Txt.config import * |
| from pdfminer.layout import LAParams |
| from pdfminer.pdfpage import PDFPage |
| from pdfminer.converter import TextConverter |
| from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter |
| from Pdf2Txt.config import _check_ann_title_processable |
|
|
|
|
| |
| logging.disable(logging.INFO) |
| logging.disable(logging.WARNING) |
|
|
|
|
|
|
| def _get_txt_from_pdf(pdf_path, out_path): |
| ''' |
| 读取Pdf文件,直接将其转换为Txt文本格式 |
| :param pdf_path: 输入的pdf公告文件的完整路径 |
| :param out_path: 输出的txt结果文件的完整路径 |
| :return: bool |
| ''' |
| manager = PDFResourceManager() |
| output = io.StringIO() |
| converter = TextConverter(manager, output, laparams=LAParams()) |
| interpreter = PDFPageInterpreter(manager, converter) |
| with open(pdf_path, 'rb') as infile: |
| content = [] |
| for page in PDFPage.get_pages(infile, check_extractable=True): |
| interpreter.process_page(page) |
| convertedPDF = output.getvalue() |
| |
| content.append(convertedPDF) |
| |
| |
| for idx, val in enumerate(content): |
| val = re.sub('\n+','\n', val) |
| val = re.sub('\n +', '', val) |
| val = val.replace('', '') |
| content[idx] = val |
| with open(out_path, 'wb') as f: |
| f.write(''.join(content).encode('utf-8')) |
| output.close() |
| converter.close() |
| f.close() |
| return True |
|
|
|
|
|
|
| def _get_cleaned_txt(txtPath, out_path): |
| ''' |
| 对Txt文件进行内容格式清洗(暂时仅供测试) |
| :param txtPath: 输入的txt文件的完整路径 |
| :param out_path: 输出的txt文件的完整路径 |
| :return: bool |
| ''' |
| with open(txtPath, 'rb')as f: |
| content = f.read().decode('utf-8') |
| p = re.compile(r'(?<=##)\S.+(?=##)|[\u4e00-\u9fff+\u3002\uFF0C]') |
| x = ''.join(re.findall(p, content)) |
| final_result = re.sub(u"[\uFF0C|\u3002|\u002B]{2,}", "", x) |
| with open(out_path, "w")as txtPath: |
| txtPath.write(final_result) |
| |
| return True |
|
|
|
|
|
|
| def get_docx_from_pdf(pdf_path, out_path): |
| ''' |
| 读取Pdf文件,将其转换为Docx格式并存在本地 |
| :param pdf_path: 输入的pdf公告文件的完整路径 |
| :param out_path: 输出的中间docx结果文件的完整路径 |
| :return: bool |
| ''' |
| try: |
| cv = Converter(pdf_path) |
| cv.convert(out_path) |
| except Exception: |
| return False |
| for p in cv.pages: |
| if not p.finalized: |
| cv.close() |
| return False |
| cv.close() |
| return True |
|
|
|
|
|
|
| def _find_key_indexs(str, key): |
| ''' |
| 给定一个父字符串和子串,在父串中查找子串的所有索引位置,并返回一个包含所有下标的列表 |
| :param str: 父字符串 |
| :param key: 子字符串 |
| :return: list |
| ''' |
| lstKey = [] |
| countStr = str.count(key) |
| if countStr < 1: |
| return [] |
| elif countStr == 1: |
| indexKey = str.find(key) |
| return [indexKey] |
| else: |
| indexKey = str.find(key) |
| lstKey.append(indexKey) |
| while countStr > 1: |
| str_new = str[indexKey + 1:len(str) + 1] |
| indexKey_new = str_new.find(key) |
| indexKey = indexKey + 1 + indexKey_new |
| lstKey.append(indexKey) |
| countStr -= 1 |
| lstKey.sort(reverse=True) |
| return lstKey |
|
|
|
|
|
|
| def _insert_char_into_str(str, idx, char): |
| ''' |
| 给定一个父字符串、下标位置、子串,在父串中的下标位置插入子串,并返回一个新的字符串 |
| :param str: 父字符串 |
| :param idx: 插入位置索引 |
| :param char: 子字符串 |
| :return: str |
| ''' |
| tmp = list(str) |
| tmp.insert(idx, char) |
| return ''.join(tmp) |
|
|
|
|
|
|
| def _is_chinese(str): |
| ''' |
| 给定一个字符串,判断该字符串是否全是中文 |
| :param str: 字符串 |
| :return: bool |
| ''' |
| for ch in str: |
| if '\u4e00' <= ch <= '\u9fff': |
| return True |
| return False |
|
|
|
|
|
|
| def _get_table_row_feat(str): |
| ''' |
| 给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串) |
| :param str: 字符串 |
| :return: 字符串 |
| ''' |
| s = str.split() |
| r = '' |
| for c in s: |
| try: |
| _ = float(c) |
| r += '1' |
| except Exception: |
| r += '0' |
| return r |
|
|
|
|
|
|
| def _check_if_include_first_proper(s, corpus): |
| ''' |
| 检查字符串s中是否包含语料列表first_corpus中的某一内容 |
| :param s: 字符串 |
| :param corpus: 字符串列表 |
| :return: [bool, str] |
| ''' |
| for i in corpus: |
| if i in s: |
| return [True, i] |
| return [False, ''] |
|
|
|
|
|
|
| def _check_if_include_second_proper(s, corpus): |
| ''' |
| 检查字符串s中是否包含语料列表first_corpus中的某一内容 |
| :param s: 字符串 |
| :param corpus: 字符串列表 |
| :return: list |
| ''' |
| res = [] |
| for i in corpus: |
| if i in s: |
| res.append([True, i]) |
| else: |
| res.append([False, i]) |
| return res |
|
|
|
|
|
|
| def _match_and_insert(string, pattern, substring): |
| ''' |
| 匹配string字符串中的pattern,计算所有pattern在string中的首个字符索引位置,并在string从后向前插入substring至这些位置 |
| :param string: 待匹配的字符串 |
| :param pattern: 匹配模式 |
| :param substring: 待插入的子字符串 |
| :return: 插入后的字符串 |
| ''' |
| idx_list = [] |
| for j in re.finditer(pattern, string): |
| idx_list.append(j.span()[0]) |
| |
| idx_list.sort(reverse=True) |
| if idx_list != []: |
| for k in idx_list: |
| if k > 0 and string[k-1] != '“': |
| string = _insert_char_into_str(string, k, substring) |
| return string |
|
|
|
|
|
|
| def _match_and_delete(string, pattern): |
| ''' |
| 匹配string字符串中的pattern,计算pattern在string中的首个字符索引位置,删除该索引前2个位置的换行符\n |
| :param string: 待匹配的字符串 |
| :param pattern: 匹配模式 |
| :return: 删除'\n\n'子字符串后的字符串 |
| ''' |
| matcher = re.search(pattern, string) |
| if matcher: |
| k = matcher.span()[0] |
| if k >= 2 and string[k-1] == '\n' and string[k-2] == '\n': |
| string = string[:k-2] + string[k:] |
| return string |
|
|
|
|
|
|
| def get_txt_from_docx(doc): |
| ''' |
| 读取Docx文件中每个自然行的材料内容 |
| :param doc: 一个Document对象实例 |
| :param out_path: 输出的txt结果文件的完整路径 |
| :return: bool(转换是否成功), list(格式化修正后的文本列表) |
| ''' |
| |
| NUMBER_1 = '123456789一二三四五六七八九十' |
| |
| NUMBER_2 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' |
| |
| paras = [para.text+'\n' for i, para in enumerate(doc.paragraphs)] |
| |
| new_paras = [] |
| |
| new_paras_len_cnt = [] |
|
|
| try: |
| |
| for val in paras: |
| |
| if val == '\n' or re.search('^[0-9]+ \n$', val) or val[:2] == '单位': |
| continue |
| |
| new_paras.append(val.lstrip()) |
| |
| new_paras_len_cnt.append(len(val)) |
|
|
| |
| line_mark = 0 |
| |
| for i, val in enumerate(new_paras[:10]): |
| |
| if '\t' in val or val.count(' ') > 2: |
| new_paras[i] = ' '.join(val.split()) + '\n' |
| if '证券代码:' in new_paras[i]: |
| continue |
| |
| if val.replace(' ', '')[-5:] == '有限公司\n': |
| new_paras[i] = val.replace(' ', '') |
| continue |
| |
| |
| if _check_ann_title_processable(val.replace(' ', ''), exp=1): |
| new_paras[i] = val.replace(' ', '') |
| line_mark = i + 1 |
| break |
| else: |
| new_paras[i] = val.replace('\n', '').replace(' ', '') |
|
|
| |
| mean_len = sum(new_paras_len_cnt)//len(new_paras_len_cnt) |
|
|
| |
| for i, _ in enumerate(new_paras): |
| |
| if i >= line_mark: |
| |
| new_paras[i] = new_paras[i]\ |
| .replace(' ', '')\ |
| .replace(' ', '')\ |
| .replace('', '')\ |
| .replace(',', '') |
|
|
| |
| if i < len(new_paras)-1 and \ |
| len(new_paras[i]) >= mean_len and \ |
| ((new_paras[i + 1].replace('(','').replace('(','')[0] not in NUMBER_1) or |
| new_paras[i + 1][-1] == '\n'): |
| new_paras[i] = new_paras[i].replace('\n', '') |
|
|
| |
| if i < len(new_paras)-2 and \ |
| len(new_paras[i + 1]) >= 3 and \ |
| new_paras[i + 1].replace('(','').replace('(','')[0] in NUMBER_2 and \ |
| (not '.' in new_paras[i+1][:3]) and \ |
| (not '、' in new_paras[i+1][:3]) and \ |
| (not '年' in new_paras[i+1]): |
| new_paras[i] = new_paras[i].replace('\n', '') |
|
|
| |
| for j in _find_key_indexs(new_paras[i], ':'): |
| |
| |
| |
| if j < len(new_paras[i])-1 and new_paras[i][j+1] != '\n' and \ |
| ('(' not in new_paras[i]) and ('《' not in new_paras[i]) and \ |
| (')' not in new_paras[i]) and ('》' not in new_paras[i]) and \ |
| (not _check_if_include_first_proper(new_paras[i], FIRST_PROPER_CORPUS)[0]): |
| new_paras[i] = _insert_char_into_str(new_paras[i], j+1, '\n') |
| |
| for j in _find_key_indexs(new_paras[i], '('): |
| |
| |
| if new_paras[i][j+1] in NUMBER_1 and new_paras[i-1][-1] != '\n' and \ |
| (not _is_chinese(new_paras[i][j-1])) and new_paras[i][j-1] != '》': |
| new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') |
| |
| for j in _find_key_indexs(new_paras[i], '('): |
| |
| |
| if new_paras[i][j + 1] in NUMBER_1 and new_paras[i - 1][-1] != '\n' and \ |
| (not _is_chinese(new_paras[i][j - 1])) and new_paras[i][j - 1] != '》': |
| new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') |
| |
| for j in _find_key_indexs(new_paras[i], '、'): |
| |
| |
| if (j-2) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] not in NUMBER_1 \ |
| and new_paras[i][j-2] in '。;.;' and new_paras[i-1][-1] != '\n': |
| new_paras[i] = _insert_char_into_str(new_paras[i], j-1, '\n') |
| continue |
| |
| |
| if (j-3) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] in NUMBER_1 \ |
| and new_paras[i][j-3] in '。;.;' and new_paras[i-1][-1] != '\n': |
| new_paras[i] = _insert_char_into_str(new_paras[i], j-2, '\n') |
|
|
| |
| if new_paras[i] == '特此公告。\n': |
| if new_paras[i-1][-1] != '\n': |
| new_paras[i] = '\n特此公告。\n' |
| if new_paras[i+1][-1] != '\n': |
| new_paras[i+1] += '\n' |
|
|
| |
| if (i+1) < len(new_paras): |
| tmp_flag, tmp_str = _check_if_include_first_proper(new_paras[i+1], FIRST_PROPER_CORPUS) |
| if tmp_flag: |
| tmp_idx = new_paras[i+1].index(tmp_str) - 1 |
| if tmp_idx >= 0 and new_paras[i+1][tmp_idx] != '(': |
| if new_paras[i][-1] != '\n': |
| new_paras[i] += '\n' |
|
|
| |
| str_sum = ''.join(new_paras) |
| |
| final_paras = str_sum.split('\n') |
| |
| for i, val in enumerate(final_paras): |
| |
| end_flag = '\n\n' |
| |
| final_paras[i] += end_flag |
|
|
| |
| |
| if '(' in final_paras[i]: |
| final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) |
|
|
| |
| |
| if len(_find_key_indexs(final_paras[i], '(')) != len(_find_key_indexs(final_paras[i], ')')): |
| final_paras[i] = final_paras[i][:-2] |
|
|
| |
| str_sum = ''.join(final_paras) |
| |
| final_paras = str_sum.split('\n\n') |
| |
| for i, val in enumerate(final_paras): |
| |
| end_flag = '\n\n' |
| |
| final_paras[i] += end_flag |
|
|
| |
| if '重要内容提示:' in final_paras[i]: |
| idx = final_paras[i].index('重要内容提示:') |
| if final_paras[i][idx+7] != '\n': |
| final_paras[i] = _insert_char_into_str(final_paras[i], idx+7, '\n\n') |
| if idx > 0: |
| if final_paras[i][idx-1] != '\n': |
| final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
|
|
| |
| if '表决结果:' in final_paras[i]: |
| if final_paras[i][:5] == '表决结果:': |
| final_paras[i] = final_paras[i][:-2] |
| elif final_paras[i][-7:] == '表决结果:\n\n': |
| idx = final_paras[i].find('表决结果:') |
| final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
| final_paras[i] = final_paras[i][:-2] |
| else: |
| idx = final_paras[i].find('表决结果:') |
| final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
|
|
| |
| for is_include, s_include in _check_if_include_second_proper(final_paras[i], SECOND_PROPER_CORPUS): |
| if is_include: |
| |
| if final_paras[i][final_paras[i].index(s_include)+len(s_include)] == '\n': |
| final_paras[i] = final_paras[i].replace('\n', '') |
|
|
| |
| |
| if '(' in final_paras[i]: |
| final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', end_flag) |
| final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) |
|
|
| |
| |
| if '、' in final_paras[i]: |
| final_paras[i] = _match_and_insert(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', end_flag) |
| final_paras[i] = _match_and_insert(final_paras[i], '[0-9]{1,2}、', end_flag) |
| |
| final_paras[i] = _match_and_delete(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、[\S]+、[\S]+') |
| final_paras[i] = _match_and_delete(final_paras[i], '[0-9]+、[0-9]+') |
|
|
| |
| for j in _find_key_indexs(final_paras[i], '●'): |
| if j > 0: |
| final_paras[i] = _insert_char_into_str(final_paras[i], j, end_flag) |
|
|
| |
| except Exception: |
| return False, [] |
| |
| return True, final_paras |
|
|
|
|
|
|
| def get_table_from_docx(doc, txt, out_path="", is_out_flag=False): |
| ''' |
| 读取Docx文件中每个表格的材料内容 |
| :param doc: 一个Document对象实例 |
| :param txt: 一个字符串列表,包含PDF的正文文本内容 |
| :param out_path: 输出的csv结果文件的完整路径 |
| :param is_out_flag: 是否输出csv结果文件,默认不输出 |
| :return: list, list |
| ''' |
| data = [] |
| table_txt = [] |
| attach_txt = {} |
| for table in doc.tables[:]: |
| table_txt.append('-----表格-----\n') |
| for i, row in enumerate(table.rows[:]): |
| row_content = [] |
| for cell in row.cells[:]: |
| c = cell.text |
| new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','') |
| row_content.append(new_c) |
| if row_content == []: |
| continue |
| if '本公司' in row_content[0]: |
| tmp = '' |
| for line in row_content: |
| tmp += line.strip() |
| tmp += '\n\n' |
| attach_txt['000'] = tmp |
| continue |
| if '证券代码' in row_content[0]: |
| tmp = '^' |
| for line in row_content: |
| tmp += line.strip()+' ' |
| tmp += '$\n' |
| txt.insert(tmp, 0) |
| continue |
| data.append(row_content) |
| new_row = '^' + '\t'.join(row_content) + '$\n' |
| if new_row.replace('\t','') != '^$\n': |
| table_txt.append(new_row) |
| data.append('-----表格-----\n') |
| table_txt.append('-----表格-----\n') |
|
|
| flag = False |
| for i, val in enumerate(table_txt): |
| if val == '-----表格-----\n': |
| if not flag: |
| flag = True |
| else: |
| table_txt[i] = '^$\n' |
| else: |
| flag = False |
|
|
| table_txt = list(filter(lambda x: x != '^$\n', table_txt)) |
| for i, val in enumerate(table_txt): |
| if val == '-----表格-----\n' and (i > 0) and (i < len(table_txt)-1): |
| feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', '')) |
| feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', '')) |
| if feat1 == feat2: |
| table_txt[i] = '^$\n' |
|
|
| if len(table_txt) == 1 and table_txt[0] == '-----表格-----\n': |
| table_txt[0] = '^$\n' |
|
|
| for i, val in enumerate(table_txt): |
| if val == '-----表格-----': |
| continue |
| if val == '^$\n': |
| table_txt[i] = '' |
| continue |
| table_txt[i] = val[1:][:-2] + '\n' |
|
|
| txt.extend(table_txt) |
|
|
| if is_out_flag: |
| f = open(out_path, 'w+', newline='') |
| writer = csv.writer(f) |
| for i, val in enumerate(data): |
| if i == 0 and val == '\n': |
| continue |
| writer.writerow(val) |
| f.close() |
|
|
| return txt, attach_txt |
|
|
|
|
|
|
| def refine_pdf2txt_list_result(txt, att_txt): |
| ''' |
| 对txt字符串列表进行最后的校对,还原或附加误识别为表格的正文内容 |
| :param txt: 一个字符串列表,包含PDF的正文文本内容 |
| :param att_txt: 一些误识别为表格的正文内容 |
| :return: list |
| ''' |
| for id, val in enumerate(txt): |
| if id > 10: break |
| else: |
| if val[-6:-2] == '有限公司': |
| txt[id] = val[:-2] |
| continue |
| if '000' in att_txt and _check_ann_title_processable(val, exp=2): |
| txt.insert(id+1, att_txt['000']) |
| break |
| return txt |
|
|
|
|
|
|
| def write_pdf2txt_list_result(out_path, txt, out_mode_flag=True): |
| ''' |
| 将txt字符串列表写为txt文本文件 |
| :param out_path: 生成的txt文本文件的路径 |
| :param txt: 一个字符串列表,包含PDF的正文和表格 |
| :param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' |
| :return: bool |
| ''' |
| with open(out_path, "w", encoding='utf-8') as f: |
| if not out_mode_flag: |
| for line in txt: |
| if line != '^$\n': |
| f.write(line) |
| else: |
| strs = ''.join(txt) |
| paras = strs.split('\n') |
| for line in paras: |
| if line != '': |
| f.write('^' + line + '$\n') |
| return True |
|
|
|
|
|
|
| def get_pdf2txt_str_result(txt, out_mode_flag=True): |
| ''' |
| 将txt字符串列表内元素拼接为完整的txt内容 |
| :param txt: 一个字符串列表,包含PDF的正文和表格 |
| :param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' |
| :return: str |
| ''' |
| txt_str = "" |
| for line in txt: |
| if not out_mode_flag: |
| for line in txt: |
| if line != '^$\n': |
| txt_str += line |
| else: |
| strs = ''.join(txt) |
| paras = strs.split('\n') |
| for line in paras: |
| if line != '': |
| txt_str += ('^' + line + '$\n') |
| return txt_str |
|
|
|
|
| def find_all_local_file(base, extension): |
| ''' |
| 找出给定目录下所有的指定后缀格式的文件路径 |
| :param base: 目录路径 |
| :param extension: 后缀格式,例如: '.pdf' |
| :return: str |
| ''' |
| for root, ds, fs in os.walk(base): |
| for f in fs: |
| if f.endswith(extension.lower()) or f.endswith(extension.upper()): |
| fullname = os.path.join(root, f).replace('/', '//').replace('\\', '//') |
| yield fullname |