File size: 8,949 Bytes
167596f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | """
Utility functions for RAGAnything
Contains helper functions for content separation, text insertion, and other utilities
"""
import base64
from typing import Dict, List, Any, Tuple
from pathlib import Path
from lightrag.utils import logger
def separate_content(
content_list: List[Dict[str, Any]],
) -> Tuple[str, List[Dict[str, Any]]]:
"""
Separate text content and multimodal content
Args:
content_list: Content list from MinerU parsing
Returns:
(text_content, multimodal_items): Pure text content and multimodal items list
"""
text_parts = []
multimodal_items = []
for item in content_list:
content_type = item.get("type", "text")
if content_type == "text":
# Text content
text = item.get("text", "")
if text.strip():
text_parts.append(text)
else:
# Multimodal content (image, table, equation, etc.)
multimodal_items.append(item)
# Merge all text content
text_content = "\n\n".join(text_parts)
logger.info("Content separation complete:")
logger.info(f" - Text content length: {len(text_content)} characters")
logger.info(f" - Multimodal items count: {len(multimodal_items)}")
# Count multimodal types
modal_types = {}
for item in multimodal_items:
modal_type = item.get("type", "unknown")
modal_types[modal_type] = modal_types.get(modal_type, 0) + 1
if modal_types:
logger.info(f" - Multimodal type distribution: {modal_types}")
return text_content, multimodal_items
def encode_image_to_base64(image_path: str) -> str:
"""
Encode image file to base64 string
Args:
image_path: Path to the image file
Returns:
str: Base64 encoded string, empty string if encoding fails
"""
try:
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
return encoded_string
except Exception as e:
logger.error(f"Failed to encode image {image_path}: {e}")
return ""
def validate_image_file(image_path: str, max_size_mb: int = 50) -> bool:
"""
Validate if a file is a valid image file
Args:
image_path: Path to the image file
max_size_mb: Maximum file size in MB
Returns:
bool: True if valid, False otherwise
"""
try:
path = Path(image_path)
logger.debug(f"Validating image path: {image_path}")
logger.debug(f"Resolved path object: {path}")
logger.debug(f"Path exists check: {path.exists()}")
# Check if file exists
if not path.exists():
logger.warning(f"Image file not found: {image_path}")
return False
# Check file extension
image_extensions = [
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".webp",
".tiff",
".tif",
]
path_lower = str(path).lower()
has_valid_extension = any(path_lower.endswith(ext) for ext in image_extensions)
logger.debug(
f"File extension check - path: {path_lower}, valid: {has_valid_extension}"
)
if not has_valid_extension:
logger.warning(f"File does not appear to be an image: {image_path}")
return False
# Check file size
file_size = path.stat().st_size
max_size = max_size_mb * 1024 * 1024
logger.debug(
f"File size check - size: {file_size} bytes, max: {max_size} bytes"
)
if file_size > max_size:
logger.warning(f"Image file too large ({file_size} bytes): {image_path}")
return False
logger.debug(f"Image validation successful: {image_path}")
return True
except Exception as e:
logger.error(f"Error validating image file {image_path}: {e}")
return False
async def insert_text_content(
lightrag,
input: str | list[str],
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
):
"""
Insert pure text content into LightRAG
Args:
lightrag: LightRAG instance
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
"""
logger.info("Starting text content insertion into LightRAG...")
# Use LightRAG's insert method with all parameters
await lightrag.ainsert(
input=input,
file_paths=file_paths,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=ids,
)
logger.info("Text content insertion complete")
async def insert_text_content_with_multimodal_content(
lightrag,
input: str | list[str],
multimodal_content: list[dict[str, any]] | None = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
scheme_name: str | None = None,
):
"""
Insert pure text content into LightRAG
Args:
lightrag: LightRAG instance
input: Single document string or list of document strings
multimodal_content: Multimodal content list (optional)
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
scheme_name: scheme name (optional)
"""
logger.info("Starting text content insertion into LightRAG...")
# Use LightRAG's insert method with all parameters
try:
await lightrag.ainsert(
input=input,
multimodal_content=multimodal_content,
file_paths=file_paths,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=ids,
scheme_name=scheme_name,
)
except Exception as e:
logger.info(f"Error: {e}")
logger.info(
"If the error is caused by the ainsert function not having a multimodal content parameter, please update the raganything branch of lightrag"
)
logger.info("Text content insertion complete")
def get_processor_for_type(modal_processors: Dict[str, Any], content_type: str):
"""
Get appropriate processor based on content type
Args:
modal_processors: Dictionary of available processors
content_type: Content type
Returns:
Corresponding processor instance
"""
# Direct mapping to corresponding processor
if content_type == "image":
return modal_processors.get("image")
elif content_type == "table":
return modal_processors.get("table")
elif content_type == "equation":
return modal_processors.get("equation")
else:
# For other types, use generic processor
return modal_processors.get("generic")
def get_processor_supports(proc_type: str) -> List[str]:
"""Get processor supported features"""
supports_map = {
"image": [
"Image content analysis",
"Visual understanding",
"Image description generation",
"Image entity extraction",
],
"table": [
"Table structure analysis",
"Data statistics",
"Trend identification",
"Table entity extraction",
],
"equation": [
"Mathematical formula parsing",
"Variable identification",
"Formula meaning explanation",
"Formula entity extraction",
],
"generic": [
"General content analysis",
"Structured processing",
"Entity extraction",
],
}
return supports_map.get(proc_type, ["Basic processing"])
|