task-parser / src /parser.py
Artem Nikolaev
PDF Task Parser with FastAPI and Google integration
ca8ebf7
import pdfplumber
import re
from datetime import datetime
from typing import List, Dict, Optional
import os
import subprocess
import shutil
class TaskParser:
def __init__(self, file_path: str):
self.file_path = file_path
self.tasks = []
self.file_extension = os.path.splitext(file_path)[1].lower()
# ========== НАСТРОЙКИ ПОЛЕЙ (МОЖНО МЕНЯТЬ) ==========
# Ключевые слова для поиска даты
self.date_keywords = ['Срок', 'Дата', 'Дедлайн', 'Due', 'Выполнить до']
# Ключевые слова для поиска ответственного
self.resp_keywords = ['Отв.', 'Исполнитель', 'Ответственный', 'Исп.']
# Текстовые статусы выполнения
self.status_keywords = ['выполнено', 'выполнен', 'сделано', 'готово']
# Разделители между словом и значением
self.separators = r'\s*(?:—|–|-|:)?\s*'
# Слова, которые означают конец раздела с задачами
self.after_keywords = [
'Протокол вёл',
'Лист согласования',
'Стр.',
'Page',
'Ознакомлены',
'Подписи',
'УТВЕРЖДАЮ',
'СОГЛАСОВАНО',
'От АО «ТАНЕКО»:',
'От ООО «НТЦ Татнефть»:',
'От ООО «ЭПИК»:'
]
# Слова, которые игнорируются до РЕШИЛИ:
self.before_keywords = [
'СЛУШАЛИ:',
'ВЫСТУПИЛИ:',
'ДОКЛАДЫВАЛИ:',
'ОБСУЖДАЛИ:',
'ПОВЕСТКА ДНЯ:',
'ПРИСУТСТВОВАЛИ:'
]
# ====================================================
def extract_text(self) -> str:
"""Извлекает текст из файла (поддерживает PDF, DOCX и DOC)"""
if self.file_extension == '.pdf':
return self._extract_from_pdf()
elif self.file_extension == '.docx':
return self._extract_from_docx()
elif self.file_extension == '.doc':
return self._extract_from_doc()
else:
print(f"❌ Неподдерживаемый формат файла: {self.file_extension}")
print(" Поддерживаются: .pdf, .docx, .doc")
return ""
def _extract_from_pdf(self) -> str:
full_text = ""
try:
with pdfplumber.open(self.file_path) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text:
full_text += text + "\n"
print(f"✅ Извлечено {len(full_text)} символов из PDF")
return full_text
except Exception as e:
print(f"❌ Ошибка при чтении PDF: {e}")
return ""
def _extract_from_docx(self) -> str:
try:
from docx import Document
doc = Document(self.file_path)
full_text = []
for para in doc.paragraphs:
text = para.text.strip()
if text:
try:
import xml.etree.ElementTree as ET
if para._element.xpath('.//w:numPr'):
full_text.append(f"¶ {text}")
else:
full_text.append(text)
except:
full_text.append(text)
for table in doc.tables:
for row in table.rows:
row_text = []
for cell in row.cells:
if cell.text.strip():
row_text.append(cell.text)
if row_text:
full_text.append(' | '.join(row_text))
result = '\n'.join(full_text)
print(f"✅ Извлечено {len(result)} символов из Word документа (.docx)")
return result
except ImportError:
print("❌ Библиотека python-docx не установлена")
print(" Установите: pip install python-docx")
return ""
except Exception as e:
print(f"❌ Ошибка при чтении Word документа: {e}")
return ""
def _extract_from_doc(self) -> str:
if shutil.which('antiword'):
try:
result = subprocess.run(['antiword', self.file_path],
capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ Извлечено {len(result.stdout)} символов из Word .doc файла")
return result.stdout
except Exception as e:
print(f"⚠️ Ошибка antiword: {e}")
if shutil.which('soffice'):
try:
import tempfile
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'soffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, self.file_path
], capture_output=True, text=True)
if result.returncode == 0:
base_name = os.path.basename(self.file_path).replace('.doc', '.txt')
txt_file = os.path.join(temp_dir, base_name)
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
os.remove(txt_file)
os.rmdir(temp_dir)
print(f"✅ Извлечено {len(content)} символов из Word .doc файла (через LibreOffice)")
return content
except Exception as e:
print(f"⚠️ Ошибка при конвертации через LibreOffice: {e}")
print("❌ Не удалось извлечь текст из .doc файла.")
print(" Установите: brew install antiword")
return ""
def parse_tasks(self, text: str) -> List[Dict]:
lines = text.split('\n')
has_resheno = False
resheno_index = -1
for i, line in enumerate(lines[:100]):
if 'РЕШИЛИ:' in line:
has_resheno = True
resheno_index = i
print(f"✅ Найден маркер 'РЕШИЛИ:' в строке {i}")
break
if self.file_extension == '.pdf':
print("📄 PDF файл: использую простой парсинг")
self.tasks = self._parse_pdf_simple(lines)
elif has_resheno:
print("📝 Word файл с РЕШИЛИ: использую парсинг протокола")
self.tasks = self._parse_word_protocol(lines, resheno_index)
else:
print("📄 Простой список: использую базовый парсинг")
self.tasks = self._parse_simple_list(lines)
return self.tasks
def _parse_pdf_simple(self, lines: List[str]) -> List[Dict]:
tasks = []
current_task = None
current_description = []
решили_index = -1
for i, line in enumerate(lines):
if 'РЕШИЛИ:' in line:
решили_index = i
break
start_index = решили_index + 1 if решили_index != -1 else 0
i = start_index
while i < len(lines):
line = lines[i].strip()
if not line:
i += 1
continue
stop_parsing = False
for keyword in self.after_keywords:
if keyword in line[:30]:
stop_parsing = True
break
if stop_parsing:
break
task_match = re.match(r'^(\d+)\.\s+(.*)', line)
if task_match:
if current_task:
full_desc = ' '.join(current_description)
full_desc = re.sub(r'\s+', ' ', full_desc)
# Очищаем описание от метаданных
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\.]+?(?:\.|$)', '', full_desc)
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^С]+?(?:\s+Срок|$)', '', full_desc)
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\n]+', '', full_desc)
full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*\d{2}\.\d{2}\.\d{4}', '', full_desc)
full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*до\s+конца\s+года', '', full_desc)
full_desc = re.sub(r'[;,\s]*С\b', '', full_desc)
full_desc = re.sub(r'\s+', ' ', full_desc)
full_desc = re.sub(r'\s*[;,]?\s*$', '', full_desc)
full_desc = full_desc.strip()
current_task['full_description'] = full_desc
tasks.append(current_task)
task_num = task_match.group(1)
task_text = task_match.group(2)
current_task = {
'number': int(task_num),
'full_description': '',
'responsible': '',
'due_date': None,
'due_date_str': ''
}
current_description = [task_text]
i += 1
elif current_task:
current_description.append(line)
if 'Отв.:' in line:
resp_match = re.search(r'Отв\.:\s*([^С]+?)(?:\s+Срок|$)', line)
if not resp_match:
resp_match = re.search(r'Отв\.:\s*([^\n]+)', line)
if resp_match:
responsible = resp_match.group(1).strip()
# Обрезаем до ключевых слов
stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение', 'Протокол'] + self.status_keywords
for stop_word in stop_words:
if stop_word in responsible:
responsible = responsible.split(stop_word)[0].strip()
break
# Дополнительная очистка от "Срок" и "Выполнено" в любом регистре
responsible = re.sub(r'\s+Срок.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+Выполнено.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+до\s+конца\s+года.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+', ' ', responsible)
current_task['responsible'] = responsible
if 'Срок' in line or any(word in line.lower() for word in self.status_keywords + ['до конца года']):
line_lower = line.lower()
# Проверяем на статусы выполнения
if any(word in line_lower for word in self.status_keywords):
current_task['due_date_str'] = 'Выполнено'
elif 'до конца года' in line_lower:
current_task['due_date_str'] = 'до конца года'
else:
date_match = re.search(r'Срок\s*[—–-]?\s*(\d{2}\.\d{2}\.\d{4})', line)
if date_match:
date_str = date_match.group(1).strip()
current_task['due_date_str'] = date_str
try:
current_task['due_date'] = datetime.strptime(date_str, '%d.%m.%Y').date()
except ValueError:
pass
i += 1
else:
i += 1
if current_task:
full_desc = ' '.join(current_description)
full_desc = re.sub(r'\s+', ' ', full_desc)
# Очищаем описание от метаданных
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\.]+?(?:\.|$)', '', full_desc)
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^С]+?(?:\s+Срок|$)', '', full_desc)
full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\n]+', '', full_desc)
full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*\d{2}\.\d{2}\.\d{4}', '', full_desc)
full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*до\s+конца\s+года', '', full_desc)
full_desc = re.sub(r'[;,\s]*С\b', '', full_desc)
full_desc = re.sub(r'\s+', ' ', full_desc)
full_desc = re.sub(r'\s*[;,]?\s*$', '', full_desc)
full_desc = full_desc.strip()
current_task['full_description'] = full_desc
tasks.append(current_task)
return tasks
def _parse_word_protocol(self, all_lines: List[str], start_idx: int) -> List[Dict]:
tasks = []
решили_pos = -1
for i, line in enumerate(all_lines):
if 'РЕШИЛИ:' in line:
решили_pos = i
break
if решили_pos == -1:
return []
task_lines = []
i = решили_pos + 1
while i < len(all_lines) and not all_lines[i].strip():
i += 1
started = False
while i < len(all_lines):
line = all_lines[i].strip()
stop_found = False
for keyword in self.after_keywords:
if keyword in line[:30]:
stop_found = True
break
if stop_found:
break
if re.match(r'^\d+$', line):
i += 1
continue
is_service = False
for keyword in self.before_keywords:
if keyword in line:
is_service = True
break
if is_service:
i += 1
continue
if not started and (re.match(r'^\d+[.\t]', line) or line.startswith('¶')):
started = True
if started and line:
task_lines.append(line)
i += 1
if task_lines:
last_line = task_lines[-1]
for keyword in self.after_keywords:
if keyword in last_line:
task_lines[-1] = last_line.split(keyword)[0].strip()
break
if task_lines and not task_lines[-1]:
task_lines.pop()
i = 0
task_counter = 1
while i < len(task_lines):
line = task_lines[i]
is_task_start = False
task_number = None
description = None
match = re.match(r'^(\d+)[.\t]\s*(.*)', line)
if match:
is_task_start = True
task_number = int(match.group(1))
description = match.group(2)
if not is_task_start and line.startswith('¶'):
is_task_start = True
task_number = task_counter
description = re.sub(r'^¶\s*', '', line)
if not is_task_start:
has_resp = any(k in line for k in self.resp_keywords)
has_date = any(k in line for k in self.date_keywords)
is_service = False
for keyword in self.before_keywords:
if keyword in line:
is_service = True
break
if not has_resp and not has_date and not is_service and len(line) > 20:
is_task_start = True
task_number = task_counter
description = line
if is_task_start and description:
i += 1
responsible = ""
due_date_str = ""
due_date = None
while i < len(task_lines) and not task_lines[i].strip():
i += 1
collected_resp = False
collected_date = False
while i < len(task_lines) and not (collected_resp and collected_date):
current = task_lines[i].strip()
if not current:
i += 1
continue
next_is_task = False
if re.match(r'^\d+[.\t]', current):
next_is_task = True
elif current.startswith('¶'):
next_is_task = True
else:
has_resp_next = any(k in current for k in self.resp_keywords)
has_date_next = any(k in current for k in self.date_keywords)
is_service_next = any(k in current for k in self.before_keywords)
if not has_resp_next and not has_date_next and not is_service_next and len(current) > 20:
next_is_task = True
if next_is_task:
break
if not collected_resp:
for keyword in self.resp_keywords:
if keyword in current:
resp_parts = current.split(keyword)
if len(resp_parts) > 1:
resp_text = resp_parts[1].strip()
stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение', 'Протокол'] + self.status_keywords
for stop_word in stop_words:
if stop_word in resp_text.lower():
resp_text = resp_text.split(stop_word)[0].strip()
break
# Дополнительная очистка
resp_text = re.sub(r'\s+Срок.*$', '', resp_text, flags=re.IGNORECASE)
resp_text = re.sub(r'\s+Выполнено.*$', '', resp_text, flags=re.IGNORECASE)
resp_text = re.sub(r'\s+до\s+конца\s+года.*$', '', resp_text, flags=re.IGNORECASE)
responsible = re.sub(r'\s+', ' ', resp_text)
responsible = re.sub(r'^:\s*', '', responsible)
collected_resp = True
for d_keyword in self.date_keywords:
if d_keyword in current:
date_parts = current.split(d_keyword)
if len(date_parts) > 1:
date_text = date_parts[1].strip()
date_text = re.sub(r'^\s*[—–-]?\s*', '', date_text)
due_date_str = re.sub(r'\s+', ' ', date_text)
date_match = re.search(r'(\d{2}\.\d{2}\.\d{4})', date_text)
if date_match:
try:
due_date = datetime.strptime(date_match.group(1), '%d.%m.%Y').date()
except ValueError:
pass
collected_date = True
break
i += 1
break
if not collected_date and i < len(task_lines):
current = task_lines[i].strip()
current_lower = current.lower()
# Проверяем на статусы выполнения
if any(word in current_lower for word in self.status_keywords):
due_date_str = 'Выполнено'
collected_date = True
i += 1
elif 'до конца года' in current_lower:
due_date_str = 'до конца года'
collected_date = True
i += 1
else:
for keyword in self.date_keywords:
if keyword in current:
date_parts = current.split(keyword)
if len(date_parts) > 1:
date_text = date_parts[1].strip()
date_text = re.sub(r'^\s*[—–-]?\s*', '', date_text)
due_date_str = re.sub(r'\s+', ' ', date_text)
date_match = re.search(r'(\d{2}\.\d{2}\.\d{4})', date_text)
if date_match:
try:
due_date = datetime.strptime(date_match.group(1), '%d.%m.%Y').date()
except ValueError:
pass
collected_date = True
i += 1
break
if not (collected_resp or collected_date):
i += 1
task = {
'number': task_number,
'full_description': description,
'responsible': responsible,
'due_date': due_date,
'due_date_str': due_date_str
}
tasks.append(task)
task_counter += 1
else:
i += 1
return tasks
def _parse_simple_list(self, lines: List[str]) -> List[Dict]:
tasks = []
current_task = None
current_description = []
for line in lines:
line = line.strip()
if not line:
continue
task_match = re.match(r'^(\d+)\.\s+(.*)', line)
if task_match:
if current_task:
full_desc = ' '.join(current_description)
full_desc = re.sub(r'\s+', ' ', full_desc)
current_task['full_description'] = full_desc
tasks.append(current_task)
task_num = task_match.group(1)
task_text = task_match.group(2)
current_task = {
'number': int(task_num),
'full_description': '',
'responsible': '',
'due_date': None,
'due_date_str': ''
}
current_description = [task_text]
elif current_task:
current_description.append(line)
if 'Срок' in line or any(word in line.lower() for word in self.status_keywords + ['до конца года']):
line_lower = line.lower()
if any(word in line_lower for word in self.status_keywords):
current_task['due_date_str'] = 'Выполнено'
elif 'до конца года' in line_lower:
current_task['due_date_str'] = 'до конца года'
else:
date_match = re.search(rf'Срок\s*[—–-]?\s*(\d{{2}}\.\d{{2}}\.\d{{4}})', line)
if date_match:
date_str = date_match.group(1).strip()
current_task['due_date_str'] = date_str
try:
current_task['due_date'] = datetime.strptime(date_str, '%d.%m.%Y').date()
except ValueError:
pass
for keyword in self.resp_keywords:
if keyword in line:
resp_match = re.search(rf'{re.escape(keyword)}\s*[—–-]?\s*([^С]+?)(?:\s+Срок|$)', line)
if not resp_match:
resp_match = re.search(rf'{re.escape(keyword)}\s*[—–-]?\s*([^\n]+)', line)
if resp_match:
responsible = resp_match.group(1).strip()
stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение'] + self.status_keywords
for stop_word in stop_words:
if stop_word in responsible.lower():
responsible = responsible.split(stop_word)[0].strip()
break
# Дополнительная очистка
responsible = re.sub(r'\s+Срок.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+Выполнено.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+до\s+конца\s+года.*$', '', responsible, flags=re.IGNORECASE)
responsible = re.sub(r'\s+', ' ', responsible)
current_task['responsible'] = responsible
break
if current_task:
full_desc = ' '.join(current_description)
full_desc = re.sub(r'\s+', ' ', full_desc)
current_task['full_description'] = full_desc
tasks.append(current_task)
return tasks
def print_tasks(self):
if not self.tasks:
print("❌ Задачи не найдены")
return
print(f"\n📋 Найдено задач: {len(self.tasks)}\n")
print("=" * 80)
for task in self.tasks:
print(f"Задача #{task['number']}")
print(f"📝 Описание: {task['full_description'][:100]}...")
print(f"👤 Ответственный: {task['responsible'] or '❌ НЕТ'}")
print(f"📅 Срок: {task['due_date_str'] or '❌ НЕТ'}")
print("-" * 40)
def to_dataframe(self):
import pandas as pd
data = []
for task in self.tasks:
data.append({
'№': task['number'],
'Описание': task['full_description'],
'Ответственный': task.get('responsible', 'Не указан'),
'Срок': task.get('due_date_str', 'Не указан'),
'Дата (для сортировки)': task.get('due_date')
})
df = pd.DataFrame(data)
return df