| """ |
| CompI Real-Time Data Processing Utilities |
| |
| This module provides utilities for Phase 2.D: Real-Time Data Feeds Integration |
| - Weather data fetching from multiple APIs |
| - News headlines and RSS feed processing |
| - Social media trends and sentiment analysis |
| - Stock market and financial data integration |
| - Data summarization and context generation |
| - Real-time data caching and rate limiting |
| """ |
|
|
| import os |
| import json |
| import time |
| import hashlib |
| import requests |
| import feedparser |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Tuple, Union, Any |
| from dataclasses import dataclass, asdict |
| from enum import Enum |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class DataFeedType(Enum): |
| """Types of real-time data feeds""" |
| WEATHER = "weather" |
| NEWS = "news" |
| SOCIAL = "social" |
| FINANCIAL = "financial" |
| SPORTS = "sports" |
| TECHNOLOGY = "technology" |
| CUSTOM_RSS = "custom_rss" |
|
|
| @dataclass |
| class RealTimeDataPoint: |
| """Container for a single real-time data point""" |
| |
| feed_type: DataFeedType |
| source: str |
| timestamp: datetime |
| title: str |
| content: str |
| metadata: Dict[str, Any] |
| sentiment_score: Optional[float] = None |
| relevance_score: Optional[float] = None |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary for JSON serialization""" |
| return { |
| 'feed_type': self.feed_type.value, |
| 'source': self.source, |
| 'timestamp': self.timestamp.isoformat(), |
| 'title': self.title, |
| 'content': self.content, |
| 'metadata': self.metadata, |
| 'sentiment_score': self.sentiment_score, |
| 'relevance_score': self.relevance_score |
| } |
|
|
| @dataclass |
| class RealTimeContext: |
| """Container for processed real-time context""" |
| |
| data_points: List[RealTimeDataPoint] |
| summary: str |
| mood_indicators: List[str] |
| key_themes: List[str] |
| temporal_context: str |
| artistic_inspiration: str |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary for JSON serialization""" |
| return { |
| 'data_points': [dp.to_dict() for dp in self.data_points], |
| 'summary': self.summary, |
| 'mood_indicators': self.mood_indicators, |
| 'key_themes': self.key_themes, |
| 'temporal_context': self.temporal_context, |
| 'artistic_inspiration': self.artistic_inspiration |
| } |
|
|
| class DataFeedCache: |
| """Simple caching system for real-time data to respect rate limits""" |
| |
| def __init__(self, cache_duration_minutes: int = 15): |
| """ |
| Initialize cache |
| |
| Args: |
| cache_duration_minutes: How long to cache data in minutes |
| """ |
| self.cache = {} |
| self.cache_duration = timedelta(minutes=cache_duration_minutes) |
| |
| def get_cache_key(self, feed_type: str, params: Dict[str, Any]) -> str: |
| """Generate cache key from feed type and parameters""" |
| param_str = json.dumps(params, sort_keys=True) |
| return hashlib.md5(f"{feed_type}_{param_str}".encode()).hexdigest() |
| |
| def get(self, feed_type: str, params: Dict[str, Any]) -> Optional[Any]: |
| """Get cached data if still valid""" |
| cache_key = self.get_cache_key(feed_type, params) |
| |
| if cache_key in self.cache: |
| data, timestamp = self.cache[cache_key] |
| if datetime.now() - timestamp < self.cache_duration: |
| logger.info(f"Using cached data for {feed_type}") |
| return data |
| else: |
| |
| del self.cache[cache_key] |
| |
| return None |
| |
| def set(self, feed_type: str, params: Dict[str, Any], data: Any): |
| """Cache data with timestamp""" |
| cache_key = self.get_cache_key(feed_type, params) |
| self.cache[cache_key] = (data, datetime.now()) |
| logger.info(f"Cached data for {feed_type}") |
|
|
| class WeatherDataFetcher: |
| """Fetch weather data from multiple sources""" |
| |
| def __init__(self, api_key: Optional[str] = None): |
| """ |
| Initialize weather fetcher |
| |
| Args: |
| api_key: OpenWeatherMap API key (optional, uses demo key if not provided) |
| """ |
| self.api_key = api_key or "9a524f695a4940f392150142250107" |
| self.base_url = "https://api.openweathermap.org/data/2.5/weather" |
| |
| def fetch_weather(self, city: str, country_code: Optional[str] = None) -> RealTimeDataPoint: |
| """ |
| Fetch current weather for a city |
| |
| Args: |
| city: City name |
| country_code: Optional country code (e.g., 'US', 'UK') |
| |
| Returns: |
| RealTimeDataPoint with weather information |
| """ |
| logger.info(f"Fetching weather for {city}") |
| |
| |
| query = city |
| if country_code: |
| query += f",{country_code}" |
| |
| params = { |
| "q": query, |
| "units": "metric", |
| "appid": self.api_key |
| } |
| |
| try: |
| response = requests.get(self.base_url, params=params, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
| |
| |
| weather_main = data['weather'][0]['main'] |
| weather_desc = data['weather'][0]['description'] |
| temp = data['main']['temp'] |
| feels_like = data['main']['feels_like'] |
| humidity = data['main']['humidity'] |
| pressure = data['main']['pressure'] |
| |
| |
| content = f"Current weather in {city}: {weather_desc}, {temp:.1f}°C (feels like {feels_like:.1f}°C), humidity {humidity}%, pressure {pressure} hPa" |
| |
| |
| mood_mapping = { |
| 'clear': 'bright and optimistic', |
| 'clouds': 'contemplative and soft', |
| 'rain': 'melancholic and reflective', |
| 'drizzle': 'gentle and soothing', |
| 'thunderstorm': 'dramatic and intense', |
| 'snow': 'serene and peaceful', |
| 'mist': 'mysterious and ethereal', |
| 'fog': 'mysterious and ethereal' |
| } |
| |
| mood = mood_mapping.get(weather_main.lower(), 'neutral') |
| |
| return RealTimeDataPoint( |
| feed_type=DataFeedType.WEATHER, |
| source="OpenWeatherMap", |
| timestamp=datetime.now(), |
| title=f"Weather in {city}", |
| content=content, |
| metadata={ |
| 'city': city, |
| 'country_code': country_code, |
| 'temperature': temp, |
| 'feels_like': feels_like, |
| 'humidity': humidity, |
| 'pressure': pressure, |
| 'weather_main': weather_main, |
| 'weather_description': weather_desc, |
| 'mood': mood |
| } |
| ) |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error fetching weather data: {e}") |
| return RealTimeDataPoint( |
| feed_type=DataFeedType.WEATHER, |
| source="OpenWeatherMap", |
| timestamp=datetime.now(), |
| title=f"Weather in {city}", |
| content=f"Unable to fetch weather data for {city}: {str(e)}", |
| metadata={'error': str(e), 'city': city} |
| ) |
|
|
| class NewsDataFetcher: |
| """Fetch news data from multiple sources""" |
| |
| def __init__(self, api_key: Optional[str] = None): |
| """ |
| Initialize news fetcher |
| |
| Args: |
| api_key: NewsAPI key (optional, uses RSS feeds if not provided) |
| """ |
| self.api_key = api_key |
| self.newsapi_url = "https://newsapi.org/v2/top-headlines" |
| |
| |
| self.rss_feeds = { |
| 'general': 'https://feeds.bbci.co.uk/news/rss.xml', |
| 'technology': 'https://feeds.bbci.co.uk/news/technology/rss.xml', |
| 'science': 'https://feeds.bbci.co.uk/news/science_and_environment/rss.xml', |
| 'world': 'https://feeds.bbci.co.uk/news/world/rss.xml', |
| 'business': 'https://feeds.bbci.co.uk/news/business/rss.xml' |
| } |
| |
| def fetch_news_headlines(self, category: str = 'general', max_headlines: int = 5) -> List[RealTimeDataPoint]: |
| """ |
| Fetch news headlines |
| |
| Args: |
| category: News category |
| max_headlines: Maximum number of headlines to fetch |
| |
| Returns: |
| List of RealTimeDataPoint objects with news data |
| """ |
| logger.info(f"Fetching {max_headlines} news headlines for category: {category}") |
| |
| if self.api_key: |
| return self._fetch_from_newsapi(category, max_headlines) |
| else: |
| return self._fetch_from_rss(category, max_headlines) |
| |
| def _fetch_from_newsapi(self, category: str, max_headlines: int) -> List[RealTimeDataPoint]: |
| """Fetch news from NewsAPI (requires API key)""" |
| params = { |
| 'apiKey': self.api_key, |
| 'category': category, |
| 'pageSize': max_headlines, |
| 'language': 'en' |
| } |
| |
| try: |
| response = requests.get(self.newsapi_url, params=params, timeout=10) |
| response.raise_for_status() |
| data = response.json() |
| |
| news_points = [] |
| for article in data.get('articles', []): |
| news_point = RealTimeDataPoint( |
| feed_type=DataFeedType.NEWS, |
| source=article.get('source', {}).get('name', 'Unknown'), |
| timestamp=datetime.now(), |
| title=article.get('title', ''), |
| content=article.get('description', ''), |
| metadata={ |
| 'url': article.get('url', ''), |
| 'published_at': article.get('publishedAt', ''), |
| 'category': category |
| } |
| ) |
| news_points.append(news_point) |
| |
| return news_points |
| |
| except Exception as e: |
| logger.error(f"Error fetching news from NewsAPI: {e}") |
| return [] |
| |
| def _fetch_from_rss(self, category: str, max_headlines: int) -> List[RealTimeDataPoint]: |
| """Fetch news from RSS feeds (free, no API key required)""" |
| feed_url = self.rss_feeds.get(category, self.rss_feeds['general']) |
| |
| try: |
| feed = feedparser.parse(feed_url) |
| news_points = [] |
| |
| for entry in feed.entries[:max_headlines]: |
| news_point = RealTimeDataPoint( |
| feed_type=DataFeedType.NEWS, |
| source=feed.feed.get('title', 'BBC News'), |
| timestamp=datetime.now(), |
| title=entry.get('title', ''), |
| content=entry.get('summary', ''), |
| metadata={ |
| 'url': entry.get('link', ''), |
| 'published': entry.get('published', ''), |
| 'category': category |
| } |
| ) |
| news_points.append(news_point) |
| |
| return news_points |
| |
| except Exception as e: |
| logger.error(f"Error fetching RSS news: {e}") |
| return [] |
|
|
|
|
| class FinancialDataFetcher: |
| """Fetch financial and market data""" |
|
|
| def __init__(self): |
| """Initialize financial data fetcher""" |
| |
| self.crypto_url = "https://api.coindesk.com/v1/bpi/currentprice.json" |
| self.forex_url = "https://api.exchangerate-api.com/v4/latest/USD" |
|
|
| def fetch_market_summary(self) -> List[RealTimeDataPoint]: |
| """ |
| Fetch basic market data |
| |
| Returns: |
| List of RealTimeDataPoint objects with financial data |
| """ |
| logger.info("Fetching market summary") |
| data_points = [] |
|
|
| |
| try: |
| response = requests.get(self.crypto_url, timeout=10) |
| response.raise_for_status() |
| btc_data = response.json() |
|
|
| btc_price = btc_data['bpi']['USD']['rate'] |
| btc_point = RealTimeDataPoint( |
| feed_type=DataFeedType.FINANCIAL, |
| source="CoinDesk", |
| timestamp=datetime.now(), |
| title="Bitcoin Price", |
| content=f"Bitcoin (BTC): {btc_price}", |
| metadata={ |
| 'currency': 'USD', |
| 'asset': 'Bitcoin', |
| 'symbol': 'BTC' |
| } |
| ) |
| data_points.append(btc_point) |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Bitcoin data: {e}") |
|
|
| |
| try: |
| response = requests.get(self.forex_url, timeout=10) |
| response.raise_for_status() |
| forex_data = response.json() |
|
|
| eur_rate = forex_data['rates'].get('EUR', 'N/A') |
| gbp_rate = forex_data['rates'].get('GBP', 'N/A') |
|
|
| forex_point = RealTimeDataPoint( |
| feed_type=DataFeedType.FINANCIAL, |
| source="ExchangeRate-API", |
| timestamp=datetime.now(), |
| title="Currency Exchange", |
| content=f"USD/EUR: {eur_rate}, USD/GBP: {gbp_rate}", |
| metadata={ |
| 'base_currency': 'USD', |
| 'eur_rate': eur_rate, |
| 'gbp_rate': gbp_rate |
| } |
| ) |
| data_points.append(forex_point) |
|
|
| except Exception as e: |
| logger.error(f"Error fetching forex data: {e}") |
|
|
| return data_points |
|
|
|
|
| class RealTimeDataProcessor: |
| """Process and contextualize real-time data for artistic inspiration""" |
|
|
| def __init__(self): |
| """Initialize the data processor""" |
| self.cache = DataFeedCache() |
| self.weather_fetcher = WeatherDataFetcher() |
| self.news_fetcher = NewsDataFetcher() |
| self.financial_fetcher = FinancialDataFetcher() |
|
|
| |
| self.mood_keywords = { |
| 'positive': ['sunny', 'clear', 'bright', 'growth', 'success', 'celebration', 'victory'], |
| 'negative': ['storm', 'rain', 'decline', 'crisis', 'conflict', 'tragedy', 'loss'], |
| 'neutral': ['cloudy', 'stable', 'steady', 'normal', 'routine', 'regular'], |
| 'dramatic': ['thunderstorm', 'breaking', 'urgent', 'major', 'significant', 'dramatic'], |
| 'peaceful': ['calm', 'gentle', 'quiet', 'serene', 'peaceful', 'tranquil'] |
| } |
|
|
| def fetch_realtime_context( |
| self, |
| include_weather: bool = False, |
| weather_city: str = "New York", |
| include_news: bool = False, |
| news_category: str = "general", |
| max_news: int = 3, |
| include_financial: bool = False, |
| weather_api_key: Optional[str] = None, |
| news_api_key: Optional[str] = None |
| ) -> RealTimeContext: |
| """ |
| Fetch and process real-time data from multiple sources |
| |
| Args: |
| include_weather: Whether to include weather data |
| weather_city: City for weather data |
| include_news: Whether to include news data |
| news_category: Category of news to fetch |
| max_news: Maximum number of news items |
| include_financial: Whether to include financial data |
| weather_api_key: Optional weather API key |
| news_api_key: Optional news API key |
| |
| Returns: |
| RealTimeContext with processed data |
| """ |
| logger.info("Fetching real-time context") |
|
|
| data_points = [] |
|
|
| |
| if include_weather: |
| cache_key = f"weather_{weather_city}" |
| cached_weather = self.cache.get("weather", {"city": weather_city}) |
|
|
| if cached_weather: |
| data_points.append(cached_weather) |
| else: |
| if weather_api_key: |
| self.weather_fetcher.api_key = weather_api_key |
|
|
| weather_data = self.weather_fetcher.fetch_weather(weather_city) |
| data_points.append(weather_data) |
| self.cache.set("weather", {"city": weather_city}, weather_data) |
|
|
| |
| if include_news: |
| cache_key = f"news_{news_category}_{max_news}" |
| cached_news = self.cache.get("news", {"category": news_category, "max": max_news}) |
|
|
| if cached_news: |
| data_points.extend(cached_news) |
| else: |
| if news_api_key: |
| self.news_fetcher.api_key = news_api_key |
|
|
| news_data = self.news_fetcher.fetch_news_headlines(news_category, max_news) |
| data_points.extend(news_data) |
| self.cache.set("news", {"category": news_category, "max": max_news}, news_data) |
|
|
| |
| if include_financial: |
| cached_financial = self.cache.get("financial", {}) |
|
|
| if cached_financial: |
| data_points.extend(cached_financial) |
| else: |
| financial_data = self.financial_fetcher.fetch_market_summary() |
| data_points.extend(financial_data) |
| self.cache.set("financial", {}, financial_data) |
|
|
| |
| return self._process_data_points(data_points) |
|
|
| def _process_data_points(self, data_points: List[RealTimeDataPoint]) -> RealTimeContext: |
| """Process data points into artistic context""" |
|
|
| if not data_points: |
| return RealTimeContext( |
| data_points=[], |
| summary="No real-time data available", |
| mood_indicators=[], |
| key_themes=[], |
| temporal_context="", |
| artistic_inspiration="" |
| ) |
|
|
| |
| summaries = [] |
| for dp in data_points: |
| summaries.append(f"{dp.title}: {dp.content}") |
|
|
| summary = "; ".join(summaries) |
|
|
| |
| mood_indicators = self._extract_mood_indicators(data_points) |
|
|
| |
| key_themes = self._extract_key_themes(data_points) |
|
|
| |
| temporal_context = self._generate_temporal_context(data_points) |
|
|
| |
| artistic_inspiration = self._generate_artistic_inspiration(data_points, mood_indicators, key_themes) |
|
|
| return RealTimeContext( |
| data_points=data_points, |
| summary=summary, |
| mood_indicators=mood_indicators, |
| key_themes=key_themes, |
| temporal_context=temporal_context, |
| artistic_inspiration=artistic_inspiration |
| ) |
|
|
| def _extract_mood_indicators(self, data_points: List[RealTimeDataPoint]) -> List[str]: |
| """Extract mood indicators from data points""" |
| moods = [] |
|
|
| for dp in data_points: |
| content_lower = dp.content.lower() |
|
|
| |
| if dp.feed_type == DataFeedType.WEATHER: |
| weather_mood = dp.metadata.get('mood', '') |
| if weather_mood: |
| moods.append(weather_mood) |
|
|
| |
| for mood, keywords in self.mood_keywords.items(): |
| if any(keyword in content_lower for keyword in keywords): |
| moods.append(mood) |
| break |
|
|
| return list(set(moods)) |
|
|
| def _extract_key_themes(self, data_points: List[RealTimeDataPoint]) -> List[str]: |
| """Extract key themes from data points""" |
| themes = [] |
|
|
| for dp in data_points: |
| if dp.feed_type == DataFeedType.WEATHER: |
| themes.append("nature") |
| themes.append("environment") |
| elif dp.feed_type == DataFeedType.NEWS: |
| themes.append("current events") |
| themes.append("society") |
| elif dp.feed_type == DataFeedType.FINANCIAL: |
| themes.append("economy") |
| themes.append("markets") |
|
|
| return list(set(themes)) |
|
|
| def _generate_temporal_context(self, data_points: List[RealTimeDataPoint]) -> str: |
| """Generate temporal context description""" |
| now = datetime.now() |
| time_desc = now.strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
| return f"Real-time context captured on {time_desc}" |
|
|
| def _generate_artistic_inspiration( |
| self, |
| data_points: List[RealTimeDataPoint], |
| mood_indicators: List[str], |
| key_themes: List[str] |
| ) -> str: |
| """Generate artistic inspiration text from processed data""" |
|
|
| inspiration_parts = [] |
|
|
| |
| if mood_indicators: |
| mood_text = ", ".join(mood_indicators) |
| inspiration_parts.append(f"reflecting a {mood_text} atmosphere") |
|
|
| |
| if key_themes: |
| theme_text = " and ".join(key_themes) |
| inspiration_parts.append(f"inspired by {theme_text}") |
|
|
| |
| for dp in data_points: |
| if dp.feed_type == DataFeedType.WEATHER: |
| weather_desc = dp.metadata.get('weather_description', '') |
| if weather_desc: |
| inspiration_parts.append(f"with {weather_desc} weather influences") |
|
|
| elif dp.feed_type == DataFeedType.NEWS: |
| inspiration_parts.append("capturing the pulse of current events") |
|
|
| elif dp.feed_type == DataFeedType.FINANCIAL: |
| inspiration_parts.append("reflecting market dynamics and economic energy") |
|
|
| if inspiration_parts: |
| return ", ".join(inspiration_parts) |
| else: |
| return "drawing from the current moment in time" |
|
|