File size: 3,800 Bytes
f5754e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import asyncio
import logging
from typing import List, Dict, Any, Set
from urllib.parse import urljoin, urlparse

logger = logging.getLogger(__name__)

class WebCrawler:
    """
    Advanced Web Crawler utilizing Crawl4AI to orchestrate Headless Playwright browsers.
    Extracts high quality markdown directly from dynamic, JS-heavy web properties.
    """
    def __init__(self, max_depth: int = 1, max_pages: int = 10, max_concurrent: int = 2):
        self.max_depth = max_depth
        self.max_pages = max_pages
        self.max_concurrent = max_concurrent
        self.visited_urls: Set[str] = set()
        self.results: List[Dict[str, Any]] = []
        self._semaphore = asyncio.Semaphore(max_concurrent)

    def _is_same_domain(self, base_url: str, target_url: str) -> bool:
        base_domain = urlparse(base_url).netloc
        target_domain = urlparse(target_url).netloc
        return base_domain == target_domain
        
    async def _crawl_recursive(self, crawler: Any, url: str, base_url: str, current_depth: int):
        if current_depth > self.max_depth or len(self.visited_urls) >= self.max_pages:
            return
            
        parsed = urlparse(url)
        normalized_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
        
        if normalized_url in self.visited_urls or not self._is_same_domain(base_url, normalized_url):
            return
            
        self.visited_urls.add(normalized_url)
        logger.info(f"Crawling {normalized_url} (Depth {current_depth}/{self.max_depth})")
        
        try:
            async with self._semaphore:
                result = await crawler.arun(url=normalized_url)
            
            if hasattr(result, 'success') and result.success:
                title = ""
                if hasattr(result, 'metadata') and isinstance(result.metadata, dict):
                    title = result.metadata.get("title", "")
                    
                self.results.append({
                    "url": normalized_url,
                    "title": title,
                    "markdown": result.markdown
                })
                
                if current_depth < self.max_depth and hasattr(result, "links") and result.links:
                    tasks = []
                    internal_links = result.links.get("internal", []) if isinstance(result.links, dict) else []
                    
                    for link_item in internal_links:
                        href = link_item.get("href")
                        if href and isinstance(href, str) and not href.startswith("mailto:") and not href.startswith("tel:"):
                            absolute_url = urljoin(base_url, href)
                            tasks.append(self._crawl_recursive(crawler, absolute_url, base_url, current_depth + 1))
                            
                    if tasks:
                        await asyncio.gather(*tasks, return_exceptions=True)

        except Exception as e:
            logger.error(f"Error crawling {normalized_url}: {e}")

    async def crawl(self, start_url: str) -> List[Dict[str, Any]]:
        self.visited_urls.clear()
        self.results.clear()
        
        try:
            # We import here to fail gracefully if the dependency is not yet installed
            from crawl4ai import AsyncWebCrawler
        except ImportError:
            logger.error("crawl4ai is not installed. Please run 'npm install' or 'uv sync'")
            raise ImportError("crawl4ai package is missing.")
            
        logger.info(f"Starting web crawl for {start_url} (Max Depth: {self.max_depth})")
        async with AsyncWebCrawler(verbose=False) as crawler:
            await self._crawl_recursive(crawler, start_url, start_url, 0)
            
        return self.results