| """ |
| Agent 1: Website Crawler |
| Design System Extractor v2 |
| |
| Persona: Meticulous Design Archaeologist |
| |
| Responsibilities: |
| - Auto-discover pages from base URL |
| - Classify page types (homepage, listing, detail, etc.) |
| - Prepare page list for user confirmation |
| """ |
|
|
| import asyncio |
| import re |
| from urllib.parse import urljoin, urlparse |
| from typing import Optional, Callable |
| from datetime import datetime |
|
|
| from playwright.async_api import async_playwright, Browser, Page, BrowserContext |
|
|
| from core.token_schema import DiscoveredPage, PageType, Viewport |
| from config.settings import get_settings |
|
|
|
|
| class PageDiscoverer: |
| """ |
| Discovers pages from a website for design system extraction. |
| |
| This is the first part of Agent 1's job — finding pages before |
| the human confirms which ones to crawl. |
| """ |
| |
| def __init__(self): |
| self.settings = get_settings() |
| self.browser: Optional[Browser] = None |
| self.context: Optional[BrowserContext] = None |
| self.visited_urls: set[str] = set() |
| self.discovered_pages: list[DiscoveredPage] = [] |
| |
| async def __aenter__(self): |
| """Async context manager entry.""" |
| await self._init_browser() |
| return self |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| """Async context manager exit.""" |
| await self._close_browser() |
| |
| async def _init_browser(self): |
| """Initialize Playwright browser.""" |
| playwright = await async_playwright().start() |
| self.browser = await playwright.chromium.launch( |
| headless=self.settings.browser.headless |
| ) |
| self.context = await self.browser.new_context( |
| viewport={ |
| "width": self.settings.viewport.desktop_width, |
| "height": self.settings.viewport.desktop_height, |
| }, |
| user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" |
| ) |
| |
| async def _close_browser(self): |
| """Close browser and cleanup.""" |
| if self.context: |
| await self.context.close() |
| if self.browser: |
| await self.browser.close() |
| |
| def _normalize_url(self, url: str, base_url: str) -> Optional[str]: |
| """Normalize and validate URL.""" |
| |
| if not url.startswith(('http://', 'https://')): |
| url = urljoin(base_url, url) |
| |
| parsed = urlparse(url) |
| base_parsed = urlparse(base_url) |
| |
| |
| if parsed.netloc != base_parsed.netloc: |
| return None |
| |
| |
| normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" |
| |
| |
| if normalized.endswith('/') and len(normalized) > len(f"{parsed.scheme}://{parsed.netloc}/"): |
| normalized = normalized.rstrip('/') |
| |
| return normalized |
| |
| def _classify_page_type(self, url: str, title: str = "") -> PageType: |
| """ |
| Classify page type based on URL patterns and title. |
| |
| This is a heuristic — not perfect, but good enough for discovery. |
| """ |
| url_lower = url.lower() |
| title_lower = title.lower() if title else "" |
| |
| |
| patterns = { |
| PageType.HOMEPAGE: [r'/$', r'/home$', r'/index'], |
| PageType.LISTING: [r'/products', r'/catalog', r'/list', r'/category', r'/collection', r'/search'], |
| PageType.DETAIL: [r'/product/', r'/item/', r'/detail/', r'/p/', r'/[a-z-]+/\d+'], |
| PageType.FORM: [r'/contact', r'/form', r'/apply', r'/submit', r'/register'], |
| PageType.AUTH: [r'/login', r'/signin', r'/signup', r'/auth', r'/account'], |
| PageType.CHECKOUT: [r'/cart', r'/checkout', r'/basket', r'/payment'], |
| PageType.MARKETING: [r'/landing', r'/promo', r'/campaign', r'/offer'], |
| PageType.ABOUT: [r'/about', r'/team', r'/company', r'/story'], |
| PageType.CONTACT: [r'/contact', r'/support', r'/help'], |
| } |
| |
| for page_type, url_patterns in patterns.items(): |
| for pattern in url_patterns: |
| if re.search(pattern, url_lower): |
| return page_type |
| |
| |
| title_patterns = { |
| PageType.HOMEPAGE: ['home', 'welcome'], |
| PageType.LISTING: ['products', 'catalog', 'collection', 'browse'], |
| PageType.DETAIL: ['product', 'item'], |
| PageType.AUTH: ['login', 'sign in', 'sign up', 'register'], |
| PageType.ABOUT: ['about', 'our story', 'team'], |
| PageType.CONTACT: ['contact', 'get in touch', 'support'], |
| } |
| |
| for page_type, keywords in title_patterns.items(): |
| for keyword in keywords: |
| if keyword in title_lower: |
| return page_type |
| |
| return PageType.OTHER |
| |
| async def _extract_links(self, page: Page, base_url: str) -> list[str]: |
| """Extract all internal links from a page.""" |
| links = await page.evaluate(""" |
| () => { |
| const links = Array.from(document.querySelectorAll('a[href]')); |
| return links.map(a => a.href).filter(href => |
| href && |
| !href.startsWith('javascript:') && |
| !href.startsWith('mailto:') && |
| !href.startsWith('tel:') && |
| !href.includes('#') |
| ); |
| } |
| """) |
| |
| |
| valid_links = [] |
| for link in links: |
| normalized = self._normalize_url(link, base_url) |
| if normalized and normalized not in self.visited_urls: |
| valid_links.append(normalized) |
| |
| return list(set(valid_links)) |
| |
| async def _get_page_title(self, page: Page) -> str: |
| """Get page title.""" |
| try: |
| return await page.title() |
| except Exception: |
| return "" |
| |
| async def discover( |
| self, |
| base_url: str, |
| max_pages: int = None, |
| progress_callback: Optional[Callable[[float], None]] = None |
| ) -> list[DiscoveredPage]: |
| """ |
| Discover pages from a website. |
| |
| Args: |
| base_url: The starting URL |
| max_pages: Maximum pages to discover (default from settings) |
| progress_callback: Optional callback for progress updates |
| |
| Returns: |
| List of discovered pages |
| """ |
| max_pages = max_pages or self.settings.crawl.max_pages |
| |
| async with self: |
| |
| normalized_base = self._normalize_url(base_url, base_url) |
| if not normalized_base: |
| raise ValueError(f"Invalid base URL: {base_url}") |
| |
| queue = [normalized_base] |
| self.visited_urls = set() |
| self.discovered_pages = [] |
| |
| while queue and len(self.discovered_pages) < max_pages: |
| current_url = queue.pop(0) |
| |
| if current_url in self.visited_urls: |
| continue |
| |
| self.visited_urls.add(current_url) |
| |
| try: |
| page = await self.context.new_page() |
| |
| |
| |
| try: |
| await page.goto( |
| current_url, |
| wait_until="domcontentloaded", |
| timeout=60000 |
| ) |
| |
| await page.wait_for_timeout(2000) |
| except Exception as nav_error: |
| |
| try: |
| await page.goto( |
| current_url, |
| wait_until="load", |
| timeout=60000 |
| ) |
| await page.wait_for_timeout(3000) |
| except Exception: |
| |
| pass |
| |
| |
| title = await self._get_page_title(page) |
| page_type = self._classify_page_type(current_url, title) |
| depth = len(urlparse(current_url).path.split('/')) - 1 |
| |
| |
| discovered = DiscoveredPage( |
| url=current_url, |
| title=title, |
| page_type=page_type, |
| depth=depth, |
| selected=True, |
| ) |
| self.discovered_pages.append(discovered) |
| |
| |
| new_links = await self._extract_links(page, base_url) |
| |
| |
| priority_patterns = ['/product', '/listing', '/category', '/about', '/contact'] |
| priority_links = [l for l in new_links if any(p in l.lower() for p in priority_patterns)] |
| other_links = [l for l in new_links if l not in priority_links] |
| |
| |
| for link in priority_links + other_links: |
| if link not in self.visited_urls and link not in queue: |
| queue.append(link) |
| |
| await page.close() |
| |
| |
| if progress_callback: |
| progress = len(self.discovered_pages) / max_pages |
| progress_callback(min(progress, 1.0)) |
| |
| |
| await asyncio.sleep(self.settings.crawl.crawl_delay_ms / 1000) |
| |
| except Exception as e: |
| |
| discovered = DiscoveredPage( |
| url=current_url, |
| title="", |
| page_type=PageType.OTHER, |
| depth=0, |
| selected=False, |
| error=str(e), |
| ) |
| self.discovered_pages.append(discovered) |
| |
| return self.discovered_pages |
| |
| def get_pages_by_type(self) -> dict[PageType, list[DiscoveredPage]]: |
| """Group discovered pages by type.""" |
| grouped: dict[PageType, list[DiscoveredPage]] = {} |
| for page in self.discovered_pages: |
| if page.page_type not in grouped: |
| grouped[page.page_type] = [] |
| grouped[page.page_type].append(page) |
| return grouped |
| |
| def get_suggested_pages(self, min_pages: int = None) -> list[DiscoveredPage]: |
| """ |
| Get suggested pages for extraction. |
| |
| Ensures diversity of page types and prioritizes key templates. |
| """ |
| min_pages = min_pages or self.settings.crawl.min_pages |
| |
| |
| priority_types = [ |
| PageType.HOMEPAGE, |
| PageType.LISTING, |
| PageType.DETAIL, |
| PageType.FORM, |
| PageType.MARKETING, |
| PageType.AUTH, |
| PageType.ABOUT, |
| PageType.CONTACT, |
| PageType.OTHER, |
| ] |
| |
| selected = [] |
| grouped = self.get_pages_by_type() |
| |
| |
| for page_type in priority_types: |
| if page_type in grouped and grouped[page_type]: |
| |
| page = sorted(grouped[page_type], key=lambda p: p.depth)[0] |
| if page not in selected: |
| selected.append(page) |
| |
| |
| remaining = [p for p in self.discovered_pages if p not in selected and not p.error] |
| remaining.sort(key=lambda p: p.depth) |
| |
| while len(selected) < min_pages and remaining: |
| selected.append(remaining.pop(0)) |
| |
| |
| for page in selected: |
| page.selected = True |
| |
| return selected |
|
|
|
|
| |
| |
| |
|
|
| async def discover_pages(base_url: str, max_pages: int = 20) -> list[DiscoveredPage]: |
| """Convenience function to discover pages.""" |
| discoverer = PageDiscoverer() |
| return await discoverer.discover(base_url, max_pages) |
|
|
|
|
| async def quick_discover(base_url: str) -> dict: |
| """Quick discovery returning summary dict.""" |
| pages = await discover_pages(base_url) |
| |
| return { |
| "total_found": len(pages), |
| "by_type": { |
| pt.value: len([p for p in pages if p.page_type == pt]) |
| for pt in PageType |
| }, |
| "pages": [ |
| { |
| "url": p.url, |
| "title": p.title, |
| "type": p.page_type.value, |
| "selected": p.selected, |
| } |
| for p in pages |
| ], |
| } |
|
|