| """ |
| Utility functions for RAGAnything |
| |
| Contains helper functions for content separation, text insertion, and other utilities |
| """ |
|
|
| import base64 |
| from typing import Dict, List, Any, Tuple |
| from pathlib import Path |
| from lightrag.utils import logger |
|
|
|
|
| def separate_content( |
| content_list: List[Dict[str, Any]], |
| ) -> Tuple[str, List[Dict[str, Any]]]: |
| """ |
| Separate text content and multimodal content |
| |
| Args: |
| content_list: Content list from MinerU parsing |
| |
| Returns: |
| (text_content, multimodal_items): Pure text content and multimodal items list |
| """ |
| text_parts = [] |
| multimodal_items = [] |
|
|
| for item in content_list: |
| content_type = item.get("type", "text") |
|
|
| if content_type == "text": |
| |
| text = item.get("text", "") |
| if text.strip(): |
| text_parts.append(text) |
| else: |
| |
| multimodal_items.append(item) |
|
|
| |
| text_content = "\n\n".join(text_parts) |
|
|
| logger.info("Content separation complete:") |
| logger.info(f" - Text content length: {len(text_content)} characters") |
| logger.info(f" - Multimodal items count: {len(multimodal_items)}") |
|
|
| |
| modal_types = {} |
| for item in multimodal_items: |
| modal_type = item.get("type", "unknown") |
| modal_types[modal_type] = modal_types.get(modal_type, 0) + 1 |
|
|
| if modal_types: |
| logger.info(f" - Multimodal type distribution: {modal_types}") |
|
|
| return text_content, multimodal_items |
|
|
|
|
| def encode_image_to_base64(image_path: str) -> str: |
| """ |
| Encode image file to base64 string |
| |
| Args: |
| image_path: Path to the image file |
| |
| Returns: |
| str: Base64 encoded string, empty string if encoding fails |
| """ |
| try: |
| with open(image_path, "rb") as image_file: |
| encoded_string = base64.b64encode(image_file.read()).decode("utf-8") |
| return encoded_string |
| except Exception as e: |
| logger.error(f"Failed to encode image {image_path}: {e}") |
| return "" |
|
|
|
|
| def validate_image_file(image_path: str, max_size_mb: int = 50) -> bool: |
| """ |
| Validate if a file is a valid image file |
| |
| Args: |
| image_path: Path to the image file |
| max_size_mb: Maximum file size in MB |
| |
| Returns: |
| bool: True if valid, False otherwise |
| """ |
| try: |
| path = Path(image_path) |
|
|
| logger.debug(f"Validating image path: {image_path}") |
| logger.debug(f"Resolved path object: {path}") |
| logger.debug(f"Path exists check: {path.exists()}") |
|
|
| |
| if not path.exists(): |
| logger.warning(f"Image file not found: {image_path}") |
| return False |
|
|
| |
| image_extensions = [ |
| ".jpg", |
| ".jpeg", |
| ".png", |
| ".gif", |
| ".bmp", |
| ".webp", |
| ".tiff", |
| ".tif", |
| ] |
|
|
| path_lower = str(path).lower() |
| has_valid_extension = any(path_lower.endswith(ext) for ext in image_extensions) |
| logger.debug( |
| f"File extension check - path: {path_lower}, valid: {has_valid_extension}" |
| ) |
|
|
| if not has_valid_extension: |
| logger.warning(f"File does not appear to be an image: {image_path}") |
| return False |
|
|
| |
| file_size = path.stat().st_size |
| max_size = max_size_mb * 1024 * 1024 |
| logger.debug( |
| f"File size check - size: {file_size} bytes, max: {max_size} bytes" |
| ) |
|
|
| if file_size > max_size: |
| logger.warning(f"Image file too large ({file_size} bytes): {image_path}") |
| return False |
|
|
| logger.debug(f"Image validation successful: {image_path}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Error validating image file {image_path}: {e}") |
| return False |
|
|
|
|
| async def insert_text_content( |
| lightrag, |
| input: str | list[str], |
| split_by_character: str | None = None, |
| split_by_character_only: bool = False, |
| ids: str | list[str] | None = None, |
| file_paths: str | list[str] | None = None, |
| ): |
| """ |
| Insert pure text content into LightRAG |
| |
| Args: |
| lightrag: LightRAG instance |
| input: Single document string or list of document strings |
| split_by_character: if split_by_character is not None, split the string by character, if chunk longer than |
| chunk_token_size, it will be split again by token size. |
| split_by_character_only: if split_by_character_only is True, split the string by character only, when |
| split_by_character is None, this parameter is ignored. |
| ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated |
| file_paths: single string of the file path or list of file paths, used for citation |
| """ |
| logger.info("Starting text content insertion into LightRAG...") |
|
|
| |
| await lightrag.ainsert( |
| input=input, |
| file_paths=file_paths, |
| split_by_character=split_by_character, |
| split_by_character_only=split_by_character_only, |
| ids=ids, |
| ) |
|
|
| logger.info("Text content insertion complete") |
|
|
|
|
| async def insert_text_content_with_multimodal_content( |
| lightrag, |
| input: str | list[str], |
| multimodal_content: list[dict[str, any]] | None = None, |
| split_by_character: str | None = None, |
| split_by_character_only: bool = False, |
| ids: str | list[str] | None = None, |
| file_paths: str | list[str] | None = None, |
| scheme_name: str | None = None, |
| ): |
| """ |
| Insert pure text content into LightRAG |
| |
| Args: |
| lightrag: LightRAG instance |
| input: Single document string or list of document strings |
| multimodal_content: Multimodal content list (optional) |
| split_by_character: if split_by_character is not None, split the string by character, if chunk longer than |
| chunk_token_size, it will be split again by token size. |
| split_by_character_only: if split_by_character_only is True, split the string by character only, when |
| split_by_character is None, this parameter is ignored. |
| ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated |
| file_paths: single string of the file path or list of file paths, used for citation |
| scheme_name: scheme name (optional) |
| """ |
| logger.info("Starting text content insertion into LightRAG...") |
|
|
| |
| try: |
| await lightrag.ainsert( |
| input=input, |
| multimodal_content=multimodal_content, |
| file_paths=file_paths, |
| split_by_character=split_by_character, |
| split_by_character_only=split_by_character_only, |
| ids=ids, |
| scheme_name=scheme_name, |
| ) |
| except Exception as e: |
| logger.info(f"Error: {e}") |
| logger.info( |
| "If the error is caused by the ainsert function not having a multimodal content parameter, please update the raganything branch of lightrag" |
| ) |
|
|
| logger.info("Text content insertion complete") |
|
|
|
|
| def get_processor_for_type(modal_processors: Dict[str, Any], content_type: str): |
| """ |
| Get appropriate processor based on content type |
| |
| Args: |
| modal_processors: Dictionary of available processors |
| content_type: Content type |
| |
| Returns: |
| Corresponding processor instance |
| """ |
| |
| if content_type == "image": |
| return modal_processors.get("image") |
| elif content_type == "table": |
| return modal_processors.get("table") |
| elif content_type == "equation": |
| return modal_processors.get("equation") |
| else: |
| |
| return modal_processors.get("generic") |
|
|
|
|
| def get_processor_supports(proc_type: str) -> List[str]: |
| """Get processor supported features""" |
| supports_map = { |
| "image": [ |
| "Image content analysis", |
| "Visual understanding", |
| "Image description generation", |
| "Image entity extraction", |
| ], |
| "table": [ |
| "Table structure analysis", |
| "Data statistics", |
| "Trend identification", |
| "Table entity extraction", |
| ], |
| "equation": [ |
| "Mathematical formula parsing", |
| "Variable identification", |
| "Formula meaning explanation", |
| "Formula entity extraction", |
| ], |
| "generic": [ |
| "General content analysis", |
| "Structured processing", |
| "Entity extraction", |
| ], |
| } |
| return supports_map.get(proc_type, ["Basic processing"]) |
|
|