OrgAI / backend /url_fetcher.py
Phonex
TheTruthSchool_RAG
167596f
"""
URL Document Fetcher for RAG-Anything
Fetches and processes documents from URLs for ingestion into the RAG system.
Features:
- Web page scraping and parsing
- PDF download from URLs
- Markdown conversion
- Content cleaning and preprocessing
- Advanced parsing with text and image extraction
- Integration with RAG pipeline
Author: RAG-Anything Team
Version: 2.0.0
"""
import os
import asyncio
import logging
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List
from urllib.parse import urlparse
import hashlib
import base64
logger = logging.getLogger(__name__)
try:
import requests
from bs4 import BeautifulSoup
import markdownify
from urllib.parse import urljoin
DEPS_AVAILABLE = True
except ImportError:
DEPS_AVAILABLE = False
logger.warning("URL fetcher dependencies not installed. Install with: pip install requests beautifulsoup4 markdownify")
class URLFetcher:
"""Fetch and process documents from URLs"""
def __init__(
self,
download_dir: Optional[str] = None,
timeout: int = 30,
user_agent: str = "RAG-Anything/1.0"
):
"""
Initialize URL fetcher
Args:
download_dir: Directory to save downloaded files
timeout: Request timeout in seconds
user_agent: User agent string for requests
"""
if not DEPS_AVAILABLE:
raise ImportError("Required dependencies not installed. Run: pip install requests beautifulsoup4 markdownify")
self.download_dir = download_dir or tempfile.gettempdir()
self.timeout = timeout
self.headers = {"User-Agent": user_agent}
Path(self.download_dir).mkdir(parents=True, exist_ok=True)
logger.info(f"URLFetcher initialized (download_dir={self.download_dir})")
def _create_content_list(self, title: str, text_content: str, images: List[Dict]) -> List[Dict[str, Any]]:
"""
Create a structured content list compatible with RAG pipeline
Args:
title: Document title
text_content: Extracted text content
images: List of extracted images with metadata
Returns:
List of content blocks for RAG processing
"""
content_list = []
# Add title as first text block
if title:
content_list.append({
"type": "text",
"text": f"# {title}",
"page_idx": 0
})
# Split text into paragraphs and add as text blocks
paragraphs = [p.strip() for p in text_content.split("\n\n") if p.strip()]
for idx, paragraph in enumerate(paragraphs[:50]): # Limit to first 50 paragraphs
if paragraph:
content_list.append({
"type": "text",
"text": paragraph,
"page_idx": idx // 10 # Group every 10 paragraphs as a "page"
})
# Add images as image blocks
for idx, img_info in enumerate(images):
content_list.append({
"type": "image",
"img_path": img_info["path"],
"image_caption": img_info.get("alt", "") or img_info.get("title", ""),
"page_idx": (len(paragraphs) + idx) // 10
})
return content_list
async def fetch_url(
self,
url: str,
save_as_pdf: bool = False,
convert_to_markdown: bool = True
) -> Dict[str, Any]:
"""
Fetch and process content from URL
Args:
url: URL to fetch
save_as_pdf: Whether to save as PDF (for PDF URLs)
convert_to_markdown: Convert HTML to markdown
Returns:
Dictionary with file_path, content, metadata
"""
try:
logger.info(f"Fetching URL: {url}")
# Validate URL
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
raise ValueError(f"Invalid URL: {url}")
# Determine content type
response = await asyncio.to_thread(
requests.head, url, headers=self.headers, timeout=self.timeout, allow_redirects=True
)
content_type = response.headers.get("Content-Type", "").lower()
# Handle PDF files
if "pdf" in content_type or url.lower().endswith(".pdf"):
return await self._fetch_pdf(url)
# Handle HTML/web pages
elif "html" in content_type or not content_type:
return await self._fetch_html(url, convert_to_markdown)
# Handle other file types
else:
return await self._fetch_generic(url, content_type)
except Exception as e:
logger.error(f"Error fetching URL {url}: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"url": url,
}
async def _fetch_pdf(self, url: str) -> Dict[str, Any]:
"""Fetch PDF from URL"""
try:
response = await asyncio.to_thread(
requests.get, url, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
# Generate filename from URL
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
filename = f"url_{url_hash}.pdf"
file_path = Path(self.download_dir) / filename
# Save PDF
with open(file_path, "wb") as f:
f.write(response.content)
logger.info(f"PDF downloaded: {file_path}")
return {
"success": True,
"file_path": str(file_path),
"url": url,
"content_type": "pdf",
"size_bytes": len(response.content),
}
except Exception as e:
logger.error(f"Error fetching PDF: {e}")
raise
async def _fetch_html(self, url: str, convert_to_markdown: bool = True) -> Dict[str, Any]:
"""Fetch and parse HTML page with advanced content extraction"""
try:
response = await asyncio.to_thread(
requests.get, url, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.content, "html.parser")
# Remove unwanted elements
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "iframe", "noscript"]):
tag.decompose()
# Extract title
title = soup.find("title")
title_text = title.get_text().strip() if title else "Untitled"
# Extract main content
main_content = soup.find("main") or soup.find("article") or soup.find("body")
# Extract images before converting to markdown (limit to first 10 images)
images = []
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
images_dir = Path(self.download_dir) / f"url_{url_hash}_images"
images_dir.mkdir(parents=True, exist_ok=True)
all_images = main_content.find_all("img")
max_images = min(10, len(all_images)) # Limit to 10 images
logger.info(f"Found {len(all_images)} images, downloading first {max_images}")
for idx, img in enumerate(all_images[:max_images]):
try:
img_url = img.get("src")
if not img_url:
continue
# Skip data URIs and very small images
if img_url.startswith("data:"):
continue
# Handle relative URLs
if img_url.startswith("//"):
img_url = "https:" + img_url
elif img_url.startswith("/"):
parsed_base = urlparse(url)
img_url = f"{parsed_base.scheme}://{parsed_base.netloc}{img_url}"
elif not img_url.startswith("http"):
img_url = urljoin(url, img_url)
# Download image with timeout
img_response = await asyncio.to_thread(
requests.get, img_url, headers=self.headers, timeout=5, stream=True
)
if img_response.status_code == 200:
# Check content size (skip if too large > 10MB)
content_length = img_response.headers.get('content-length')
if content_length and int(content_length) > 10 * 1024 * 1024:
logger.debug(f"Skipping large image {idx}: {content_length} bytes")
continue
# Determine file extension
content_type = img_response.headers.get("Content-Type", "")
ext = ".jpg"
if "png" in content_type:
ext = ".png"
elif "gif" in content_type:
ext = ".gif"
elif "webp" in content_type:
ext = ".webp"
img_path = images_dir / f"image_{idx}{ext}"
with open(img_path, "wb") as f:
f.write(img_response.content)
images.append({
"path": str(img_path),
"alt": img.get("alt", ""),
"title": img.get("title", ""),
"url": img_url
})
logger.debug(f"Downloaded image {idx+1}/{max_images}: {img_path.name}")
except Exception as img_error:
logger.debug(f"Failed to download image {idx}: {img_error}")
continue
if convert_to_markdown:
# Convert to markdown
content = markdownify.markdownify(
str(main_content),
heading_style="ATX",
bullets="-"
)
else:
# Extract plain text
content = main_content.get_text(separator="\n", strip=True)
# Create content list with structured data
content_list = self._create_content_list(title_text, content, images)
# Save to file
ext = ".md" if convert_to_markdown else ".txt"
filename = f"url_{url_hash}{ext}"
file_path = Path(self.download_dir) / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"# {title_text}\n\n")
f.write(f"Source: {url}\n\n")
f.write(content)
# Save content list as JSON for RAG processing
import json
json_path = Path(self.download_dir) / f"url_{url_hash}_content_list.json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(content_list, f, indent=2, ensure_ascii=False)
logger.info(f"HTML content saved: {file_path}")
logger.info(f"Extracted {len(images)} images from web page")
return {
"success": True,
"file_path": str(file_path),
"content_list_path": str(json_path),
"url": url,
"content_type": "html",
"title": title_text,
"content_preview": content[:500],
"images_count": len(images),
"content_list": content_list
}
except Exception as e:
logger.error(f"Error fetching HTML: {e}")
raise
async def _fetch_generic(self, url: str, content_type: str) -> Dict[str, Any]:
"""Fetch generic file"""
try:
response = await asyncio.to_thread(
requests.get, url, headers=self.headers, timeout=self.timeout
)
response.raise_for_status()
# Determine extension from content type
ext_map = {
"text/plain": ".txt",
"text/markdown": ".md",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
}
ext = ext_map.get(content_type, ".bin")
# Save file
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
filename = f"url_{url_hash}{ext}"
file_path = Path(self.download_dir) / filename
with open(file_path, "wb") as f:
f.write(response.content)
logger.info(f"File downloaded: {file_path}")
return {
"success": True,
"file_path": str(file_path),
"url": url,
"content_type": content_type,
"size_bytes": len(response.content),
}
except Exception as e:
logger.error(f"Error fetching file: {e}")
raise
def create_url_fetcher(download_dir: Optional[str] = None, **kwargs) -> URLFetcher:
"""
Factory function to create a URL fetcher
Args:
download_dir: Directory to save downloaded files
**kwargs: Additional URLFetcher parameters
Returns:
Configured URLFetcher instance
"""
return URLFetcher(download_dir=download_dir, **kwargs)