we need a frontend web ui which ties together the backend consisting of our insight scraping algorithms: import asyncio import aiohttp import json import time import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from collections import defaultdict import sqlite3 from textblob import TextBlob import re from urllib.parse import quote_plus import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class TrendData: """Data structure for trend information""" keyword: str source: str volume: int sentiment: float timestamp: datetime metadata: Dict @dataclass class MarketGap: """Data structure for identified market gaps""" gap_description: str evidence_score: float demand_indicators: List[str] supply_gaps: List[str] related_keywords: List[str] confidence_level: float class APIClient: """Generic API client with rate limiting and error handling""" def __init__(self, base_url: str, rate_limit: float = 1.0): self.base_url = base_url self.rate_limit = rate_limit self.last_request = 0 self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def make_request(self, endpoint: str, params: Dict = None, headers: Dict = None): """Make rate-limited API request""" # Rate limiting now = time.time() time_since_last = now - self.last_request if time_since_last < self.rate_limit: await asyncio.sleep(self.rate_limit - time_since_last) try: url = f"{self.base_url}/{endpoint.lstrip('/')}" async with self.session.get(url, params=params, headers=headers) as response: self.last_request = time.time() if response.status == 200: return await response.json() else: logger.warning(f"API request failed: {response.status}") return None except Exception as e: logger.error(f"API request error: {e}") return None class TrendScraper: """Main class for scraping trend data from multiple sources""" def __init__(self): self.db_path = "market_trends.db" self.initialize_database() def initialize_database(self): """Initialize SQLite database for storing trend data""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create trends table cursor.execute(''' CREATE TABLE IF NOT EXISTS trends ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, source TEXT, volume INTEGER, sentiment REAL, timestamp TEXT, metadata TEXT ) ''') # Create market_gaps table cursor.execute(''' CREATE TABLE IF NOT EXISTS market_gaps ( id INTEGER PRIMARY KEY AUTOINCREMENT, gap_description TEXT, evidence_score REAL, demand_indicators TEXT, supply_gaps TEXT, related_keywords TEXT, confidence_level REAL, created_at TEXT ) ''') conn.commit() conn.close() async def scrape_reddit_trends(self, keywords: List[str]) -> List[TrendData]: """Scrape Reddit for trend data using pushshift API""" trends = [] async with APIClient("https://api.pushshift.io/reddit/search", rate_limit=1.0) as client: for keyword in keywords: params = { 'q': keyword, 'subreddit': 'all', 'sort': 'desc', 'sort_type': 'created_utc', 'size': 100, 'after': int((datetime.now() - timedelta(days=30)).timestamp()) } data = await client.make_request("submission", params) if data and 'data' in data: for post in data['data']: sentiment = TextBlob(post.get('title', '') + ' ' + post.get('selftext', '')).sentiment.polarity trends.append(TrendData( keyword=keyword, source='reddit', volume=post.get('score', 0), sentiment=sentiment, timestamp=datetime.fromtimestamp(post.get('created_utc', 0)), metadata={'subreddit': post.get('subreddit'), 'comments': post.get('num_comments', 0)} )) return trends async def scrape_twitter_trends(self, keywords: List[str]) -> List[TrendData]: """Simulate Twitter trend scraping (requires Twitter API credentials)""" # This is a placeholder - you'll need to implement with actual Twitter API trends = [] # Simulated data for demonstration for keyword in keywords: trends.append(TrendData( keyword=keyword, source='twitter', volume=np.random.randint(100, 10000), sentiment=np.random.uniform(-1, 1), timestamp=datetime.now(), metadata={'hashtags': f"#{keyword}", 'retweets': np.random.randint(10, 1000)} )) return trends async def scrape_google_trends(self, keywords: List[str]) -> List[TrendData]: """Scrape Google Trends data (requires pytrends library)""" trends = [] try: from pytrends.request import TrendReq pytrends = TrendReq(hl='en-US', tz=360) for keyword in keywords: pytrends.build_payload([keyword], timeframe='today 3-m') interest_over_time = pytrends.interest_over_time() if not interest_over_time.empty: for date, row in interest_over_time.iterrows(): trends.append(TrendData( keyword=keyword, source='google_trends', volume=int(row[keyword]), sentiment=0.0, # Google Trends doesn't provide sentiment timestamp=date, metadata={'isPartial': row.get('isPartial', False)} )) except ImportError: logger.warning("pytrends not installed. Skipping Google Trends scraping.") return trends async def scrape_github_trends(self, keywords: List[str]) -> List[TrendData]: """Scrape GitHub for repository trends""" trends = [] async with APIClient("https://api.github.com", rate_limit=1.0) as client: headers = {'Accept': 'application/vnd.github.v3+json'} for keyword in keywords: params = { 'q': keyword, 'sort': 'updated', 'order': 'desc', 'per_page': 100 } data = await client.make_request("search/repositories", params, headers) if data and 'items' in data: for repo in data['items']: trends.append(TrendData( keyword=keyword, source='github', volume=repo.get('stargazers_count', 0), sentiment=0.5, # Neutral sentiment for GitHub timestamp=datetime.fromisoformat(repo.get('updated_at', '').replace('Z', '+00:00')), metadata={ 'language': repo.get('language'), 'forks': repo.get('forks_count', 0), 'issues': repo.get('open_issues_count', 0) } )) return trends async def scrape_news_trends(self, keywords: List[str]) -> List[TrendData]: """Scrape news trends using NewsAPI (requires API key)""" trends = [] # Placeholder for NewsAPI integration # You'll need to register for a NewsAPI key at https://newsapi.org/ API_KEY = "YOUR_NEWSAPI_KEY" # Replace with actual key if API_KEY != "YOUR_NEWSAPI_KEY": async with APIClient("https://newsapi.org/v2", rate_limit=1.0) as client: headers = {'X-API-Key': API_KEY} for keyword in keywords: params = { 'q': keyword, 'sortBy': 'popularity', 'pageSize': 100, 'from': (datetime.now() - timedelta(days=30)).isoformat() } data = await client.make_request("everything", params, headers) if data and 'articles' in data: for article in data['articles']: sentiment = TextBlob(article.get('title', '') + ' ' + article.get('description', '')).sentiment.polarity trends.append(TrendData( keyword=keyword, source='news', volume=1, # News articles don't have volume metrics sentiment=sentiment, timestamp=datetime.fromisoformat(article.get('publishedAt', '').replace('Z', '+00:00')), metadata={ 'source': article.get('source', {}).get('name'), 'author': article.get('author') } )) return trends class MarketGapAnalyzer: """Analyze trends to identify market gaps""" def __init__(self, db_path: str): self.db_path = db_path def load_trend_data(self, keywords: List[str]) -> pd.DataFrame: """Load trend data from database""" conn = sqlite3.connect(self.db_path) keyword_filter = "'" + "','".join(keywords) + "'" query = f""" SELECT * FROM trends WHERE keyword IN ({keyword_filter}) ORDER BY timestamp DESC """ df = pd.read_sql_query(query, conn) conn.close() if not df.empty: df['timestamp'] = pd.to_datetime(df['timestamp']) df['metadata'] = df['metadata'].apply(json.loads) return df def calculate_demand_indicators(self, df: pd.DataFrame) -> Dict[str, float]: """Calculate various demand indicators""" indicators = {} # Volume trend analysis df_sorted = df.sort_values('timestamp') if len(df_sorted) > 1: recent_volume = df_sorted.tail(10)['volume'].mean() older_volume = df_sorted.head(10)['volume'].mean() indicators['volume_growth'] = (recent_volume - older_volume) / max(older_volume, 1) # Sentiment analysis indicators['avg_sentiment'] = df['sentiment'].mean() indicators['sentiment_volatility'] = df['sentiment'].std() # Source diversity source_counts = df['source'].value_counts() indicators['source_diversity'] = len(source_counts) indicators['cross_platform_consistency'] = 1 - (source_counts.std() / source_counts.mean()) if source_counts.mean() > 0 else 0 # Temporal patterns df['hour'] = df['timestamp'].dt.hour df['day_of_week'] = df['timestamp'].dt.dayofweek indicators['temporal_concentration'] = df.groupby('hour').size().std() / df.groupby('hour').size().mean() return indicators def identify_supply_gaps(self, keyword: str, df: pd.DataFrame) -> List[str]: """Identify potential supply gaps based on trend analysis""" gaps = [] # Analyze sentiment patterns if df['sentiment'].mean() < -0.2: gaps.append(f"High negative sentiment around {keyword} suggests user dissatisfaction") # Analyze GitHub data for tech gaps github_data = df[df['source'] == 'github'] if not github_data.empty: avg_stars = github_data['volume'].mean() if avg_stars < 100: gaps.append(f"Low GitHub engagement suggests limited open-source solutions for {keyword}") # Analyze Reddit discussions for unmet needs reddit_data = df[df['source'] == 'reddit'] if not reddit_data.empty: high_comment_posts = reddit_data[ reddit_data['metadata'].apply(lambda x: x.get('comments', 0) > 50) ] if len(high_comment_posts) > len(reddit_data) * 0.3: gaps.append(f"High discussion volume on Reddit indicates active community seeking solutions for {keyword}") return gaps def calculate_market_gap_score(self, demand_indicators: Dict[str, float], supply_gaps: List[str]) -> float: """Calculate overall market gap score""" score = 0.0 # Demand components score += max(demand_indicators.get('volume_growth', 0) * 0.3, 0) score += (demand_indicators.get('avg_sentiment', 0) + 1) * 0.1 # Normalize sentiment to 0-2 range score += demand_indicators.get('source_diversity', 0) * 0.1 score += demand_indicators.get('cross_platform_consistency', 0) * 0.2 # Supply gap components score += len(supply_gaps) * 0.1 # Temporal activity (higher concentration might indicate urgent need) temporal_score = demand_indicators.get('temporal_concentration', 0) if temporal_score > 1: # High concentration score += 0.2 return min(score, 1.0) # Cap at 1.0 def analyze_market_gaps(self, keywords: List[str]) -> List[MarketGap]: """Main method to analyze market gaps for given keywords""" df = self.load_trend_data(keywords) gaps = [] for keyword in keywords: keyword_data = df[df['keyword'] == keyword] if keyword_data.empty: continue demand_indicators = self.calculate_demand_indicators(keyword_data) supply_gaps = self.identify_supply_gaps(keyword, keyword_data) # Generate gap description gap_description = f"Market opportunity in {keyword} sector" if demand_indicators.get('volume_growth', 0) > 0.5: gap_description += " with high growth potential" if demand_indicators.get('avg_sentiment', 0) < -0.2: gap_description += " addressing user dissatisfaction" gap_score = self.calculate_market_gap_score(demand_indicators, supply_gaps) # Find related keywords (simple co-occurrence analysis) related_keywords = self.find_related_keywords(keyword, df) gap = MarketGap( gap_description=gap_description, evidence_score=gap_score, demand_indicators=[f"{k}: {v:.3f}" for k, v in demand_indicators.items()], supply_gaps=supply_gaps, related_keywords=related_keywords, confidence_level=min(len(keyword_data) / 100.0, 1.0) # Based on data volume ) gaps.append(gap) return sorted(gaps, key=lambda x: x.evidence_score, reverse=True) def find_related_keywords(self, keyword: str, df: pd.DataFrame) -> List[str]: """Find keywords that often appear together with the target keyword""" related = [] # Simple co-occurrence based on temporal proximity keyword_data = df[df['keyword'] == keyword] for _, row in keyword_data.iterrows(): time_window = timedelta(hours=24) nearby_data = df[ (df['timestamp'] >= row['timestamp'] - time_window) & (df['timestamp'] <= row['timestamp'] + time_window) & (df['keyword'] != keyword) ] related.extend(nearby_data['keyword'].unique()) # Count occurrences and return top related keywords from collections import Counter related_counts = Counter(related) return [k for k, v in related_counts.most_common(5)] def save_market_gaps(self, gaps: List[MarketGap]): """Save identified market gaps to database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for gap in gaps: cursor.execute(''' INSERT INTO market_gaps (gap_description, evidence_score, demand_indicators, supply_gaps, related_keywords, confidence_level, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( gap.gap_description, gap.evidence_score, json.dumps(gap.demand_indicators), json.dumps(gap.supply_gaps), json.dumps(gap.related_keywords), gap.confidence_level, datetime.now().isoformat() )) conn.commit() conn.close() class TrendCorrelationEngine: """Engine for correlating trends across different data sources""" def __init__(self, db_path: str): self.db_path = db_path def calculate_cross_platform_correlation(self, keyword: str) -> Dict[str, float]: """Calculate correlation between different platforms for a keyword""" conn = sqlite3.connect(self.db_path) query = """ SELECT source, DATE(timestamp) as date, AVG(volume) as avg_volume, AVG(sentiment) as avg_sentiment FROM trends WHERE keyword = ? GROUP BY source, date ORDER BY date """ df = pd.read_sql_query(query, conn, params=[keyword]) conn.close() if df.empty: return {} # Pivot data for correlation analysis volume_pivot = df.pivot(index='date', columns='source', values='avg_volume').fillna(0) sentiment_pivot = df.pivot(index='date', columns='source', values='avg_sentiment').fillna(0) correlations = {} # Volume correlations if len(volume_pivot.columns) > 1: vol_corr = volume_pivot.corr() correlations['volume_correlations'] = vol_corr.to_dict() # Sentiment correlations if len(sentiment_pivot.columns) > 1: sent_corr = sentiment_pivot.corr() correlations['sentiment_correlations'] = sent_corr.to_dict() return correlations async def main_analysis_pipeline(keywords: List[str]): """Main pipeline for trend scraping and market gap analysis""" logger.info(f"Starting analysis for keywords: {keywords}") # Initialize components scraper = TrendScraper() analyzer = MarketGapAnalyzer(scraper.db_path) correlation_engine = TrendCorrelationEngine(scraper.db_path) # Step 1: Scrape data from all sources all_trends = [] logger.info("Scraping Reddit trends...") reddit_trends = await scraper.scrape_reddit_trends(keywords) all_trends.extend(reddit_trends) logger.info("Scraping Twitter trends...") twitter_trends = await scraper.scrape_twitter_trends(keywords) all_trends.extend(twitter_trends) logger.info("Scraping Google trends...") google_trends = await scraper.scrape_google_trends(keywords) all_trends.extend(google_trends) logger.info("Scraping GitHub trends...") github_trends = await scraper.scrape_github_trends(keywords) all_trends.extend(github_trends) logger.info("Scraping news trends...") news_trends = await scraper.scrape_news_trends(keywords) all_trends.extend(news_trends) # Step 2: Store trends in database conn = sqlite3.connect(scraper.db_path) cursor = conn.cursor() for trend in all_trends: cursor.execute(''' INSERT INTO trends (keyword, source, volume, sentiment, timestamp, metadata) VALUES (?, ?, ?, ?, ?, ?) ''', ( trend.keyword, trend.source, trend.volume, trend.sentiment, trend.timestamp.isoformat(), json.dumps(trend.metadata) )) conn.commit() conn.close() logger.info(f"Stored {len(all_trends)} trend data points") # Step 3: Analyze market gaps logger.info("Analyzing market gaps...") market_gaps = analyzer.analyze_market_gaps(keywords) # Step 4: Calculate correlations logger.info("Calculating cross-platform correlations...") correlations = {} for keyword in keywords: correlations[keyword] = correlation_engine.calculate_cross_platform_correlation(keyword) # Step 5: Save results analyzer.save_market_gaps(market_gaps) # Step 6: Generate report logger.info("Generating analysis report...") report = { 'analysis_timestamp': datetime.now().isoformat(), 'keywords_analyzed': keywords, 'total_data_points': len(all_trends), 'market_gaps_identified': len(market_gaps), 'top_opportunities': [ { 'description': gap.gap_description, 'evidence_score': gap.evidence_score, 'confidence': gap.confidence_level, 'key_indicators': gap.demand_indicators[:3], # Top 3 indicators 'supply_gaps': gap.supply_gaps } for gap in market_gaps[:5] # Top 5 opportunities ], 'cross_platform_correlations': correlations } # Save report to file with open(f"market_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f: json.dump(report, f, indent=2, default=str) logger.info("Analysis complete. Report saved.") return report # Example usage if __name__ == "__main__": # Example keywords to analyze test_keywords = [ "sustainable packaging", "remote work tools", "mental health apps", "AI productivity", "electric vehicle charging" ] # Run the analysis asyncio.run(main_analysis_pipeline(test_keywords)) import asyncio import aiohttp import json import re from typing import List, Dict, Set import nltk from textblob import TextBlob from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans import numpy as np from collections import defaultdict, Counter import sqlite3 class KeywordExpansionEngine: """Engine for expanding keywords and identifying market segments""" def __init__(self, db_path: str): self.db_path = db_path self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by']) # Download required NLTK data try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt') def extract_keywords_from_text(self, text: str) -> List[str]: """Extract relevant keywords from text using NLP techniques""" if not text: return [] # Clean and tokenize text text = re.sub(r'[^\w\s]', '', text.lower()) blob = TextBlob(text) # Extract noun phrases and significant words keywords = [] # Get noun phrases for phrase in blob.noun_phrases: if len(phrase.split()) <= 3 and phrase not in self.stop_words: keywords.append(phrase) # Get significant single words (nouns, adjectives) words = blob.words for word, tag in blob.tags: if tag.startswith(('NN', 'JJ')) and len(word) > 3 and word not in self.stop_words: keywords.append(word) return list(set(keywords)) def expand_keywords_from_trends(self, base_keyword: str, limit: int = 20) -> List[str]: """Expand keywords based on existing trend data""" conn = sqlite3.connect(self.db_path) # Get related content from database query = """ SELECT metadata FROM trends WHERE keyword = ? OR keyword LIKE ? """ cursor = conn.execute(query, [base_keyword, f'%{base_keyword}%']) results = cursor.fetchall() conn.close() expanded_keywords = set() for row in results: try: metadata = json.loads(row[0]) # Extract keywords from different metadata fields text_fields = [] if isinstance(metadata, dict): for key, value in metadata.items(): if isinstance(value, str) and key in ['title', 'description', 'text', 'content']: text_fields.append(value) # Process text fields for text in text_fields: keywords = self.extract_keywords_from_text(text) expanded_keywords.update(keywords[:5]) # Limit per text except json.JSONDecodeError: continue # Filter and rank keywords relevant_keywords = [] for keyword in expanded_keywords: if (len(keyword.split()) <= 3 and len(keyword) > 2 and keyword != base_keyword and base_keyword.split()[0] in keyword or any(word in keyword for word in base_keyword.split())): relevant_keywords.append(keyword) return relevant_keywords[:limit] async def expand_keywords_with_search_suggestions(self, base_keyword: str) -> List[str]: """Expand keywords using search suggestion APIs""" suggestions = [] # Google Suggest API (no API key required) async with aiohttp.ClientSession() as session: try: url = "https://suggestqueries.google.com/complete/search" params = { 'client': 'firefox', 'q': base_keyword } async with session.get(url, params=params) as response: if response.status == 200: data = await response.text() # Parse the JSONP response if data.startswith('window.google.ac.h('): json_str = data[19:-1] # Remove JSONP wrapper suggestions_data = json.loads(json_str) if len(suggestions_data) > 1: suggestions.extend(suggestions_data[1][:10]) # Get top 10 except Exception as e: print(f"Error fetching Google suggestions: {e}") return suggestions def identify_market_segments(self, keywords: List[str]) -> Dict[str, List[str]]: """Identify market segments by clustering related keywords""" if len(keywords) < 5: return {"main_segment": keywords} # Get trend data for all keywords conn = sqlite3.connect(self.db_path) keyword_texts = {} for keyword in keywords: query = """ SELECT metadata FROM trends WHERE keyword = ? LIMIT 50 """ cursor = conn.execute(query, [keyword]) results = cursor.fetchall() texts = [] for row in results: try: metadata = json.loads(row[0]) if isinstance(metadata, dict): for key, value in metadata.items(): if isinstance(value, str) and len(value) > 10: texts.append(value) except: continue keyword_texts[keyword] = ' '.join(texts) conn.close() # Create text corpus for clustering corpus = [keyword_texts.get(keyword, keyword) for keyword in keywords] if len(corpus) < 3: return {"main_segment": keywords} try: # TF-IDF vectorization vectorizer = TfidfVectorizer(max_features=100, stop_words='english') tfidf_matrix = vectorizer.fit_transform(corpus) # K-means clustering n_clusters = min(max(2, len(keywords) // 3), 5) # 2-5 clusters kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) clusters = kmeans.fit_predict(tfidf_matrix) # Group keywords by cluster segments = defaultdict(list) for i, keyword in enumerate(keywords): cluster_id = clusters[i] segments[f"segment_{cluster_id}"].append(keyword) # Name segments based on common terms named_segments = {} feature_names = vectorizer.get_feature_names_out() for cluster_id in range(n_clusters): cluster_center = kmeans.cluster_centers_[cluster_id] top_indices = cluster_center.argsort()[-3:][::-1] # Top 3 features top_terms = [feature_names[i] for i in top_indices] segment_name = f"{top_terms[0]}_segment" segment_keywords = segments[f"segment_{cluster_id}"] named_segments[segment_name] = segment_keywords return named_segments except Exception as e: print(f"Clustering error: {e}") return {"main_segment": keywords} def analyze_keyword_difficulty(self, keyword: str) -> Dict[str, float]: """Analyze keyword difficulty and opportunity scores""" conn = sqlite3.connect(self.db_path) # Get trend data for keyword query = """ SELECT source, volume, sentiment, COUNT(*) as frequency FROM trends WHERE keyword = ? OR keyword LIKE ? GROUP BY source """ cursor = conn.execute(query, [keyword, f'%{keyword}%']) results = cursor.fetchall() conn.close() if not results: return { 'competition_score': 0.1, # Low competition if no data 'opportunity_score': 0.8, # High opportunity 'data_availability': 0.0 } # Calculate metrics total_volume = sum(row[1] for row in results) avg_sentiment = sum(row[2] for row in results) / len(results) source_diversity = len(results) total_frequency = sum(row[3] for row in results) # Competition score (higher volume = more competition) competition_score = min(total_volume / 10000, 1.0) # Normalize to 0-1 # Opportunity score (based on sentiment and engagement) opportunity_score = 0.5 if avg_sentiment < -0.2: # Negative sentiment indicates problems to solve opportunity_score += 0.3 if source_diversity >= 3: # Cross-platform interest opportunity_score += 0.2 opportunity_score = min(opportunity_score, 1.0) # Data availability score data_availability = min(total_frequency / 100, 1.0) return { 'competition_score': competition_score, 'opportunity_score': opportunity_score, 'data_availability': data_availability, 'total_mentions': total_frequency, 'source_diversity': source_diversity, 'avg_sentiment': avg_sentiment } def generate_keyword_variations(self, base_keyword: str) -> List[str]: """Generate variations of a base keyword""" variations = [] words = base_keyword.split() if len(words) == 1: # Single word variations base = words[0] variations.extend([ f"{base} app", f"{base} software", f"{base} tool", f"{base} platform", f"{base} service", f"{base} solution", f"best {base}", f"{base} alternative", f"cheap {base}", f"free {base}" ]) else: # Multi-word variations variations.extend([ f"{base_keyword} app", f"{base_keyword} software", f"{base_keyword} platform", f"best {base_keyword}", f"{base_keyword} solution", f"{base_keyword} alternative" ]) # Word order variations if len(words) == 2: variations.append(f"{words[1]} {words[0]}") return variations class MarketSegmentAnalyzer: """Analyze market segments and identify gaps within segments""" def __init__(self, db_path: str): self.db_path = db_path def analyze_segment_trends(self, segment_keywords: List[str]) -> Dict[str, any]: """Analyze trends within a market segment""" conn = sqlite3.connect(self.db_path) # Get trend data for all keywords in segment placeholders = ','.join(['?' for _ in segment_keywords]) query = f""" SELECT keyword, source, AVG(volume) as avg_volume, AVG(sentiment) as avg_sentiment, COUNT(*) as mentions, MIN(timestamp) as first_seen, MAX(timestamp) as last_seen FROM trends WHERE keyword IN ({placeholders}) GROUP BY keyword, source """ cursor = conn.execute(query, segment_keywords) results = cursor.fetchall() conn.close() if not results: return {} # Analyze segment characteristics segment_data = defaultdict(lambda: defaultdict(list)) for row in results: keyword, source, avg_vol, avg_sent, mentions, first, last = row segment_data[keyword]['sources'].append(source) segment_data[keyword]['avg_volume'] = segment_data[keyword].get('avg_volume', 0) + avg_vol segment_data[keyword]['avg_sentiment'] = segment_data[keyword].get('avg_sentiment', 0) + avg_sent segment_data[keyword]['total_mentions'] = segment_data[keyword].get('total_mentions', 0) + mentions segment_data[keyword]['date_range'] = (first, last) # Calculate segment-wide metrics total_volume = sum(data['avg_volume'] for data in segment_data.values()) total_mentions = sum(data['total_mentions'] for data in segment_data.values()) avg_segment_sentiment = sum(data['avg_sentiment'] for data in segment_data.values()) / len(segment_data) # Identify leaders and gaps in segment sorted_by_volume = sorted(segment_data.items(), key=lambda x: x[1]['avg_volume'], reverse=True) market_leaders = sorted_by_volume[:3] # Top 3 by volume underserved_areas = sorted_by_volume[-3:] # Bottom 3 by volume return { 'segment_size': len(segment_keywords), 'total_market_volume': total_volume, 'total_mentions': total_mentions, 'avg_sentiment': avg_segment_sentiment, 'market_leaders': [(kw, data['avg_volume']) for kw, data in market_leaders], 'underserved_areas': [(kw, data['avg_volume']) for kw, data in underserved_areas], 'keyword_details': dict(segment_data) } def identify_segment_gaps(self, segment_analysis: Dict[str, any]) -> List[str]: """Identify specific gaps within a market segment""" gaps = [] # Low volume but high sentiment areas (unmet demand) if segment_analysis: keyword_details = segment_analysis.get('keyword_details', {}) for keyword, data in keyword_details.items(): volume = data.get('avg_volume', 0) sentiment = data.get('avg_sentiment', 0) # High demand, low supply indicators if volume < segment_analysis.get('total_market_volume', 0) / len(keyword_details) * 0.5: if sentiment > 0.2: # Positive sentiment gaps.append(f"Underserved positive demand for {keyword}") elif sentiment < -0.2: # Negative sentiment gaps.append(f"User dissatisfaction with current {keyword} solutions") return gaps async def comprehensive_keyword_analysis(base_keywords: List[str], db_path: str): """Comprehensive analysis pipeline for keyword expansion and market segmentation""" print(f"Starting comprehensive analysis for: {base_keywords}") # Initialize engines keyword_engine = KeywordExpansionEngine(db_path) segment_analyzer = MarketSegmentAnalyzer(db_path) results = { 'base_keywords': base_keywords, 'expanded_keywords': {}, 'market_segments': {}, 'keyword_difficulty': {}, 'segment_analysis': {}, 'identified_gaps': {} } all_keywords = set(base_keywords) # Step 1: Expand keywords print("Expanding keywords...") for keyword in base_keywords: # Get variations variations = keyword_engine.generate_keyword_variations(keyword) # Get trend-based expansions trend_expansions = keyword_engine.expand_keywords_from_trends(keyword) # Get search suggestions search_suggestions = await keyword_engine.expand_keywords_with_search_suggestions(keyword) # Combine all expansions expanded = list(set(variations + trend_expansions + search_suggestions)) results['expanded_keywords'][keyword] = expanded all_keywords.update(expanded) # Step 2: Analyze keyword difficulty print("Analyzing keyword difficulty...") for keyword in all_keywords: difficulty = keyword_engine.analyze_keyword_difficulty(keyword) results['keyword_difficulty'][keyword] = difficulty # Step 3: Identify market segments print("Identifying market segments...") all_keywords_list = list(all_keywords) segments = keyword_engine.identify_market_segments(all_keywords_list) results['market_segments'] = segments # Step 4: Analyze each segment print("Analyzing market segments...") for segment_name, segment_keywords in segments.items(): segment_analysis = segment_analyzer.analyze_segment_trends(segment_keywords) results['segment_analysis'][segment_name] = segment_analysis # Identify gaps in segment segment_gaps = segment_analyzer.identify_segment_gaps(segment_analysis) results['identified_gaps'][segment_name] = segment_gaps # Step 5: Generate prioritized opportunities print("Generating opportunity priorities...") opportunities = [] for keyword, difficulty in results['keyword_difficulty'].items(): opportunity_score = difficulty.get('opportunity_score', 0) competition_score = difficulty.get('competition_score', 0) data_availability = difficulty.get('data_availability', 0) # Calculate combined opportunity score combined_score = (opportunity_score * 0.4 + (1 - competition_score) * 0.4 + data_availability * 0.2) opportunities.append({ 'keyword': keyword, 'opportunity_score': opportunity_score, 'competition_score': competition_score, 'combined_score': combined_score, 'data_points': difficulty.get('total_mentions', 0), 'sentiment': difficulty.get('avg_sentiment', 0) }) # Sort by combined score opportunities.sort(key=lambda x: x['combined_score'], reverse=True) results['prioritized_opportunities'] = opportunities[:20] # Top 20 # Step 6: Save results timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"keyword_analysis_report_{timestamp}.json" with open(filename, 'w') as f: json.dump(results, f, indent=2, default=str) print(f"Analysis complete. Results saved to {filename}") return results # Example usage and testing if __name__ == "__main__": import sys import os from datetime import datetime # Add the main script directory to path to import the main classes sys.path.append(os.path.dirname(__file__)) async def test_keyword_expansion(): # Test with sample keywords test_keywords = [ "productivity app", "sustainable fashion", "remote collaboration" ] # Use the database from the main trend scraper db_path = "market_trends.db" # Run comprehensive analysis results = await comprehensive_keyword_analysis(test_keywords, db_path) # Print summary print("\n=== KEYWORD EXPANSION RESULTS ===") print(f"Base keywords: {len(test_keywords)}") print(f"Total expanded keywords: {sum(len(v) for v in results['expanded_keywords'].values())}") print(f"Market segments identified: {len(results['market_segments'])}") print(f"Top opportunities: {len(results['prioritized_opportunities'])}") print("\n=== TOP 5 OPPORTUNITIES ===") for i, opp in enumerate(results['prioritized_opportunities'][:5], 1): print(f"{i}. {opp['keyword']}") print(f" Combined Score: {opp['combined_score']:.3f}") print(f" Opportunity: {opp['opportunity_score']:.3f}, Competition: {opp['competition_score']:.3f}") print() # Run the test asyncio.run(test_keyword_expansion()) import matplotlib.pyplot as plt import seaborn as sns import pandas as pd import numpy as np import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import sqlite3 import json from datetime import datetime, timedelta from typing import Dict, List, Optional import networkx as nx from wordcloud import WordCloud import warnings warnings.filterwarnings('ignore') class TrendVisualizationDashboard: """Create visualizations and reports for trend analysis results""" def __init__(self, db_path: str): self.db_path = db_path plt.style.use('seaborn-v0_8') sns.set_palette("husl") def load_trend_data(self, keywords: Optional[List[str]] = None, days: int = 30) -> pd.DataFrame: """Load trend data from database""" conn = sqlite3.connect(self.db_path) base_query = """ SELECT keyword, source, volume, sentiment, timestamp, metadata FROM trends WHERE timestamp >= ? """ params = [(datetime.now() - timedelta(days=days)).isoformat()] if keywords: keyword_filter = "'" + "','".join(keywords) + "'" base_query += f" AND keyword IN ({keyword_filter})" base_query += " ORDER BY timestamp DESC" df = pd.read_sql_query(base_query, conn, params=params) conn.close() if not df.empty: df['timestamp'] = pd.to_datetime(df['timestamp']) df['date'] = df['timestamp'].dt.date df['hour'] = df['timestamp'].dt.hour try: df['metadata'] = df['metadata'].apply(json.loads) except: df['metadata'] = df['metadata'].apply(lambda x: {}) return df def load_market_gaps(self) -> pd.DataFrame: """Load market gap analysis results""" conn = sqlite3.connect(self.db_path) query = """ SELECT gap_description, evidence_score, demand_indicators, supply_gaps, related_keywords, confidence_level, created_at FROM market_gaps ORDER BY evidence_score DESC """ df = pd.read_sql_query(query, conn) conn.close() if not df.empty: df['created_at'] = pd.to_datetime(df['created_at']) df['demand_indicators'] = df['demand_indicators'].apply(json.loads) df['supply_gaps'] = df['supply_gaps'].apply(json.loads) df['related_keywords'] = df['related_keywords'].apply(json.loads) return df def create_trend_timeline(self, keywords: List[str], save_path: str = None) -> go.Figure: """Create interactive timeline of trend data""" df = self.load_trend_data(keywords) if df.empty: return go.Figure().add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) fig = make_subplots( rows=2, cols=1, subplot_titles=('Volume Trends', 'Sentiment Trends'), vertical_spacing=0.1 ) # Volume trends by keyword for keyword in keywords: keyword_data = df[df['keyword'] == keyword] if not keyword_data.empty: daily_volume = keyword_data.groupby('date')['volume'].sum().reset_index() fig.add_trace( go.Scatter( x=daily_volume['date'], y=daily_volume['volume'], mode='lines+markers', name=f'{keyword} Volume', line=dict(width=3) ), row=1, col=1 ) # Sentiment trends by keyword for keyword in keywords: keyword_data = df[df['keyword'] == keyword] if not keyword_data.empty: daily_sentiment = keyword_data.groupby('date')['sentiment'].mean().reset_index() fig.add_trace( go.Scatter( x=daily_sentiment['date'], y=daily_sentiment['sentiment'], mode='lines+markers', name=f'{keyword} Sentiment', line=dict(width=3, dash='dash') ), row=2, col=1 ) fig.update_layout( title='Trend Analysis Timeline', height=600, showlegend=True, template='plotly_white' ) fig.update_xaxes(title_text="Date", row=2, col=1) fig.update_yaxes(title_text="Volume", row=1, col=1) fig.update_yaxes(title_text="Sentiment", row=2, col=1) if save_path: fig.write_html(save_path) return fig def create_source_distribution(self, keywords: List[str], save_path: str = None) -> go.Figure: """Create pie chart of data sources distribution""" df = self.load_trend_data(keywords) if df.empty: return go.Figure().add_annotation(text="No data available") source_counts = df['source'].value_counts() fig = go.Figure(data=[ go.Pie( labels=source_counts.index, values=source_counts.values, hole=.3, textinfo='label+percent', textposition='outside' ) ]) fig.update_layout( title='Data Sources Distribution', template='plotly_white', annotations=[dict(text='Sources', x=0.5, y=0.5, font_size=20, showarrow=False)] ) if save_path: fig.write_html(save_path) return fig def create_sentiment_heatmap(self, keywords: List[str], save_path: str = None): """Create sentiment heatmap across sources and keywords""" df = self.load_trend_data(keywords) if df.empty: print("No data available for heatmap") return # Create pivot table for heatmap sentiment_pivot = df.groupby(['keyword', 'source'])['sentiment'].mean().unstack(fill_value=0) plt.figure(figsize=(12, 8)) sns.heatmap(sentiment_pivot, annot=True, cmap='RdYlGn', center=0, fmt='.2f', cbar_kws={'label': 'Average Sentiment'}) plt.title('Sentiment Heatmap: Keywords vs Sources') plt.xlabel('Data Source') plt.ylabel('Keyword') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') plt.show() def create_market_gap_visualization(self, save_path: str = None) -> go.Figure: """Visualize market gaps and opportunities""" df = self.load_market_gaps() if df.empty: return go.Figure().add_annotation(text="No market gaps data available") # Create bubble chart fig = go.Figure() # Extract confidence levels and evidence scores x_vals = df['confidence_level'] y_vals = df['evidence_score'] sizes = [len(gaps) * 10 + 20 for gaps in df['supply_gaps']] # Size based on number of supply gaps texts = [desc[:50] + "..." if len(desc) > 50 else desc for desc in df['gap_description']] fig.add_trace(go.Scatter( x=x_vals, y=y_vals, mode='markers', marker=dict( size=sizes, color=y_vals, colorscale='Viridis', showscale=True, colorbar=dict(title="Evidence Score"), line=dict(width=2, color='white') ), text=texts, textposition="middle center", hovertemplate='%{text}
' + 'Confidence: %{x:.2f}
' + 'Evidence Score: %{y:.2f}
' + '' )) fig.update_layout( title='Market Gap Opportunities', xaxis_title='Confidence Level', yaxis_title='Evidence Score', template='plotly_white', height=600 ) if save_path: fig.write_html(save_path) return fig def create_keyword_network(self, keywords: List[str], save_path: str = None): """Create network graph of keyword relationships""" df = self.load_trend_data(keywords) if df.empty: print("No data available for network") return # Create network graph G = nx.Graph() # Add nodes for each keyword for keyword in keywords: keyword_data = df[df['keyword'] == keyword] volume = keyword_data['volume'].sum() if not keyword_data.empty else 0 sentiment = keyword_data['sentiment'].mean() if not keyword_data.empty else 0 G.add_node(keyword, volume=volume, sentiment=sentiment) # Add edges based on co-occurrence in time windows for i, keyword1 in enumerate(keywords): for keyword2 in keywords[i+1:]: # Find temporal co-occurrence data1 = df[df['keyword'] == keyword1] data2 = df[df['keyword'] == keyword2] if not data1.empty and not data2.empty: # Simple co-occurrence based on overlapping time periods dates1 = set(data1['date']) dates2 = set(data2['date']) overlap = len(dates1.intersection(dates2)) if overlap > 0: weight = overlap / max(len(dates1), len(dates2)) G.add_edge(keyword1, keyword2, weight=weight) # Create visualization plt.figure(figsize=(14, 10)) pos = nx.spring_layout(G, k=3, iterations=50) # Node sizes based on volume node_sizes = [G.nodes[node]['volume'] * 0.1 + 300 for node in G.nodes()] # Node colors based on sentiment node_colors = [G.nodes[node]['sentiment'] for node in G.nodes()] # Draw network nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, cmap='RdYlGn', alpha=0.8) nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold') # Draw edges with varying thickness based on weight edges = G.edges() weights = [G[u][v]['weight'] * 5 for u, v in edges] nx.draw_networkx_edges(G, pos, width=weights, alpha=0.6, edge_color='gray') plt.title('Keyword Relationship Network') plt.axis('off') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') plt.show() def create_wordcloud(self, keywords: List[str], save_path: str = None): """Create word cloud from trend metadata""" df = self.load_trend_data(keywords) if df.empty: print("No data available for word cloud") return # Extract text from metadata text_data = [] for metadata in df['metadata']: if isinstance(metadata, dict): for key, value in metadata.items(): if isinstance(value, str) and len(value) > 3: text_data.append(value) if not text_data: print("No text data found in metadata") return # Combine all text combined_text = ' '.join(text_data) # Create word cloud wordcloud = WordCloud( width=800, height=400, background_color='white', max_words=100, colormap='viridis', relative_scaling=0.5 ).generate(combined_text) plt.figure(figsize=(12, 6)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') plt.title('Trending Topics Word Cloud') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') plt.show() def generate_comprehensive_report(self, keywords: List[str], output_dir: str = "reports"): """Generate comprehensive analysis report with all visualizations""" import os # Create output directory os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') print(f"Generating comprehensive report for keywords: {keywords}") # Load data for summary statistics df = self.load_trend_data(keywords) gaps_df = self.load_market_gaps() # Generate all visualizations print("Creating trend timeline...") timeline_fig = self.create_trend_timeline(keywords, f"{output_dir}/timeline_{timestamp}.html") print("Creating source distribution...") source_fig = self.create_source_distribution(keywords, f"{output_dir}/sources_{timestamp}.html") print("Creating sentiment heatmap...") self.create_sentiment_heatmap(keywords, f"{output_dir}/sentiment_heatmap_{timestamp}.png") print("Creating market gap visualization...") gap_fig = self.create_market_gap_visualization(f"{output_dir}/market_gaps_{timestamp}.html") print("Creating keyword network...") self.create_keyword_network(keywords, f"{output_dir}/keyword_network_{timestamp}.png") print("Creating word cloud...") self.create_wordcloud(keywords, f"{output_dir}/wordcloud_{timestamp}.png") # Generate summary statistics summary_stats = self.generate_summary_statistics(df, gaps_df, keywords) # Create HTML report html_report = self.create_html_report(summary_stats, keywords, timestamp) report_path = f"{output_dir}/comprehensive_report_{timestamp}.html" with open(report_path, 'w') as f: f.write(html_report) print(f"\nComprehensive report generated successfully!") print(f"Main report: {report_path}") print(f"All files saved in: {output_dir}/") return report_path def generate_summary_statistics(self, df: pd.DataFrame, gaps_df: pd.DataFrame, keywords: List[str]) -> Dict: """Generate summary statistics for the report""" if df.empty: return { 'total_data_points': 0, 'date_range': 'No data', 'keywords_analyzed': len(keywords), 'sources_used': 0, 'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0 } stats = { 'total_data_points': len(df), 'date_range': f"{df['timestamp'].min().strftime('%Y-%m-%d')} to {df['timestamp'].max().strftime('%Y-%m-%d')}", 'keywords_analyzed': len(keywords), 'sources_used': df['source'].nunique(), 'avg_sentiment': df['sentiment'].mean(), 'sentiment_std': df['sentiment'].std(), 'total_volume': df['volume'].sum(), 'avg_volume': df['volume'].mean(), 'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0, 'top_volume_keyword': df.groupby('keyword')['volume'].sum().idxmax() if len(df) > 0 else 'N/A', 'most_positive_keyword': df.groupby('keyword')['sentiment'].mean().idxmax() if len(df) > 0 else 'N/A', 'source_breakdown': df['source'].value_counts().to_dict() } return stats def create_html_report(self, stats: Dict, keywords: List[str], timestamp: str) -> str: """Create HTML report with embedded visualizations""" html_template = f""" Market Trend Analysis Report

