Spaces:
Running
Running
| import asyncio | |
| import time | |
| import json | |
| import hashlib | |
| import re | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime | |
| from collections import OrderedDict | |
| from contextlib import asynccontextmanager | |
| from urllib.parse import urlparse, parse_qs, unquote | |
| import psutil | |
| import trafilatura | |
| from bs4 import BeautifulSoup | |
| import httpx | |
| from fake_useragent import UserAgent | |
| # Crawl4AI 0.8.0 imports | |
| from crawl4ai import ( | |
| AsyncWebCrawler, | |
| BrowserConfig, | |
| CacheMode, | |
| CrawlerRunConfig, | |
| MemoryAdaptiveDispatcher, | |
| ) | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends | |
| from fastapi.responses import JSONResponse | |
| from starlette.responses import StreamingResponse as StarletteStreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.middleware.gzip import GZipMiddleware | |
| from pydantic import BaseModel, Field, field_validator | |
| from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST, CollectorRegistry | |
| from starlette.responses import Response | |
| import structlog | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| # βββ Structured Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logger = structlog.get_logger() | |
| # βββ Prometheus Metrics ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| METRICS_REGISTRY = CollectorRegistry() | |
| REQUEST_COUNT = Counter('scraper_requests_total', 'Total scraping requests', ['method', 'endpoint'], registry=METRICS_REGISTRY) | |
| REQUEST_LATENCY = Histogram('scraper_request_duration_seconds', 'Request latency in seconds', ['endpoint'], registry=METRICS_REGISTRY) | |
| ACTIVE_SCRAPERS = Gauge('scraper_active_tasks', 'Number of active scraping tasks', registry=METRICS_REGISTRY) | |
| CACHE_HITS = Counter('scraper_cache_hits_total', 'Total cache hits', registry=METRICS_REGISTRY) | |
| CACHE_MISSES = Counter('scraper_cache_misses_total', 'Total cache misses', registry=METRICS_REGISTRY) | |
| SCRAPE_ERRORS = Counter('scraper_errors_total', 'Total scraping errors', ['error_type'], registry=METRICS_REGISTRY) | |
| # βββ Domains where httpx works better than a browser βββββββββββββββββββββββββ | |
| HTTPX_PREFERRED_DOMAINS = { | |
| 'github.com', 'raw.githubusercontent.com', 'gist.github.com', | |
| 'huggingface.co', 'hf.co', | |
| 'arxiv.org', 'abs.arxiv.org', | |
| 'pypi.org', 'readthedocs.io', | |
| 'stackoverflow.com', 'stackexchange.com', | |
| 'wikipedia.org', 'en.wikipedia.org', | |
| 'docs.python.org', | |
| 'npmjs.com', | |
| } | |
| # βββ LRU Cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LRUCache: | |
| def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600): | |
| self.cache: OrderedDict = OrderedDict() | |
| self.max_size = max_size | |
| self.ttl = ttl_seconds | |
| self._lock = asyncio.Lock() | |
| async def get(self, key: str) -> Optional[Any]: | |
| async with self._lock: | |
| if key not in self.cache: | |
| CACHE_MISSES.inc() | |
| return None | |
| item = self.cache[key] | |
| if time.time() - item['timestamp'] > self.ttl: | |
| del self.cache[key] | |
| CACHE_MISSES.inc() | |
| return None | |
| self.cache.move_to_end(key) | |
| CACHE_HITS.inc() | |
| return item['data'] | |
| async def set(self, key: str, data: Any): | |
| async with self._lock: | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| self.cache[key] = {'data': data, 'timestamp': time.time()} | |
| else: | |
| if len(self.cache) >= self.max_size: | |
| self.cache.popitem(last=False) | |
| self.cache[key] = {'data': data, 'timestamp': time.time()} | |
| async def clear_expired(self): | |
| async with self._lock: | |
| current_time = time.time() | |
| expired_keys = [ | |
| key for key, item in self.cache.items() | |
| if current_time - item['timestamp'] > self.ttl | |
| ] | |
| for key in expired_keys: | |
| del self.cache[key] | |
| if expired_keys: | |
| logger.info("cleared_expired_cache_items", count=len(expired_keys)) | |
| cache = LRUCache(max_size=1000, ttl_seconds=1800) | |
| # βββ Pydantic Models βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ScraperRequest(BaseModel): | |
| query: str = Field(..., min_length=1, max_length=500, description="Search query") | |
| max_results: int = Field(5, ge=1, le=20, description="Maximum number of results") | |
| extract_full_content: bool = Field(True, description="Extract full article content") | |
| use_stealth_mode: bool = Field(True, description="Use Crawl4AI stealth mode") | |
| timeout: int = Field(30, ge=10, le=120, description="Request timeout in seconds") | |
| def validate_query(cls, v: str) -> str: | |
| if not v.strip(): | |
| raise ValueError('Query cannot be empty') | |
| return v.strip() | |
| class ScrapedResult(BaseModel): | |
| url: str | |
| title: str | |
| snippet: Optional[str] = None | |
| full_content: Optional[str] = None | |
| published_date: Optional[str] = None | |
| author: Optional[str] = None | |
| domain: str | |
| scraped_at: datetime | |
| metadata: Optional[Dict[str, Any]] = None | |
| class ScraperResponse(BaseModel): | |
| query: str | |
| results: List[ScrapedResult] | |
| total_results: int | |
| processing_time_ms: float | |
| cached: bool | |
| timestamp: datetime | |
| class HealthResponse(BaseModel): | |
| status: str | |
| version: str | |
| uptime_seconds: float | |
| memory_usage_mb: float | |
| cpu_percent: float | |
| active_scrapers: int | |
| cache_size: int | |
| # βββ Rate Limiter βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RateLimiter: | |
| def __init__(self, max_requests: int = 10, window_seconds: int = 60): | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self.requests: Dict[str, List[float]] = {} | |
| self._lock = asyncio.Lock() | |
| async def is_allowed(self, client_id: str) -> bool: | |
| async with self._lock: | |
| now = time.time() | |
| if client_id in self.requests: | |
| self.requests[client_id] = [ | |
| req_time for req_time in self.requests[client_id] | |
| if now - req_time < self.window_seconds | |
| ] | |
| else: | |
| self.requests[client_id] = [] | |
| if len(self.requests[client_id]) >= self.max_requests: | |
| return False | |
| self.requests[client_id].append(now) | |
| return True | |
| rate_limiter = RateLimiter(max_requests=10, window_seconds=60) | |
| # βββ Utility Functions ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_domain(url: str) -> str: | |
| try: | |
| return urlparse(url).netloc or '' | |
| except Exception: | |
| return '' | |
| def _strip_text_fragment(url: str) -> str: | |
| """Remove Google #:~:text=... fragments that cause service-worker conflicts.""" | |
| if '#:~:text=' in url: | |
| return url.split('#:~:text=')[0] | |
| return url | |
| def _clean_url(url: str) -> str: | |
| return _strip_text_fragment(url.strip()) | |
| def _unwrap_google_redirect(href: str) -> Optional[str]: | |
| if '/url?' in href: | |
| parsed = urlparse(href) | |
| qs = parse_qs(parsed.query) | |
| real_url = qs.get('q', [None])[0] or qs.get('url', [None])[0] | |
| if real_url and real_url.startswith('http'): | |
| return unquote(real_url) | |
| return None | |
| def _is_valid_result_url(href: str) -> bool: | |
| if not href or not href.startswith('http'): | |
| return False | |
| skip_patterns = [ | |
| 'google.com/search', 'google.com/url', 'google.com/preferences', | |
| 'google.com/advanced_search', 'google.com/intl', | |
| 'accounts.google.com', 'support.google.com', 'maps.google.com', | |
| 'policies.google.com', 'play.google.com', | |
| 'youtube.com', 'youtu.be', | |
| 'webcache.googleusercontent.com', 'translate.google.com', | |
| ] | |
| href_lower = href.lower() | |
| return not any(p in href_lower for p in skip_patterns) | |
| def _should_use_httpx(url: str) -> bool: | |
| domain = _extract_domain(url).lower() | |
| return any(preferred in domain for preferred in HTTPX_PREFERRED_DOMAINS) | |
| def generate_cache_key(query: str, max_results: int, extract_full_content: bool) -> str: | |
| content = f"{query}_{max_results}_{extract_full_content}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| # βββ Content Extraction Pipeline βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_content_from_html(html: str, url: str) -> Dict[str, Any]: | |
| """Multi-method content extraction: Trafilatura β BeautifulSoup.""" | |
| extracted = {} | |
| domain = _extract_domain(url) | |
| # Method 1: Trafilatura | |
| try: | |
| traf = trafilatura.extract( | |
| html, include_comments=False, include_tables=True, output_format='json' | |
| ) | |
| if traf: | |
| extracted = json.loads(traf) | |
| except Exception: | |
| pass | |
| # Method 2: BeautifulSoup for site-specific + general fallback | |
| if not extracted.get('text'): | |
| try: | |
| soup = BeautifulSoup(html, 'html.parser') | |
| title_elem = soup.find('title') | |
| title = title_elem.get_text(strip=True) if title_elem else '' | |
| # GitHub README | |
| if 'github.com' in domain: | |
| readme = soup.find('article', class_='markdown-body') | |
| if readme: | |
| return {'text': readme.get_text(separator=' ', strip=True), 'title': title} | |
| # HuggingFace model card | |
| if 'huggingface.co' in domain: | |
| card = soup.find('div', class_='prose') or soup.find('article') | |
| if card: | |
| return {'text': card.get_text(separator=' ', strip=True), 'title': title} | |
| # General | |
| main = ( | |
| soup.find('main') or soup.find('article') or | |
| soup.find('div', {'role': 'main'}) or soup.find('div', class_='content') | |
| ) | |
| full_content = main.get_text(separator=' ', strip=True) if main else ' '.join( | |
| p.get_text(strip=True) for p in soup.find_all('p') | |
| ) | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| desc = meta_desc.get('content', '') if meta_desc else full_content[:300] | |
| extracted.setdefault('title', title) | |
| extracted.setdefault('text', full_content) | |
| extracted.setdefault('excerpt', desc) | |
| except Exception: | |
| pass | |
| return extracted | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DUAL CRAWLER ARCHITECTURE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β CRAWLER A β Google Search (Persistent Context, Serialized) β | |
| # β β’ use_persistent_context=True β cookies saved (Google consent) β | |
| # β β’ headless=False β visible window (Google trusts it more) β | |
| # β β’ Serialized via _search_lock (one search at a time) β | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β CRAWLER B β URL Scraping (Non-Persistent, arun_many Parallel) β | |
| # β β’ headless=True β fast, no visual rendering overhead β | |
| # β β’ ignore_https_errors=True β doesn't hang on SSL β | |
| # β β’ viewport 800x600 β less memory per tab β | |
| # β β’ arun_many() + MemoryAdaptiveDispatcher β native parallel tabs β | |
| # β β’ Each URL gets its own tab, managed by crawl4ai internally β | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β FALLBACK C β httpx + Trafilatura (for GitHub, HuggingFace, etc.) β | |
| # β β’ Parallel asyncio.gather for pure HTTP requests β | |
| # β β’ No browser overhead β fastest for static content sites β | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _search_lock = asyncio.Lock() | |
| async def _get_search_crawler() -> AsyncWebCrawler: | |
| """Persistent-context crawler for Google Search.""" | |
| if 'search' not in app.state.crawler_pool: | |
| config = BrowserConfig( | |
| headless=False, | |
| enable_stealth=True, | |
| user_agent_mode="random", | |
| use_persistent_context=True, | |
| user_data_dir="./browser_data", | |
| extra_args=[ | |
| "--disable-blink-features=AutomationControlled", | |
| "--disable-gpu", | |
| "--disable-gpu-compositing", | |
| "--disable-software-rasterizer", | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--no-first-run", | |
| "--no-default-browser-check", | |
| "--disable-infobars", | |
| "--ignore-certificate-errors", | |
| "--ignore-certificate-errors-spki-list", | |
| "--disable-ipc-flooding-protection", | |
| "--force-color-profile=srgb", | |
| "--mute-audio", | |
| "--disable-background-timer-throttling", | |
| "--disable-renderer-backgrounding", | |
| ], | |
| ) | |
| crawler = AsyncWebCrawler(config=config) | |
| await crawler.start() | |
| logger.info("SEARCH crawler created (persistent context).") | |
| app.state.crawler_pool['search'] = crawler | |
| return app.state.crawler_pool['search'] | |
| async def _get_scrape_crawler() -> AsyncWebCrawler: | |
| """High-speed non-persistent crawler for parallel URL scraping via arun_many.""" | |
| if 'scrape' not in app.state.crawler_pool: | |
| config = BrowserConfig( | |
| headless=False, | |
| enable_stealth=True, | |
| user_agent_mode="random", | |
| ignore_https_errors=True, | |
| viewport_width=800, | |
| viewport_height=600, | |
| extra_args=[ | |
| "--disable-blink-features=AutomationControlled", | |
| "--disable-gpu", | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--no-first-run", | |
| "--disable-infobars", | |
| "--ignore-certificate-errors", | |
| "--disable-ipc-flooding-protection", | |
| "--mute-audio", | |
| "--disable-background-timer-throttling", | |
| "--disable-renderer-backgrounding", | |
| "--disable-features=IsolateOrigins,site-per-process", | |
| ], | |
| ) | |
| crawler = AsyncWebCrawler(config=config) | |
| await crawler.start() | |
| logger.info("SCRAPE crawler created (headless parallel via arun_many).") | |
| app.state.crawler_pool['scrape'] = crawler | |
| return app.state.crawler_pool['scrape'] | |
| # βββ Google SERP Extraction ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def extract_search_results(query: str, max_results: int, timeout: int) -> List[Dict[str, str]]: | |
| """Extract URLs from Google SERP using persistent search crawler.""" | |
| search_url = f"https://www.google.com/search?q={query.replace(' ', '+')}" | |
| try: | |
| crawler = await _get_search_crawler() | |
| run_config = CrawlerRunConfig( | |
| cache_mode=CacheMode.BYPASS, | |
| js_code=""" | |
| Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); | |
| window.scrollBy(0, 600); | |
| await new Promise(r => setTimeout(r, 1000)); | |
| window.scrollBy(0, 600); | |
| await new Promise(r => setTimeout(r, 500)); | |
| """, | |
| wait_for="css:div.g a[href], a[data-ved]", | |
| ) | |
| async with _search_lock: | |
| try: | |
| result = await asyncio.wait_for( | |
| crawler.arun(url=search_url, config=run_config), | |
| timeout=15 | |
| ) | |
| except asyncio.TimeoutError: | |
| logger.error("search_hard_timeout", query=query) | |
| return [] | |
| except Exception as e: | |
| logger.error("search_crawl_exception", query=query, error=str(e)) | |
| return [] | |
| if not result or not result.success: | |
| err = getattr(result, 'error_message', 'Unknown') if result else 'None' | |
| logger.warning("search_crawl_failed", url=search_url, error=err) | |
| return [] | |
| html = result.html or '' | |
| soup = BeautifulSoup(html, 'html.parser') | |
| title_text = soup.title.string if soup.title else 'None' | |
| # ββ Debug: understand what Google is actually serving ββ | |
| all_a_tags = soup.find_all('a', href=True) | |
| has_rso = soup.find('div', id='rso') is not None | |
| sample_hrefs = [a.get('href', '')[:80] for a in all_a_tags[:8]] | |
| logger.info( | |
| "serp_debug", | |
| html_len=len(html), | |
| title=str(title_text)[:60], | |
| total_a_tags=len(all_a_tags), | |
| has_rso=has_rso, | |
| sample_hrefs=sample_hrefs, | |
| ) | |
| urls, seen = [], set() | |
| def _add(href, title): | |
| if not href: | |
| return | |
| unwrapped = _unwrap_google_redirect(href) | |
| if unwrapped: | |
| href = unwrapped | |
| href = _clean_url(href) | |
| if not _is_valid_result_url(href) or href in seen: | |
| return | |
| seen.add(href) | |
| urls.append({'url': href, 'title': (title or 'Untitled').strip()[:100]}) | |
| selectors = [ | |
| 'div.g a[href]', 'h3 a[href]', 'div#rso div.g a[href]', | |
| 'div[data-ocrs] a[href]', 'g-more-link a[href]', | |
| 'div#search div.g a[href]', 'a[href][data-ved][href^=http]', | |
| 'div.yuRUbf a[href]', 'a[jsname] h3', | |
| 'div[data-snhf] a[href]', 'div[data-sokoban] a[href]', | |
| 'a[href^="/url?q="]', | |
| ] | |
| for sel in selectors: | |
| try: | |
| matches = soup.select(sel) | |
| if matches: | |
| logger.info(f"Selector '{sel}': {len(matches)} matches") | |
| for link in matches: | |
| if link.name == 'h3': | |
| pa = link.find_parent('a') | |
| if pa: | |
| _add(pa.get('href', ''), link.get_text(strip=True)) | |
| else: | |
| _add(link.get('href', ''), link.get_text(strip=True)) | |
| except Exception: | |
| pass | |
| if len(urls) >= max_results * 3: | |
| break | |
| if len(urls) < max_results: | |
| for m in re.finditer(r'href="(https?://[^"]+)"', html, re.I): | |
| _add(m.group(1), 'Untitled') | |
| if len(urls) >= max_results * 3: | |
| break | |
| logger.info(f"Extracted {len(urls)} unique URLs from Google SERP") | |
| return urls[:max_results * 3] | |
| except Exception as e: | |
| logger.error("search_extraction_error", query=query, error=str(e)) | |
| SCRAPE_ERRORS.labels(error_type='search_error').inc() | |
| return [] | |
| # βββ httpx-based Scraping (GitHub, HuggingFace, fallback) ββββββββββββββββββββ | |
| async def _fetch_with_retry(client: httpx.AsyncClient, url: str, timeout: int, headers: dict): | |
| resp = await client.get(url, headers=headers, timeout=timeout, follow_redirects=True) | |
| resp.raise_for_status() | |
| return resp | |
| async def scrape_url_httpx(url: str, timeout: int) -> Dict[str, Any]: | |
| """Fast HTTP-based scraping. No browser overhead.""" | |
| t0 = time.time() | |
| domain = _extract_domain(url) | |
| try: | |
| async with httpx.AsyncClient(verify=False) as client: | |
| headers = { | |
| 'User-Agent': app.state.user_agent.random, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| } | |
| resp = await _fetch_with_retry(client, url, timeout, headers) | |
| extracted = _extract_content_from_html(resp.text, url) | |
| if extracted.get('text'): | |
| return { | |
| 'url': url, 'title': extracted.get('title', ''), | |
| 'snippet': extracted.get('excerpt', '')[:300], | |
| 'full_content': extracted.get('text', ''), | |
| 'published_date': extracted.get('date'), | |
| 'author': extracted.get('author'), | |
| 'domain': domain, | |
| 'extraction_time_ms': (time.time() - t0) * 1000, | |
| 'extraction_method': 'httpx', | |
| } | |
| return {'url': url, 'error': 'No content extracted', 'extraction_method': 'failed'} | |
| except Exception as e: | |
| logger.debug("httpx_scraping_error", url=url, error=str(e)) | |
| SCRAPE_ERRORS.labels(error_type='httpx_failed').inc() | |
| return {'url': url, 'error': str(e), 'extraction_method': 'failed'} | |
| # βββ arun_many Parallel Browser Scraping βββββββββββββββββββββββββββββββββββββ | |
| async def stream_urls_browser_parallel(urls: List[str], timeout: int): | |
| """ | |
| Scrape multiple URLs in TRUE PARALLEL using crawl4ai's native arun_many() | |
| with stream=True to yield results as soon as they finish. | |
| """ | |
| if not urls: | |
| return | |
| t0 = time.time() | |
| crawler = await _get_scrape_crawler() | |
| # Config for actual content extraction | |
| scrape_config = CrawlerRunConfig( | |
| cache_mode=CacheMode.BYPASS, | |
| page_timeout=min(timeout, 8) * 1000, | |
| word_count_threshold=0, | |
| process_iframes=False, | |
| stream=True, # Crucial for 0.8.0: yields results as they finish | |
| ) | |
| dispatcher = MemoryAdaptiveDispatcher( | |
| max_session_permit=5, | |
| memory_threshold_percent=80.0, | |
| check_interval=0.5, | |
| ) | |
| try: | |
| # arun_many with stream=True returns an async generator | |
| streamer = await crawler.arun_many( | |
| urls=urls, | |
| config=scrape_config, | |
| dispatcher=dispatcher, | |
| ) | |
| async for cr in streamer: | |
| # Enforce overall timeout manual bailout | |
| if time.time() - t0 > min(timeout, 30): | |
| logger.warning("arun_many_timeout", urls=len(urls), timeout_s=min(timeout, 30)) | |
| SCRAPE_ERRORS.labels(error_type='arun_many_timeout').inc() | |
| break | |
| url = cr.url | |
| domain = _extract_domain(url) | |
| if cr.success: | |
| extracted = {} | |
| if cr.html: | |
| extracted = _extract_content_from_html(cr.html, url) | |
| if not extracted.get('text') and cr.markdown: | |
| extracted['text'] = cr.markdown | |
| extracted.setdefault('title', '') | |
| if extracted.get('text'): | |
| yield { | |
| 'url': url, | |
| 'title': extracted.get('title', ''), | |
| 'snippet': extracted.get('excerpt', '')[:300], | |
| 'full_content': extracted.get('text', ''), | |
| 'published_date': extracted.get('date'), | |
| 'author': extracted.get('author'), | |
| 'domain': domain, | |
| 'extraction_time_ms': (time.time() - t0) * 1000, | |
| 'extraction_method': 'crawl4ai_parallel', | |
| } | |
| logger.info("browser_parallel_ok", url=url) | |
| else: | |
| logger.debug("browser_no_content", url=url) | |
| else: | |
| err = getattr(cr, 'error_message', 'Unknown') | |
| logger.warning("browser_parallel_failed", url=url, error=err) | |
| except Exception as e: | |
| logger.error("arun_many_error", error=str(e)) | |
| SCRAPE_ERRORS.labels(error_type='arun_many_error').inc() | |
| logger.info(f"arun_many stream completed in {(time.time()-t0)*1000:.0f}ms") | |
| async def scrape_urls_browser_parallel(urls: List[str], timeout: int) -> List[Dict[str, Any]]: | |
| """Helper to collect streaming browser results into a list for batch use.""" | |
| results = [] | |
| async for r in stream_urls_browser_parallel(urls, timeout): | |
| results.append(r) | |
| return results | |
| # βββ Smart Scraping Orchestrator βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def scrape_urls_smart( | |
| urls: List[str], | |
| timeout: int, | |
| max_results: int, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Orchestrate scraping across three parallel channels: | |
| 1. httpx for GitHub/HuggingFace/arxiv (instant, no browser) | |
| 2. arun_many for all other URLs (parallel browser tabs) | |
| 3. httpx fallback for any browser failures | |
| All channels run simultaneously. | |
| """ | |
| ACTIVE_SCRAPERS.inc() | |
| try: | |
| # Split URLs by routing strategy | |
| httpx_urls = [] | |
| browser_urls = [] | |
| for url in urls: | |
| url = _clean_url(url) | |
| if _should_use_httpx(url): | |
| httpx_urls.append(url) | |
| else: | |
| browser_urls.append(url) | |
| logger.info("routing_urls", httpx=len(httpx_urls), browser=len(browser_urls), total=len(urls)) | |
| async def _empty(): | |
| return [] | |
| # Fire BOTH channels in parallel | |
| httpx_coro = asyncio.gather( | |
| *[scrape_url_httpx(u, timeout) for u in httpx_urls], | |
| return_exceptions=True | |
| ) if httpx_urls else _empty() | |
| # Only send max_results+2 URLs to the browser β no need to wait for extras | |
| browser_coro = scrape_urls_browser_parallel( | |
| browser_urls[:max_results + 2], timeout | |
| ) if browser_urls else _empty() | |
| httpx_results_raw, browser_results = await asyncio.gather( | |
| httpx_coro, browser_coro, return_exceptions=True | |
| ) | |
| # Collect valid results | |
| valid = [] | |
| # Process httpx results | |
| if isinstance(httpx_results_raw, list): | |
| for r in httpx_results_raw: | |
| if isinstance(r, dict) and 'error' not in r: | |
| valid.append(r) | |
| logger.info("httpx_ok", url=r.get('url', '?')) | |
| # Process browser results | |
| if isinstance(browser_results, list): | |
| valid.extend(browser_results) | |
| # If we still need more results, try httpx as fallback for failed browser URLs | |
| if len(valid) < max_results: | |
| succeeded_urls = {r['url'] for r in valid} | |
| failed_browser_urls = [u for u in browser_urls if u not in succeeded_urls] | |
| if failed_browser_urls: | |
| logger.info("httpx_fallback", count=len(failed_browser_urls)) | |
| fallback_tasks = [scrape_url_httpx(u, timeout) for u in failed_browser_urls] | |
| fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True) | |
| for r in fallback_results: | |
| if isinstance(r, dict) and 'error' not in r: | |
| valid.append(r) | |
| return valid[:max_results] | |
| finally: | |
| ACTIVE_SCRAPERS.dec() | |
| # βββ Background Tasks ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def periodic_cache_cleanup(): | |
| while True: | |
| try: | |
| await asyncio.sleep(300) | |
| await cache.clear_expired() | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| logger.error("cache_cleanup_error", error=str(e)) | |
| # βββ App Lifespan ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def lifespan(app: FastAPI): | |
| logger.info("application_starting") | |
| app.state.start_time = time.time() | |
| app.state.user_agent = UserAgent() | |
| app.state.crawler_pool = {} | |
| app.state.cache_cleanup_task = asyncio.create_task(periodic_cache_cleanup()) | |
| yield | |
| logger.info("application_shutting_down") | |
| if hasattr(app.state, 'crawler_pool'): | |
| for name, crawler in app.state.crawler_pool.items(): | |
| try: | |
| await crawler.close() | |
| except Exception as e: | |
| logger.warning("crawler_close_error", name=name, error=str(e)) | |
| logger.info(f"All {len(app.state.crawler_pool)} crawlers closed.") | |
| app.state.crawler_pool.clear() | |
| if hasattr(app.state, 'cache_cleanup_task'): | |
| app.state.cache_cleanup_task.cancel() | |
| logger.info("Shutdown complete.") | |
| # βββ FastAPI App βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="NeuralStream Scraping API", | |
| description="Production-ready async web scraping API with Crawl4AI 0.8 stealth", | |
| version="2.1.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware(GZipMiddleware, minimum_size=1000) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def check_rate_limit(client_ip: str = "127.0.0.1"): | |
| if not await rate_limiter.is_allowed(client_ip): | |
| raise HTTPException(status_code=429, detail="Rate limit exceeded. Try again later.") | |
| return True | |
| # βββ API Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return { | |
| "message": "NeuralStream Scraping API β Production Ready", | |
| "version": "2.1.0", | |
| "features": [ | |
| "Dual Crawler (Persistent Search + Parallel arun_many Scrape)", | |
| "MemoryAdaptiveDispatcher (auto-scales tabs by RAM)", | |
| "Smart Routing (httpx for GitHub/HF, Browser for JS sites)", | |
| "SSE Real-Time Streaming (POST /scrape/stream)", | |
| "Three-Channel Parallel Scraping", | |
| "Graceful Fallback Chain", | |
| ], | |
| } | |
| async def scrape_data( | |
| request: ScraperRequest, | |
| background_tasks: BackgroundTasks, | |
| _: bool = Depends(check_rate_limit), | |
| ): | |
| """Main scraping endpoint (batch β returns all results at once).""" | |
| start_time = time.time() | |
| REQUEST_COUNT.labels(method='POST', endpoint='/scrape').inc() | |
| cache_key = generate_cache_key(request.query, request.max_results, request.extract_full_content) | |
| cached_result = await cache.get(cache_key) | |
| if cached_result: | |
| logger.info("serving_from_cache", query=request.query) | |
| return JSONResponse(content=cached_result) | |
| logger.info("scraping_request", | |
| query=request.query, | |
| max_results=request.max_results, | |
| use_stealth=request.use_stealth_mode) | |
| with REQUEST_LATENCY.labels(endpoint='/scrape').time(): | |
| try: | |
| # Phase 1: Google SERP β candidate URLs | |
| search_results = await extract_search_results( | |
| request.query, request.max_results, request.timeout | |
| ) | |
| urls = [r['url'] for r in search_results if r.get('url')] | |
| if not urls: | |
| raise HTTPException(status_code=404, detail="No relevant URLs found") | |
| # Phase 2: Scrape all URLs in parallel (browser tabs + httpx) | |
| scraped = await scrape_urls_smart(urls, request.timeout, request.max_results) | |
| if not scraped: | |
| raise HTTPException(status_code=404, detail="Failed to scrape any URLs") | |
| # Phase 3: Format response | |
| results = [] | |
| for d in scraped: | |
| results.append(ScrapedResult( | |
| url=d['url'], | |
| title=d.get('title', 'Untitled'), | |
| snippet=d.get('snippet', ''), | |
| full_content=d.get('full_content', '') if request.extract_full_content else None, | |
| published_date=d.get('published_date'), | |
| author=d.get('author'), | |
| domain=d.get('domain', _extract_domain(d['url'])), | |
| scraped_at=datetime.utcnow(), | |
| metadata={ | |
| 'extraction_time_ms': d.get('extraction_time_ms', 0), | |
| 'content_length': len(d.get('full_content', '')), | |
| 'extraction_method': d.get('extraction_method', 'unknown'), | |
| }, | |
| )) | |
| proc_time = (time.time() - start_time) * 1000 | |
| response = ScraperResponse( | |
| query=request.query, results=results, | |
| total_results=len(results), processing_time_ms=proc_time, | |
| cached=False, timestamp=datetime.utcnow(), | |
| ) | |
| response_dict = json.loads(response.model_dump_json()) | |
| background_tasks.add_task(cache.set, cache_key, response_dict) | |
| logger.info("scraping_completed", | |
| query=request.query, results=len(results), | |
| processing_time_ms=proc_time, | |
| stealth_enabled=request.use_stealth_mode) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("scraping_failed", query=request.query, error=str(e)) | |
| raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") | |
| # βββ SSE Streaming Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def scrape_stream( | |
| request: ScraperRequest, | |
| _: bool = Depends(check_rate_limit), | |
| ): | |
| """ | |
| Real-time streaming scrape β sends results via Server-Sent Events | |
| as each URL finishes scraping. Frontend receives cards immediately. | |
| """ | |
| async def event_generator(): | |
| start_time = time.time() | |
| total_results = 0 | |
| try: | |
| # Phase 1: Google SERP search | |
| yield f"event: status\ndata: {json.dumps({'message': 'Searching Google...'})}\n\n" | |
| search_results = await extract_search_results( | |
| request.query, request.max_results, request.timeout | |
| ) | |
| urls = [r['url'] for r in search_results if r.get('url')] | |
| if not urls: | |
| yield f"event: error\ndata: {json.dumps({'message': 'No relevant URLs found'})}\n\n" | |
| return | |
| yield f"event: search_complete\ndata: {json.dumps({'urls_found': len(urls), 'urls': urls[:10]})}\n\n" | |
| # Phase 2: Route URLs to channels | |
| httpx_urls = [] | |
| browser_urls = [] | |
| for url in urls: | |
| url = _clean_url(url) | |
| if _should_use_httpx(url): | |
| httpx_urls.append(url) | |
| else: | |
| browser_urls.append(url) | |
| # Multiplex streaming via asyncio.Queue | |
| queue = asyncio.Queue() | |
| active_tasks = [] | |
| async def _run_httpx(u): | |
| try: | |
| res = await scrape_url_httpx(u, request.timeout) | |
| await queue.put(res) | |
| except Exception as e: | |
| await queue.put({'url': u, 'error': str(e)}) | |
| async def _run_browser(): | |
| try: | |
| async for res in stream_urls_browser_parallel(browser_urls[:request.max_results + 2], request.timeout): | |
| await queue.put(res) | |
| except Exception as e: | |
| logger.error("stream_browser_error", error=str(e)) | |
| for u in httpx_urls: | |
| active_tasks.append(asyncio.create_task(_run_httpx(u))) | |
| if browser_urls: | |
| active_tasks.append(asyncio.create_task(_run_browser())) | |
| # Stream results as soon as they hit the queue | |
| def _all_done(): | |
| return all(t.done() for t in active_tasks) | |
| while not _all_done() or not queue.empty(): | |
| try: | |
| res = await asyncio.wait_for(queue.get(), timeout=0.5) | |
| if isinstance(res, dict) and 'error' not in res: | |
| total_results += 1 | |
| yield f"event: result\ndata: {json.dumps(res, default=str)}\n\n" | |
| else: | |
| yield f"event: skipped\ndata: {json.dumps({'url': res.get('url', '?'), 'reason': res.get('error', 'unknown')})}\n\n" | |
| except asyncio.TimeoutError: | |
| continue # Just check _all_done() again | |
| except Exception as e: | |
| logger.error("queue_consumer_error", error=str(e)) | |
| # Final complete event | |
| proc_time = (time.time() - start_time) * 1000 | |
| yield f"event: complete\ndata: {json.dumps({'total_results': total_results, 'processing_time_ms': round(proc_time)})}\n\n" | |
| except Exception as e: | |
| logger.error("stream_error", error=str(e)) | |
| yield f"event: error\ndata: {json.dumps({'message': str(e)})}\n\n" | |
| return StarletteStreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| # βββ Health, Metrics, Cache ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def health_check(): | |
| process = psutil.Process() | |
| return HealthResponse( | |
| status="healthy", version="2.1.0", | |
| uptime_seconds=time.time() - app.state.start_time, | |
| memory_usage_mb=process.memory_info().rss / 1024 / 1024, | |
| cpu_percent=process.cpu_percent(interval=0.1), | |
| active_scrapers=ACTIVE_SCRAPERS._value.get(), | |
| cache_size=len(cache.cache), | |
| ) | |
| async def metrics(): | |
| return Response(content=generate_latest(METRICS_REGISTRY), media_type=CONTENT_TYPE_LATEST) | |
| async def clear_cache(): | |
| cache.cache.clear() | |
| logger.info("cache_cleared") | |
| return {"message": "Cache cleared successfully"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "app:app", host="0.0.0.0", port=8000, workers=1, | |
| loop="asyncio", http="httptools", | |
| access_log=False, server_header=False, date_header=False, | |
| forwarded_allow_ips="*", | |
| ) | |