FinancialPlatform / app /services /news_monitor_twikit.py
Dmitry Beresnev
fix logger
d34f6ef
"""
Professional Finance News Monitor using Twikit
Real-time tracking: Macro, Markets, Geopolitical intelligence
Optimized for low-latency trading decisions
"""
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import streamlit as st
import os
import asyncio
import re
import logging
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
try:
from twikit import Client
TWIKIT_AVAILABLE = True
except ImportError:
TWIKIT_AVAILABLE = False
logger.warning("twikit not available. Install with: pip install twikit")
class FinanceNewsMonitor:
"""
Professional-grade financial news aggregator using Twikit
Sources: Bloomberg, Reuters, WSJ, FT, CNBC, and 18 more premium sources
"""
# Premium financial sources - expanded coverage
SOURCES = {
# ===== TIER 1: Major Financial News =====
'reuters': {
'handle': 'Reuters',
'weight': 1.5,
'specialization': ['macro', 'geopolitical', 'markets']
},
'bloomberg': {
'handle': 'business',
'weight': 1.5,
'specialization': ['macro', 'markets']
},
'ft': {
'handle': 'FT',
'weight': 1.4,
'specialization': ['macro', 'markets']
},
'economist': {
'handle': 'TheEconomist',
'weight': 1.3,
'specialization': ['macro', 'geopolitical']
},
'wsj': {
'handle': 'WSJ',
'weight': 1.4,
'specialization': ['markets', 'macro']
},
'bloomberg_terminal': {
'handle': 'Bloomberg',
'weight': 1.5,
'specialization': ['macro', 'markets']
},
'cnbc': {
'handle': 'CNBC',
'weight': 1.2,
'specialization': ['markets']
},
'marketwatch': {
'handle': 'MarketWatch',
'weight': 1.1,
'specialization': ['markets']
},
# ===== TIER 2: Geopolitical Intelligence =====
'bbc_world': {
'handle': 'BBCWorld',
'weight': 1.4,
'specialization': ['geopolitical']
},
'afp': {
'handle': 'AFP',
'weight': 1.3,
'specialization': ['geopolitical']
},
'aljazeera': {
'handle': 'AlJazeera',
'weight': 1.2,
'specialization': ['geopolitical']
},
'politico': {
'handle': 'politico',
'weight': 1.2,
'specialization': ['geopolitical', 'macro']
},
'dw_news': {
'handle': 'dwnews',
'weight': 1.2,
'specialization': ['geopolitical']
},
# ===== TIER 3: Central Banks & Official Sources =====
'federal_reserve': {
'handle': 'federalreserve',
'weight': 2.0, # Highest priority
'specialization': ['macro']
},
'ecb': {
'handle': 'ecb',
'weight': 2.0,
'specialization': ['macro']
},
'lagarde': {
'handle': 'Lagarde',
'weight': 1.9, # ECB President
'specialization': ['macro']
},
'bank_of_england': {
'handle': 'bankofengland',
'weight': 1.8,
'specialization': ['macro']
},
'imf': {
'handle': 'IMFNews',
'weight': 1.7,
'specialization': ['macro', 'geopolitical']
},
'world_bank': {
'handle': 'worldbank',
'weight': 1.6,
'specialization': ['macro', 'geopolitical']
},
'us_treasury': {
'handle': 'USTreasury',
'weight': 1.8,
'specialization': ['macro']
},
# ===== TIER 4: Alpha Accounts (Fast Breaking News) =====
'zerohedge': {
'handle': 'zerohedge',
'weight': 1.0,
'specialization': ['markets', 'macro']
},
'first_squawk': {
'handle': 'FirstSquawk',
'weight': 1.1, # Fast alerts
'specialization': ['markets', 'macro']
},
'live_squawk': {
'handle': 'LiveSquawk',
'weight': 1.1, # Real-time market squawks
'specialization': ['markets', 'macro']
}
}
# Enhanced keyword detection for professional traders
MACRO_KEYWORDS = [
# Central Banks & Policy
'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde',
'interest rate', 'rate cut', 'rate hike', 'QE', 'quantitative',
'monetary policy', 'inflation', 'CPI', 'PCE', 'tapering',
# Economic Data
'GDP', 'unemployment', 'jobs report', 'NFP', 'payroll',
'PMI', 'manufacturing', 'services', 'consumer confidence',
'retail sales', 'housing starts', 'durable goods'
]
MARKET_KEYWORDS = [
# Equities
'S&P', 'Dow', 'Nasdaq', 'Russell', 'earnings', 'EPS',
'stock', 'share', 'equity', 'rally', 'selloff', 'correction',
# Corporate
'merger', 'acquisition', 'IPO', 'buyback', 'dividend',
'guidance', 'revenue', 'profit', 'loss', 'bankruptcy'
]
GEOPOLITICAL_KEYWORDS = [
# Conflicts & Relations
'war', 'conflict', 'sanctions', 'trade', 'tariff', 'embargo',
'summit', 'treaty', 'diplomacy', 'tension', 'crisis',
# Regions
'Ukraine', 'Russia', 'China', 'Taiwan', 'Middle East',
'Iran', 'North Korea', 'EU', 'Brexit'
]
def __init__(self):
"""Initialize monitor with caching"""
self.news_cache = []
self.last_fetch = None
self.cache_ttl = 180 # 3 minutes for low latency
self.client = None
self.authenticated = False
async def _authenticate_twikit(self):
"""Authenticate with Twitter using Twikit"""
if not TWIKIT_AVAILABLE:
return False
try:
self.client = Client('en-US')
# Get credentials from environment variables
username = os.getenv('TWITTER_USERNAME')
email = os.getenv('TWITTER_EMAIL')
password = os.getenv('TWITTER_PASSWORD')
if not all([username, email, password]):
logger.warning("Twitter credentials not found in environment variables")
logger.info("Set TWITTER_USERNAME, TWITTER_EMAIL, TWITTER_PASSWORD in .env")
return False
await self.client.login(
auth_info_1=username,
auth_info_2=email,
password=password
)
self.authenticated = True
logger.info("Successfully authenticated with Twitter/X")
return True
except Exception as e:
logger.error(f"Twitter authentication failed: {e}")
return False
async def _scrape_twitter_async(self, max_tweets: int = 100) -> List[Dict]:
"""Async method to scrape tweets using Twikit"""
if not self.authenticated:
auth_success = await self._authenticate_twikit()
if not auth_success:
return self._get_mock_news()
all_tweets = []
tweets_per_source = max(5, max_tweets // len(self.SOURCES))
failed_sources = 0
for source_name, source_info in self.SOURCES.items():
try:
handle = source_info['handle']
# Search for tweets from this user
tweets = await self.client.search_tweet(
f'from:{handle}',
product='Latest',
count=tweets_per_source
)
for tweet in tweets:
# Skip old tweets (>24h)
tweet_date = datetime.fromisoformat(tweet.created_at.replace('Z', '+00:00'))
if (datetime.now(tweet_date.tzinfo) - tweet_date).days > 1:
continue
# Skip retweets and replies
if hasattr(tweet, 'retweeted_tweet') or tweet.in_reply_to_user_id:
continue
# Categorize and analyze
category = self._categorize_tweet(tweet.text, source_info['specialization'])
sentiment = self._analyze_sentiment(tweet.text)
impact = self._assess_impact_twikit(tweet, source_info['weight'])
is_breaking = self._detect_breaking_news(tweet.text)
all_tweets.append({
'id': int(tweet.id),
'title': tweet.text,
'summary': self._extract_summary(tweet.text),
'source': source_name.replace('_', ' ').title(),
'category': category,
'timestamp': tweet_date.replace(tzinfo=None),
'sentiment': sentiment,
'impact': impact,
'url': f'https://twitter.com/{handle}/status/{tweet.id}',
'likes': tweet.favorite_count or 0,
'retweets': tweet.retweet_count or 0,
'is_breaking': is_breaking,
'source_weight': source_info['weight']
})
except Exception as e:
failed_sources += 1
error_msg = str(e).lower()
if 'rate limit' in error_msg:
logger.warning(f"Rate limited for {source_name}")
elif 'unauthorized' in error_msg or 'forbidden' in error_msg:
logger.warning(f"Access denied for {source_name}")
else:
logger.error(f"Error scraping {source_name}: {e}")
continue
# If all sources failed, fall back to mock data
if failed_sources >= len(self.SOURCES) or len(all_tweets) == 0:
logger.warning("Twitter/X scraping failed - falling back to mock data")
return self._get_mock_news()
# Sort by impact and timestamp
all_tweets.sort(
key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']),
reverse=True
)
return all_tweets
@st.cache_data(ttl=180)
def scrape_twitter_news(_self, max_tweets: int = 100) -> List[Dict]:
"""
Scrape latest financial news with caching (sync wrapper)
max_tweets: Total tweets to fetch (distributed across sources)
"""
if not TWIKIT_AVAILABLE:
logger.info("Twikit not available - using mock data")
return _self._get_mock_news()
try:
# Run async scraping in event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(_self._scrape_twitter_async(max_tweets))
loop.close()
return result
except Exception as e:
logger.error(f"Error in async scraping: {e}")
return _self._get_mock_news()
def _categorize_tweet(self, text: str, source_specialization: List[str]) -> str:
"""Advanced categorization with source specialization"""
text_lower = text.lower()
# Count keyword matches
macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower)
market_score = sum(1 for kw in self.MARKET_KEYWORDS if kw.lower() in text_lower)
geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower)
# Weight by source specialization
if 'macro' in source_specialization:
macro_score *= 1.5
if 'markets' in source_specialization:
market_score *= 1.5
if 'geopolitical' in source_specialization:
geo_score *= 1.5
# Return highest scoring category
scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score}
return max(scores, key=scores.get) if max(scores.values()) > 0 else 'markets'
def _analyze_sentiment(self, text: str) -> str:
"""Professional sentiment analysis for traders"""
text_lower = text.lower()
positive_signals = ['surge', 'soar', 'rally', 'beat', 'upgrade', 'bullish',
'gain', 'rise', 'jump', 'boost', 'optimistic', 'positive']
negative_signals = ['plunge', 'crash', 'fall', 'miss', 'downgrade', 'bearish',
'loss', 'drop', 'slide', 'concern', 'worry', 'negative']
pos_count = sum(1 for signal in positive_signals if signal in text_lower)
neg_count = sum(1 for signal in negative_signals if signal in text_lower)
if pos_count > neg_count:
return 'positive'
elif neg_count > pos_count:
return 'negative'
return 'neutral'
def _assess_impact_twikit(self, tweet, source_weight: float) -> str:
"""Assess market impact using Twikit tweet object"""
engagement = (tweet.favorite_count or 0) + (tweet.retweet_count or 0) * 2
weighted_engagement = engagement * source_weight
if weighted_engagement > 5000 or source_weight >= 1.8:
return 'high'
elif weighted_engagement > 1000:
return 'medium'
return 'low'
def _detect_breaking_news(self, text: str) -> bool:
"""Detect breaking/urgent news"""
text_upper = text.upper()
breaking_signals = ['BREAKING', 'ALERT', 'URGENT', 'JUST IN',
'*FED', '*ECB', '*POWELL', '*LAGARDE']
return any(signal in text_upper for signal in breaking_signals)
def _extract_summary(self, text: str, max_length: int = 150) -> str:
"""Extract clean summary from tweet"""
# Remove URLs
text = re.sub(r'http\S+', '', text)
text = text.strip()
if len(text) <= max_length:
return text
return text[:max_length] + '...'
def _get_mock_news(self) -> List[Dict]:
"""Mock news data when Twikit is unavailable"""
return [
{
'id': 1,
'title': 'BREAKING: Federal Reserve announces emergency rate cut of 50bps - Powell cites economic uncertainty',
'summary': 'BREAKING: Fed emergency rate cut 50bps',
'source': 'Federal Reserve',
'category': 'macro',
'timestamp': datetime.now() - timedelta(minutes=5),
'sentiment': 'negative',
'impact': 'high',
'url': 'https://twitter.com/federalreserve',
'likes': 5000,
'retweets': 2000,
'is_breaking': True,
'source_weight': 2.0
},
{
'id': 2,
'title': '*FIRST SQUAWK: S&P 500 FUTURES DROP 2% AFTER FED ANNOUNCEMENT',
'summary': '*FIRST SQUAWK: S&P 500 futures drop 2%',
'source': 'First Squawk',
'category': 'markets',
'timestamp': datetime.now() - timedelta(minutes=10),
'sentiment': 'negative',
'impact': 'high',
'url': 'https://twitter.com/FirstSquawk',
'likes': 1500,
'retweets': 600,
'is_breaking': False,
'source_weight': 1.1
},
{
'id': 3,
'title': 'Apple reports earnings beat with $123B revenue, raises dividend by 4% - Stock up 3% after hours',
'summary': 'Apple beats earnings, raises dividend 4%',
'source': 'Bloomberg',
'category': 'markets',
'timestamp': datetime.now() - timedelta(minutes=25),
'sentiment': 'positive',
'impact': 'high',
'url': 'https://twitter.com/business',
'likes': 2800,
'retweets': 900,
'is_breaking': False,
'source_weight': 1.5
},
{
'id': 4,
'title': 'ECB President Lagarde: Inflation remains above target, rates to stay higher for longer',
'summary': 'Lagarde: rates to stay higher for longer',
'source': 'Lagarde',
'category': 'macro',
'timestamp': datetime.now() - timedelta(minutes=45),
'sentiment': 'neutral',
'impact': 'high',
'url': 'https://twitter.com/Lagarde',
'likes': 1200,
'retweets': 400,
'is_breaking': False,
'source_weight': 1.9
},
{
'id': 5,
'title': 'Ukraine conflict: New peace talks scheduled as tensions ease in Eastern Europe',
'summary': 'Ukraine: New peace talks scheduled',
'source': 'BBC World',
'category': 'geopolitical',
'timestamp': datetime.now() - timedelta(hours=1),
'sentiment': 'positive',
'impact': 'medium',
'url': 'https://twitter.com/BBCWorld',
'likes': 3500,
'retweets': 1200,
'is_breaking': False,
'source_weight': 1.4
},
{
'id': 6,
'title': 'US GDP growth revised up to 2.8% in Q4, beating economists expectations of 2.5%',
'summary': 'US GDP growth revised up to 2.8% in Q4',
'source': 'Reuters',
'category': 'macro',
'timestamp': datetime.now() - timedelta(hours=2),
'sentiment': 'positive',
'impact': 'medium',
'url': 'https://twitter.com/Reuters',
'likes': 1800,
'retweets': 600,
'is_breaking': False,
'source_weight': 1.5
},
{
'id': 7,
'title': '*LIVE SQUAWK: Oil prices surge 5% on Middle East supply concerns, Brent crude at $92/barrel',
'summary': '*LIVE SQUAWK: Oil surges 5% on supply fears',
'source': 'Live Squawk',
'category': 'markets',
'timestamp': datetime.now() - timedelta(hours=3),
'sentiment': 'neutral',
'impact': 'medium',
'url': 'https://twitter.com/LiveSquawk',
'likes': 900,
'retweets': 350,
'is_breaking': False,
'source_weight': 1.1
},
{
'id': 8,
'title': 'IMF upgrades global growth forecast to 3.2% for 2024, warns of recession risks in Europe',
'summary': 'IMF upgrades global growth to 3.2%',
'source': 'IMF',
'category': 'macro',
'timestamp': datetime.now() - timedelta(hours=4),
'sentiment': 'neutral',
'impact': 'medium',
'url': 'https://twitter.com/IMFNews',
'likes': 800,
'retweets': 300,
'is_breaking': False,
'source_weight': 1.7
},
{
'id': 9,
'title': 'US-China trade talks resume in Washington, focus on technology transfer and tariffs',
'summary': 'US-China trade talks resume',
'source': 'Politico',
'category': 'geopolitical',
'timestamp': datetime.now() - timedelta(hours=5),
'sentiment': 'neutral',
'impact': 'low',
'url': 'https://twitter.com/politico',
'likes': 600,
'retweets': 200,
'is_breaking': False,
'source_weight': 1.2
},
{
'id': 10,
'title': 'Bank of America cuts recession probability to 20%, cites resilient consumer spending',
'summary': 'BofA cuts recession probability to 20%',
'source': 'FT',
'category': 'markets',
'timestamp': datetime.now() - timedelta(hours=6),
'sentiment': 'positive',
'impact': 'low',
'url': 'https://twitter.com/FT',
'likes': 700,
'retweets': 250,
'is_breaking': False,
'source_weight': 1.4
}
]
def get_news(self, category: str = 'all', sentiment: str = 'all',
impact: str = 'all', refresh: bool = False) -> pd.DataFrame:
"""
Get filtered news with intelligent caching
Args:
category: 'all', 'macro', 'geopolitical', 'markets'
sentiment: 'all', 'positive', 'negative', 'neutral'
impact: 'all', 'high', 'medium', 'low'
refresh: Force refresh cache
"""
# Check cache freshness
if refresh or not self.last_fetch or \
(datetime.now() - self.last_fetch).seconds > self.cache_ttl:
self.news_cache = self.scrape_twitter_news(max_tweets=100)
self.last_fetch = datetime.now()
news = self.news_cache.copy()
# Apply filters
if category != 'all':
news = [n for n in news if n['category'] == category]
if sentiment != 'all':
news = [n for n in news if n['sentiment'] == sentiment]
if impact != 'all':
news = [n for n in news if n['impact'] == impact]
df = pd.DataFrame(news)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
def get_breaking_news(self) -> pd.DataFrame:
"""Get only breaking/high-impact news for alerts"""
return self.get_news(impact='high')
def get_statistics(self) -> Dict:
"""Get feed statistics"""
if not self.news_cache:
return {
'total': 0,
'high_impact': 0,
'breaking': 0,
'last_update': 'Never',
'by_category': {}
}
df = pd.DataFrame(self.news_cache)
return {
'total': len(df),
'high_impact': len(df[df['impact'] == 'high']),
'breaking': len(df[df['is_breaking'] == True]),
'last_update': self.last_fetch.strftime('%H:%M:%S') if self.last_fetch else 'Never',
'by_category': df['category'].value_counts().to_dict()
}