Market Trend Analysis Report

Generated on {timestamp}

Executive Summary

This report analyzes market trends and identifies opportunities for the following keywords: {', '.join(keywords)}. The analysis covers data from multiple sources including social media, news, GitHub repositories, and search trends to provide comprehensive market insights.

{stats.get('total_data_points', 0):,}
Total Data Points
{stats.get('keywords_analyzed', 0)}
Keywords Analyzed
{stats.get('sources_used', 0)}
Data Sources
{stats.get('market_gaps_found', 0)}
Market Gaps Found

Key Metrics

MetricValue
Analysis Period{stats.get('date_range', 'N/A')}
Average Sentiment{stats.get('avg_sentiment', 0):.3f}
Sentiment Volatility{stats.get('sentiment_std', 0):.3f}
Total Volume{stats.get('total_volume', 0):,}
Average Volume{stats.get('avg_volume', 0):,.1f}
Top Volume Keyword{stats.get('top_volume_keyword', 'N/A')}
Most Positive Keyword{stats.get('most_positive_keyword', 'N/A')}

Data Source Breakdown

{''.join([f"" for source, count in stats.get('source_breakdown', {}).items()])}
SourceData PointsPercentage
{source}{count:,}{count/stats.get('total_data_points', 1)*100:.1f}%

