""" Professional Finance News Monitor using Playwright Real-time Twitter/X scraping without authentication Optimized for low-latency trading decisions """ import pandas as pd from datetime import datetime, timedelta from typing import List, Dict, Optional import streamlit as st import re import logging from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError PLAYWRIGHT_AVAILABLE = True except ImportError: PLAYWRIGHT_AVAILABLE = False logger.warning("playwright not available. Install with: pip install playwright && playwright install chromium") class TwitterFinanceMonitor: """ Professional-grade financial news aggregator using Playwright No authentication required - public Twitter/X profiles only """ # Premium financial Twitter accounts SOURCES = { # ===== TIER 1: Breaking News Aggregators ===== 'walter_bloomberg': { 'handle': 'WalterBloomberg', 'url': 'https://x.com/WalterBloomberg', 'weight': 1.9, 'specialization': ['macro', 'markets', 'geopolitical'] }, 'fxhedge': { 'handle': 'Fxhedgers', 'url': 'https://x.com/Fxhedgers', 'weight': 1.7, 'specialization': ['macro', 'markets'] }, 'deitaone': { 'handle': 'DeItaone', 'url': 'https://x.com/DeItaone', 'weight': 1.8, 'specialization': ['markets', 'macro'] }, 'firstsquawk': { 'handle': 'FirstSquawk', 'url': 'https://x.com/FirstSquawk', 'weight': 1.7, 'specialization': ['markets', 'macro'] }, 'livesquawk': { 'handle': 'LiveSquawk', 'url': 'https://x.com/LiveSquawk', 'weight': 1.7, 'specialization': ['markets', 'macro'] }, # ===== TIER 2: Major News Agencies ===== 'reuters': { 'handle': 'Reuters', 'url': 'https://x.com/Reuters', 'weight': 1.9, 'specialization': ['geopolitical', 'macro', 'markets'] }, 'bloomberg': { 'handle': 'business', 'url': 'https://x.com/business', 'weight': 1.9, 'specialization': ['markets', 'macro'] }, 'ft': { 'handle': 'FT', 'url': 'https://x.com/FT', 'weight': 1.8, 'specialization': ['markets', 'macro', 'geopolitical'] }, 'wsj': { 'handle': 'WSJ', 'url': 'https://x.com/WSJ', 'weight': 1.8, 'specialization': ['markets', 'macro', 'geopolitical'] }, 'cnbc': { 'handle': 'CNBC', 'url': 'https://x.com/CNBC', 'weight': 1.6, 'specialization': ['markets', 'macro'] }, 'bbcbusiness': { 'handle': 'BBCBusiness', 'url': 'https://x.com/BBCBusiness', 'weight': 1.7, 'specialization': ['geopolitical', 'macro', 'markets'] }, # ===== TIER 3: Specialized Financial Media ===== 'zerohedge': { 'handle': 'zerohedge', 'url': 'https://x.com/zerohedge', 'weight': 1.5, 'specialization': ['macro', 'geopolitical', 'markets'] }, 'marketwatch': { 'handle': 'MarketWatch', 'url': 'https://x.com/MarketWatch', 'weight': 1.6, 'specialization': ['markets', 'macro'] }, 'unusual_whales': { 'handle': 'unusual_whales', 'url': 'https://x.com/unusual_whales', 'weight': 1.5, 'specialization': ['markets'] }, 'financialtimes': { 'handle': 'FinancialTimes', 'url': 'https://x.com/FinancialTimes', 'weight': 1.8, 'specialization': ['markets', 'macro', 'geopolitical'] }, # ===== TIER 4: Economists & Analysis ===== 'economics': { 'handle': 'economics', 'url': 'https://x.com/economics', 'weight': 1.7, 'specialization': ['macro', 'geopolitical'] }, 'ap': { 'handle': 'AP', 'url': 'https://x.com/AP', 'weight': 1.7, 'specialization': ['geopolitical', 'macro'] }, 'afp': { 'handle': 'AFP', 'url': 'https://x.com/AFP', 'weight': 1.7, 'specialization': ['geopolitical', 'macro'] }, 'ajenglish': { 'handle': 'AJEnglish', 'url': 'https://x.com/AJEnglish', 'weight': 1.6, 'specialization': ['geopolitical', 'macro'] } } # Keyword detection for categorization MACRO_KEYWORDS = [ 'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde', 'interest rate', 'inflation', 'CPI', 'PPI', 'GDP', 'unemployment', 'jobs report', 'NFP', 'central bank', 'monetary policy', 'quantitative', 'recession' ] MARKET_KEYWORDS = [ 'S&P', 'Dow', 'Nasdaq', 'Russell', 'stocks', 'equities', 'earnings', 'revenue', 'profit', 'shares', 'IPO', 'merger', 'acquisition', 'crypto', 'Bitcoin', 'Ethereum', 'oil', 'gold', 'commodities', 'futures', 'options' ] GEOPOLITICAL_KEYWORDS = [ 'war', 'conflict', 'sanctions', 'trade', 'tariff', 'China', 'Russia', 'Ukraine', 'Taiwan', 'Middle East', 'election', 'government', 'military', 'diplomatic', 'treaty', 'EU', 'Brexit', 'OPEC' ] def __init__(self): """Initialize monitor""" # Find Chromium executable self.chromium_path = self._find_chromium() def _find_chromium(self) -> str: """Find Chromium installation path""" import os import shutil # Try common paths paths = [ '/usr/bin/chromium', '/usr/bin/chromium-browser', '/usr/lib/chromium/chromium', shutil.which('chromium'), shutil.which('chromium-browser'), ] for path in paths: if path and os.path.exists(path): logger.info(f"Found Chromium at: {path}") return path logger.warning("Chromium not found in standard paths") return '/usr/bin/chromium' # Fallback def _scrape_twitter_profile(self, source_name: str, source_info: Dict, timeout: int = 30) -> List[Dict]: """Scrape tweets from a single Twitter profile using Playwright""" if not PLAYWRIGHT_AVAILABLE: logger.warning("Playwright not available") return [] try: with sync_playwright() as p: # Launch lightweight browser with aggressive performance flags browser = p.chromium.launch( executable_path=self.chromium_path, headless=True, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', # Overcome limited resource problems '--no-sandbox', # Required for some environments '--disable-setuid-sandbox', '--disable-gpu', # Not needed in headless '--disable-software-rasterizer' ] ) context = browser.new_context( user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ) page = context.new_page() # Block images, fonts, css, and videos for speed def route_intercept(route): if route.request.resource_type in ["image", "media", "font", "stylesheet", "video"]: route.abort() else: route.continue_() page.route("**/*", route_intercept) # Navigate to profile with increased timeout logger.info(f"Scraping {source_name}...") page.goto(source_info['url'], timeout=timeout * 1000, wait_until="domcontentloaded") # Wait for tweets to load with increased timeout try: page.wait_for_selector("article", timeout=15000) # Increased to 15 seconds except PlaywrightTimeoutError: logger.warning(f"Timeout waiting for tweets from {source_name}") browser.close() return [] # Extract tweet texts (limit to 15) tweet_elements = page.locator("article div[data-testid='tweetText']").all() news_items = [] for idx, element in enumerate(tweet_elements[:15]): # Reduced from 20 to 15 for speed try: text = element.text_content() if not text or len(text) < 10: continue # Clean text text = text.strip() text = re.sub(r'\s+', ' ', text) # Skip retweets and replies if text.startswith('RT @') or text.startswith('@'): continue # Categorize and analyze category = self._categorize_text(text, source_info['specialization']) sentiment = self._analyze_sentiment(text) impact = self._assess_impact(source_info['weight'], text) is_breaking = self._detect_breaking_news(text) # Create summary summary = self._extract_summary(text) if len(text) > 150 else text news_items.append({ 'id': hash(f"{source_name}_{idx}_{datetime.now().isoformat()}"), 'title': text, 'summary': summary, 'source': source_info['handle'], 'category': category, 'timestamp': datetime.now() - timedelta(minutes=idx), # Approximate time 'sentiment': sentiment, 'impact': impact, 'url': source_info['url'], 'likes': 0, 'retweets': 0, 'is_breaking': is_breaking, 'source_weight': source_info['weight'], 'from_web': True }) except Exception as e: logger.debug(f"Error parsing tweet from {source_name}: {e}") continue browser.close() logger.info(f"Scraped {len(news_items)} tweets from {source_name}") return news_items except Exception as e: logger.error(f"Error scraping {source_name}: {e}") return [] def scrape_twitter_news(self, max_tweets: int = 100) -> List[Dict]: """ Scrape latest financial news from Twitter using Playwright Runs in parallel for better performance - 19 sources in ~30-45 seconds """ if not PLAYWRIGHT_AVAILABLE: logger.info("Playwright not available - using mock data") return self._get_mock_news() all_news = [] seen_texts = set() # Sort sources by weight (priority) - scrape high-value sources first sorted_sources = sorted( self.SOURCES.items(), key=lambda x: x[1]['weight'], reverse=True ) # Scrape sources in parallel with moderate concurrency # 8 workers = 19 sources in 3 batches (~60-90 seconds total) with ThreadPoolExecutor(max_workers=8) as executor: futures = [] for name, info in sorted_sources: # Increased timeout for better success rate future = executor.submit(self._scrape_twitter_profile, name, info, timeout=30) futures.append((future, name)) for future, source_name in futures: try: # Wait max 35 seconds per source (increased for reliability) news_items = future.result(timeout=35) # Deduplicate based on text similarity unique_items = [] for item in news_items: text_hash = hash(item['title'][:100]) if text_hash not in seen_texts: seen_texts.add(text_hash) unique_items.append(item) all_news.extend(unique_items) if len(unique_items) > 0: logger.info(f"Fetched {len(unique_items)} unique tweets from {source_name}") except FuturesTimeoutError: logger.warning(f"Timeout scraping {source_name} - skipping") except Exception as e: logger.error(f"Error processing {source_name}: {e}") # If no news was fetched, use mock data if not all_news: logger.warning("No tweets fetched - using mock data") return self._get_mock_news() # Sort by breaking news, then impact, then timestamp all_news.sort( key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']), reverse=True ) logger.info(f"Total unique tweets: {len(all_news)}") return all_news[:max_tweets] def _categorize_text(self, text: str, source_specialization: List[str]) -> str: """Categorize news based on keywords and source specialization""" text_lower = text.lower() # Count keyword matches macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower) market_score = sum(1 for kw in self.MARKET_KEYWORDS if kw.lower() in text_lower) geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower) # Boost scores based on source specialization if 'macro' in source_specialization: macro_score *= 1.5 if 'markets' in source_specialization: market_score *= 1.5 if 'geopolitical' in source_specialization: geo_score *= 1.5 # Return category with highest score scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score} return max(scores, key=scores.get) def _analyze_sentiment(self, text: str) -> str: """Simple keyword-based sentiment analysis for trading""" text_lower = text.lower() positive_keywords = ['surge', 'rally', 'gain', 'rise', 'up', 'bullish', 'strong', 'beat', 'exceed'] negative_keywords = ['crash', 'plunge', 'fall', 'down', 'bearish', 'weak', 'miss', 'below', 'loss'] pos_count = sum(1 for kw in positive_keywords if kw in text_lower) neg_count = sum(1 for kw in negative_keywords if kw in text_lower) if pos_count > neg_count: return 'positive' elif neg_count > pos_count: return 'negative' return 'neutral' def _assess_impact(self, source_weight: float, text: str) -> str: """Assess market impact based on source weight and keywords""" text_lower = text.lower() high_impact_keywords = ['breaking', 'alert', 'urgent', 'flash', 'fed', 'powell', 'rate', 'war'] impact_score = sum(1 for kw in high_impact_keywords if kw in text_lower) # Combine source weight and keyword impact total_impact = source_weight + (impact_score * 0.3) if total_impact >= 1.8: return 'high' elif total_impact >= 1.4: return 'medium' return 'low' def _detect_breaking_news(self, text: str) -> bool: """Detect if news is breaking/urgent""" text_lower = text.lower() breaking_keywords = ['breaking', 'alert', 'urgent', 'flash', '*breaking*', '🚨'] return any(kw in text_lower for kw in breaking_keywords) def _extract_summary(self, text: str) -> str: """Extract first 150 characters as summary""" if len(text) <= 150: return text return text[:147] + "..." def _get_mock_news(self) -> List[Dict]: """Return mock data when scraping fails""" mock_news = [ { 'id': hash('mock1'), 'title': 'Fed signals potential rate pause as inflation moderates', 'summary': 'Fed signals potential rate pause as inflation moderates', 'source': 'Mock Data', 'category': 'macro', 'timestamp': datetime.now() - timedelta(minutes=5), 'sentiment': 'neutral', 'impact': 'high', 'url': 'https://x.com', 'likes': 0, 'retweets': 0, 'is_breaking': False, 'source_weight': 1.5, 'from_web': True }, { 'id': hash('mock2'), 'title': 'S&P 500 futures rise ahead of key earnings reports', 'summary': 'S&P 500 futures rise ahead of key earnings reports', 'source': 'Mock Data', 'category': 'markets', 'timestamp': datetime.now() - timedelta(minutes=15), 'sentiment': 'positive', 'impact': 'medium', 'url': 'https://x.com', 'likes': 0, 'retweets': 0, 'is_breaking': False, 'source_weight': 1.5, 'from_web': True } ] return mock_news def get_statistics(self) -> Dict: """ Get statistics about cached news Note: Statistics are now managed by NewsCacheManager This method returns empty stats for backward compatibility """ return { 'total': 0, 'high_impact': 0, 'breaking': 0, 'last_update': 'Managed by cache', 'by_category': {} }