| """ |
| URL Document Fetcher for RAG-Anything |
| |
| Fetches and processes documents from URLs for ingestion into the RAG system. |
| |
| Features: |
| - Web page scraping and parsing |
| - PDF download from URLs |
| - Markdown conversion |
| - Content cleaning and preprocessing |
| - Advanced parsing with text and image extraction |
| - Integration with RAG pipeline |
| |
| Author: RAG-Anything Team |
| Version: 2.0.0 |
| """ |
|
|
| import os |
| import asyncio |
| import logging |
| import tempfile |
| from pathlib import Path |
| from typing import Optional, Dict, Any, List |
| from urllib.parse import urlparse |
| import hashlib |
| import base64 |
|
|
| logger = logging.getLogger(__name__) |
|
|
| try: |
| import requests |
| from bs4 import BeautifulSoup |
| import markdownify |
| from urllib.parse import urljoin |
| DEPS_AVAILABLE = True |
| except ImportError: |
| DEPS_AVAILABLE = False |
| logger.warning("URL fetcher dependencies not installed. Install with: pip install requests beautifulsoup4 markdownify") |
|
|
|
|
| class URLFetcher: |
| """Fetch and process documents from URLs""" |
|
|
| def __init__( |
| self, |
| download_dir: Optional[str] = None, |
| timeout: int = 30, |
| user_agent: str = "RAG-Anything/1.0" |
| ): |
| """ |
| Initialize URL fetcher |
| |
| Args: |
| download_dir: Directory to save downloaded files |
| timeout: Request timeout in seconds |
| user_agent: User agent string for requests |
| """ |
| if not DEPS_AVAILABLE: |
| raise ImportError("Required dependencies not installed. Run: pip install requests beautifulsoup4 markdownify") |
|
|
| self.download_dir = download_dir or tempfile.gettempdir() |
| self.timeout = timeout |
| self.headers = {"User-Agent": user_agent} |
|
|
| Path(self.download_dir).mkdir(parents=True, exist_ok=True) |
| logger.info(f"URLFetcher initialized (download_dir={self.download_dir})") |
|
|
| def _create_content_list(self, title: str, text_content: str, images: List[Dict]) -> List[Dict[str, Any]]: |
| """ |
| Create a structured content list compatible with RAG pipeline |
| |
| Args: |
| title: Document title |
| text_content: Extracted text content |
| images: List of extracted images with metadata |
| |
| Returns: |
| List of content blocks for RAG processing |
| """ |
| content_list = [] |
|
|
| |
| if title: |
| content_list.append({ |
| "type": "text", |
| "text": f"# {title}", |
| "page_idx": 0 |
| }) |
|
|
| |
| paragraphs = [p.strip() for p in text_content.split("\n\n") if p.strip()] |
| for idx, paragraph in enumerate(paragraphs[:50]): |
| if paragraph: |
| content_list.append({ |
| "type": "text", |
| "text": paragraph, |
| "page_idx": idx // 10 |
| }) |
|
|
| |
| for idx, img_info in enumerate(images): |
| content_list.append({ |
| "type": "image", |
| "img_path": img_info["path"], |
| "image_caption": img_info.get("alt", "") or img_info.get("title", ""), |
| "page_idx": (len(paragraphs) + idx) // 10 |
| }) |
|
|
| return content_list |
|
|
| async def fetch_url( |
| self, |
| url: str, |
| save_as_pdf: bool = False, |
| convert_to_markdown: bool = True |
| ) -> Dict[str, Any]: |
| """ |
| Fetch and process content from URL |
| |
| Args: |
| url: URL to fetch |
| save_as_pdf: Whether to save as PDF (for PDF URLs) |
| convert_to_markdown: Convert HTML to markdown |
| |
| Returns: |
| Dictionary with file_path, content, metadata |
| """ |
| try: |
| logger.info(f"Fetching URL: {url}") |
|
|
| |
| parsed = urlparse(url) |
| if not parsed.scheme or not parsed.netloc: |
| raise ValueError(f"Invalid URL: {url}") |
|
|
| |
| response = await asyncio.to_thread( |
| requests.head, url, headers=self.headers, timeout=self.timeout, allow_redirects=True |
| ) |
| content_type = response.headers.get("Content-Type", "").lower() |
|
|
| |
| if "pdf" in content_type or url.lower().endswith(".pdf"): |
| return await self._fetch_pdf(url) |
|
|
| |
| elif "html" in content_type or not content_type: |
| return await self._fetch_html(url, convert_to_markdown) |
|
|
| |
| else: |
| return await self._fetch_generic(url, content_type) |
|
|
| except Exception as e: |
| logger.error(f"Error fetching URL {url}: {e}", exc_info=True) |
| return { |
| "success": False, |
| "error": str(e), |
| "url": url, |
| } |
|
|
| async def _fetch_pdf(self, url: str) -> Dict[str, Any]: |
| """Fetch PDF from URL""" |
| try: |
| response = await asyncio.to_thread( |
| requests.get, url, headers=self.headers, timeout=self.timeout |
| ) |
| response.raise_for_status() |
|
|
| |
| url_hash = hashlib.md5(url.encode()).hexdigest()[:8] |
| filename = f"url_{url_hash}.pdf" |
| file_path = Path(self.download_dir) / filename |
|
|
| |
| with open(file_path, "wb") as f: |
| f.write(response.content) |
|
|
| logger.info(f"PDF downloaded: {file_path}") |
|
|
| return { |
| "success": True, |
| "file_path": str(file_path), |
| "url": url, |
| "content_type": "pdf", |
| "size_bytes": len(response.content), |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error fetching PDF: {e}") |
| raise |
|
|
| async def _fetch_html(self, url: str, convert_to_markdown: bool = True) -> Dict[str, Any]: |
| """Fetch and parse HTML page with advanced content extraction""" |
| try: |
| response = await asyncio.to_thread( |
| requests.get, url, headers=self.headers, timeout=self.timeout |
| ) |
| response.raise_for_status() |
|
|
| |
| soup = BeautifulSoup(response.content, "html.parser") |
|
|
| |
| for tag in soup(["script", "style", "nav", "footer", "header", "aside", "iframe", "noscript"]): |
| tag.decompose() |
|
|
| |
| title = soup.find("title") |
| title_text = title.get_text().strip() if title else "Untitled" |
|
|
| |
| main_content = soup.find("main") or soup.find("article") or soup.find("body") |
|
|
| |
| images = [] |
| url_hash = hashlib.md5(url.encode()).hexdigest()[:8] |
| images_dir = Path(self.download_dir) / f"url_{url_hash}_images" |
| images_dir.mkdir(parents=True, exist_ok=True) |
|
|
| all_images = main_content.find_all("img") |
| max_images = min(10, len(all_images)) |
| logger.info(f"Found {len(all_images)} images, downloading first {max_images}") |
|
|
| for idx, img in enumerate(all_images[:max_images]): |
| try: |
| img_url = img.get("src") |
| if not img_url: |
| continue |
|
|
| |
| if img_url.startswith("data:"): |
| continue |
|
|
| |
| if img_url.startswith("//"): |
| img_url = "https:" + img_url |
| elif img_url.startswith("/"): |
| parsed_base = urlparse(url) |
| img_url = f"{parsed_base.scheme}://{parsed_base.netloc}{img_url}" |
| elif not img_url.startswith("http"): |
| img_url = urljoin(url, img_url) |
|
|
| |
| img_response = await asyncio.to_thread( |
| requests.get, img_url, headers=self.headers, timeout=5, stream=True |
| ) |
|
|
| if img_response.status_code == 200: |
| |
| content_length = img_response.headers.get('content-length') |
| if content_length and int(content_length) > 10 * 1024 * 1024: |
| logger.debug(f"Skipping large image {idx}: {content_length} bytes") |
| continue |
|
|
| |
| content_type = img_response.headers.get("Content-Type", "") |
| ext = ".jpg" |
| if "png" in content_type: |
| ext = ".png" |
| elif "gif" in content_type: |
| ext = ".gif" |
| elif "webp" in content_type: |
| ext = ".webp" |
|
|
| img_path = images_dir / f"image_{idx}{ext}" |
| with open(img_path, "wb") as f: |
| f.write(img_response.content) |
|
|
| images.append({ |
| "path": str(img_path), |
| "alt": img.get("alt", ""), |
| "title": img.get("title", ""), |
| "url": img_url |
| }) |
| logger.debug(f"Downloaded image {idx+1}/{max_images}: {img_path.name}") |
| except Exception as img_error: |
| logger.debug(f"Failed to download image {idx}: {img_error}") |
| continue |
|
|
| if convert_to_markdown: |
| |
| content = markdownify.markdownify( |
| str(main_content), |
| heading_style="ATX", |
| bullets="-" |
| ) |
| else: |
| |
| content = main_content.get_text(separator="\n", strip=True) |
|
|
| |
| content_list = self._create_content_list(title_text, content, images) |
|
|
| |
| ext = ".md" if convert_to_markdown else ".txt" |
| filename = f"url_{url_hash}{ext}" |
| file_path = Path(self.download_dir) / filename |
|
|
| with open(file_path, "w", encoding="utf-8") as f: |
| f.write(f"# {title_text}\n\n") |
| f.write(f"Source: {url}\n\n") |
| f.write(content) |
|
|
| |
| import json |
| json_path = Path(self.download_dir) / f"url_{url_hash}_content_list.json" |
| with open(json_path, "w", encoding="utf-8") as f: |
| json.dump(content_list, f, indent=2, ensure_ascii=False) |
|
|
| logger.info(f"HTML content saved: {file_path}") |
| logger.info(f"Extracted {len(images)} images from web page") |
|
|
| return { |
| "success": True, |
| "file_path": str(file_path), |
| "content_list_path": str(json_path), |
| "url": url, |
| "content_type": "html", |
| "title": title_text, |
| "content_preview": content[:500], |
| "images_count": len(images), |
| "content_list": content_list |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error fetching HTML: {e}") |
| raise |
|
|
| async def _fetch_generic(self, url: str, content_type: str) -> Dict[str, Any]: |
| """Fetch generic file""" |
| try: |
| response = await asyncio.to_thread( |
| requests.get, url, headers=self.headers, timeout=self.timeout |
| ) |
| response.raise_for_status() |
|
|
| |
| ext_map = { |
| "text/plain": ".txt", |
| "text/markdown": ".md", |
| "application/msword": ".doc", |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", |
| } |
| ext = ext_map.get(content_type, ".bin") |
|
|
| |
| url_hash = hashlib.md5(url.encode()).hexdigest()[:8] |
| filename = f"url_{url_hash}{ext}" |
| file_path = Path(self.download_dir) / filename |
|
|
| with open(file_path, "wb") as f: |
| f.write(response.content) |
|
|
| logger.info(f"File downloaded: {file_path}") |
|
|
| return { |
| "success": True, |
| "file_path": str(file_path), |
| "url": url, |
| "content_type": content_type, |
| "size_bytes": len(response.content), |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error fetching file: {e}") |
| raise |
|
|
|
|
| def create_url_fetcher(download_dir: Optional[str] = None, **kwargs) -> URLFetcher: |
| """ |
| Factory function to create a URL fetcher |
| |
| Args: |
| download_dir: Directory to save downloaded files |
| **kwargs: Additional URLFetcher parameters |
| |
| Returns: |
| Configured URLFetcher instance |
| """ |
| return URLFetcher(download_dir=download_dir, **kwargs) |
|
|