Key Insights

Interactive Visualizations

Click the links below to view interactive charts and detailed visualizations:

📈 Trend Timeline 📊 Source Distribution 🎯 Market Opportunities

Static Visualizations

Sentiment Heatmap

Sentiment Heatmap

Keyword Relationship Network

Keyword Network

Trending Topics Word Cloud

Word Cloud

Recommendations

""" return html_template def create_dashboard_app(db_path: str): """Create a simple Flask web dashboard (requires Flask installation)""" dashboard_code = f''' import flask from flask import Flask, render_template_string, request, jsonify import json from datetime import datetime app = Flask(__name__) viz_dashboard = TrendVisualizationDashboard("{db_path}") @app.route('/') def home(): return render_template_string(""" Market Trend Analysis Dashboard

Market Trend Analysis Dashboard

Enter keywords to analyze market trends and identify opportunities

Analyze Keywords

Generate Report

""") @app.route('/analyze', methods=['POST']) def analyze(): keywords = [kw.strip() for kw in request.form['keywords'].split(',')] try: # Quick analysis df = viz_dashboard.load_trend_data(keywords) if not df.empty: summary = {{ 'keywords': keywords, 'total_data_points': len(df), 'sources': df['source'].unique().tolist(), 'avg_sentiment': df['sentiment'].mean(), 'date_range': f"{{df['timestamp'].min()}} to {{df['timestamp'].max()}}" }} else: summary = {{'message': 'No data found for specified keywords'}} return jsonify(summary) except Exception as e: return jsonify({{'error': str(e)}}) @app.route('/report', methods=['POST']) def generate_report(): keywords = [kw.strip() for kw in request.form['keywords'].split(',')] try: report_path = viz_dashboard.generate_comprehensive_report(keywords) return jsonify({{'message': f'Report generated successfully: {{report_path}}'}}) except Exception as e: return jsonify({{'error': str(e)}}) if __name__ == '__main__': print("Starting Market Trend Analysis Dashboard...") print("Visit http://localhost:5000 to access the dashboard") app.run(debug=True, host='0.0.0.0', port=5000) ''' return dashboard_code # Example usage and testing functions def test_visualizations(): """Test all visualization functions""" # Initialize dashboard dashboard = TrendVisualizationDashboard("market_trends.db") # Test keywords test_keywords = ["sustainable packaging", "remote work tools", "AI productivity"] print("Testing visualizations...") try: # Test timeline print("1. Creating timeline visualization...") timeline_fig = dashboard.create_trend_timeline(test_keywords) timeline_fig.show() # Test source distribution print("2. Creating source distribution...") source_fig = dashboard.create_source_distribution(test_keywords) source_fig.show() # Test sentiment heatmap print("3. Creating sentiment heatmap...") dashboard.create_sentiment_heatmap(test_keywords) # Test market gaps print("4. Creating market gap visualization...") gap_fig = dashboard.create_market_gap_visualization() gap_fig.show() # Test network print("5. Creating keyword network...") dashboard.create_keyword_network(test_keywords) # Test word cloud print("6. Creating word cloud...") dashboard.create_wordcloud(test_keywords) print("All visualizations tested successfully!") except Exception as e: print(f"Error in visualization testing: {e}") def export_dashboard_code(filename: str = "dashboard_app.py"): """Export the dashboard application code to a file""" dashboard_code = create_dashboard_app("market_trends.db") with open(filename, 'w') as f: f.write("# Market Trend Analysis Dashboard\n") f.write("# Run this file to start the web dashboard\n\n") f.write("from visualization_reporting import TrendVisualizationDashboard\n") f.write(dashboard_code) print(f"Dashboard application code exported to {filename}") print("Install Flask with: pip install flask") print(f"Run the dashboard with: python {filename}") if __name__ == "__main__": # Run tests test_visualizations() # Export dashboard export_dashboard_code() #!/usr/bin/env python3 """ Main Execution Script for Market Gap Analysis System This script orchestrates the complete pipeline: 1. Keyword expansion and market segmentation 2. Multi-source trend data scraping 3. Market gap analysis and correlation 4. Visualization and reporting Usage: python main_execution.py --keywords "keyword1,keyword2,keyword3" --days 30 --output results/ python main_execution.py --config config.json python main_execution.py --interactive """ import asyncio import argparse import json import os import sys from datetime import datetime, timedelta from typing import List, Dict, Optional import logging # Import our custom modules from market_gap_backend import main_analysis_pipeline, TrendScraper, MarketGapAnalyzer from keyword_expansion import comprehensive_keyword_analysis, KeywordExpansionEngine from visualization_reporting import TrendVisualizationDashboard # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('market_analysis.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class MarketAnalysisOrchestrator: """Main orchestrator class for the complete market analysis pipeline""" def __init__(self, config: Dict = None): self.config = config or self.get_default_config() self.db_path = self.config.get('db_path', 'market_trends.db') self.output_dir = self.config.get('output_dir', 'results') # Initialize components self.scraper = TrendScraper() self.analyzer = MarketGapAnalyzer(self.db_path) self.keyword_engine = KeywordExpansionEngine(self.db_path) self.viz_dashboard = TrendVisualizationDashboard(self.db_path) # Create output directory os.makedirs(self.output_dir, exist_ok=True) @staticmethod def get_default_config() -> Dict: """Get default configuration""" return { 'db_path': 'market_trends.db', 'output_dir': 'results', 'analysis_days': 30, 'max_keywords': 50, 'enable_keyword_expansion': True, 'enable_visualizations': True, 'generate_report': True, 'api_keys': { 'newsapi': 'YOUR_NEWSAPI_KEY', 'twitter_bearer': 'YOUR_TWITTER_BEARER_TOKEN' }, 'scraping_sources': { 'reddit': True, 'github': True, 'google_trends': True, 'news': False, # Requires API key 'twitter': False # Requires API key } } def validate_config(self) -> bool: """Validate configuration settings""" required_fields = ['db_path', 'output_dir', 'analysis_days'] for field in required_fields: if field not in self.config: logger.error(f"Missing required config field: {field}") return False if self.config['analysis_days'] < 1 or self.config['analysis_days'] > 365: logger.error("analysis_days must be between 1 and 365") return False return True async def expand_keywords(self, base_keywords: List[str]) -> List[str]: """Expand base keywords using various techniques""" if not self.config.get('enable_keyword_expansion', True): return base_keywords logger.info(f"Expanding {len(base_keywords)} base keywords...") try: # Run comprehensive keyword analysis results = await comprehensive_keyword_analysis(base_keywords, self.db_path) # Extract all expanded keywords all_keywords = set(base_keywords) for keyword, expansions in results['expanded_keywords'].items(): all_keywords.update(expansions) # Apply keyword filtering and ranking filtered_keywords = self.filter_and_rank_keywords( list(all_keywords), results.get('keyword_difficulty', {}) ) max_keywords = self.config.get('max_keywords', 50) final_keywords = filtered_keywords[:max_keywords] logger.info(f"Expanded to {len(final_keywords)} keywords after filtering") # Save keyword expansion results expansion_file = os.path.join(self.output_dir, 'keyword_expansion_results.json') with open(expansion_file, 'w') as f: json.dump(results, f, indent=2, default=str) return final_keywords except Exception as e: logger.error(f"Error in keyword expansion: {e}") return base_keywords def filter_and_rank_keywords(self, keywords: List[str], difficulty_scores: Dict) -> List[str]: """Filter and rank keywords based on opportunity scores""" scored_keywords = [] for keyword in keywords: scores = difficulty_scores.get(keyword, {}) opportunity = scores.get('opportunity_score', 0.5) competition = scores.get('competition_score', 0.5) data_availability = scores.get('data_availability', 0.1) # Calculate combined score combined_score = (opportunity * 0.4 + (1 - competition) * 0.4 + data_availability * 0.2) scored_keywords.append((keyword, combined_score)) # Sort by score and return keywords scored_keywords.sort(key=lambda x: x[1], reverse=True) return [kw for kw, score in scored_keywords] async def run_trend_analysis(self, keywords: List[str]) -> Dict: """Run the main trend analysis pipeline""" logger.info(f"Running trend analysis for {len(keywords)} keywords...") try: # Run the main analysis pipeline results = await main_analysis_pipeline(keywords) # Save results results_file = os.path.join(self.output_dir, 'trend_analysis_results.json') with open(results_file, 'w') as f: json.dump(results, f, indent=2, default=str) logger.info("Trend analysis completed successfully") return results except Exception as e: logger.error(f"Error in trend analysis: {e}") return {} def generate_visualizations(self, keywords: List[str]) -> str: """Generate all visualizations and reports""" if not self.config.get('enable_visualizations', True): logger.info("Visualizations disabled in config") return "" logger.info("Generating visualizations and reports...") try: # Generate comprehensive report with all visualizations report_path = self.viz_dashboard.generate_comprehensive_report( keywords, self.output_dir ) logger.info(f"Visualizations and report generated: {report_path}") return report_path except Exception as e: logger.error(f"Error generating visualizations: {e}") return "" def calculate_roi_estimates(self, market_gaps: List, keywords: List[str]) -> Dict: """Calculate rough ROI estimates for identified opportunities""" roi_estimates = {} for gap in market_gaps: # Simple ROI calculation based on evidence score and market indicators evidence_score = gap.get('evidence_score', 0) confidence = gap.get('confidence_level', 0) # Estimate market size based on trend data estimated_market_size = self.estimate_market_size(gap.get('related_keywords', [])) # Calculate potential ROI (simplified) potential_roi = { 'opportunity_score': evidence_score, 'confidence_level': confidence, 'estimated_market_size': estimated_market_size, 'risk_level': 'Low' if confidence > 0.7 else 'Medium' if confidence > 0.4 else 'High', 'time_to_market': self.estimate_time_to_market(gap), 'investment_category': self.categorize_investment_level(gap) } roi_estimates[gap.get('gap_description', 'Unknown')] = potential_roi return roi_estimates def estimate_market_size(self, related_keywords: List[str]) -> str: """Estimate market size category based on keyword data""" # Simple heuristic based on number of related keywords and their volume if len(related_keywords) > 10: return "Large" elif len(related_keywords) > 5: return "Medium" else: return "Small" def estimate_time_to_market(self, gap: Dict) -> str: """Estimate time to market based on gap characteristics""" supply_gaps = gap.get('supply_gaps', []) if any('dissatisfaction' in sg.lower() for sg in supply_gaps): return "3-6 months" # Improving existing solutions elif any('limited' in sg.lower() for sg in supply_gaps): return "6-12 months" # Creating new solutions else: return "12+ months" # New market creation def categorize_investment_level(self, gap: Dict) -> str: """Categorize required investment level""" evidence_score = gap.get('evidence_score', 0) if evidence_score > 0.7: return "High - Strong market validation" elif evidence_score > 0.4: return "Medium - Moderate validation needed" else: return "Low - High validation required" def generate_executive_summary(self, results: Dict, keywords: List[str], roi_estimates: Dict) -> str: """Generate executive summary of findings""" summary = f""" EXECUTIVE SUMMARY - Market Gap Analysis Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ANALYSIS SCOPE: - Keywords Analyzed: {len(keywords)} - Data Points Collected: {results.get('total_data_points', 0):,} - Market Gaps Identified: {results.get('market_gaps_identified', 0)} - Analysis Period: {self.config.get('analysis_days', 30)} days TOP OPPORTUNITIES: """ top_opportunities = results.get('top_opportunities', [])[:5] for i, opp in enumerate(top_opportunities, 1): summary += f""" {i}. {opp.get('description', 'Unknown')} Evidence Score: {opp.get('evidence_score', 0):.3f}/1.0 Confidence: {opp.get('confidence', 0):.3f}/1.0 Key Indicators: {', '.join(opp.get('key_indicators', [])[:2])} """ summary += f""" KEY INSIGHTS: - {len([k for k in keywords if any('app' in k.lower() for k in [k])])} app-related opportunities identified - Average market sentiment: {'Positive' if results.get('avg_sentiment', 0) > 0 else 'Negative'} - Cross-platform correlation strength: {'High' if len(results.get('cross_platform_correlations', {})) > 0 else 'Low'} INVESTMENT RECOMMENDATIONS: """ high_roi = [k for k, v in roi_estimates.items() if v.get('opportunity_score', 0) > 0.6] summary += f"- {len(high_roi)} high-opportunity investments identified\n" summary += f"- Risk distribution: {len([v for v in roi_estimates.values() if v.get('risk_level') == 'Low'])} low-risk, " summary += f"{len([v for v in roi_estimates.values() if v.get('risk_level') == 'Medium'])} medium-risk opportunities\n" return summary async def run_complete_analysis(self, base_keywords: List[str]) -> Dict: """Run the complete analysis pipeline""" start_time = datetime.now() logger.info(f"Starting complete market analysis pipeline at {start_time}") try: # Step 1: Expand keywords expanded_keywords = await self.expand_keywords(base_keywords) # Step 2: Run trend analysis trend_results = await self.run_trend_analysis(expanded_keywords) # Step 3: Calculate ROI estimates market_gaps = trend_results.get('top_opportunities', []) roi_estimates = self.calculate_roi_estimates(market_gaps, expanded_keywords) # Step 4: Generate visualizations report_path = self.generate_visualizations(expanded_keywords) # Step 5: Generate executive summary executive_summary = self.generate_executive_summary( trend_results, expanded_keywords, roi_estimates ) # Compile final results final_results = { 'analysis_metadata': { 'start_time': start_time.isoformat(), 'end_time': datetime.now().isoformat(), 'base_keywords': base_keywords, 'expanded_keywords': expanded_keywords, 'total_keywords_analyzed': len(expanded_keywords), 'config_used': self.config }, 'trend_analysis': trend_results, 'roi_estimates': roi_estimates, 'executive_summary': executive_summary, 'report_path': report_path, 'output_directory': self.output_dir } # Save final results final_results_path = os.path.join(self.output_dir, 'final_analysis_results.json') with open(final_results_path, 'w') as f: json.dump(final_results, f, indent=2, default=str) # Save executive summary as text file summary_path = os.path.join(self.output_dir, 'executive_summary.txt') with open(summary_path, 'w') as f: f.write(executive_summary) end_time = datetime.now() duration = end_time - start_time logger.info(f"Complete analysis finished in {duration}") logger.info(f"Results saved to: {self.output_dir}") logger.info(f"Main report: {report_path}") return final_results except Exception as e: logger.error(f"Error in complete analysis: {e}") raise def load_config_file(config_path: str) -> Dict: """Load configuration from JSON file""" try: with open(config_path, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Error loading config file {config_path}: {e}") return {} def interactive_mode(): """Run analysis in interactive mode""" print("\n" + "="*60) print("MARKET GAP ANALYSIS SYSTEM - INTERACTIVE MODE") print("="*60) # Get keywords from user print("\nStep 1: Enter Keywords to Analyze") keywords_input = input("Enter keywords separated by commas: ").strip() if not keywords_input: print("No keywords provided. Exiting.") return base_keywords = [kw.strip() for kw in keywords_input.split(',')] print(f"Keywords to analyze: {base_keywords}") # Get analysis period print("\nStep 2: Analysis Period") try: days = int(input("Enter number of days to analyze (default: 30): ") or "30") except ValueError: days = 30 # Get output directory print("\nStep 3: Output Configuration") output_dir = input("Enter output directory (default: results/): ").strip() or "results/" # Configure options print("\nStep 4: Analysis Options") enable_expansion = input("Enable keyword expansion? (y/n, default: y): ").lower().startswith('y') enable_viz = input("Generate visualizations? (y/n, default: y): ").lower().startswith('y') # Create configuration config = { 'db_path': 'market_trends.db', 'output_dir': output_dir, 'analysis_days': days, 'max_keywords': 50, 'enable_keyword_expansion': enable_expansion, 'enable_visualizations': enable_viz, 'generate_report': True, 'scraping_sources': { 'reddit': True, 'github': True, 'google_trends': True, 'news': False, 'twitter': False } } print(f"\nConfiguration:") print(f"- Keywords: {base_keywords}") print(f"- Analysis period: {days} days") print(f"- Output directory: {output_dir}") print(f"- Keyword expansion: {enable_expansion}") print(f"- Visualizations: {enable_viz}") confirm = input("\nProceed with analysis? (y/n): ").lower().startswith('y') if not confirm: print("Analysis cancelled.") return # Run analysis orchestrator = MarketAnalysisOrchestrator(config) print("\nStarting analysis...") try: results = asyncio.run(orchestrator.run_complete_analysis(base_keywords)) print("\n" + "="*60) print("ANALYSIS COMPLETE!") print("="*60) print(f"Results saved to: {results['output_directory']}") print(f"Executive summary saved to: {os.path.join(results['output_directory'], 'executive_summary.txt')}") if results.get('report_path'): print(f"Full report: {results['report_path']}") # Display executive summary print("\nEXECUTIVE SUMMARY:") print("-" * 40) print(results['executive_summary']) except Exception as e: print(f"Error during analysis: {e}") logger.error(f"Interactive mode error: {e}") def create_sample_config(): """Create a sample configuration file""" sample_config = { "db_path": "market_trends.db", "output_dir": "results", "analysis_days": 30, "max_keywords": 50, "enable_keyword_expansion": True, "enable_visualizations": True, "generate_report": True, "api_keys": { "newsapi": "YOUR_NEWSAPI_KEY_HERE", "twitter_bearer": "YOUR_TWITTER_BEARER_TOKEN_HERE" }, "scraping_sources": { "reddit": True, "github": True, "google_trends": True, "news": False, "twitter": False }, "keyword_filters": { "min_length": 3, "max_length": 50, "exclude_patterns": ["spam", "fake", "scam"], "include_only": [] }, "analysis_settings": { "sentiment_threshold": 0.1, "volume_threshold": 10, "correlation_threshold": 0.3 } } config_path = "config_sample.json" with open(config_path, 'w') as f: json.dump(sample_config, f, indent=2) print(f"Sample configuration created: {config_path}") print("Edit this file with your API keys and preferences, then run:") print(f"python main_execution.py --config {config_path}") def main(): """Main entry point""" parser = argparse.ArgumentParser( description="Market Gap Analysis System", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python main_execution.py --keywords "AI tools,productivity apps" --days 30 python main_execution.py --config config.json python main_execution.py --interactive python main_execution.py --create-config """ ) parser.add_argument('--keywords', type=str, help='Comma-separated list of keywords to analyze') parser.add_argument('--days', type=int, default=30, help='Number of days to analyze (default: 30)') parser.add_argument('--output', type=str, default='results', help='Output directory (default: results)') parser.add_argument('--config', type=str, help='Path to configuration JSON file') parser.add_argument('--interactive', action='store_true', help='Run in interactive mode') parser.add_argument('--create-config', action='store_true', help='Create sample configuration file') parser.add_argument('--max-keywords', type=int, default=50, help='Maximum keywords to analyze (default: 50)') parser.add_argument('--no-expansion', action='store_true', help='Disable keyword expansion') parser.add_argument('--no-viz', action='store_true', help='Disable visualizations') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Create sample config if args.create_config: create_sample_config() return # Interactive mode if args.interactive: interactive_mode() return # Load configuration if args.config: config = load_config_file(args.config) if not config: print(f"Failed to load configuration from {args.config}") return else: # Create config from command line arguments config = { 'db_path': 'market_trends.db', 'output_dir': args.output, 'analysis_days': args.days, 'max_keywords': args.max_keywords, 'enable_keyword_expansion': not args.no_expansion, 'enable_visualizations': not args.no_viz, 'generate_report': True, 'scraping_sources': { 'reddit': True, 'github': True, 'google_trends': True, 'news': False, 'twitter': False } } # Get keywords if args.keywords: base_keywords = [kw.strip() for kw in args.keywords.split(',')] elif config.get('base_keywords'): base_keywords = config['base_keywords'] else: print("No keywords provided. Use --keywords, --config, or --interactive mode.") print("Run with --help for usage information.") return # Validate keywords if not base_keywords: print("No valid keywords found.") return print(f"Starting analysis with keywords: {base_keywords}") print(f"Configuration: {json.dumps(config, indent=2)}") # Run analysis try: orchestrator = MarketAnalysisOrchestrator(config) if not orchestrator.validate_config(): print("Configuration validation failed. Check your settings.") return # Run the complete analysis results = asyncio.run(orchestrator.run_complete_analysis(base_keywords)) print("\n" + "="*60) print("ANALYSIS COMPLETED SUCCESSFULLY!") print("="*60) print(f"Results directory: {results['output_directory']}") print(f"Analysis duration: {results['analysis_metadata']['end_time']}") print(f"Keywords analyzed: {results['analysis_metadata']['total_keywords_analyzed']}") if results.get('report_path'): print(f"Main report: {results['report_path']}") print("\nNext steps:") print("1. Review the executive summary in executive_summary.txt") print("2. Open the HTML reports for detailed visualizations") print("3. Check the JSON files for raw data and analysis results") except KeyboardInterrupt: print("\nAnalysis interrupted by user.") except Exception as e: print(f"Analysis failed: {e}") logger.error(f"Main execution error: {e}") return 1 return 0 # Additional utility functions def quick_analysis(keywords: List[str], days: int = 7) -> Dict: """Run a quick analysis for immediate insights""" print(f"Running quick analysis for: {', '.join(keywords)}") config = { 'db_path': 'market_trends.db', 'output_dir': f'quick_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}', 'analysis_days': days, 'max_keywords': 20, 'enable_keyword_expansion': False, 'enable_visualizations': False, 'generate_report': False } orchestrator = MarketAnalysisOrchestrator(config) try: # Run just the trend analysis without expansions results = asyncio.run(orchestrator.run_trend_analysis(keywords)) # Quick summary summary = { 'keywords': keywords, 'opportunities_found': len(results.get('top_opportunities', [])), 'top_opportunity': results.get('top_opportunities', [{}])[0] if results.get('top_opportunities') else None, 'total_data_points': results.get('total_data_points', 0), 'analysis_timestamp': results.get('analysis_timestamp', datetime.now().isoformat()) } return summary except Exception as e: return {'error': str(e), 'keywords': keywords} def batch_analysis(keyword_batches: List[List[str]], output_base_dir: str = "batch_results") -> List[Dict]: """Run analysis on multiple keyword batches""" results = [] for i, batch in enumerate(keyword_batches): print(f"Processing batch {i+1}/{len(keyword_batches)}: {batch}") config = { 'db_path': 'market_trends.db', 'output_dir': f'{output_base_dir}/batch_{i+1}', 'analysis_days': 30, 'max_keywords': 30, 'enable_keyword_expansion': True, 'enable_visualizations': True, 'generate_report': True } orchestrator = MarketAnalysisOrchestrator(config) try: batch_results = asyncio.run(orchestrator.run_complete_analysis(batch)) results.append(batch_results) print(f"Batch {i+1} completed successfully") except Exception as e: print(f"Batch {i+1} failed: {e}") results.append({'error': str(e), 'batch': batch}) return results if __name__ == "__main__": exit_code = main() sys.exit(exit_code) make it functional and beautiful.