| """ |
| Batch and Parallel Document Parsing |
| |
| This module provides functionality for processing multiple documents in parallel, |
| with progress reporting and error handling. |
| """ |
|
|
| import asyncio |
| import logging |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
| from dataclasses import dataclass |
| import time |
|
|
| from tqdm import tqdm |
|
|
| from .parser import MineruParser, DoclingParser |
|
|
|
|
| @dataclass |
| class BatchProcessingResult: |
| """Result of batch processing operation""" |
|
|
| successful_files: List[str] |
| failed_files: List[str] |
| total_files: int |
| processing_time: float |
| errors: Dict[str, str] |
| output_dir: str |
|
|
| @property |
| def success_rate(self) -> float: |
| """Calculate success rate as percentage""" |
| if self.total_files == 0: |
| return 0.0 |
| return (len(self.successful_files) / self.total_files) * 100 |
|
|
| def summary(self) -> str: |
| """Generate a summary of the batch processing results""" |
| return ( |
| f"Batch Processing Summary:\n" |
| f" Total files: {self.total_files}\n" |
| f" Successful: {len(self.successful_files)} ({self.success_rate:.1f}%)\n" |
| f" Failed: {len(self.failed_files)}\n" |
| f" Processing time: {self.processing_time:.2f} seconds\n" |
| f" Output directory: {self.output_dir}" |
| ) |
|
|
|
|
| class BatchParser: |
| """ |
| Batch document parser with parallel processing capabilities |
| |
| Supports processing multiple documents concurrently with progress tracking |
| and comprehensive error handling. |
| """ |
|
|
| def __init__( |
| self, |
| parser_type: str = "mineru", |
| max_workers: int = 4, |
| show_progress: bool = True, |
| timeout_per_file: int = 300, |
| skip_installation_check: bool = False, |
| ): |
| """ |
| Initialize batch parser |
| |
| Args: |
| parser_type: Type of parser to use ("mineru" or "docling") |
| max_workers: Maximum number of parallel workers |
| show_progress: Whether to show progress bars |
| timeout_per_file: Timeout in seconds for each file |
| skip_installation_check: Skip parser installation check (useful for testing) |
| """ |
| self.parser_type = parser_type |
| self.max_workers = max_workers |
| self.show_progress = show_progress |
| self.timeout_per_file = timeout_per_file |
| self.logger = logging.getLogger(__name__) |
|
|
| |
| if parser_type == "mineru": |
| self.parser = MineruParser() |
| elif parser_type == "docling": |
| self.parser = DoclingParser() |
| else: |
| raise ValueError(f"Unsupported parser type: {parser_type}") |
|
|
| |
| if not skip_installation_check: |
| if not self.parser.check_installation(): |
| self.logger.warning( |
| f"{parser_type.title()} parser installation check failed. " |
| f"This may be due to package conflicts. " |
| f"Use skip_installation_check=True to bypass this check." |
| ) |
| |
|
|
| def get_supported_extensions(self) -> List[str]: |
| """Get list of supported file extensions""" |
| return list( |
| self.parser.OFFICE_FORMATS |
| | self.parser.IMAGE_FORMATS |
| | self.parser.TEXT_FORMATS |
| | {".pdf"} |
| ) |
|
|
| def filter_supported_files( |
| self, file_paths: List[str], recursive: bool = True |
| ) -> List[str]: |
| """ |
| Filter file paths to only include supported file types |
| |
| Args: |
| file_paths: List of file paths or directories |
| recursive: Whether to search directories recursively |
| |
| Returns: |
| List of supported file paths |
| """ |
| supported_extensions = set(self.get_supported_extensions()) |
| supported_files = [] |
|
|
| for path_str in file_paths: |
| path = Path(path_str) |
|
|
| if path.is_file(): |
| if path.suffix.lower() in supported_extensions: |
| supported_files.append(str(path)) |
| else: |
| self.logger.warning(f"Unsupported file type: {path}") |
|
|
| elif path.is_dir(): |
| if recursive: |
| |
| for file_path in path.rglob("*"): |
| if ( |
| file_path.is_file() |
| and file_path.suffix.lower() in supported_extensions |
| ): |
| supported_files.append(str(file_path)) |
| else: |
| |
| for file_path in path.glob("*"): |
| if ( |
| file_path.is_file() |
| and file_path.suffix.lower() in supported_extensions |
| ): |
| supported_files.append(str(file_path)) |
|
|
| else: |
| self.logger.warning(f"Path does not exist: {path}") |
|
|
| return supported_files |
|
|
| def process_single_file( |
| self, file_path: str, output_dir: str, parse_method: str = "auto", **kwargs |
| ) -> Tuple[bool, str, Optional[str]]: |
| """ |
| Process a single file |
| |
| Args: |
| file_path: Path to the file to process |
| output_dir: Output directory |
| parse_method: Parsing method |
| **kwargs: Additional parser arguments |
| |
| Returns: |
| Tuple of (success, file_path, error_message) |
| """ |
| try: |
| start_time = time.time() |
|
|
| |
| file_name = Path(file_path).stem |
| file_output_dir = Path(output_dir) / file_name |
| file_output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| content_list = self.parser.parse_document( |
| file_path=file_path, |
| output_dir=str(file_output_dir), |
| method=parse_method, |
| **kwargs, |
| ) |
|
|
| processing_time = time.time() - start_time |
|
|
| self.logger.info( |
| f"Successfully processed {file_path} " |
| f"({len(content_list)} content blocks, {processing_time:.2f}s)" |
| ) |
|
|
| return True, file_path, None |
|
|
| except Exception as e: |
| error_msg = f"Failed to process {file_path}: {str(e)}" |
| self.logger.error(error_msg) |
| return False, file_path, error_msg |
|
|
| def process_batch( |
| self, |
| file_paths: List[str], |
| output_dir: str, |
| parse_method: str = "auto", |
| recursive: bool = True, |
| **kwargs, |
| ) -> BatchProcessingResult: |
| """ |
| Process multiple files in parallel |
| |
| Args: |
| file_paths: List of file paths or directories to process |
| output_dir: Base output directory |
| parse_method: Parsing method for all files |
| recursive: Whether to search directories recursively |
| **kwargs: Additional parser arguments |
| |
| Returns: |
| BatchProcessingResult with processing statistics |
| """ |
| start_time = time.time() |
|
|
| |
| supported_files = self.filter_supported_files(file_paths, recursive) |
|
|
| if not supported_files: |
| self.logger.warning("No supported files found to process") |
| return BatchProcessingResult( |
| successful_files=[], |
| failed_files=[], |
| total_files=0, |
| processing_time=0.0, |
| errors={}, |
| output_dir=output_dir, |
| ) |
|
|
| self.logger.info(f"Found {len(supported_files)} files to process") |
|
|
| |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| successful_files = [] |
| failed_files = [] |
| errors = {} |
|
|
| |
| pbar = None |
| if self.show_progress: |
| pbar = tqdm( |
| total=len(supported_files), |
| desc=f"Processing files ({self.parser_type})", |
| unit="file", |
| ) |
|
|
| try: |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
| |
| future_to_file = { |
| executor.submit( |
| self.process_single_file, |
| file_path, |
| output_dir, |
| parse_method, |
| **kwargs, |
| ): file_path |
| for file_path in supported_files |
| } |
|
|
| |
| for future in as_completed( |
| future_to_file, timeout=self.timeout_per_file |
| ): |
| success, file_path, error_msg = future.result() |
|
|
| if success: |
| successful_files.append(file_path) |
| else: |
| failed_files.append(file_path) |
| errors[file_path] = error_msg |
|
|
| if pbar: |
| pbar.update(1) |
|
|
| except Exception as e: |
| self.logger.error(f"Batch processing failed: {str(e)}") |
| |
| for future in future_to_file: |
| if not future.done(): |
| file_path = future_to_file[future] |
| failed_files.append(file_path) |
| errors[file_path] = f"Processing interrupted: {str(e)}" |
| if pbar: |
| pbar.update(1) |
|
|
| finally: |
| if pbar: |
| pbar.close() |
|
|
| processing_time = time.time() - start_time |
|
|
| |
| result = BatchProcessingResult( |
| successful_files=successful_files, |
| failed_files=failed_files, |
| total_files=len(supported_files), |
| processing_time=processing_time, |
| errors=errors, |
| output_dir=output_dir, |
| ) |
|
|
| |
| self.logger.info(result.summary()) |
|
|
| return result |
|
|
| async def process_batch_async( |
| self, |
| file_paths: List[str], |
| output_dir: str, |
| parse_method: str = "auto", |
| recursive: bool = True, |
| **kwargs, |
| ) -> BatchProcessingResult: |
| """ |
| Async version of batch processing |
| |
| Args: |
| file_paths: List of file paths or directories to process |
| output_dir: Base output directory |
| parse_method: Parsing method for all files |
| recursive: Whether to search directories recursively |
| **kwargs: Additional parser arguments |
| |
| Returns: |
| BatchProcessingResult with processing statistics |
| """ |
| |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor( |
| None, |
| self.process_batch, |
| file_paths, |
| output_dir, |
| parse_method, |
| recursive, |
| **kwargs, |
| ) |
|
|
|
|
| def main(): |
| """Command-line interface for batch parsing""" |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="Batch document parsing") |
| parser.add_argument("paths", nargs="+", help="File paths or directories to process") |
| parser.add_argument("--output", "-o", required=True, help="Output directory") |
| parser.add_argument( |
| "--parser", |
| choices=["mineru", "docling"], |
| default="mineru", |
| help="Parser to use", |
| ) |
| parser.add_argument( |
| "--method", |
| choices=["auto", "txt", "ocr"], |
| default="auto", |
| help="Parsing method", |
| ) |
| parser.add_argument( |
| "--workers", type=int, default=4, help="Number of parallel workers" |
| ) |
| parser.add_argument( |
| "--no-progress", action="store_true", help="Disable progress bar" |
| ) |
| parser.add_argument( |
| "--recursive", |
| action="store_true", |
| default=True, |
| help="Search directories recursively", |
| ) |
| parser.add_argument( |
| "--timeout", type=int, default=300, help="Timeout per file (seconds)" |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| ) |
|
|
| try: |
| |
| batch_parser = BatchParser( |
| parser_type=args.parser, |
| max_workers=args.workers, |
| show_progress=not args.no_progress, |
| timeout_per_file=args.timeout, |
| ) |
|
|
| |
| result = batch_parser.process_batch( |
| file_paths=args.paths, |
| output_dir=args.output, |
| parse_method=args.method, |
| recursive=args.recursive, |
| ) |
|
|
| |
| print("\n" + result.summary()) |
|
|
| |
| if result.failed_files: |
| return 1 |
|
|
| return 0 |
|
|
| except Exception as e: |
| print(f"Error: {str(e)}") |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| exit(main()) |
|
|