Spaces:
Running
Running
| we need a frontend web ui which ties together the backend consisting of our insight scraping algorithms: | |
| import asyncio | |
| import aiohttp | |
| import json | |
| import time | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from collections import defaultdict | |
| import sqlite3 | |
| from textblob import TextBlob | |
| import re | |
| from urllib.parse import quote_plus | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| @dataclass | |
| class TrendData: | |
| """Data structure for trend information""" | |
| keyword: str | |
| source: str | |
| volume: int | |
| sentiment: float | |
| timestamp: datetime | |
| metadata: Dict | |
| @dataclass | |
| class MarketGap: | |
| """Data structure for identified market gaps""" | |
| gap_description: str | |
| evidence_score: float | |
| demand_indicators: List[str] | |
| supply_gaps: List[str] | |
| related_keywords: List[str] | |
| confidence_level: float | |
| class APIClient: | |
| """Generic API client with rate limiting and error handling""" | |
| def __init__(self, base_url: str, rate_limit: float = 1.0): | |
| self.base_url = base_url | |
| self.rate_limit = rate_limit | |
| self.last_request = 0 | |
| self.session = None | |
| async def __aenter__(self): | |
| self.session = aiohttp.ClientSession() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self.session: | |
| await self.session.close() | |
| async def make_request(self, endpoint: str, params: Dict = None, headers: Dict = None): | |
| """Make rate-limited API request""" | |
| # Rate limiting | |
| now = time.time() | |
| time_since_last = now - self.last_request | |
| if time_since_last < self.rate_limit: | |
| await asyncio.sleep(self.rate_limit - time_since_last) | |
| try: | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| async with self.session.get(url, params=params, headers=headers) as response: | |
| self.last_request = time.time() | |
| if response.status == 200: | |
| return await response.json() | |
| else: | |
| logger.warning(f"API request failed: {response.status}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"API request error: {e}") | |
| return None | |
| class TrendScraper: | |
| """Main class for scraping trend data from multiple sources""" | |
| def __init__(self): | |
| self.db_path = "market_trends.db" | |
| self.initialize_database() | |
| def initialize_database(self): | |
| """Initialize SQLite database for storing trend data""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create trends table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS trends ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| keyword TEXT, | |
| source TEXT, | |
| volume INTEGER, | |
| sentiment REAL, | |
| timestamp TEXT, | |
| metadata TEXT | |
| ) | |
| ''') | |
| # Create market_gaps table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS market_gaps ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| gap_description TEXT, | |
| evidence_score REAL, | |
| demand_indicators TEXT, | |
| supply_gaps TEXT, | |
| related_keywords TEXT, | |
| confidence_level REAL, | |
| created_at TEXT | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| async def scrape_reddit_trends(self, keywords: List[str]) -> List[TrendData]: | |
| """Scrape Reddit for trend data using pushshift API""" | |
| trends = [] | |
| async with APIClient("https://api.pushshift.io/reddit/search", rate_limit=1.0) as client: | |
| for keyword in keywords: | |
| params = { | |
| 'q': keyword, | |
| 'subreddit': 'all', | |
| 'sort': 'desc', | |
| 'sort_type': 'created_utc', | |
| 'size': 100, | |
| 'after': int((datetime.now() - timedelta(days=30)).timestamp()) | |
| } | |
| data = await client.make_request("submission", params) | |
| if data and 'data' in data: | |
| for post in data['data']: | |
| sentiment = TextBlob(post.get('title', '') + ' ' + post.get('selftext', '')).sentiment.polarity | |
| trends.append(TrendData( | |
| keyword=keyword, | |
| source='reddit', | |
| volume=post.get('score', 0), | |
| sentiment=sentiment, | |
| timestamp=datetime.fromtimestamp(post.get('created_utc', 0)), | |
| metadata={'subreddit': post.get('subreddit'), 'comments': post.get('num_comments', 0)} | |
| )) | |
| return trends | |
| async def scrape_twitter_trends(self, keywords: List[str]) -> List[TrendData]: | |
| """Simulate Twitter trend scraping (requires Twitter API credentials)""" | |
| # This is a placeholder - you'll need to implement with actual Twitter API | |
| trends = [] | |
| # Simulated data for demonstration | |
| for keyword in keywords: | |
| trends.append(TrendData( | |
| keyword=keyword, | |
| source='twitter', | |
| volume=np.random.randint(100, 10000), | |
| sentiment=np.random.uniform(-1, 1), | |
| timestamp=datetime.now(), | |
| metadata={'hashtags': f"#{keyword}", 'retweets': np.random.randint(10, 1000)} | |
| )) | |
| return trends | |
| async def scrape_google_trends(self, keywords: List[str]) -> List[TrendData]: | |
| """Scrape Google Trends data (requires pytrends library)""" | |
| trends = [] | |
| try: | |
| from pytrends.request import TrendReq | |
| pytrends = TrendReq(hl='en-US', tz=360) | |
| for keyword in keywords: | |
| pytrends.build_payload([keyword], timeframe='today 3-m') | |
| interest_over_time = pytrends.interest_over_time() | |
| if not interest_over_time.empty: | |
| for date, row in interest_over_time.iterrows(): | |
| trends.append(TrendData( | |
| keyword=keyword, | |
| source='google_trends', | |
| volume=int(row[keyword]), | |
| sentiment=0.0, # Google Trends doesn't provide sentiment | |
| timestamp=date, | |
| metadata={'isPartial': row.get('isPartial', False)} | |
| )) | |
| except ImportError: | |
| logger.warning("pytrends not installed. Skipping Google Trends scraping.") | |
| return trends | |
| async def scrape_github_trends(self, keywords: List[str]) -> List[TrendData]: | |
| """Scrape GitHub for repository trends""" | |
| trends = [] | |
| async with APIClient("https://api.github.com", rate_limit=1.0) as client: | |
| headers = {'Accept': 'application/vnd.github.v3+json'} | |
| for keyword in keywords: | |
| params = { | |
| 'q': keyword, | |
| 'sort': 'updated', | |
| 'order': 'desc', | |
| 'per_page': 100 | |
| } | |
| data = await client.make_request("search/repositories", params, headers) | |
| if data and 'items' in data: | |
| for repo in data['items']: | |
| trends.append(TrendData( | |
| keyword=keyword, | |
| source='github', | |
| volume=repo.get('stargazers_count', 0), | |
| sentiment=0.5, # Neutral sentiment for GitHub | |
| timestamp=datetime.fromisoformat(repo.get('updated_at', '').replace('Z', '+00:00')), | |
| metadata={ | |
| 'language': repo.get('language'), | |
| 'forks': repo.get('forks_count', 0), | |
| 'issues': repo.get('open_issues_count', 0) | |
| } | |
| )) | |
| return trends | |
| async def scrape_news_trends(self, keywords: List[str]) -> List[TrendData]: | |
| """Scrape news trends using NewsAPI (requires API key)""" | |
| trends = [] | |
| # Placeholder for NewsAPI integration | |
| # You'll need to register for a NewsAPI key at https://newsapi.org/ | |
| API_KEY = "YOUR_NEWSAPI_KEY" # Replace with actual key | |
| if API_KEY != "YOUR_NEWSAPI_KEY": | |
| async with APIClient("https://newsapi.org/v2", rate_limit=1.0) as client: | |
| headers = {'X-API-Key': API_KEY} | |
| for keyword in keywords: | |
| params = { | |
| 'q': keyword, | |
| 'sortBy': 'popularity', | |
| 'pageSize': 100, | |
| 'from': (datetime.now() - timedelta(days=30)).isoformat() | |
| } | |
| data = await client.make_request("everything", params, headers) | |
| if data and 'articles' in data: | |
| for article in data['articles']: | |
| sentiment = TextBlob(article.get('title', '') + ' ' + article.get('description', '')).sentiment.polarity | |
| trends.append(TrendData( | |
| keyword=keyword, | |
| source='news', | |
| volume=1, # News articles don't have volume metrics | |
| sentiment=sentiment, | |
| timestamp=datetime.fromisoformat(article.get('publishedAt', '').replace('Z', '+00:00')), | |
| metadata={ | |
| 'source': article.get('source', {}).get('name'), | |
| 'author': article.get('author') | |
| } | |
| )) | |
| return trends | |
| class MarketGapAnalyzer: | |
| """Analyze trends to identify market gaps""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| def load_trend_data(self, keywords: List[str]) -> pd.DataFrame: | |
| """Load trend data from database""" | |
| conn = sqlite3.connect(self.db_path) | |
| keyword_filter = "'" + "','".join(keywords) + "'" | |
| query = f""" | |
| SELECT * FROM trends | |
| WHERE keyword IN ({keyword_filter}) | |
| ORDER BY timestamp DESC | |
| """ | |
| df = pd.read_sql_query(query, conn) | |
| conn.close() | |
| if not df.empty: | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| df['metadata'] = df['metadata'].apply(json.loads) | |
| return df | |
| def calculate_demand_indicators(self, df: pd.DataFrame) -> Dict[str, float]: | |
| """Calculate various demand indicators""" | |
| indicators = {} | |
| # Volume trend analysis | |
| df_sorted = df.sort_values('timestamp') | |
| if len(df_sorted) > 1: | |
| recent_volume = df_sorted.tail(10)['volume'].mean() | |
| older_volume = df_sorted.head(10)['volume'].mean() | |
| indicators['volume_growth'] = (recent_volume - older_volume) / max(older_volume, 1) | |
| # Sentiment analysis | |
| indicators['avg_sentiment'] = df['sentiment'].mean() | |
| indicators['sentiment_volatility'] = df['sentiment'].std() | |
| # Source diversity | |
| source_counts = df['source'].value_counts() | |
| indicators['source_diversity'] = len(source_counts) | |
| indicators['cross_platform_consistency'] = 1 - (source_counts.std() / source_counts.mean()) if source_counts.mean() > 0 else 0 | |
| # Temporal patterns | |
| df['hour'] = df['timestamp'].dt.hour | |
| df['day_of_week'] = df['timestamp'].dt.dayofweek | |
| indicators['temporal_concentration'] = df.groupby('hour').size().std() / df.groupby('hour').size().mean() | |
| return indicators | |
| def identify_supply_gaps(self, keyword: str, df: pd.DataFrame) -> List[str]: | |
| """Identify potential supply gaps based on trend analysis""" | |
| gaps = [] | |
| # Analyze sentiment patterns | |
| if df['sentiment'].mean() < -0.2: | |
| gaps.append(f"High negative sentiment around {keyword} suggests user dissatisfaction") | |
| # Analyze GitHub data for tech gaps | |
| github_data = df[df['source'] == 'github'] | |
| if not github_data.empty: | |
| avg_stars = github_data['volume'].mean() | |
| if avg_stars < 100: | |
| gaps.append(f"Low GitHub engagement suggests limited open-source solutions for {keyword}") | |
| # Analyze Reddit discussions for unmet needs | |
| reddit_data = df[df['source'] == 'reddit'] | |
| if not reddit_data.empty: | |
| high_comment_posts = reddit_data[ | |
| reddit_data['metadata'].apply(lambda x: x.get('comments', 0) > 50) | |
| ] | |
| if len(high_comment_posts) > len(reddit_data) * 0.3: | |
| gaps.append(f"High discussion volume on Reddit indicates active community seeking solutions for {keyword}") | |
| return gaps | |
| def calculate_market_gap_score(self, demand_indicators: Dict[str, float], supply_gaps: List[str]) -> float: | |
| """Calculate overall market gap score""" | |
| score = 0.0 | |
| # Demand components | |
| score += max(demand_indicators.get('volume_growth', 0) * 0.3, 0) | |
| score += (demand_indicators.get('avg_sentiment', 0) + 1) * 0.1 # Normalize sentiment to 0-2 range | |
| score += demand_indicators.get('source_diversity', 0) * 0.1 | |
| score += demand_indicators.get('cross_platform_consistency', 0) * 0.2 | |
| # Supply gap components | |
| score += len(supply_gaps) * 0.1 | |
| # Temporal activity (higher concentration might indicate urgent need) | |
| temporal_score = demand_indicators.get('temporal_concentration', 0) | |
| if temporal_score > 1: # High concentration | |
| score += 0.2 | |
| return min(score, 1.0) # Cap at 1.0 | |
| def analyze_market_gaps(self, keywords: List[str]) -> List[MarketGap]: | |
| """Main method to analyze market gaps for given keywords""" | |
| df = self.load_trend_data(keywords) | |
| gaps = [] | |
| for keyword in keywords: | |
| keyword_data = df[df['keyword'] == keyword] | |
| if keyword_data.empty: | |
| continue | |
| demand_indicators = self.calculate_demand_indicators(keyword_data) | |
| supply_gaps = self.identify_supply_gaps(keyword, keyword_data) | |
| # Generate gap description | |
| gap_description = f"Market opportunity in {keyword} sector" | |
| if demand_indicators.get('volume_growth', 0) > 0.5: | |
| gap_description += " with high growth potential" | |
| if demand_indicators.get('avg_sentiment', 0) < -0.2: | |
| gap_description += " addressing user dissatisfaction" | |
| gap_score = self.calculate_market_gap_score(demand_indicators, supply_gaps) | |
| # Find related keywords (simple co-occurrence analysis) | |
| related_keywords = self.find_related_keywords(keyword, df) | |
| gap = MarketGap( | |
| gap_description=gap_description, | |
| evidence_score=gap_score, | |
| demand_indicators=[f"{k}: {v:.3f}" for k, v in demand_indicators.items()], | |
| supply_gaps=supply_gaps, | |
| related_keywords=related_keywords, | |
| confidence_level=min(len(keyword_data) / 100.0, 1.0) # Based on data volume | |
| ) | |
| gaps.append(gap) | |
| return sorted(gaps, key=lambda x: x.evidence_score, reverse=True) | |
| def find_related_keywords(self, keyword: str, df: pd.DataFrame) -> List[str]: | |
| """Find keywords that often appear together with the target keyword""" | |
| related = [] | |
| # Simple co-occurrence based on temporal proximity | |
| keyword_data = df[df['keyword'] == keyword] | |
| for _, row in keyword_data.iterrows(): | |
| time_window = timedelta(hours=24) | |
| nearby_data = df[ | |
| (df['timestamp'] >= row['timestamp'] - time_window) & | |
| (df['timestamp'] <= row['timestamp'] + time_window) & | |
| (df['keyword'] != keyword) | |
| ] | |
| related.extend(nearby_data['keyword'].unique()) | |
| # Count occurrences and return top related keywords | |
| from collections import Counter | |
| related_counts = Counter(related) | |
| return [k for k, v in related_counts.most_common(5)] | |
| def save_market_gaps(self, gaps: List[MarketGap]): | |
| """Save identified market gaps to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| for gap in gaps: | |
| cursor.execute(''' | |
| INSERT INTO market_gaps | |
| (gap_description, evidence_score, demand_indicators, supply_gaps, related_keywords, confidence_level, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| gap.gap_description, | |
| gap.evidence_score, | |
| json.dumps(gap.demand_indicators), | |
| json.dumps(gap.supply_gaps), | |
| json.dumps(gap.related_keywords), | |
| gap.confidence_level, | |
| datetime.now().isoformat() | |
| )) | |
| conn.commit() | |
| conn.close() | |
| class TrendCorrelationEngine: | |
| """Engine for correlating trends across different data sources""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| def calculate_cross_platform_correlation(self, keyword: str) -> Dict[str, float]: | |
| """Calculate correlation between different platforms for a keyword""" | |
| conn = sqlite3.connect(self.db_path) | |
| query = """ | |
| SELECT source, DATE(timestamp) as date, AVG(volume) as avg_volume, AVG(sentiment) as avg_sentiment | |
| FROM trends | |
| WHERE keyword = ? | |
| GROUP BY source, date | |
| ORDER BY date | |
| """ | |
| df = pd.read_sql_query(query, conn, params=[keyword]) | |
| conn.close() | |
| if df.empty: | |
| return {} | |
| # Pivot data for correlation analysis | |
| volume_pivot = df.pivot(index='date', columns='source', values='avg_volume').fillna(0) | |
| sentiment_pivot = df.pivot(index='date', columns='source', values='avg_sentiment').fillna(0) | |
| correlations = {} | |
| # Volume correlations | |
| if len(volume_pivot.columns) > 1: | |
| vol_corr = volume_pivot.corr() | |
| correlations['volume_correlations'] = vol_corr.to_dict() | |
| # Sentiment correlations | |
| if len(sentiment_pivot.columns) > 1: | |
| sent_corr = sentiment_pivot.corr() | |
| correlations['sentiment_correlations'] = sent_corr.to_dict() | |
| return correlations | |
| async def main_analysis_pipeline(keywords: List[str]): | |
| """Main pipeline for trend scraping and market gap analysis""" | |
| logger.info(f"Starting analysis for keywords: {keywords}") | |
| # Initialize components | |
| scraper = TrendScraper() | |
| analyzer = MarketGapAnalyzer(scraper.db_path) | |
| correlation_engine = TrendCorrelationEngine(scraper.db_path) | |
| # Step 1: Scrape data from all sources | |
| all_trends = [] | |
| logger.info("Scraping Reddit trends...") | |
| reddit_trends = await scraper.scrape_reddit_trends(keywords) | |
| all_trends.extend(reddit_trends) | |
| logger.info("Scraping Twitter trends...") | |
| twitter_trends = await scraper.scrape_twitter_trends(keywords) | |
| all_trends.extend(twitter_trends) | |
| logger.info("Scraping Google trends...") | |
| google_trends = await scraper.scrape_google_trends(keywords) | |
| all_trends.extend(google_trends) | |
| logger.info("Scraping GitHub trends...") | |
| github_trends = await scraper.scrape_github_trends(keywords) | |
| all_trends.extend(github_trends) | |
| logger.info("Scraping news trends...") | |
| news_trends = await scraper.scrape_news_trends(keywords) | |
| all_trends.extend(news_trends) | |
| # Step 2: Store trends in database | |
| conn = sqlite3.connect(scraper.db_path) | |
| cursor = conn.cursor() | |
| for trend in all_trends: | |
| cursor.execute(''' | |
| INSERT INTO trends (keyword, source, volume, sentiment, timestamp, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| trend.keyword, | |
| trend.source, | |
| trend.volume, | |
| trend.sentiment, | |
| trend.timestamp.isoformat(), | |
| json.dumps(trend.metadata) | |
| )) | |
| conn.commit() | |
| conn.close() | |
| logger.info(f"Stored {len(all_trends)} trend data points") | |
| # Step 3: Analyze market gaps | |
| logger.info("Analyzing market gaps...") | |
| market_gaps = analyzer.analyze_market_gaps(keywords) | |
| # Step 4: Calculate correlations | |
| logger.info("Calculating cross-platform correlations...") | |
| correlations = {} | |
| for keyword in keywords: | |
| correlations[keyword] = correlation_engine.calculate_cross_platform_correlation(keyword) | |
| # Step 5: Save results | |
| analyzer.save_market_gaps(market_gaps) | |
| # Step 6: Generate report | |
| logger.info("Generating analysis report...") | |
| report = { | |
| 'analysis_timestamp': datetime.now().isoformat(), | |
| 'keywords_analyzed': keywords, | |
| 'total_data_points': len(all_trends), | |
| 'market_gaps_identified': len(market_gaps), | |
| 'top_opportunities': [ | |
| { | |
| 'description': gap.gap_description, | |
| 'evidence_score': gap.evidence_score, | |
| 'confidence': gap.confidence_level, | |
| 'key_indicators': gap.demand_indicators[:3], # Top 3 indicators | |
| 'supply_gaps': gap.supply_gaps | |
| } | |
| for gap in market_gaps[:5] # Top 5 opportunities | |
| ], | |
| 'cross_platform_correlations': correlations | |
| } | |
| # Save report to file | |
| with open(f"market_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f: | |
| json.dump(report, f, indent=2, default=str) | |
| logger.info("Analysis complete. Report saved.") | |
| return report | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Example keywords to analyze | |
| test_keywords = [ | |
| "sustainable packaging", | |
| "remote work tools", | |
| "mental health apps", | |
| "AI productivity", | |
| "electric vehicle charging" | |
| ] | |
| # Run the analysis | |
| asyncio.run(main_analysis_pipeline(test_keywords)) | |
| import asyncio | |
| import aiohttp | |
| import json | |
| import re | |
| from typing import List, Dict, Set | |
| import nltk | |
| from textblob import TextBlob | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.cluster import KMeans | |
| import numpy as np | |
| from collections import defaultdict, Counter | |
| import sqlite3 | |
| class KeywordExpansionEngine: | |
| """Engine for expanding keywords and identifying market segments""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by']) | |
| # Download required NLTK data | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| nltk.download('punkt') | |
| def extract_keywords_from_text(self, text: str) -> List[str]: | |
| """Extract relevant keywords from text using NLP techniques""" | |
| if not text: | |
| return [] | |
| # Clean and tokenize text | |
| text = re.sub(r'[^\w\s]', '', text.lower()) | |
| blob = TextBlob(text) | |
| # Extract noun phrases and significant words | |
| keywords = [] | |
| # Get noun phrases | |
| for phrase in blob.noun_phrases: | |
| if len(phrase.split()) <= 3 and phrase not in self.stop_words: | |
| keywords.append(phrase) | |
| # Get significant single words (nouns, adjectives) | |
| words = blob.words | |
| for word, tag in blob.tags: | |
| if tag.startswith(('NN', 'JJ')) and len(word) > 3 and word not in self.stop_words: | |
| keywords.append(word) | |
| return list(set(keywords)) | |
| def expand_keywords_from_trends(self, base_keyword: str, limit: int = 20) -> List[str]: | |
| """Expand keywords based on existing trend data""" | |
| conn = sqlite3.connect(self.db_path) | |
| # Get related content from database | |
| query = """ | |
| SELECT metadata FROM trends | |
| WHERE keyword = ? OR keyword LIKE ? | |
| """ | |
| cursor = conn.execute(query, [base_keyword, f'%{base_keyword}%']) | |
| results = cursor.fetchall() | |
| conn.close() | |
| expanded_keywords = set() | |
| for row in results: | |
| try: | |
| metadata = json.loads(row[0]) | |
| # Extract keywords from different metadata fields | |
| text_fields = [] | |
| if isinstance(metadata, dict): | |
| for key, value in metadata.items(): | |
| if isinstance(value, str) and key in ['title', 'description', 'text', 'content']: | |
| text_fields.append(value) | |
| # Process text fields | |
| for text in text_fields: | |
| keywords = self.extract_keywords_from_text(text) | |
| expanded_keywords.update(keywords[:5]) # Limit per text | |
| except json.JSONDecodeError: | |
| continue | |
| # Filter and rank keywords | |
| relevant_keywords = [] | |
| for keyword in expanded_keywords: | |
| if (len(keyword.split()) <= 3 and | |
| len(keyword) > 2 and | |
| keyword != base_keyword and | |
| base_keyword.split()[0] in keyword or any(word in keyword for word in base_keyword.split())): | |
| relevant_keywords.append(keyword) | |
| return relevant_keywords[:limit] | |
| async def expand_keywords_with_search_suggestions(self, base_keyword: str) -> List[str]: | |
| """Expand keywords using search suggestion APIs""" | |
| suggestions = [] | |
| # Google Suggest API (no API key required) | |
| async with aiohttp.ClientSession() as session: | |
| try: | |
| url = "https://suggestqueries.google.com/complete/search" | |
| params = { | |
| 'client': 'firefox', | |
| 'q': base_keyword | |
| } | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.text() | |
| # Parse the JSONP response | |
| if data.startswith('window.google.ac.h('): | |
| json_str = data[19:-1] # Remove JSONP wrapper | |
| suggestions_data = json.loads(json_str) | |
| if len(suggestions_data) > 1: | |
| suggestions.extend(suggestions_data[1][:10]) # Get top 10 | |
| except Exception as e: | |
| print(f"Error fetching Google suggestions: {e}") | |
| return suggestions | |
| def identify_market_segments(self, keywords: List[str]) -> Dict[str, List[str]]: | |
| """Identify market segments by clustering related keywords""" | |
| if len(keywords) < 5: | |
| return {"main_segment": keywords} | |
| # Get trend data for all keywords | |
| conn = sqlite3.connect(self.db_path) | |
| keyword_texts = {} | |
| for keyword in keywords: | |
| query = """ | |
| SELECT metadata FROM trends WHERE keyword = ? | |
| LIMIT 50 | |
| """ | |
| cursor = conn.execute(query, [keyword]) | |
| results = cursor.fetchall() | |
| texts = [] | |
| for row in results: | |
| try: | |
| metadata = json.loads(row[0]) | |
| if isinstance(metadata, dict): | |
| for key, value in metadata.items(): | |
| if isinstance(value, str) and len(value) > 10: | |
| texts.append(value) | |
| except: | |
| continue | |
| keyword_texts[keyword] = ' '.join(texts) | |
| conn.close() | |
| # Create text corpus for clustering | |
| corpus = [keyword_texts.get(keyword, keyword) for keyword in keywords] | |
| if len(corpus) < 3: | |
| return {"main_segment": keywords} | |
| try: | |
| # TF-IDF vectorization | |
| vectorizer = TfidfVectorizer(max_features=100, stop_words='english') | |
| tfidf_matrix = vectorizer.fit_transform(corpus) | |
| # K-means clustering | |
| n_clusters = min(max(2, len(keywords) // 3), 5) # 2-5 clusters | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| clusters = kmeans.fit_predict(tfidf_matrix) | |
| # Group keywords by cluster | |
| segments = defaultdict(list) | |
| for i, keyword in enumerate(keywords): | |
| cluster_id = clusters[i] | |
| segments[f"segment_{cluster_id}"].append(keyword) | |
| # Name segments based on common terms | |
| named_segments = {} | |
| feature_names = vectorizer.get_feature_names_out() | |
| for cluster_id in range(n_clusters): | |
| cluster_center = kmeans.cluster_centers_[cluster_id] | |
| top_indices = cluster_center.argsort()[-3:][::-1] # Top 3 features | |
| top_terms = [feature_names[i] for i in top_indices] | |
| segment_name = f"{top_terms[0]}_segment" | |
| segment_keywords = segments[f"segment_{cluster_id}"] | |
| named_segments[segment_name] = segment_keywords | |
| return named_segments | |
| except Exception as e: | |
| print(f"Clustering error: {e}") | |
| return {"main_segment": keywords} | |
| def analyze_keyword_difficulty(self, keyword: str) -> Dict[str, float]: | |
| """Analyze keyword difficulty and opportunity scores""" | |
| conn = sqlite3.connect(self.db_path) | |
| # Get trend data for keyword | |
| query = """ | |
| SELECT source, volume, sentiment, COUNT(*) as frequency | |
| FROM trends | |
| WHERE keyword = ? OR keyword LIKE ? | |
| GROUP BY source | |
| """ | |
| cursor = conn.execute(query, [keyword, f'%{keyword}%']) | |
| results = cursor.fetchall() | |
| conn.close() | |
| if not results: | |
| return { | |
| 'competition_score': 0.1, # Low competition if no data | |
| 'opportunity_score': 0.8, # High opportunity | |
| 'data_availability': 0.0 | |
| } | |
| # Calculate metrics | |
| total_volume = sum(row[1] for row in results) | |
| avg_sentiment = sum(row[2] for row in results) / len(results) | |
| source_diversity = len(results) | |
| total_frequency = sum(row[3] for row in results) | |
| # Competition score (higher volume = more competition) | |
| competition_score = min(total_volume / 10000, 1.0) # Normalize to 0-1 | |
| # Opportunity score (based on sentiment and engagement) | |
| opportunity_score = 0.5 | |
| if avg_sentiment < -0.2: # Negative sentiment indicates problems to solve | |
| opportunity_score += 0.3 | |
| if source_diversity >= 3: # Cross-platform interest | |
| opportunity_score += 0.2 | |
| opportunity_score = min(opportunity_score, 1.0) | |
| # Data availability score | |
| data_availability = min(total_frequency / 100, 1.0) | |
| return { | |
| 'competition_score': competition_score, | |
| 'opportunity_score': opportunity_score, | |
| 'data_availability': data_availability, | |
| 'total_mentions': total_frequency, | |
| 'source_diversity': source_diversity, | |
| 'avg_sentiment': avg_sentiment | |
| } | |
| def generate_keyword_variations(self, base_keyword: str) -> List[str]: | |
| """Generate variations of a base keyword""" | |
| variations = [] | |
| words = base_keyword.split() | |
| if len(words) == 1: | |
| # Single word variations | |
| base = words[0] | |
| variations.extend([ | |
| f"{base} app", | |
| f"{base} software", | |
| f"{base} tool", | |
| f"{base} platform", | |
| f"{base} service", | |
| f"{base} solution", | |
| f"best {base}", | |
| f"{base} alternative", | |
| f"cheap {base}", | |
| f"free {base}" | |
| ]) | |
| else: | |
| # Multi-word variations | |
| variations.extend([ | |
| f"{base_keyword} app", | |
| f"{base_keyword} software", | |
| f"{base_keyword} platform", | |
| f"best {base_keyword}", | |
| f"{base_keyword} solution", | |
| f"{base_keyword} alternative" | |
| ]) | |
| # Word order variations | |
| if len(words) == 2: | |
| variations.append(f"{words[1]} {words[0]}") | |
| return variations | |
| class MarketSegmentAnalyzer: | |
| """Analyze market segments and identify gaps within segments""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| def analyze_segment_trends(self, segment_keywords: List[str]) -> Dict[str, any]: | |
| """Analyze trends within a market segment""" | |
| conn = sqlite3.connect(self.db_path) | |
| # Get trend data for all keywords in segment | |
| placeholders = ','.join(['?' for _ in segment_keywords]) | |
| query = f""" | |
| SELECT keyword, source, AVG(volume) as avg_volume, | |
| AVG(sentiment) as avg_sentiment, COUNT(*) as mentions, | |
| MIN(timestamp) as first_seen, MAX(timestamp) as last_seen | |
| FROM trends | |
| WHERE keyword IN ({placeholders}) | |
| GROUP BY keyword, source | |
| """ | |
| cursor = conn.execute(query, segment_keywords) | |
| results = cursor.fetchall() | |
| conn.close() | |
| if not results: | |
| return {} | |
| # Analyze segment characteristics | |
| segment_data = defaultdict(lambda: defaultdict(list)) | |
| for row in results: | |
| keyword, source, avg_vol, avg_sent, mentions, first, last = row | |
| segment_data[keyword]['sources'].append(source) | |
| segment_data[keyword]['avg_volume'] = segment_data[keyword].get('avg_volume', 0) + avg_vol | |
| segment_data[keyword]['avg_sentiment'] = segment_data[keyword].get('avg_sentiment', 0) + avg_sent | |
| segment_data[keyword]['total_mentions'] = segment_data[keyword].get('total_mentions', 0) + mentions | |
| segment_data[keyword]['date_range'] = (first, last) | |
| # Calculate segment-wide metrics | |
| total_volume = sum(data['avg_volume'] for data in segment_data.values()) | |
| total_mentions = sum(data['total_mentions'] for data in segment_data.values()) | |
| avg_segment_sentiment = sum(data['avg_sentiment'] for data in segment_data.values()) / len(segment_data) | |
| # Identify leaders and gaps in segment | |
| sorted_by_volume = sorted(segment_data.items(), key=lambda x: x[1]['avg_volume'], reverse=True) | |
| market_leaders = sorted_by_volume[:3] # Top 3 by volume | |
| underserved_areas = sorted_by_volume[-3:] # Bottom 3 by volume | |
| return { | |
| 'segment_size': len(segment_keywords), | |
| 'total_market_volume': total_volume, | |
| 'total_mentions': total_mentions, | |
| 'avg_sentiment': avg_segment_sentiment, | |
| 'market_leaders': [(kw, data['avg_volume']) for kw, data in market_leaders], | |
| 'underserved_areas': [(kw, data['avg_volume']) for kw, data in underserved_areas], | |
| 'keyword_details': dict(segment_data) | |
| } | |
| def identify_segment_gaps(self, segment_analysis: Dict[str, any]) -> List[str]: | |
| """Identify specific gaps within a market segment""" | |
| gaps = [] | |
| # Low volume but high sentiment areas (unmet demand) | |
| if segment_analysis: | |
| keyword_details = segment_analysis.get('keyword_details', {}) | |
| for keyword, data in keyword_details.items(): | |
| volume = data.get('avg_volume', 0) | |
| sentiment = data.get('avg_sentiment', 0) | |
| # High demand, low supply indicators | |
| if volume < segment_analysis.get('total_market_volume', 0) / len(keyword_details) * 0.5: | |
| if sentiment > 0.2: # Positive sentiment | |
| gaps.append(f"Underserved positive demand for {keyword}") | |
| elif sentiment < -0.2: # Negative sentiment | |
| gaps.append(f"User dissatisfaction with current {keyword} solutions") | |
| return gaps | |
| async def comprehensive_keyword_analysis(base_keywords: List[str], db_path: str): | |
| """Comprehensive analysis pipeline for keyword expansion and market segmentation""" | |
| print(f"Starting comprehensive analysis for: {base_keywords}") | |
| # Initialize engines | |
| keyword_engine = KeywordExpansionEngine(db_path) | |
| segment_analyzer = MarketSegmentAnalyzer(db_path) | |
| results = { | |
| 'base_keywords': base_keywords, | |
| 'expanded_keywords': {}, | |
| 'market_segments': {}, | |
| 'keyword_difficulty': {}, | |
| 'segment_analysis': {}, | |
| 'identified_gaps': {} | |
| } | |
| all_keywords = set(base_keywords) | |
| # Step 1: Expand keywords | |
| print("Expanding keywords...") | |
| for keyword in base_keywords: | |
| # Get variations | |
| variations = keyword_engine.generate_keyword_variations(keyword) | |
| # Get trend-based expansions | |
| trend_expansions = keyword_engine.expand_keywords_from_trends(keyword) | |
| # Get search suggestions | |
| search_suggestions = await keyword_engine.expand_keywords_with_search_suggestions(keyword) | |
| # Combine all expansions | |
| expanded = list(set(variations + trend_expansions + search_suggestions)) | |
| results['expanded_keywords'][keyword] = expanded | |
| all_keywords.update(expanded) | |
| # Step 2: Analyze keyword difficulty | |
| print("Analyzing keyword difficulty...") | |
| for keyword in all_keywords: | |
| difficulty = keyword_engine.analyze_keyword_difficulty(keyword) | |
| results['keyword_difficulty'][keyword] = difficulty | |
| # Step 3: Identify market segments | |
| print("Identifying market segments...") | |
| all_keywords_list = list(all_keywords) | |
| segments = keyword_engine.identify_market_segments(all_keywords_list) | |
| results['market_segments'] = segments | |
| # Step 4: Analyze each segment | |
| print("Analyzing market segments...") | |
| for segment_name, segment_keywords in segments.items(): | |
| segment_analysis = segment_analyzer.analyze_segment_trends(segment_keywords) | |
| results['segment_analysis'][segment_name] = segment_analysis | |
| # Identify gaps in segment | |
| segment_gaps = segment_analyzer.identify_segment_gaps(segment_analysis) | |
| results['identified_gaps'][segment_name] = segment_gaps | |
| # Step 5: Generate prioritized opportunities | |
| print("Generating opportunity priorities...") | |
| opportunities = [] | |
| for keyword, difficulty in results['keyword_difficulty'].items(): | |
| opportunity_score = difficulty.get('opportunity_score', 0) | |
| competition_score = difficulty.get('competition_score', 0) | |
| data_availability = difficulty.get('data_availability', 0) | |
| # Calculate combined opportunity score | |
| combined_score = (opportunity_score * 0.4 + | |
| (1 - competition_score) * 0.4 + | |
| data_availability * 0.2) | |
| opportunities.append({ | |
| 'keyword': keyword, | |
| 'opportunity_score': opportunity_score, | |
| 'competition_score': competition_score, | |
| 'combined_score': combined_score, | |
| 'data_points': difficulty.get('total_mentions', 0), | |
| 'sentiment': difficulty.get('avg_sentiment', 0) | |
| }) | |
| # Sort by combined score | |
| opportunities.sort(key=lambda x: x['combined_score'], reverse=True) | |
| results['prioritized_opportunities'] = opportunities[:20] # Top 20 | |
| # Step 6: Save results | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"keyword_analysis_report_{timestamp}.json" | |
| with open(filename, 'w') as f: | |
| json.dump(results, f, indent=2, default=str) | |
| print(f"Analysis complete. Results saved to {filename}") | |
| return results | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| import sys | |
| import os | |
| from datetime import datetime | |
| # Add the main script directory to path to import the main classes | |
| sys.path.append(os.path.dirname(__file__)) | |
| async def test_keyword_expansion(): | |
| # Test with sample keywords | |
| test_keywords = [ | |
| "productivity app", | |
| "sustainable fashion", | |
| "remote collaboration" | |
| ] | |
| # Use the database from the main trend scraper | |
| db_path = "market_trends.db" | |
| # Run comprehensive analysis | |
| results = await comprehensive_keyword_analysis(test_keywords, db_path) | |
| # Print summary | |
| print("\n=== KEYWORD EXPANSION RESULTS ===") | |
| print(f"Base keywords: {len(test_keywords)}") | |
| print(f"Total expanded keywords: {sum(len(v) for v in results['expanded_keywords'].values())}") | |
| print(f"Market segments identified: {len(results['market_segments'])}") | |
| print(f"Top opportunities: {len(results['prioritized_opportunities'])}") | |
| print("\n=== TOP 5 OPPORTUNITIES ===") | |
| for i, opp in enumerate(results['prioritized_opportunities'][:5], 1): | |
| print(f"{i}. {opp['keyword']}") | |
| print(f" Combined Score: {opp['combined_score']:.3f}") | |
| print(f" Opportunity: {opp['opportunity_score']:.3f}, Competition: {opp['competition_score']:.3f}") | |
| print() | |
| # Run the test | |
| asyncio.run(test_keyword_expansion()) | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| import sqlite3 | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| import networkx as nx | |
| from wordcloud import WordCloud | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class TrendVisualizationDashboard: | |
| """Create visualizations and reports for trend analysis results""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| plt.style.use('seaborn-v0_8') | |
| sns.set_palette("husl") | |
| def load_trend_data(self, keywords: Optional[List[str]] = None, days: int = 30) -> pd.DataFrame: | |
| """Load trend data from database""" | |
| conn = sqlite3.connect(self.db_path) | |
| base_query = """ | |
| SELECT keyword, source, volume, sentiment, timestamp, metadata | |
| FROM trends | |
| WHERE timestamp >= ? | |
| """ | |
| params = [(datetime.now() - timedelta(days=days)).isoformat()] | |
| if keywords: | |
| keyword_filter = "'" + "','".join(keywords) + "'" | |
| base_query += f" AND keyword IN ({keyword_filter})" | |
| base_query += " ORDER BY timestamp DESC" | |
| df = pd.read_sql_query(base_query, conn, params=params) | |
| conn.close() | |
| if not df.empty: | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| df['date'] = df['timestamp'].dt.date | |
| df['hour'] = df['timestamp'].dt.hour | |
| try: | |
| df['metadata'] = df['metadata'].apply(json.loads) | |
| except: | |
| df['metadata'] = df['metadata'].apply(lambda x: {}) | |
| return df | |
| def load_market_gaps(self) -> pd.DataFrame: | |
| """Load market gap analysis results""" | |
| conn = sqlite3.connect(self.db_path) | |
| query = """ | |
| SELECT gap_description, evidence_score, demand_indicators, | |
| supply_gaps, related_keywords, confidence_level, created_at | |
| FROM market_gaps | |
| ORDER BY evidence_score DESC | |
| """ | |
| df = pd.read_sql_query(query, conn) | |
| conn.close() | |
| if not df.empty: | |
| df['created_at'] = pd.to_datetime(df['created_at']) | |
| df['demand_indicators'] = df['demand_indicators'].apply(json.loads) | |
| df['supply_gaps'] = df['supply_gaps'].apply(json.loads) | |
| df['related_keywords'] = df['related_keywords'].apply(json.loads) | |
| return df | |
| def create_trend_timeline(self, keywords: List[str], save_path: str = None) -> go.Figure: | |
| """Create interactive timeline of trend data""" | |
| df = self.load_trend_data(keywords) | |
| if df.empty: | |
| return go.Figure().add_annotation(text="No data available", | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, showarrow=False) | |
| fig = make_subplots( | |
| rows=2, cols=1, | |
| subplot_titles=('Volume Trends', 'Sentiment Trends'), | |
| vertical_spacing=0.1 | |
| ) | |
| # Volume trends by keyword | |
| for keyword in keywords: | |
| keyword_data = df[df['keyword'] == keyword] | |
| if not keyword_data.empty: | |
| daily_volume = keyword_data.groupby('date')['volume'].sum().reset_index() | |
| fig.add_trace( | |
| go.Scatter( | |
| x=daily_volume['date'], | |
| y=daily_volume['volume'], | |
| mode='lines+markers', | |
| name=f'{keyword} Volume', | |
| line=dict(width=3) | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Sentiment trends by keyword | |
| for keyword in keywords: | |
| keyword_data = df[df['keyword'] == keyword] | |
| if not keyword_data.empty: | |
| daily_sentiment = keyword_data.groupby('date')['sentiment'].mean().reset_index() | |
| fig.add_trace( | |
| go.Scatter( | |
| x=daily_sentiment['date'], | |
| y=daily_sentiment['sentiment'], | |
| mode='lines+markers', | |
| name=f'{keyword} Sentiment', | |
| line=dict(width=3, dash='dash') | |
| ), | |
| row=2, col=1 | |
| ) | |
| fig.update_layout( | |
| title='Trend Analysis Timeline', | |
| height=600, | |
| showlegend=True, | |
| template='plotly_white' | |
| ) | |
| fig.update_xaxes(title_text="Date", row=2, col=1) | |
| fig.update_yaxes(title_text="Volume", row=1, col=1) | |
| fig.update_yaxes(title_text="Sentiment", row=2, col=1) | |
| if save_path: | |
| fig.write_html(save_path) | |
| return fig | |
| def create_source_distribution(self, keywords: List[str], save_path: str = None) -> go.Figure: | |
| """Create pie chart of data sources distribution""" | |
| df = self.load_trend_data(keywords) | |
| if df.empty: | |
| return go.Figure().add_annotation(text="No data available") | |
| source_counts = df['source'].value_counts() | |
| fig = go.Figure(data=[ | |
| go.Pie( | |
| labels=source_counts.index, | |
| values=source_counts.values, | |
| hole=.3, | |
| textinfo='label+percent', | |
| textposition='outside' | |
| ) | |
| ]) | |
| fig.update_layout( | |
| title='Data Sources Distribution', | |
| template='plotly_white', | |
| annotations=[dict(text='Sources', x=0.5, y=0.5, font_size=20, showarrow=False)] | |
| ) | |
| if save_path: | |
| fig.write_html(save_path) | |
| return fig | |
| def create_sentiment_heatmap(self, keywords: List[str], save_path: str = None): | |
| """Create sentiment heatmap across sources and keywords""" | |
| df = self.load_trend_data(keywords) | |
| if df.empty: | |
| print("No data available for heatmap") | |
| return | |
| # Create pivot table for heatmap | |
| sentiment_pivot = df.groupby(['keyword', 'source'])['sentiment'].mean().unstack(fill_value=0) | |
| plt.figure(figsize=(12, 8)) | |
| sns.heatmap(sentiment_pivot, annot=True, cmap='RdYlGn', center=0, | |
| fmt='.2f', cbar_kws={'label': 'Average Sentiment'}) | |
| plt.title('Sentiment Heatmap: Keywords vs Sources') | |
| plt.xlabel('Data Source') | |
| plt.ylabel('Keyword') | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') | |
| plt.show() | |
| def create_market_gap_visualization(self, save_path: str = None) -> go.Figure: | |
| """Visualize market gaps and opportunities""" | |
| df = self.load_market_gaps() | |
| if df.empty: | |
| return go.Figure().add_annotation(text="No market gaps data available") | |
| # Create bubble chart | |
| fig = go.Figure() | |
| # Extract confidence levels and evidence scores | |
| x_vals = df['confidence_level'] | |
| y_vals = df['evidence_score'] | |
| sizes = [len(gaps) * 10 + 20 for gaps in df['supply_gaps']] # Size based on number of supply gaps | |
| texts = [desc[:50] + "..." if len(desc) > 50 else desc for desc in df['gap_description']] | |
| fig.add_trace(go.Scatter( | |
| x=x_vals, | |
| y=y_vals, | |
| mode='markers', | |
| marker=dict( | |
| size=sizes, | |
| color=y_vals, | |
| colorscale='Viridis', | |
| showscale=True, | |
| colorbar=dict(title="Evidence Score"), | |
| line=dict(width=2, color='white') | |
| ), | |
| text=texts, | |
| textposition="middle center", | |
| hovertemplate='<b>%{text}</b><br>' + | |
| 'Confidence: %{x:.2f}<br>' + | |
| 'Evidence Score: %{y:.2f}<br>' + | |
| '<extra></extra>' | |
| )) | |
| fig.update_layout( | |
| title='Market Gap Opportunities', | |
| xaxis_title='Confidence Level', | |
| yaxis_title='Evidence Score', | |
| template='plotly_white', | |
| height=600 | |
| ) | |
| if save_path: | |
| fig.write_html(save_path) | |
| return fig | |
| def create_keyword_network(self, keywords: List[str], save_path: str = None): | |
| """Create network graph of keyword relationships""" | |
| df = self.load_trend_data(keywords) | |
| if df.empty: | |
| print("No data available for network") | |
| return | |
| # Create network graph | |
| G = nx.Graph() | |
| # Add nodes for each keyword | |
| for keyword in keywords: | |
| keyword_data = df[df['keyword'] == keyword] | |
| volume = keyword_data['volume'].sum() if not keyword_data.empty else 0 | |
| sentiment = keyword_data['sentiment'].mean() if not keyword_data.empty else 0 | |
| G.add_node(keyword, volume=volume, sentiment=sentiment) | |
| # Add edges based on co-occurrence in time windows | |
| for i, keyword1 in enumerate(keywords): | |
| for keyword2 in keywords[i+1:]: | |
| # Find temporal co-occurrence | |
| data1 = df[df['keyword'] == keyword1] | |
| data2 = df[df['keyword'] == keyword2] | |
| if not data1.empty and not data2.empty: | |
| # Simple co-occurrence based on overlapping time periods | |
| dates1 = set(data1['date']) | |
| dates2 = set(data2['date']) | |
| overlap = len(dates1.intersection(dates2)) | |
| if overlap > 0: | |
| weight = overlap / max(len(dates1), len(dates2)) | |
| G.add_edge(keyword1, keyword2, weight=weight) | |
| # Create visualization | |
| plt.figure(figsize=(14, 10)) | |
| pos = nx.spring_layout(G, k=3, iterations=50) | |
| # Node sizes based on volume | |
| node_sizes = [G.nodes[node]['volume'] * 0.1 + 300 for node in G.nodes()] | |
| # Node colors based on sentiment | |
| node_colors = [G.nodes[node]['sentiment'] for node in G.nodes()] | |
| # Draw network | |
| nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, | |
| cmap='RdYlGn', alpha=0.8) | |
| nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold') | |
| # Draw edges with varying thickness based on weight | |
| edges = G.edges() | |
| weights = [G[u][v]['weight'] * 5 for u, v in edges] | |
| nx.draw_networkx_edges(G, pos, width=weights, alpha=0.6, edge_color='gray') | |
| plt.title('Keyword Relationship Network') | |
| plt.axis('off') | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') | |
| plt.show() | |
| def create_wordcloud(self, keywords: List[str], save_path: str = None): | |
| """Create word cloud from trend metadata""" | |
| df = self.load_trend_data(keywords) | |
| if df.empty: | |
| print("No data available for word cloud") | |
| return | |
| # Extract text from metadata | |
| text_data = [] | |
| for metadata in df['metadata']: | |
| if isinstance(metadata, dict): | |
| for key, value in metadata.items(): | |
| if isinstance(value, str) and len(value) > 3: | |
| text_data.append(value) | |
| if not text_data: | |
| print("No text data found in metadata") | |
| return | |
| # Combine all text | |
| combined_text = ' '.join(text_data) | |
| # Create word cloud | |
| wordcloud = WordCloud( | |
| width=800, height=400, | |
| background_color='white', | |
| max_words=100, | |
| colormap='viridis', | |
| relative_scaling=0.5 | |
| ).generate(combined_text) | |
| plt.figure(figsize=(12, 6)) | |
| plt.imshow(wordcloud, interpolation='bilinear') | |
| plt.axis('off') | |
| plt.title('Trending Topics Word Cloud') | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') | |
| plt.show() | |
| def generate_comprehensive_report(self, keywords: List[str], output_dir: str = "reports"): | |
| """Generate comprehensive analysis report with all visualizations""" | |
| import os | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| print(f"Generating comprehensive report for keywords: {keywords}") | |
| # Load data for summary statistics | |
| df = self.load_trend_data(keywords) | |
| gaps_df = self.load_market_gaps() | |
| # Generate all visualizations | |
| print("Creating trend timeline...") | |
| timeline_fig = self.create_trend_timeline(keywords, | |
| f"{output_dir}/timeline_{timestamp}.html") | |
| print("Creating source distribution...") | |
| source_fig = self.create_source_distribution(keywords, | |
| f"{output_dir}/sources_{timestamp}.html") | |
| print("Creating sentiment heatmap...") | |
| self.create_sentiment_heatmap(keywords, | |
| f"{output_dir}/sentiment_heatmap_{timestamp}.png") | |
| print("Creating market gap visualization...") | |
| gap_fig = self.create_market_gap_visualization(f"{output_dir}/market_gaps_{timestamp}.html") | |
| print("Creating keyword network...") | |
| self.create_keyword_network(keywords, | |
| f"{output_dir}/keyword_network_{timestamp}.png") | |
| print("Creating word cloud...") | |
| self.create_wordcloud(keywords, | |
| f"{output_dir}/wordcloud_{timestamp}.png") | |
| # Generate summary statistics | |
| summary_stats = self.generate_summary_statistics(df, gaps_df, keywords) | |
| # Create HTML report | |
| html_report = self.create_html_report(summary_stats, keywords, timestamp) | |
| report_path = f"{output_dir}/comprehensive_report_{timestamp}.html" | |
| with open(report_path, 'w') as f: | |
| f.write(html_report) | |
| print(f"\nComprehensive report generated successfully!") | |
| print(f"Main report: {report_path}") | |
| print(f"All files saved in: {output_dir}/") | |
| return report_path | |
| def generate_summary_statistics(self, df: pd.DataFrame, gaps_df: pd.DataFrame, | |
| keywords: List[str]) -> Dict: | |
| """Generate summary statistics for the report""" | |
| if df.empty: | |
| return { | |
| 'total_data_points': 0, | |
| 'date_range': 'No data', | |
| 'keywords_analyzed': len(keywords), | |
| 'sources_used': 0, | |
| 'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0 | |
| } | |
| stats = { | |
| 'total_data_points': len(df), | |
| 'date_range': f"{df['timestamp'].min().strftime('%Y-%m-%d')} to {df['timestamp'].max().strftime('%Y-%m-%d')}", | |
| 'keywords_analyzed': len(keywords), | |
| 'sources_used': df['source'].nunique(), | |
| 'avg_sentiment': df['sentiment'].mean(), | |
| 'sentiment_std': df['sentiment'].std(), | |
| 'total_volume': df['volume'].sum(), | |
| 'avg_volume': df['volume'].mean(), | |
| 'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0, | |
| 'top_volume_keyword': df.groupby('keyword')['volume'].sum().idxmax() if len(df) > 0 else 'N/A', | |
| 'most_positive_keyword': df.groupby('keyword')['sentiment'].mean().idxmax() if len(df) > 0 else 'N/A', | |
| 'source_breakdown': df['source'].value_counts().to_dict() | |
| } | |
| return stats | |
| def create_html_report(self, stats: Dict, keywords: List[str], timestamp: str) -> str: | |
| """Create HTML report with embedded visualizations""" | |
| html_template = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Market Trend Analysis Report</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }} | |
| .container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 30px; border-radius: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }} | |
| h1 {{ color: #2c3e50; text-align: center; border-bottom: 3px solid #3498db; padding-bottom: 10px; }} | |
| h2 {{ color: #34495e; margin-top: 30px; }} | |
| .stats-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; margin: 20px 0; }} | |
| .stat-card {{ background-color: #ecf0f1; padding: 20px; border-radius: 8px; text-align: center; }} | |
| .stat-number {{ font-size: 2em; font-weight: bold; color: #3498db; }} | |
| .stat-label {{ color: #7f8c8d; margin-top: 5px; }} | |
| .keyword-list {{ background-color: #f8f9fa; padding: 15px; border-radius: 5px; }} | |
| .visualization {{ margin: 20px 0; text-align: center; }} | |
| .chart-link {{ display: inline-block; padding: 10px 20px; background-color: #3498db; color: white; text-decoration: none; border-radius: 5px; margin: 5px; }} | |
| .chart-link:hover {{ background-color: #2980b9; }} | |
| .summary {{ background-color: #e8f6f3; padding: 20px; border-radius: 8px; margin: 20px 0; }} | |
| table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }} | |
| th, td {{ padding: 12px; text-align: left; border-bottom: 1px solid #ddd; }} | |
| th {{ background-color: #f2f2f2; font-weight: bold; }} | |
| .insights {{ background-color: #fff3cd; padding: 15px; border-left: 5px solid #ffc107; margin: 15px 0; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>Market Trend Analysis Report</h1> | |
| <p style="text-align: center; color: #7f8c8d;">Generated on {timestamp}</p> | |
| <div class="summary"> | |
| <h2>Executive Summary</h2> | |
| <p>This report analyzes market trends and identifies opportunities for the following keywords: | |
| <strong>{', '.join(keywords)}</strong>. The analysis covers data from multiple sources including | |
| social media, news, GitHub repositories, and search trends to provide comprehensive market insights.</p> | |
| </div> | |
| <div class="stats-grid"> | |
| <div class="stat-card"> | |
| <div class="stat-number">{stats.get('total_data_points', 0):,}</div> | |
| <div class="stat-label">Total Data Points</div> | |
| </div> | |
| <div class="stat-card"> | |
| <div class="stat-number">{stats.get('keywords_analyzed', 0)}</div> | |
| <div class="stat-label">Keywords Analyzed</div> | |
| </div> | |
| <div class="stat-card"> | |
| <div class="stat-number">{stats.get('sources_used', 0)}</div> | |
| <div class="stat-label">Data Sources</div> | |
| </div> | |
| <div class="stat-card"> | |
| <div class="stat-number">{stats.get('market_gaps_found', 0)}</div> | |
| <div class="stat-label">Market Gaps Found</div> | |
| </div> | |
| </div> | |
| <h2>Key Metrics</h2> | |
| <table> | |
| <tr><th>Metric</th><th>Value</th></tr> | |
| <tr><td>Analysis Period</td><td>{stats.get('date_range', 'N/A')}</td></tr> | |
| <tr><td>Average Sentiment</td><td>{stats.get('avg_sentiment', 0):.3f}</td></tr> | |
| <tr><td>Sentiment Volatility</td><td>{stats.get('sentiment_std', 0):.3f}</td></tr> | |
| <tr><td>Total Volume</td><td>{stats.get('total_volume', 0):,}</td></tr> | |
| <tr><td>Average Volume</td><td>{stats.get('avg_volume', 0):,.1f}</td></tr> | |
| <tr><td>Top Volume Keyword</td><td>{stats.get('top_volume_keyword', 'N/A')}</td></tr> | |
| <tr><td>Most Positive Keyword</td><td>{stats.get('most_positive_keyword', 'N/A')}</td></tr> | |
| </table> | |
| <h2>Data Source Breakdown</h2> | |
| <table> | |
| <tr><th>Source</th><th>Data Points</th><th>Percentage</th></tr> | |
| {''.join([f"<tr><td>{source}</td><td>{count:,}</td><td>{count/stats.get('total_data_points', 1)*100:.1f}%</td></tr>" | |
| for source, count in stats.get('source_breakdown', {}).items()])} | |
| </table> | |
| <div class="insights"> | |
| <h3>Key Insights</h3> | |
| <ul> | |
| <li><strong>Market Activity:</strong> Analyzed {stats.get('total_data_points', 0):,} data points across {stats.get('sources_used', 0)} different sources</li> | |
| <li><strong>Sentiment Analysis:</strong> Overall market sentiment is {'positive' if stats.get('avg_sentiment', 0) > 0.1 else 'negative' if stats.get('avg_sentiment', 0) < -0.1 else 'neutral'} ({stats.get('avg_sentiment', 0):.3f})</li> | |
| <li><strong>Top Performer:</strong> "{stats.get('top_volume_keyword', 'N/A')}" shows the highest engagement volume</li> | |
| <li><strong>Opportunities:</strong> {stats.get('market_gaps_found', 0)} potential market gaps identified for further investigation</li> | |
| </ul> | |
| </div> | |
| <h2>Interactive Visualizations</h2> | |
| <div class="visualization"> | |
| <p>Click the links below to view interactive charts and detailed visualizations:</p> | |
| <a href="timeline_{timestamp}.html" class="chart-link">📈 Trend Timeline</a> | |
| <a href="sources_{timestamp}.html" class="chart-link">📊 Source Distribution</a> | |
| <a href="market_gaps_{timestamp}.html" class="chart-link">🎯 Market Opportunities</a> | |
| </div> | |
| <h2>Static Visualizations</h2> | |
| <div class="visualization"> | |
| <h3>Sentiment Heatmap</h3> | |
| <img src="sentiment_heatmap_{timestamp}.png" alt="Sentiment Heatmap" style="max-width: 100%; height: auto;"> | |
| <h3>Keyword Relationship Network</h3> | |
| <img src="keyword_network_{timestamp}.png" alt="Keyword Network" style="max-width: 100%; height: auto;"> | |
| <h3>Trending Topics Word Cloud</h3> | |
| <img src="wordcloud_{timestamp}.png" alt="Word Cloud" style="max-width: 100%; height: auto;"> | |
| </div> | |
| <div class="insights"> | |
| <h3>Recommendations</h3> | |
| <ul> | |
| <li><strong>Priority Keywords:</strong> Focus on keywords with high volume but low competition scores</li> | |
| <li><strong>Sentiment Opportunities:</strong> Address negative sentiment areas as potential market gaps</li> | |
| <li><strong>Cross-Platform Strategy:</strong> Leverage insights from multiple data sources for comprehensive market approach</li> | |
| <li><strong>Trend Monitoring:</strong> Continue monitoring identified opportunities for market entry timing</li> | |
| </ul> | |
| </div> | |
| <footer style="margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd; text-align: center; color: #7f8c8d;"> | |
| <p>Report generated by Market Trend Analysis System | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> | |
| </footer> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_template | |
| def create_dashboard_app(db_path: str): | |
| """Create a simple Flask web dashboard (requires Flask installation)""" | |
| dashboard_code = f''' | |
| import flask | |
| from flask import Flask, render_template_string, request, jsonify | |
| import json | |
| from datetime import datetime | |
| app = Flask(__name__) | |
| viz_dashboard = TrendVisualizationDashboard("{db_path}") | |
| @app.route('/') | |
| def home(): | |
| return render_template_string(""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Market Trend Analysis Dashboard</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background-color: #f5f5f5; }} | |
| .container {{ max-width: 1200px; margin: 0 auto; }} | |
| .header {{ background-color: #3498db; color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }} | |
| .form-section {{ background-color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }} | |
| .results {{ background-color: white; padding: 20px; border-radius: 10px; }} | |
| input, button {{ padding: 10px; margin: 5px; }} | |
| button {{ background-color: #3498db; color: white; border: none; cursor: pointer; border-radius: 5px; }} | |
| button:hover {{ background-color: #2980b9; }} | |
| .keyword-input {{ width: 300px; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>Market Trend Analysis Dashboard</h1> | |
| <p>Enter keywords to analyze market trends and identify opportunities</p> | |
| </div> | |
| <div class="form-section"> | |
| <h2>Analyze Keywords</h2> | |
| <form method="POST" action="/analyze"> | |
| <input type="text" name="keywords" class="keyword-input" | |
| placeholder="Enter keywords separated by commas" required> | |
| <button type="submit">Analyze Trends</button> | |
| </form> | |
| </div> | |
| <div class="form-section"> | |
| <h2>Generate Report</h2> | |
| <form method="POST" action="/report"> | |
| <input type="text" name="keywords" class="keyword-input" | |
| placeholder="Enter keywords for comprehensive report" required> | |
| <button type="submit">Generate Report</button> | |
| </form> | |
| </div> | |
| <div class="results" id="results" style="display: none;"> | |
| <!-- Results will be displayed here --> | |
| </div> | |
| </div> | |
| <script> | |
| function showResults(data) {{ | |
| const resultsDiv = document.getElementById('results'); | |
| resultsDiv.innerHTML = '<h2>Analysis Results</h2><pre>' + JSON.stringify(data, null, 2) + '</pre>'; | |
| resultsDiv.style.display = 'block'; | |
| }} | |
| </script> | |
| </body> | |
| </html> | |
| """) | |
| @app.route('/analyze', methods=['POST']) | |
| def analyze(): | |
| keywords = [kw.strip() for kw in request.form['keywords'].split(',')] | |
| try: | |
| # Quick analysis | |
| df = viz_dashboard.load_trend_data(keywords) | |
| if not df.empty: | |
| summary = {{ | |
| 'keywords': keywords, | |
| 'total_data_points': len(df), | |
| 'sources': df['source'].unique().tolist(), | |
| 'avg_sentiment': df['sentiment'].mean(), | |
| 'date_range': f"{{df['timestamp'].min()}} to {{df['timestamp'].max()}}" | |
| }} | |
| else: | |
| summary = {{'message': 'No data found for specified keywords'}} | |
| return jsonify(summary) | |
| except Exception as e: | |
| return jsonify({{'error': str(e)}}) | |
| @app.route('/report', methods=['POST']) | |
| def generate_report(): | |
| keywords = [kw.strip() for kw in request.form['keywords'].split(',')] | |
| try: | |
| report_path = viz_dashboard.generate_comprehensive_report(keywords) | |
| return jsonify({{'message': f'Report generated successfully: {{report_path}}'}}) | |
| except Exception as e: | |
| return jsonify({{'error': str(e)}}) | |
| if __name__ == '__main__': | |
| print("Starting Market Trend Analysis Dashboard...") | |
| print("Visit http://localhost:5000 to access the dashboard") | |
| app.run(debug=True, host='0.0.0.0', port=5000) | |
| ''' | |
| return dashboard_code | |
| # Example usage and testing functions | |
| def test_visualizations(): | |
| """Test all visualization functions""" | |
| # Initialize dashboard | |
| dashboard = TrendVisualizationDashboard("market_trends.db") | |
| # Test keywords | |
| test_keywords = ["sustainable packaging", "remote work tools", "AI productivity"] | |
| print("Testing visualizations...") | |
| try: | |
| # Test timeline | |
| print("1. Creating timeline visualization...") | |
| timeline_fig = dashboard.create_trend_timeline(test_keywords) | |
| timeline_fig.show() | |
| # Test source distribution | |
| print("2. Creating source distribution...") | |
| source_fig = dashboard.create_source_distribution(test_keywords) | |
| source_fig.show() | |
| # Test sentiment heatmap | |
| print("3. Creating sentiment heatmap...") | |
| dashboard.create_sentiment_heatmap(test_keywords) | |
| # Test market gaps | |
| print("4. Creating market gap visualization...") | |
| gap_fig = dashboard.create_market_gap_visualization() | |
| gap_fig.show() | |
| # Test network | |
| print("5. Creating keyword network...") | |
| dashboard.create_keyword_network(test_keywords) | |
| # Test word cloud | |
| print("6. Creating word cloud...") | |
| dashboard.create_wordcloud(test_keywords) | |
| print("All visualizations tested successfully!") | |
| except Exception as e: | |
| print(f"Error in visualization testing: {e}") | |
| def export_dashboard_code(filename: str = "dashboard_app.py"): | |
| """Export the dashboard application code to a file""" | |
| dashboard_code = create_dashboard_app("market_trends.db") | |
| with open(filename, 'w') as f: | |
| f.write("# Market Trend Analysis Dashboard\n") | |
| f.write("# Run this file to start the web dashboard\n\n") | |
| f.write("from visualization_reporting import TrendVisualizationDashboard\n") | |
| f.write(dashboard_code) | |
| print(f"Dashboard application code exported to {filename}") | |
| print("Install Flask with: pip install flask") | |
| print(f"Run the dashboard with: python {filename}") | |
| if __name__ == "__main__": | |
| # Run tests | |
| test_visualizations() | |
| # Export dashboard | |
| export_dashboard_code() | |
| #!/usr/bin/env python3 | |
| """ | |
| Main Execution Script for Market Gap Analysis System | |
| This script orchestrates the complete pipeline: | |
| 1. Keyword expansion and market segmentation | |
| 2. Multi-source trend data scraping | |
| 3. Market gap analysis and correlation | |
| 4. Visualization and reporting | |
| Usage: | |
| python main_execution.py --keywords "keyword1,keyword2,keyword3" --days 30 --output results/ | |
| python main_execution.py --config config.json | |
| python main_execution.py --interactive | |
| """ | |
| import asyncio | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional | |
| import logging | |
| # Import our custom modules | |
| from market_gap_backend import main_analysis_pipeline, TrendScraper, MarketGapAnalyzer | |
| from keyword_expansion import comprehensive_keyword_analysis, KeywordExpansionEngine | |
| from visualization_reporting import TrendVisualizationDashboard | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('market_analysis.log'), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class MarketAnalysisOrchestrator: | |
| """Main orchestrator class for the complete market analysis pipeline""" | |
| def __init__(self, config: Dict = None): | |
| self.config = config or self.get_default_config() | |
| self.db_path = self.config.get('db_path', 'market_trends.db') | |
| self.output_dir = self.config.get('output_dir', 'results') | |
| # Initialize components | |
| self.scraper = TrendScraper() | |
| self.analyzer = MarketGapAnalyzer(self.db_path) | |
| self.keyword_engine = KeywordExpansionEngine(self.db_path) | |
| self.viz_dashboard = TrendVisualizationDashboard(self.db_path) | |
| # Create output directory | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| @staticmethod | |
| def get_default_config() -> Dict: | |
| """Get default configuration""" | |
| return { | |
| 'db_path': 'market_trends.db', | |
| 'output_dir': 'results', | |
| 'analysis_days': 30, | |
| 'max_keywords': 50, | |
| 'enable_keyword_expansion': True, | |
| 'enable_visualizations': True, | |
| 'generate_report': True, | |
| 'api_keys': { | |
| 'newsapi': 'YOUR_NEWSAPI_KEY', | |
| 'twitter_bearer': 'YOUR_TWITTER_BEARER_TOKEN' | |
| }, | |
| 'scraping_sources': { | |
| 'reddit': True, | |
| 'github': True, | |
| 'google_trends': True, | |
| 'news': False, # Requires API key | |
| 'twitter': False # Requires API key | |
| } | |
| } | |
| def validate_config(self) -> bool: | |
| """Validate configuration settings""" | |
| required_fields = ['db_path', 'output_dir', 'analysis_days'] | |
| for field in required_fields: | |
| if field not in self.config: | |
| logger.error(f"Missing required config field: {field}") | |
| return False | |
| if self.config['analysis_days'] < 1 or self.config['analysis_days'] > 365: | |
| logger.error("analysis_days must be between 1 and 365") | |
| return False | |
| return True | |
| async def expand_keywords(self, base_keywords: List[str]) -> List[str]: | |
| """Expand base keywords using various techniques""" | |
| if not self.config.get('enable_keyword_expansion', True): | |
| return base_keywords | |
| logger.info(f"Expanding {len(base_keywords)} base keywords...") | |
| try: | |
| # Run comprehensive keyword analysis | |
| results = await comprehensive_keyword_analysis(base_keywords, self.db_path) | |
| # Extract all expanded keywords | |
| all_keywords = set(base_keywords) | |
| for keyword, expansions in results['expanded_keywords'].items(): | |
| all_keywords.update(expansions) | |
| # Apply keyword filtering and ranking | |
| filtered_keywords = self.filter_and_rank_keywords( | |
| list(all_keywords), | |
| results.get('keyword_difficulty', {}) | |
| ) | |
| max_keywords = self.config.get('max_keywords', 50) | |
| final_keywords = filtered_keywords[:max_keywords] | |
| logger.info(f"Expanded to {len(final_keywords)} keywords after filtering") | |
| # Save keyword expansion results | |
| expansion_file = os.path.join(self.output_dir, 'keyword_expansion_results.json') | |
| with open(expansion_file, 'w') as f: | |
| json.dump(results, f, indent=2, default=str) | |
| return final_keywords | |
| except Exception as e: | |
| logger.error(f"Error in keyword expansion: {e}") | |
| return base_keywords | |
| def filter_and_rank_keywords(self, keywords: List[str], difficulty_scores: Dict) -> List[str]: | |
| """Filter and rank keywords based on opportunity scores""" | |
| scored_keywords = [] | |
| for keyword in keywords: | |
| scores = difficulty_scores.get(keyword, {}) | |
| opportunity = scores.get('opportunity_score', 0.5) | |
| competition = scores.get('competition_score', 0.5) | |
| data_availability = scores.get('data_availability', 0.1) | |
| # Calculate combined score | |
| combined_score = (opportunity * 0.4 + | |
| (1 - competition) * 0.4 + | |
| data_availability * 0.2) | |
| scored_keywords.append((keyword, combined_score)) | |
| # Sort by score and return keywords | |
| scored_keywords.sort(key=lambda x: x[1], reverse=True) | |
| return [kw for kw, score in scored_keywords] | |
| async def run_trend_analysis(self, keywords: List[str]) -> Dict: | |
| """Run the main trend analysis pipeline""" | |
| logger.info(f"Running trend analysis for {len(keywords)} keywords...") | |
| try: | |
| # Run the main analysis pipeline | |
| results = await main_analysis_pipeline(keywords) | |
| # Save results | |
| results_file = os.path.join(self.output_dir, 'trend_analysis_results.json') | |
| with open(results_file, 'w') as f: | |
| json.dump(results, f, indent=2, default=str) | |
| logger.info("Trend analysis completed successfully") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in trend analysis: {e}") | |
| return {} | |
| def generate_visualizations(self, keywords: List[str]) -> str: | |
| """Generate all visualizations and reports""" | |
| if not self.config.get('enable_visualizations', True): | |
| logger.info("Visualizations disabled in config") | |
| return "" | |
| logger.info("Generating visualizations and reports...") | |
| try: | |
| # Generate comprehensive report with all visualizations | |
| report_path = self.viz_dashboard.generate_comprehensive_report( | |
| keywords, self.output_dir | |
| ) | |
| logger.info(f"Visualizations and report generated: {report_path}") | |
| return report_path | |
| except Exception as e: | |
| logger.error(f"Error generating visualizations: {e}") | |
| return "" | |
| def calculate_roi_estimates(self, market_gaps: List, keywords: List[str]) -> Dict: | |
| """Calculate rough ROI estimates for identified opportunities""" | |
| roi_estimates = {} | |
| for gap in market_gaps: | |
| # Simple ROI calculation based on evidence score and market indicators | |
| evidence_score = gap.get('evidence_score', 0) | |
| confidence = gap.get('confidence_level', 0) | |
| # Estimate market size based on trend data | |
| estimated_market_size = self.estimate_market_size(gap.get('related_keywords', [])) | |
| # Calculate potential ROI (simplified) | |
| potential_roi = { | |
| 'opportunity_score': evidence_score, | |
| 'confidence_level': confidence, | |
| 'estimated_market_size': estimated_market_size, | |
| 'risk_level': 'Low' if confidence > 0.7 else 'Medium' if confidence > 0.4 else 'High', | |
| 'time_to_market': self.estimate_time_to_market(gap), | |
| 'investment_category': self.categorize_investment_level(gap) | |
| } | |
| roi_estimates[gap.get('gap_description', 'Unknown')] = potential_roi | |
| return roi_estimates | |
| def estimate_market_size(self, related_keywords: List[str]) -> str: | |
| """Estimate market size category based on keyword data""" | |
| # Simple heuristic based on number of related keywords and their volume | |
| if len(related_keywords) > 10: | |
| return "Large" | |
| elif len(related_keywords) > 5: | |
| return "Medium" | |
| else: | |
| return "Small" | |
| def estimate_time_to_market(self, gap: Dict) -> str: | |
| """Estimate time to market based on gap characteristics""" | |
| supply_gaps = gap.get('supply_gaps', []) | |
| if any('dissatisfaction' in sg.lower() for sg in supply_gaps): | |
| return "3-6 months" # Improving existing solutions | |
| elif any('limited' in sg.lower() for sg in supply_gaps): | |
| return "6-12 months" # Creating new solutions | |
| else: | |
| return "12+ months" # New market creation | |
| def categorize_investment_level(self, gap: Dict) -> str: | |
| """Categorize required investment level""" | |
| evidence_score = gap.get('evidence_score', 0) | |
| if evidence_score > 0.7: | |
| return "High - Strong market validation" | |
| elif evidence_score > 0.4: | |
| return "Medium - Moderate validation needed" | |
| else: | |
| return "Low - High validation required" | |
| def generate_executive_summary(self, results: Dict, keywords: List[str], | |
| roi_estimates: Dict) -> str: | |
| """Generate executive summary of findings""" | |
| summary = f""" | |
| EXECUTIVE SUMMARY - Market Gap Analysis | |
| Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| ANALYSIS SCOPE: | |
| - Keywords Analyzed: {len(keywords)} | |
| - Data Points Collected: {results.get('total_data_points', 0):,} | |
| - Market Gaps Identified: {results.get('market_gaps_identified', 0)} | |
| - Analysis Period: {self.config.get('analysis_days', 30)} days | |
| TOP OPPORTUNITIES: | |
| """ | |
| top_opportunities = results.get('top_opportunities', [])[:5] | |
| for i, opp in enumerate(top_opportunities, 1): | |
| summary += f""" | |
| {i}. {opp.get('description', 'Unknown')} | |
| Evidence Score: {opp.get('evidence_score', 0):.3f}/1.0 | |
| Confidence: {opp.get('confidence', 0):.3f}/1.0 | |
| Key Indicators: {', '.join(opp.get('key_indicators', [])[:2])} | |
| """ | |
| summary += f""" | |
| KEY INSIGHTS: | |
| - {len([k for k in keywords if any('app' in k.lower() for k in [k])])} app-related opportunities identified | |
| - Average market sentiment: {'Positive' if results.get('avg_sentiment', 0) > 0 else 'Negative'} | |
| - Cross-platform correlation strength: {'High' if len(results.get('cross_platform_correlations', {})) > 0 else 'Low'} | |
| INVESTMENT RECOMMENDATIONS: | |
| """ | |
| high_roi = [k for k, v in roi_estimates.items() if v.get('opportunity_score', 0) > 0.6] | |
| summary += f"- {len(high_roi)} high-opportunity investments identified\n" | |
| summary += f"- Risk distribution: {len([v for v in roi_estimates.values() if v.get('risk_level') == 'Low'])} low-risk, " | |
| summary += f"{len([v for v in roi_estimates.values() if v.get('risk_level') == 'Medium'])} medium-risk opportunities\n" | |
| return summary | |
| async def run_complete_analysis(self, base_keywords: List[str]) -> Dict: | |
| """Run the complete analysis pipeline""" | |
| start_time = datetime.now() | |
| logger.info(f"Starting complete market analysis pipeline at {start_time}") | |
| try: | |
| # Step 1: Expand keywords | |
| expanded_keywords = await self.expand_keywords(base_keywords) | |
| # Step 2: Run trend analysis | |
| trend_results = await self.run_trend_analysis(expanded_keywords) | |
| # Step 3: Calculate ROI estimates | |
| market_gaps = trend_results.get('top_opportunities', []) | |
| roi_estimates = self.calculate_roi_estimates(market_gaps, expanded_keywords) | |
| # Step 4: Generate visualizations | |
| report_path = self.generate_visualizations(expanded_keywords) | |
| # Step 5: Generate executive summary | |
| executive_summary = self.generate_executive_summary( | |
| trend_results, expanded_keywords, roi_estimates | |
| ) | |
| # Compile final results | |
| final_results = { | |
| 'analysis_metadata': { | |
| 'start_time': start_time.isoformat(), | |
| 'end_time': datetime.now().isoformat(), | |
| 'base_keywords': base_keywords, | |
| 'expanded_keywords': expanded_keywords, | |
| 'total_keywords_analyzed': len(expanded_keywords), | |
| 'config_used': self.config | |
| }, | |
| 'trend_analysis': trend_results, | |
| 'roi_estimates': roi_estimates, | |
| 'executive_summary': executive_summary, | |
| 'report_path': report_path, | |
| 'output_directory': self.output_dir | |
| } | |
| # Save final results | |
| final_results_path = os.path.join(self.output_dir, 'final_analysis_results.json') | |
| with open(final_results_path, 'w') as f: | |
| json.dump(final_results, f, indent=2, default=str) | |
| # Save executive summary as text file | |
| summary_path = os.path.join(self.output_dir, 'executive_summary.txt') | |
| with open(summary_path, 'w') as f: | |
| f.write(executive_summary) | |
| end_time = datetime.now() | |
| duration = end_time - start_time | |
| logger.info(f"Complete analysis finished in {duration}") | |
| logger.info(f"Results saved to: {self.output_dir}") | |
| logger.info(f"Main report: {report_path}") | |
| return final_results | |
| except Exception as e: | |
| logger.error(f"Error in complete analysis: {e}") | |
| raise | |
| def load_config_file(config_path: str) -> Dict: | |
| """Load configuration from JSON file""" | |
| try: | |
| with open(config_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Error loading config file {config_path}: {e}") | |
| return {} | |
| def interactive_mode(): | |
| """Run analysis in interactive mode""" | |
| print("\n" + "="*60) | |
| print("MARKET GAP ANALYSIS SYSTEM - INTERACTIVE MODE") | |
| print("="*60) | |
| # Get keywords from user | |
| print("\nStep 1: Enter Keywords to Analyze") | |
| keywords_input = input("Enter keywords separated by commas: ").strip() | |
| if not keywords_input: | |
| print("No keywords provided. Exiting.") | |
| return | |
| base_keywords = [kw.strip() for kw in keywords_input.split(',')] | |
| print(f"Keywords to analyze: {base_keywords}") | |
| # Get analysis period | |
| print("\nStep 2: Analysis Period") | |
| try: | |
| days = int(input("Enter number of days to analyze (default: 30): ") or "30") | |
| except ValueError: | |
| days = 30 | |
| # Get output directory | |
| print("\nStep 3: Output Configuration") | |
| output_dir = input("Enter output directory (default: results/): ").strip() or "results/" | |
| # Configure options | |
| print("\nStep 4: Analysis Options") | |
| enable_expansion = input("Enable keyword expansion? (y/n, default: y): ").lower().startswith('y') | |
| enable_viz = input("Generate visualizations? (y/n, default: y): ").lower().startswith('y') | |
| # Create configuration | |
| config = { | |
| 'db_path': 'market_trends.db', | |
| 'output_dir': output_dir, | |
| 'analysis_days': days, | |
| 'max_keywords': 50, | |
| 'enable_keyword_expansion': enable_expansion, | |
| 'enable_visualizations': enable_viz, | |
| 'generate_report': True, | |
| 'scraping_sources': { | |
| 'reddit': True, | |
| 'github': True, | |
| 'google_trends': True, | |
| 'news': False, | |
| 'twitter': False | |
| } | |
| } | |
| print(f"\nConfiguration:") | |
| print(f"- Keywords: {base_keywords}") | |
| print(f"- Analysis period: {days} days") | |
| print(f"- Output directory: {output_dir}") | |
| print(f"- Keyword expansion: {enable_expansion}") | |
| print(f"- Visualizations: {enable_viz}") | |
| confirm = input("\nProceed with analysis? (y/n): ").lower().startswith('y') | |
| if not confirm: | |
| print("Analysis cancelled.") | |
| return | |
| # Run analysis | |
| orchestrator = MarketAnalysisOrchestrator(config) | |
| print("\nStarting analysis...") | |
| try: | |
| results = asyncio.run(orchestrator.run_complete_analysis(base_keywords)) | |
| print("\n" + "="*60) | |
| print("ANALYSIS COMPLETE!") | |
| print("="*60) | |
| print(f"Results saved to: {results['output_directory']}") | |
| print(f"Executive summary saved to: {os.path.join(results['output_directory'], 'executive_summary.txt')}") | |
| if results.get('report_path'): | |
| print(f"Full report: {results['report_path']}") | |
| # Display executive summary | |
| print("\nEXECUTIVE SUMMARY:") | |
| print("-" * 40) | |
| print(results['executive_summary']) | |
| except Exception as e: | |
| print(f"Error during analysis: {e}") | |
| logger.error(f"Interactive mode error: {e}") | |
| def create_sample_config(): | |
| """Create a sample configuration file""" | |
| sample_config = { | |
| "db_path": "market_trends.db", | |
| "output_dir": "results", | |
| "analysis_days": 30, | |
| "max_keywords": 50, | |
| "enable_keyword_expansion": True, | |
| "enable_visualizations": True, | |
| "generate_report": True, | |
| "api_keys": { | |
| "newsapi": "YOUR_NEWSAPI_KEY_HERE", | |
| "twitter_bearer": "YOUR_TWITTER_BEARER_TOKEN_HERE" | |
| }, | |
| "scraping_sources": { | |
| "reddit": True, | |
| "github": True, | |
| "google_trends": True, | |
| "news": False, | |
| "twitter": False | |
| }, | |
| "keyword_filters": { | |
| "min_length": 3, | |
| "max_length": 50, | |
| "exclude_patterns": ["spam", "fake", "scam"], | |
| "include_only": [] | |
| }, | |
| "analysis_settings": { | |
| "sentiment_threshold": 0.1, | |
| "volume_threshold": 10, | |
| "correlation_threshold": 0.3 | |
| } | |
| } | |
| config_path = "config_sample.json" | |
| with open(config_path, 'w') as f: | |
| json.dump(sample_config, f, indent=2) | |
| print(f"Sample configuration created: {config_path}") | |
| print("Edit this file with your API keys and preferences, then run:") | |
| print(f"python main_execution.py --config {config_path}") | |
| def main(): | |
| """Main entry point""" | |
| parser = argparse.ArgumentParser( | |
| description="Market Gap Analysis System", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python main_execution.py --keywords "AI tools,productivity apps" --days 30 | |
| python main_execution.py --config config.json | |
| python main_execution.py --interactive | |
| python main_execution.py --create-config | |
| """ | |
| ) | |
| parser.add_argument('--keywords', type=str, | |
| help='Comma-separated list of keywords to analyze') | |
| parser.add_argument('--days', type=int, default=30, | |
| help='Number of days to analyze (default: 30)') | |
| parser.add_argument('--output', type=str, default='results', | |
| help='Output directory (default: results)') | |
| parser.add_argument('--config', type=str, | |
| help='Path to configuration JSON file') | |
| parser.add_argument('--interactive', action='store_true', | |
| help='Run in interactive mode') | |
| parser.add_argument('--create-config', action='store_true', | |
| help='Create sample configuration file') | |
| parser.add_argument('--max-keywords', type=int, default=50, | |
| help='Maximum keywords to analyze (default: 50)') | |
| parser.add_argument('--no-expansion', action='store_true', | |
| help='Disable keyword expansion') | |
| parser.add_argument('--no-viz', action='store_true', | |
| help='Disable visualizations') | |
| parser.add_argument('--verbose', action='store_true', | |
| help='Enable verbose logging') | |
| args = parser.parse_args() | |
| # Set logging level | |
| if args.verbose: | |
| logging.getLogger().setLevel(logging.DEBUG) | |
| # Create sample config | |
| if args.create_config: | |
| create_sample_config() | |
| return | |
| # Interactive mode | |
| if args.interactive: | |
| interactive_mode() | |
| return | |
| # Load configuration | |
| if args.config: | |
| config = load_config_file(args.config) | |
| if not config: | |
| print(f"Failed to load configuration from {args.config}") | |
| return | |
| else: | |
| # Create config from command line arguments | |
| config = { | |
| 'db_path': 'market_trends.db', | |
| 'output_dir': args.output, | |
| 'analysis_days': args.days, | |
| 'max_keywords': args.max_keywords, | |
| 'enable_keyword_expansion': not args.no_expansion, | |
| 'enable_visualizations': not args.no_viz, | |
| 'generate_report': True, | |
| 'scraping_sources': { | |
| 'reddit': True, | |
| 'github': True, | |
| 'google_trends': True, | |
| 'news': False, | |
| 'twitter': False | |
| } | |
| } | |
| # Get keywords | |
| if args.keywords: | |
| base_keywords = [kw.strip() for kw in args.keywords.split(',')] | |
| elif config.get('base_keywords'): | |
| base_keywords = config['base_keywords'] | |
| else: | |
| print("No keywords provided. Use --keywords, --config, or --interactive mode.") | |
| print("Run with --help for usage information.") | |
| return | |
| # Validate keywords | |
| if not base_keywords: | |
| print("No valid keywords found.") | |
| return | |
| print(f"Starting analysis with keywords: {base_keywords}") | |
| print(f"Configuration: {json.dumps(config, indent=2)}") | |
| # Run analysis | |
| try: | |
| orchestrator = MarketAnalysisOrchestrator(config) | |
| if not orchestrator.validate_config(): | |
| print("Configuration validation failed. Check your settings.") | |
| return | |
| # Run the complete analysis | |
| results = asyncio.run(orchestrator.run_complete_analysis(base_keywords)) | |
| print("\n" + "="*60) | |
| print("ANALYSIS COMPLETED SUCCESSFULLY!") | |
| print("="*60) | |
| print(f"Results directory: {results['output_directory']}") | |
| print(f"Analysis duration: {results['analysis_metadata']['end_time']}") | |
| print(f"Keywords analyzed: {results['analysis_metadata']['total_keywords_analyzed']}") | |
| if results.get('report_path'): | |
| print(f"Main report: {results['report_path']}") | |
| print("\nNext steps:") | |
| print("1. Review the executive summary in executive_summary.txt") | |
| print("2. Open the HTML reports for detailed visualizations") | |
| print("3. Check the JSON files for raw data and analysis results") | |
| except KeyboardInterrupt: | |
| print("\nAnalysis interrupted by user.") | |
| except Exception as e: | |
| print(f"Analysis failed: {e}") | |
| logger.error(f"Main execution error: {e}") | |
| return 1 | |
| return 0 | |
| # Additional utility functions | |
| def quick_analysis(keywords: List[str], days: int = 7) -> Dict: | |
| """Run a quick analysis for immediate insights""" | |
| print(f"Running quick analysis for: {', '.join(keywords)}") | |
| config = { | |
| 'db_path': 'market_trends.db', | |
| 'output_dir': f'quick_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}', | |
| 'analysis_days': days, | |
| 'max_keywords': 20, | |
| 'enable_keyword_expansion': False, | |
| 'enable_visualizations': False, | |
| 'generate_report': False | |
| } | |
| orchestrator = MarketAnalysisOrchestrator(config) | |
| try: | |
| # Run just the trend analysis without expansions | |
| results = asyncio.run(orchestrator.run_trend_analysis(keywords)) | |
| # Quick summary | |
| summary = { | |
| 'keywords': keywords, | |
| 'opportunities_found': len(results.get('top_opportunities', [])), | |
| 'top_opportunity': results.get('top_opportunities', [{}])[0] if results.get('top_opportunities') else None, | |
| 'total_data_points': results.get('total_data_points', 0), | |
| 'analysis_timestamp': results.get('analysis_timestamp', datetime.now().isoformat()) | |
| } | |
| return summary | |
| except Exception as e: | |
| return {'error': str(e), 'keywords': keywords} | |
| def batch_analysis(keyword_batches: List[List[str]], output_base_dir: str = "batch_results") -> List[Dict]: | |
| """Run analysis on multiple keyword batches""" | |
| results = [] | |
| for i, batch in enumerate(keyword_batches): | |
| print(f"Processing batch {i+1}/{len(keyword_batches)}: {batch}") | |
| config = { | |
| 'db_path': 'market_trends.db', | |
| 'output_dir': f'{output_base_dir}/batch_{i+1}', | |
| 'analysis_days': 30, | |
| 'max_keywords': 30, | |
| 'enable_keyword_expansion': True, | |
| 'enable_visualizations': True, | |
| 'generate_report': True | |
| } | |
| orchestrator = MarketAnalysisOrchestrator(config) | |
| try: | |
| batch_results = asyncio.run(orchestrator.run_complete_analysis(batch)) | |
| results.append(batch_results) | |
| print(f"Batch {i+1} completed successfully") | |
| except Exception as e: | |
| print(f"Batch {i+1} failed: {e}") | |
| results.append({'error': str(e), 'batch': batch}) | |
| return results | |
| if __name__ == "__main__": | |
| exit_code = main() | |
| sys.exit(exit_code) | |
| make it functional and beautiful. |