""" Utility functions for RAGAnything Contains helper functions for content separation, text insertion, and other utilities """ import base64 from typing import Dict, List, Any, Tuple from pathlib import Path from lightrag.utils import logger def separate_content( content_list: List[Dict[str, Any]], ) -> Tuple[str, List[Dict[str, Any]]]: """ Separate text content and multimodal content Args: content_list: Content list from MinerU parsing Returns: (text_content, multimodal_items): Pure text content and multimodal items list """ text_parts = [] multimodal_items = [] for item in content_list: content_type = item.get("type", "text") if content_type == "text": # Text content text = item.get("text", "") if text.strip(): text_parts.append(text) else: # Multimodal content (image, table, equation, etc.) multimodal_items.append(item) # Merge all text content text_content = "\n\n".join(text_parts) logger.info("Content separation complete:") logger.info(f" - Text content length: {len(text_content)} characters") logger.info(f" - Multimodal items count: {len(multimodal_items)}") # Count multimodal types modal_types = {} for item in multimodal_items: modal_type = item.get("type", "unknown") modal_types[modal_type] = modal_types.get(modal_type, 0) + 1 if modal_types: logger.info(f" - Multimodal type distribution: {modal_types}") return text_content, multimodal_items def encode_image_to_base64(image_path: str) -> str: """ Encode image file to base64 string Args: image_path: Path to the image file Returns: str: Base64 encoded string, empty string if encoding fails """ try: with open(image_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode("utf-8") return encoded_string except Exception as e: logger.error(f"Failed to encode image {image_path}: {e}") return "" def validate_image_file(image_path: str, max_size_mb: int = 50) -> bool: """ Validate if a file is a valid image file Args: image_path: Path to the image file max_size_mb: Maximum file size in MB Returns: bool: True if valid, False otherwise """ try: path = Path(image_path) logger.debug(f"Validating image path: {image_path}") logger.debug(f"Resolved path object: {path}") logger.debug(f"Path exists check: {path.exists()}") # Check if file exists if not path.exists(): logger.warning(f"Image file not found: {image_path}") return False # Check file extension image_extensions = [ ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff", ".tif", ] path_lower = str(path).lower() has_valid_extension = any(path_lower.endswith(ext) for ext in image_extensions) logger.debug( f"File extension check - path: {path_lower}, valid: {has_valid_extension}" ) if not has_valid_extension: logger.warning(f"File does not appear to be an image: {image_path}") return False # Check file size file_size = path.stat().st_size max_size = max_size_mb * 1024 * 1024 logger.debug( f"File size check - size: {file_size} bytes, max: {max_size} bytes" ) if file_size > max_size: logger.warning(f"Image file too large ({file_size} bytes): {image_path}") return False logger.debug(f"Image validation successful: {image_path}") return True except Exception as e: logger.error(f"Error validating image file {image_path}: {e}") return False async def insert_text_content( lightrag, input: str | list[str], split_by_character: str | None = None, split_by_character_only: bool = False, ids: str | list[str] | None = None, file_paths: str | list[str] | None = None, ): """ Insert pure text content into LightRAG Args: lightrag: LightRAG instance input: Single document string or list of document strings split_by_character: if split_by_character is not None, split the string by character, if chunk longer than chunk_token_size, it will be split again by token size. split_by_character_only: if split_by_character_only is True, split the string by character only, when split_by_character is None, this parameter is ignored. ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: single string of the file path or list of file paths, used for citation """ logger.info("Starting text content insertion into LightRAG...") # Use LightRAG's insert method with all parameters await lightrag.ainsert( input=input, file_paths=file_paths, split_by_character=split_by_character, split_by_character_only=split_by_character_only, ids=ids, ) logger.info("Text content insertion complete") async def insert_text_content_with_multimodal_content( lightrag, input: str | list[str], multimodal_content: list[dict[str, any]] | None = None, split_by_character: str | None = None, split_by_character_only: bool = False, ids: str | list[str] | None = None, file_paths: str | list[str] | None = None, scheme_name: str | None = None, ): """ Insert pure text content into LightRAG Args: lightrag: LightRAG instance input: Single document string or list of document strings multimodal_content: Multimodal content list (optional) split_by_character: if split_by_character is not None, split the string by character, if chunk longer than chunk_token_size, it will be split again by token size. split_by_character_only: if split_by_character_only is True, split the string by character only, when split_by_character is None, this parameter is ignored. ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: single string of the file path or list of file paths, used for citation scheme_name: scheme name (optional) """ logger.info("Starting text content insertion into LightRAG...") # Use LightRAG's insert method with all parameters try: await lightrag.ainsert( input=input, multimodal_content=multimodal_content, file_paths=file_paths, split_by_character=split_by_character, split_by_character_only=split_by_character_only, ids=ids, scheme_name=scheme_name, ) except Exception as e: logger.info(f"Error: {e}") logger.info( "If the error is caused by the ainsert function not having a multimodal content parameter, please update the raganything branch of lightrag" ) logger.info("Text content insertion complete") def get_processor_for_type(modal_processors: Dict[str, Any], content_type: str): """ Get appropriate processor based on content type Args: modal_processors: Dictionary of available processors content_type: Content type Returns: Corresponding processor instance """ # Direct mapping to corresponding processor if content_type == "image": return modal_processors.get("image") elif content_type == "table": return modal_processors.get("table") elif content_type == "equation": return modal_processors.get("equation") else: # For other types, use generic processor return modal_processors.get("generic") def get_processor_supports(proc_type: str) -> List[str]: """Get processor supported features""" supports_map = { "image": [ "Image content analysis", "Visual understanding", "Image description generation", "Image entity extraction", ], "table": [ "Table structure analysis", "Data statistics", "Trend identification", "Table entity extraction", ], "equation": [ "Mathematical formula parsing", "Variable identification", "Formula meaning explanation", "Formula entity extraction", ], "generic": [ "General content analysis", "Structured processing", "Entity extraction", ], } return supports_map.get(proc_type, ["Basic processing"])