Spaces:
Running
Running
| import pdfplumber | |
| import re | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import os | |
| import subprocess | |
| import shutil | |
| class TaskParser: | |
| def __init__(self, file_path: str): | |
| self.file_path = file_path | |
| self.tasks = [] | |
| self.file_extension = os.path.splitext(file_path)[1].lower() | |
| # ========== НАСТРОЙКИ ПОЛЕЙ (МОЖНО МЕНЯТЬ) ========== | |
| # Ключевые слова для поиска даты | |
| self.date_keywords = ['Срок', 'Дата', 'Дедлайн', 'Due', 'Выполнить до'] | |
| # Ключевые слова для поиска ответственного | |
| self.resp_keywords = ['Отв.', 'Исполнитель', 'Ответственный', 'Исп.'] | |
| # Текстовые статусы выполнения | |
| self.status_keywords = ['выполнено', 'выполнен', 'сделано', 'готово'] | |
| # Разделители между словом и значением | |
| self.separators = r'\s*(?:—|–|-|:)?\s*' | |
| # Слова, которые означают конец раздела с задачами | |
| self.after_keywords = [ | |
| 'Протокол вёл', | |
| 'Лист согласования', | |
| 'Стр.', | |
| 'Page', | |
| 'Ознакомлены', | |
| 'Подписи', | |
| 'УТВЕРЖДАЮ', | |
| 'СОГЛАСОВАНО', | |
| 'От АО «ТАНЕКО»:', | |
| 'От ООО «НТЦ Татнефть»:', | |
| 'От ООО «ЭПИК»:' | |
| ] | |
| # Слова, которые игнорируются до РЕШИЛИ: | |
| self.before_keywords = [ | |
| 'СЛУШАЛИ:', | |
| 'ВЫСТУПИЛИ:', | |
| 'ДОКЛАДЫВАЛИ:', | |
| 'ОБСУЖДАЛИ:', | |
| 'ПОВЕСТКА ДНЯ:', | |
| 'ПРИСУТСТВОВАЛИ:' | |
| ] | |
| # ==================================================== | |
| def extract_text(self) -> str: | |
| """Извлекает текст из файла (поддерживает PDF, DOCX и DOC)""" | |
| if self.file_extension == '.pdf': | |
| return self._extract_from_pdf() | |
| elif self.file_extension == '.docx': | |
| return self._extract_from_docx() | |
| elif self.file_extension == '.doc': | |
| return self._extract_from_doc() | |
| else: | |
| print(f"❌ Неподдерживаемый формат файла: {self.file_extension}") | |
| print(" Поддерживаются: .pdf, .docx, .doc") | |
| return "" | |
| def _extract_from_pdf(self) -> str: | |
| full_text = "" | |
| try: | |
| with pdfplumber.open(self.file_path) as pdf: | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| if text: | |
| full_text += text + "\n" | |
| print(f"✅ Извлечено {len(full_text)} символов из PDF") | |
| return full_text | |
| except Exception as e: | |
| print(f"❌ Ошибка при чтении PDF: {e}") | |
| return "" | |
| def _extract_from_docx(self) -> str: | |
| try: | |
| from docx import Document | |
| doc = Document(self.file_path) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| text = para.text.strip() | |
| if text: | |
| try: | |
| import xml.etree.ElementTree as ET | |
| if para._element.xpath('.//w:numPr'): | |
| full_text.append(f"¶ {text}") | |
| else: | |
| full_text.append(text) | |
| except: | |
| full_text.append(text) | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = [] | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| row_text.append(cell.text) | |
| if row_text: | |
| full_text.append(' | '.join(row_text)) | |
| result = '\n'.join(full_text) | |
| print(f"✅ Извлечено {len(result)} символов из Word документа (.docx)") | |
| return result | |
| except ImportError: | |
| print("❌ Библиотека python-docx не установлена") | |
| print(" Установите: pip install python-docx") | |
| return "" | |
| except Exception as e: | |
| print(f"❌ Ошибка при чтении Word документа: {e}") | |
| return "" | |
| def _extract_from_doc(self) -> str: | |
| if shutil.which('antiword'): | |
| try: | |
| result = subprocess.run(['antiword', self.file_path], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| print(f"✅ Извлечено {len(result.stdout)} символов из Word .doc файла") | |
| return result.stdout | |
| except Exception as e: | |
| print(f"⚠️ Ошибка antiword: {e}") | |
| if shutil.which('soffice'): | |
| try: | |
| import tempfile | |
| temp_dir = tempfile.mkdtemp() | |
| result = subprocess.run([ | |
| 'soffice', '--headless', '--convert-to', 'txt', | |
| '--outdir', temp_dir, self.file_path | |
| ], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| base_name = os.path.basename(self.file_path).replace('.doc', '.txt') | |
| txt_file = os.path.join(temp_dir, base_name) | |
| if os.path.exists(txt_file): | |
| with open(txt_file, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| os.remove(txt_file) | |
| os.rmdir(temp_dir) | |
| print(f"✅ Извлечено {len(content)} символов из Word .doc файла (через LibreOffice)") | |
| return content | |
| except Exception as e: | |
| print(f"⚠️ Ошибка при конвертации через LibreOffice: {e}") | |
| print("❌ Не удалось извлечь текст из .doc файла.") | |
| print(" Установите: brew install antiword") | |
| return "" | |
| def parse_tasks(self, text: str) -> List[Dict]: | |
| lines = text.split('\n') | |
| has_resheno = False | |
| resheno_index = -1 | |
| for i, line in enumerate(lines[:100]): | |
| if 'РЕШИЛИ:' in line: | |
| has_resheno = True | |
| resheno_index = i | |
| print(f"✅ Найден маркер 'РЕШИЛИ:' в строке {i}") | |
| break | |
| if self.file_extension == '.pdf': | |
| print("📄 PDF файл: использую простой парсинг") | |
| self.tasks = self._parse_pdf_simple(lines) | |
| elif has_resheno: | |
| print("📝 Word файл с РЕШИЛИ: использую парсинг протокола") | |
| self.tasks = self._parse_word_protocol(lines, resheno_index) | |
| else: | |
| print("📄 Простой список: использую базовый парсинг") | |
| self.tasks = self._parse_simple_list(lines) | |
| return self.tasks | |
| def _parse_pdf_simple(self, lines: List[str]) -> List[Dict]: | |
| tasks = [] | |
| current_task = None | |
| current_description = [] | |
| решили_index = -1 | |
| for i, line in enumerate(lines): | |
| if 'РЕШИЛИ:' in line: | |
| решили_index = i | |
| break | |
| start_index = решили_index + 1 if решили_index != -1 else 0 | |
| i = start_index | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| if not line: | |
| i += 1 | |
| continue | |
| stop_parsing = False | |
| for keyword in self.after_keywords: | |
| if keyword in line[:30]: | |
| stop_parsing = True | |
| break | |
| if stop_parsing: | |
| break | |
| task_match = re.match(r'^(\d+)\.\s+(.*)', line) | |
| if task_match: | |
| if current_task: | |
| full_desc = ' '.join(current_description) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| # Очищаем описание от метаданных | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\.]+?(?:\.|$)', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^С]+?(?:\s+Срок|$)', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\n]+', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*\d{2}\.\d{2}\.\d{4}', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*до\s+конца\s+года', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*С\b', '', full_desc) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| full_desc = re.sub(r'\s*[;,]?\s*$', '', full_desc) | |
| full_desc = full_desc.strip() | |
| current_task['full_description'] = full_desc | |
| tasks.append(current_task) | |
| task_num = task_match.group(1) | |
| task_text = task_match.group(2) | |
| current_task = { | |
| 'number': int(task_num), | |
| 'full_description': '', | |
| 'responsible': '', | |
| 'due_date': None, | |
| 'due_date_str': '' | |
| } | |
| current_description = [task_text] | |
| i += 1 | |
| elif current_task: | |
| current_description.append(line) | |
| if 'Отв.:' in line: | |
| resp_match = re.search(r'Отв\.:\s*([^С]+?)(?:\s+Срок|$)', line) | |
| if not resp_match: | |
| resp_match = re.search(r'Отв\.:\s*([^\n]+)', line) | |
| if resp_match: | |
| responsible = resp_match.group(1).strip() | |
| # Обрезаем до ключевых слов | |
| stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение', 'Протокол'] + self.status_keywords | |
| for stop_word in stop_words: | |
| if stop_word in responsible: | |
| responsible = responsible.split(stop_word)[0].strip() | |
| break | |
| # Дополнительная очистка от "Срок" и "Выполнено" в любом регистре | |
| responsible = re.sub(r'\s+Срок.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+Выполнено.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+до\s+конца\s+года.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+', ' ', responsible) | |
| current_task['responsible'] = responsible | |
| if 'Срок' in line or any(word in line.lower() for word in self.status_keywords + ['до конца года']): | |
| line_lower = line.lower() | |
| # Проверяем на статусы выполнения | |
| if any(word in line_lower for word in self.status_keywords): | |
| current_task['due_date_str'] = 'Выполнено' | |
| elif 'до конца года' in line_lower: | |
| current_task['due_date_str'] = 'до конца года' | |
| else: | |
| date_match = re.search(r'Срок\s*[—–-]?\s*(\d{2}\.\d{2}\.\d{4})', line) | |
| if date_match: | |
| date_str = date_match.group(1).strip() | |
| current_task['due_date_str'] = date_str | |
| try: | |
| current_task['due_date'] = datetime.strptime(date_str, '%d.%m.%Y').date() | |
| except ValueError: | |
| pass | |
| i += 1 | |
| else: | |
| i += 1 | |
| if current_task: | |
| full_desc = ' '.join(current_description) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| # Очищаем описание от метаданных | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\.]+?(?:\.|$)', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^С]+?(?:\s+Срок|$)', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Отв\.:\s*[^\n]+', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*\d{2}\.\d{2}\.\d{4}', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*Срок\s*[—–-]?\s*до\s+конца\s+года', '', full_desc) | |
| full_desc = re.sub(r'[;,\s]*С\b', '', full_desc) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| full_desc = re.sub(r'\s*[;,]?\s*$', '', full_desc) | |
| full_desc = full_desc.strip() | |
| current_task['full_description'] = full_desc | |
| tasks.append(current_task) | |
| return tasks | |
| def _parse_word_protocol(self, all_lines: List[str], start_idx: int) -> List[Dict]: | |
| tasks = [] | |
| решили_pos = -1 | |
| for i, line in enumerate(all_lines): | |
| if 'РЕШИЛИ:' in line: | |
| решили_pos = i | |
| break | |
| if решили_pos == -1: | |
| return [] | |
| task_lines = [] | |
| i = решили_pos + 1 | |
| while i < len(all_lines) and not all_lines[i].strip(): | |
| i += 1 | |
| started = False | |
| while i < len(all_lines): | |
| line = all_lines[i].strip() | |
| stop_found = False | |
| for keyword in self.after_keywords: | |
| if keyword in line[:30]: | |
| stop_found = True | |
| break | |
| if stop_found: | |
| break | |
| if re.match(r'^\d+$', line): | |
| i += 1 | |
| continue | |
| is_service = False | |
| for keyword in self.before_keywords: | |
| if keyword in line: | |
| is_service = True | |
| break | |
| if is_service: | |
| i += 1 | |
| continue | |
| if not started and (re.match(r'^\d+[.\t]', line) or line.startswith('¶')): | |
| started = True | |
| if started and line: | |
| task_lines.append(line) | |
| i += 1 | |
| if task_lines: | |
| last_line = task_lines[-1] | |
| for keyword in self.after_keywords: | |
| if keyword in last_line: | |
| task_lines[-1] = last_line.split(keyword)[0].strip() | |
| break | |
| if task_lines and not task_lines[-1]: | |
| task_lines.pop() | |
| i = 0 | |
| task_counter = 1 | |
| while i < len(task_lines): | |
| line = task_lines[i] | |
| is_task_start = False | |
| task_number = None | |
| description = None | |
| match = re.match(r'^(\d+)[.\t]\s*(.*)', line) | |
| if match: | |
| is_task_start = True | |
| task_number = int(match.group(1)) | |
| description = match.group(2) | |
| if not is_task_start and line.startswith('¶'): | |
| is_task_start = True | |
| task_number = task_counter | |
| description = re.sub(r'^¶\s*', '', line) | |
| if not is_task_start: | |
| has_resp = any(k in line for k in self.resp_keywords) | |
| has_date = any(k in line for k in self.date_keywords) | |
| is_service = False | |
| for keyword in self.before_keywords: | |
| if keyword in line: | |
| is_service = True | |
| break | |
| if not has_resp and not has_date and not is_service and len(line) > 20: | |
| is_task_start = True | |
| task_number = task_counter | |
| description = line | |
| if is_task_start and description: | |
| i += 1 | |
| responsible = "" | |
| due_date_str = "" | |
| due_date = None | |
| while i < len(task_lines) and not task_lines[i].strip(): | |
| i += 1 | |
| collected_resp = False | |
| collected_date = False | |
| while i < len(task_lines) and not (collected_resp and collected_date): | |
| current = task_lines[i].strip() | |
| if not current: | |
| i += 1 | |
| continue | |
| next_is_task = False | |
| if re.match(r'^\d+[.\t]', current): | |
| next_is_task = True | |
| elif current.startswith('¶'): | |
| next_is_task = True | |
| else: | |
| has_resp_next = any(k in current for k in self.resp_keywords) | |
| has_date_next = any(k in current for k in self.date_keywords) | |
| is_service_next = any(k in current for k in self.before_keywords) | |
| if not has_resp_next and not has_date_next and not is_service_next and len(current) > 20: | |
| next_is_task = True | |
| if next_is_task: | |
| break | |
| if not collected_resp: | |
| for keyword in self.resp_keywords: | |
| if keyword in current: | |
| resp_parts = current.split(keyword) | |
| if len(resp_parts) > 1: | |
| resp_text = resp_parts[1].strip() | |
| stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение', 'Протокол'] + self.status_keywords | |
| for stop_word in stop_words: | |
| if stop_word in resp_text.lower(): | |
| resp_text = resp_text.split(stop_word)[0].strip() | |
| break | |
| # Дополнительная очистка | |
| resp_text = re.sub(r'\s+Срок.*$', '', resp_text, flags=re.IGNORECASE) | |
| resp_text = re.sub(r'\s+Выполнено.*$', '', resp_text, flags=re.IGNORECASE) | |
| resp_text = re.sub(r'\s+до\s+конца\s+года.*$', '', resp_text, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+', ' ', resp_text) | |
| responsible = re.sub(r'^:\s*', '', responsible) | |
| collected_resp = True | |
| for d_keyword in self.date_keywords: | |
| if d_keyword in current: | |
| date_parts = current.split(d_keyword) | |
| if len(date_parts) > 1: | |
| date_text = date_parts[1].strip() | |
| date_text = re.sub(r'^\s*[—–-]?\s*', '', date_text) | |
| due_date_str = re.sub(r'\s+', ' ', date_text) | |
| date_match = re.search(r'(\d{2}\.\d{2}\.\d{4})', date_text) | |
| if date_match: | |
| try: | |
| due_date = datetime.strptime(date_match.group(1), '%d.%m.%Y').date() | |
| except ValueError: | |
| pass | |
| collected_date = True | |
| break | |
| i += 1 | |
| break | |
| if not collected_date and i < len(task_lines): | |
| current = task_lines[i].strip() | |
| current_lower = current.lower() | |
| # Проверяем на статусы выполнения | |
| if any(word in current_lower for word in self.status_keywords): | |
| due_date_str = 'Выполнено' | |
| collected_date = True | |
| i += 1 | |
| elif 'до конца года' in current_lower: | |
| due_date_str = 'до конца года' | |
| collected_date = True | |
| i += 1 | |
| else: | |
| for keyword in self.date_keywords: | |
| if keyword in current: | |
| date_parts = current.split(keyword) | |
| if len(date_parts) > 1: | |
| date_text = date_parts[1].strip() | |
| date_text = re.sub(r'^\s*[—–-]?\s*', '', date_text) | |
| due_date_str = re.sub(r'\s+', ' ', date_text) | |
| date_match = re.search(r'(\d{2}\.\d{2}\.\d{4})', date_text) | |
| if date_match: | |
| try: | |
| due_date = datetime.strptime(date_match.group(1), '%d.%m.%Y').date() | |
| except ValueError: | |
| pass | |
| collected_date = True | |
| i += 1 | |
| break | |
| if not (collected_resp or collected_date): | |
| i += 1 | |
| task = { | |
| 'number': task_number, | |
| 'full_description': description, | |
| 'responsible': responsible, | |
| 'due_date': due_date, | |
| 'due_date_str': due_date_str | |
| } | |
| tasks.append(task) | |
| task_counter += 1 | |
| else: | |
| i += 1 | |
| return tasks | |
| def _parse_simple_list(self, lines: List[str]) -> List[Dict]: | |
| tasks = [] | |
| current_task = None | |
| current_description = [] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| task_match = re.match(r'^(\d+)\.\s+(.*)', line) | |
| if task_match: | |
| if current_task: | |
| full_desc = ' '.join(current_description) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| current_task['full_description'] = full_desc | |
| tasks.append(current_task) | |
| task_num = task_match.group(1) | |
| task_text = task_match.group(2) | |
| current_task = { | |
| 'number': int(task_num), | |
| 'full_description': '', | |
| 'responsible': '', | |
| 'due_date': None, | |
| 'due_date_str': '' | |
| } | |
| current_description = [task_text] | |
| elif current_task: | |
| current_description.append(line) | |
| if 'Срок' in line or any(word in line.lower() for word in self.status_keywords + ['до конца года']): | |
| line_lower = line.lower() | |
| if any(word in line_lower for word in self.status_keywords): | |
| current_task['due_date_str'] = 'Выполнено' | |
| elif 'до конца года' in line_lower: | |
| current_task['due_date_str'] = 'до конца года' | |
| else: | |
| date_match = re.search(rf'Срок\s*[—–-]?\s*(\d{{2}}\.\d{{2}}\.\d{{4}})', line) | |
| if date_match: | |
| date_str = date_match.group(1).strip() | |
| current_task['due_date_str'] = date_str | |
| try: | |
| current_task['due_date'] = datetime.strptime(date_str, '%d.%m.%Y').date() | |
| except ValueError: | |
| pass | |
| for keyword in self.resp_keywords: | |
| if keyword in line: | |
| resp_match = re.search(rf'{re.escape(keyword)}\s*[—–-]?\s*([^С]+?)(?:\s+Срок|$)', line) | |
| if not resp_match: | |
| resp_match = re.search(rf'{re.escape(keyword)}\s*[—–-]?\s*([^\n]+)', line) | |
| if resp_match: | |
| responsible = resp_match.group(1).strip() | |
| stop_words = self.date_keywords + ['Выполнено', 'Приложение', 'приложение'] + self.status_keywords | |
| for stop_word in stop_words: | |
| if stop_word in responsible.lower(): | |
| responsible = responsible.split(stop_word)[0].strip() | |
| break | |
| # Дополнительная очистка | |
| responsible = re.sub(r'\s+Срок.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+Выполнено.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+до\s+конца\s+года.*$', '', responsible, flags=re.IGNORECASE) | |
| responsible = re.sub(r'\s+', ' ', responsible) | |
| current_task['responsible'] = responsible | |
| break | |
| if current_task: | |
| full_desc = ' '.join(current_description) | |
| full_desc = re.sub(r'\s+', ' ', full_desc) | |
| current_task['full_description'] = full_desc | |
| tasks.append(current_task) | |
| return tasks | |
| def print_tasks(self): | |
| if not self.tasks: | |
| print("❌ Задачи не найдены") | |
| return | |
| print(f"\n📋 Найдено задач: {len(self.tasks)}\n") | |
| print("=" * 80) | |
| for task in self.tasks: | |
| print(f"Задача #{task['number']}") | |
| print(f"📝 Описание: {task['full_description'][:100]}...") | |
| print(f"👤 Ответственный: {task['responsible'] or '❌ НЕТ'}") | |
| print(f"📅 Срок: {task['due_date_str'] or '❌ НЕТ'}") | |
| print("-" * 40) | |
| def to_dataframe(self): | |
| import pandas as pd | |
| data = [] | |
| for task in self.tasks: | |
| data.append({ | |
| '№': task['number'], | |
| 'Описание': task['full_description'], | |
| 'Ответственный': task.get('responsible', 'Не указан'), | |
| 'Срок': task.get('due_date_str', 'Не указан'), | |
| 'Дата (для сортировки)': task.get('due_date') | |
| }) | |
| df = pd.DataFrame(data) | |
| return df |