mga-basic-test / prompts.txt
phvv8's picture
we need a frontend web ui which ties together the backend consisting of our insight scraping algorithms:
6b2a750 verified
we need a frontend web ui which ties together the backend consisting of our insight scraping algorithms:
import asyncio
import aiohttp
import json
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict
import sqlite3
from textblob import TextBlob
import re
from urllib.parse import quote_plus
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrendData:
"""Data structure for trend information"""
keyword: str
source: str
volume: int
sentiment: float
timestamp: datetime
metadata: Dict
@dataclass
class MarketGap:
"""Data structure for identified market gaps"""
gap_description: str
evidence_score: float
demand_indicators: List[str]
supply_gaps: List[str]
related_keywords: List[str]
confidence_level: float
class APIClient:
"""Generic API client with rate limiting and error handling"""
def __init__(self, base_url: str, rate_limit: float = 1.0):
self.base_url = base_url
self.rate_limit = rate_limit
self.last_request = 0
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def make_request(self, endpoint: str, params: Dict = None, headers: Dict = None):
"""Make rate-limited API request"""
# Rate limiting
now = time.time()
time_since_last = now - self.last_request
if time_since_last < self.rate_limit:
await asyncio.sleep(self.rate_limit - time_since_last)
try:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.get(url, params=params, headers=headers) as response:
self.last_request = time.time()
if response.status == 200:
return await response.json()
else:
logger.warning(f"API request failed: {response.status}")
return None
except Exception as e:
logger.error(f"API request error: {e}")
return None
class TrendScraper:
"""Main class for scraping trend data from multiple sources"""
def __init__(self):
self.db_path = "market_trends.db"
self.initialize_database()
def initialize_database(self):
"""Initialize SQLite database for storing trend data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create trends table
cursor.execute('''
CREATE TABLE IF NOT EXISTS trends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
source TEXT,
volume INTEGER,
sentiment REAL,
timestamp TEXT,
metadata TEXT
)
''')
# Create market_gaps table
cursor.execute('''
CREATE TABLE IF NOT EXISTS market_gaps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gap_description TEXT,
evidence_score REAL,
demand_indicators TEXT,
supply_gaps TEXT,
related_keywords TEXT,
confidence_level REAL,
created_at TEXT
)
''')
conn.commit()
conn.close()
async def scrape_reddit_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape Reddit for trend data using pushshift API"""
trends = []
async with APIClient("https://api.pushshift.io/reddit/search", rate_limit=1.0) as client:
for keyword in keywords:
params = {
'q': keyword,
'subreddit': 'all',
'sort': 'desc',
'sort_type': 'created_utc',
'size': 100,
'after': int((datetime.now() - timedelta(days=30)).timestamp())
}
data = await client.make_request("submission", params)
if data and 'data' in data:
for post in data['data']:
sentiment = TextBlob(post.get('title', '') + ' ' + post.get('selftext', '')).sentiment.polarity
trends.append(TrendData(
keyword=keyword,
source='reddit',
volume=post.get('score', 0),
sentiment=sentiment,
timestamp=datetime.fromtimestamp(post.get('created_utc', 0)),
metadata={'subreddit': post.get('subreddit'), 'comments': post.get('num_comments', 0)}
))
return trends
async def scrape_twitter_trends(self, keywords: List[str]) -> List[TrendData]:
"""Simulate Twitter trend scraping (requires Twitter API credentials)"""
# This is a placeholder - you'll need to implement with actual Twitter API
trends = []
# Simulated data for demonstration
for keyword in keywords:
trends.append(TrendData(
keyword=keyword,
source='twitter',
volume=np.random.randint(100, 10000),
sentiment=np.random.uniform(-1, 1),
timestamp=datetime.now(),
metadata={'hashtags': f"#{keyword}", 'retweets': np.random.randint(10, 1000)}
))
return trends
async def scrape_google_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape Google Trends data (requires pytrends library)"""
trends = []
try:
from pytrends.request import TrendReq
pytrends = TrendReq(hl='en-US', tz=360)
for keyword in keywords:
pytrends.build_payload([keyword], timeframe='today 3-m')
interest_over_time = pytrends.interest_over_time()
if not interest_over_time.empty:
for date, row in interest_over_time.iterrows():
trends.append(TrendData(
keyword=keyword,
source='google_trends',
volume=int(row[keyword]),
sentiment=0.0, # Google Trends doesn't provide sentiment
timestamp=date,
metadata={'isPartial': row.get('isPartial', False)}
))
except ImportError:
logger.warning("pytrends not installed. Skipping Google Trends scraping.")
return trends
async def scrape_github_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape GitHub for repository trends"""
trends = []
async with APIClient("https://api.github.com", rate_limit=1.0) as client:
headers = {'Accept': 'application/vnd.github.v3+json'}
for keyword in keywords:
params = {
'q': keyword,
'sort': 'updated',
'order': 'desc',
'per_page': 100
}
data = await client.make_request("search/repositories", params, headers)
if data and 'items' in data:
for repo in data['items']:
trends.append(TrendData(
keyword=keyword,
source='github',
volume=repo.get('stargazers_count', 0),
sentiment=0.5, # Neutral sentiment for GitHub
timestamp=datetime.fromisoformat(repo.get('updated_at', '').replace('Z', '+00:00')),
metadata={
'language': repo.get('language'),
'forks': repo.get('forks_count', 0),
'issues': repo.get('open_issues_count', 0)
}
))
return trends
async def scrape_news_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape news trends using NewsAPI (requires API key)"""
trends = []
# Placeholder for NewsAPI integration
# You'll need to register for a NewsAPI key at https://newsapi.org/
API_KEY = "YOUR_NEWSAPI_KEY" # Replace with actual key
if API_KEY != "YOUR_NEWSAPI_KEY":
async with APIClient("https://newsapi.org/v2", rate_limit=1.0) as client:
headers = {'X-API-Key': API_KEY}
for keyword in keywords:
params = {
'q': keyword,
'sortBy': 'popularity',
'pageSize': 100,
'from': (datetime.now() - timedelta(days=30)).isoformat()
}
data = await client.make_request("everything", params, headers)
if data and 'articles' in data:
for article in data['articles']:
sentiment = TextBlob(article.get('title', '') + ' ' + article.get('description', '')).sentiment.polarity
trends.append(TrendData(
keyword=keyword,
source='news',
volume=1, # News articles don't have volume metrics
sentiment=sentiment,
timestamp=datetime.fromisoformat(article.get('publishedAt', '').replace('Z', '+00:00')),
metadata={
'source': article.get('source', {}).get('name'),
'author': article.get('author')
}
))
return trends
class MarketGapAnalyzer:
"""Analyze trends to identify market gaps"""
def __init__(self, db_path: str):
self.db_path = db_path
def load_trend_data(self, keywords: List[str]) -> pd.DataFrame:
"""Load trend data from database"""
conn = sqlite3.connect(self.db_path)
keyword_filter = "'" + "','".join(keywords) + "'"
query = f"""
SELECT * FROM trends
WHERE keyword IN ({keyword_filter})
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['metadata'] = df['metadata'].apply(json.loads)
return df
def calculate_demand_indicators(self, df: pd.DataFrame) -> Dict[str, float]:
"""Calculate various demand indicators"""
indicators = {}
# Volume trend analysis
df_sorted = df.sort_values('timestamp')
if len(df_sorted) > 1:
recent_volume = df_sorted.tail(10)['volume'].mean()
older_volume = df_sorted.head(10)['volume'].mean()
indicators['volume_growth'] = (recent_volume - older_volume) / max(older_volume, 1)
# Sentiment analysis
indicators['avg_sentiment'] = df['sentiment'].mean()
indicators['sentiment_volatility'] = df['sentiment'].std()
# Source diversity
source_counts = df['source'].value_counts()
indicators['source_diversity'] = len(source_counts)
indicators['cross_platform_consistency'] = 1 - (source_counts.std() / source_counts.mean()) if source_counts.mean() > 0 else 0
# Temporal patterns
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
indicators['temporal_concentration'] = df.groupby('hour').size().std() / df.groupby('hour').size().mean()
return indicators
def identify_supply_gaps(self, keyword: str, df: pd.DataFrame) -> List[str]:
"""Identify potential supply gaps based on trend analysis"""
gaps = []
# Analyze sentiment patterns
if df['sentiment'].mean() < -0.2:
gaps.append(f"High negative sentiment around {keyword} suggests user dissatisfaction")
# Analyze GitHub data for tech gaps
github_data = df[df['source'] == 'github']
if not github_data.empty:
avg_stars = github_data['volume'].mean()
if avg_stars < 100:
gaps.append(f"Low GitHub engagement suggests limited open-source solutions for {keyword}")
# Analyze Reddit discussions for unmet needs
reddit_data = df[df['source'] == 'reddit']
if not reddit_data.empty:
high_comment_posts = reddit_data[
reddit_data['metadata'].apply(lambda x: x.get('comments', 0) > 50)
]
if len(high_comment_posts) > len(reddit_data) * 0.3:
gaps.append(f"High discussion volume on Reddit indicates active community seeking solutions for {keyword}")
return gaps
def calculate_market_gap_score(self, demand_indicators: Dict[str, float], supply_gaps: List[str]) -> float:
"""Calculate overall market gap score"""
score = 0.0
# Demand components
score += max(demand_indicators.get('volume_growth', 0) * 0.3, 0)
score += (demand_indicators.get('avg_sentiment', 0) + 1) * 0.1 # Normalize sentiment to 0-2 range
score += demand_indicators.get('source_diversity', 0) * 0.1
score += demand_indicators.get('cross_platform_consistency', 0) * 0.2
# Supply gap components
score += len(supply_gaps) * 0.1
# Temporal activity (higher concentration might indicate urgent need)
temporal_score = demand_indicators.get('temporal_concentration', 0)
if temporal_score > 1: # High concentration
score += 0.2
return min(score, 1.0) # Cap at 1.0
def analyze_market_gaps(self, keywords: List[str]) -> List[MarketGap]:
"""Main method to analyze market gaps for given keywords"""
df = self.load_trend_data(keywords)
gaps = []
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if keyword_data.empty:
continue
demand_indicators = self.calculate_demand_indicators(keyword_data)
supply_gaps = self.identify_supply_gaps(keyword, keyword_data)
# Generate gap description
gap_description = f"Market opportunity in {keyword} sector"
if demand_indicators.get('volume_growth', 0) > 0.5:
gap_description += " with high growth potential"
if demand_indicators.get('avg_sentiment', 0) < -0.2:
gap_description += " addressing user dissatisfaction"
gap_score = self.calculate_market_gap_score(demand_indicators, supply_gaps)
# Find related keywords (simple co-occurrence analysis)
related_keywords = self.find_related_keywords(keyword, df)
gap = MarketGap(
gap_description=gap_description,
evidence_score=gap_score,
demand_indicators=[f"{k}: {v:.3f}" for k, v in demand_indicators.items()],
supply_gaps=supply_gaps,
related_keywords=related_keywords,
confidence_level=min(len(keyword_data) / 100.0, 1.0) # Based on data volume
)
gaps.append(gap)
return sorted(gaps, key=lambda x: x.evidence_score, reverse=True)
def find_related_keywords(self, keyword: str, df: pd.DataFrame) -> List[str]:
"""Find keywords that often appear together with the target keyword"""
related = []
# Simple co-occurrence based on temporal proximity
keyword_data = df[df['keyword'] == keyword]
for _, row in keyword_data.iterrows():
time_window = timedelta(hours=24)
nearby_data = df[
(df['timestamp'] >= row['timestamp'] - time_window) &
(df['timestamp'] <= row['timestamp'] + time_window) &
(df['keyword'] != keyword)
]
related.extend(nearby_data['keyword'].unique())
# Count occurrences and return top related keywords
from collections import Counter
related_counts = Counter(related)
return [k for k, v in related_counts.most_common(5)]
def save_market_gaps(self, gaps: List[MarketGap]):
"""Save identified market gaps to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for gap in gaps:
cursor.execute('''
INSERT INTO market_gaps
(gap_description, evidence_score, demand_indicators, supply_gaps, related_keywords, confidence_level, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
gap.gap_description,
gap.evidence_score,
json.dumps(gap.demand_indicators),
json.dumps(gap.supply_gaps),
json.dumps(gap.related_keywords),
gap.confidence_level,
datetime.now().isoformat()
))
conn.commit()
conn.close()
class TrendCorrelationEngine:
"""Engine for correlating trends across different data sources"""
def __init__(self, db_path: str):
self.db_path = db_path
def calculate_cross_platform_correlation(self, keyword: str) -> Dict[str, float]:
"""Calculate correlation between different platforms for a keyword"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT source, DATE(timestamp) as date, AVG(volume) as avg_volume, AVG(sentiment) as avg_sentiment
FROM trends
WHERE keyword = ?
GROUP BY source, date
ORDER BY date
"""
df = pd.read_sql_query(query, conn, params=[keyword])
conn.close()
if df.empty:
return {}
# Pivot data for correlation analysis
volume_pivot = df.pivot(index='date', columns='source', values='avg_volume').fillna(0)
sentiment_pivot = df.pivot(index='date', columns='source', values='avg_sentiment').fillna(0)
correlations = {}
# Volume correlations
if len(volume_pivot.columns) > 1:
vol_corr = volume_pivot.corr()
correlations['volume_correlations'] = vol_corr.to_dict()
# Sentiment correlations
if len(sentiment_pivot.columns) > 1:
sent_corr = sentiment_pivot.corr()
correlations['sentiment_correlations'] = sent_corr.to_dict()
return correlations
async def main_analysis_pipeline(keywords: List[str]):
"""Main pipeline for trend scraping and market gap analysis"""
logger.info(f"Starting analysis for keywords: {keywords}")
# Initialize components
scraper = TrendScraper()
analyzer = MarketGapAnalyzer(scraper.db_path)
correlation_engine = TrendCorrelationEngine(scraper.db_path)
# Step 1: Scrape data from all sources
all_trends = []
logger.info("Scraping Reddit trends...")
reddit_trends = await scraper.scrape_reddit_trends(keywords)
all_trends.extend(reddit_trends)
logger.info("Scraping Twitter trends...")
twitter_trends = await scraper.scrape_twitter_trends(keywords)
all_trends.extend(twitter_trends)
logger.info("Scraping Google trends...")
google_trends = await scraper.scrape_google_trends(keywords)
all_trends.extend(google_trends)
logger.info("Scraping GitHub trends...")
github_trends = await scraper.scrape_github_trends(keywords)
all_trends.extend(github_trends)
logger.info("Scraping news trends...")
news_trends = await scraper.scrape_news_trends(keywords)
all_trends.extend(news_trends)
# Step 2: Store trends in database
conn = sqlite3.connect(scraper.db_path)
cursor = conn.cursor()
for trend in all_trends:
cursor.execute('''
INSERT INTO trends (keyword, source, volume, sentiment, timestamp, metadata)
VALUES (?, ?, ?, ?, ?, ?)
''', (
trend.keyword,
trend.source,
trend.volume,
trend.sentiment,
trend.timestamp.isoformat(),
json.dumps(trend.metadata)
))
conn.commit()
conn.close()
logger.info(f"Stored {len(all_trends)} trend data points")
# Step 3: Analyze market gaps
logger.info("Analyzing market gaps...")
market_gaps = analyzer.analyze_market_gaps(keywords)
# Step 4: Calculate correlations
logger.info("Calculating cross-platform correlations...")
correlations = {}
for keyword in keywords:
correlations[keyword] = correlation_engine.calculate_cross_platform_correlation(keyword)
# Step 5: Save results
analyzer.save_market_gaps(market_gaps)
# Step 6: Generate report
logger.info("Generating analysis report...")
report = {
'analysis_timestamp': datetime.now().isoformat(),
'keywords_analyzed': keywords,
'total_data_points': len(all_trends),
'market_gaps_identified': len(market_gaps),
'top_opportunities': [
{
'description': gap.gap_description,
'evidence_score': gap.evidence_score,
'confidence': gap.confidence_level,
'key_indicators': gap.demand_indicators[:3], # Top 3 indicators
'supply_gaps': gap.supply_gaps
}
for gap in market_gaps[:5] # Top 5 opportunities
],
'cross_platform_correlations': correlations
}
# Save report to file
with open(f"market_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
json.dump(report, f, indent=2, default=str)
logger.info("Analysis complete. Report saved.")
return report
# Example usage
if __name__ == "__main__":
# Example keywords to analyze
test_keywords = [
"sustainable packaging",
"remote work tools",
"mental health apps",
"AI productivity",
"electric vehicle charging"
]
# Run the analysis
asyncio.run(main_analysis_pipeline(test_keywords))
import asyncio
import aiohttp
import json
import re
from typing import List, Dict, Set
import nltk
from textblob import TextBlob
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import numpy as np
from collections import defaultdict, Counter
import sqlite3
class KeywordExpansionEngine:
"""Engine for expanding keywords and identifying market segments"""
def __init__(self, db_path: str):
self.db_path = db_path
self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'])
# Download required NLTK data
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
def extract_keywords_from_text(self, text: str) -> List[str]:
"""Extract relevant keywords from text using NLP techniques"""
if not text:
return []
# Clean and tokenize text
text = re.sub(r'[^\w\s]', '', text.lower())
blob = TextBlob(text)
# Extract noun phrases and significant words
keywords = []
# Get noun phrases
for phrase in blob.noun_phrases:
if len(phrase.split()) <= 3 and phrase not in self.stop_words:
keywords.append(phrase)
# Get significant single words (nouns, adjectives)
words = blob.words
for word, tag in blob.tags:
if tag.startswith(('NN', 'JJ')) and len(word) > 3 and word not in self.stop_words:
keywords.append(word)
return list(set(keywords))
def expand_keywords_from_trends(self, base_keyword: str, limit: int = 20) -> List[str]:
"""Expand keywords based on existing trend data"""
conn = sqlite3.connect(self.db_path)
# Get related content from database
query = """
SELECT metadata FROM trends
WHERE keyword = ? OR keyword LIKE ?
"""
cursor = conn.execute(query, [base_keyword, f'%{base_keyword}%'])
results = cursor.fetchall()
conn.close()
expanded_keywords = set()
for row in results:
try:
metadata = json.loads(row[0])
# Extract keywords from different metadata fields
text_fields = []
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, str) and key in ['title', 'description', 'text', 'content']:
text_fields.append(value)
# Process text fields
for text in text_fields:
keywords = self.extract_keywords_from_text(text)
expanded_keywords.update(keywords[:5]) # Limit per text
except json.JSONDecodeError:
continue
# Filter and rank keywords
relevant_keywords = []
for keyword in expanded_keywords:
if (len(keyword.split()) <= 3 and
len(keyword) > 2 and
keyword != base_keyword and
base_keyword.split()[0] in keyword or any(word in keyword for word in base_keyword.split())):
relevant_keywords.append(keyword)
return relevant_keywords[:limit]
async def expand_keywords_with_search_suggestions(self, base_keyword: str) -> List[str]:
"""Expand keywords using search suggestion APIs"""
suggestions = []
# Google Suggest API (no API key required)
async with aiohttp.ClientSession() as session:
try:
url = "https://suggestqueries.google.com/complete/search"
params = {
'client': 'firefox',
'q': base_keyword
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.text()
# Parse the JSONP response
if data.startswith('window.google.ac.h('):
json_str = data[19:-1] # Remove JSONP wrapper
suggestions_data = json.loads(json_str)
if len(suggestions_data) > 1:
suggestions.extend(suggestions_data[1][:10]) # Get top 10
except Exception as e:
print(f"Error fetching Google suggestions: {e}")
return suggestions
def identify_market_segments(self, keywords: List[str]) -> Dict[str, List[str]]:
"""Identify market segments by clustering related keywords"""
if len(keywords) < 5:
return {"main_segment": keywords}
# Get trend data for all keywords
conn = sqlite3.connect(self.db_path)
keyword_texts = {}
for keyword in keywords:
query = """
SELECT metadata FROM trends WHERE keyword = ?
LIMIT 50
"""
cursor = conn.execute(query, [keyword])
results = cursor.fetchall()
texts = []
for row in results:
try:
metadata = json.loads(row[0])
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, str) and len(value) > 10:
texts.append(value)
except:
continue
keyword_texts[keyword] = ' '.join(texts)
conn.close()
# Create text corpus for clustering
corpus = [keyword_texts.get(keyword, keyword) for keyword in keywords]
if len(corpus) < 3:
return {"main_segment": keywords}
try:
# TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
tfidf_matrix = vectorizer.fit_transform(corpus)
# K-means clustering
n_clusters = min(max(2, len(keywords) // 3), 5) # 2-5 clusters
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
clusters = kmeans.fit_predict(tfidf_matrix)
# Group keywords by cluster
segments = defaultdict(list)
for i, keyword in enumerate(keywords):
cluster_id = clusters[i]
segments[f"segment_{cluster_id}"].append(keyword)
# Name segments based on common terms
named_segments = {}
feature_names = vectorizer.get_feature_names_out()
for cluster_id in range(n_clusters):
cluster_center = kmeans.cluster_centers_[cluster_id]
top_indices = cluster_center.argsort()[-3:][::-1] # Top 3 features
top_terms = [feature_names[i] for i in top_indices]
segment_name = f"{top_terms[0]}_segment"
segment_keywords = segments[f"segment_{cluster_id}"]
named_segments[segment_name] = segment_keywords
return named_segments
except Exception as e:
print(f"Clustering error: {e}")
return {"main_segment": keywords}
def analyze_keyword_difficulty(self, keyword: str) -> Dict[str, float]:
"""Analyze keyword difficulty and opportunity scores"""
conn = sqlite3.connect(self.db_path)
# Get trend data for keyword
query = """
SELECT source, volume, sentiment, COUNT(*) as frequency
FROM trends
WHERE keyword = ? OR keyword LIKE ?
GROUP BY source
"""
cursor = conn.execute(query, [keyword, f'%{keyword}%'])
results = cursor.fetchall()
conn.close()
if not results:
return {
'competition_score': 0.1, # Low competition if no data
'opportunity_score': 0.8, # High opportunity
'data_availability': 0.0
}
# Calculate metrics
total_volume = sum(row[1] for row in results)
avg_sentiment = sum(row[2] for row in results) / len(results)
source_diversity = len(results)
total_frequency = sum(row[3] for row in results)
# Competition score (higher volume = more competition)
competition_score = min(total_volume / 10000, 1.0) # Normalize to 0-1
# Opportunity score (based on sentiment and engagement)
opportunity_score = 0.5
if avg_sentiment < -0.2: # Negative sentiment indicates problems to solve
opportunity_score += 0.3
if source_diversity >= 3: # Cross-platform interest
opportunity_score += 0.2
opportunity_score = min(opportunity_score, 1.0)
# Data availability score
data_availability = min(total_frequency / 100, 1.0)
return {
'competition_score': competition_score,
'opportunity_score': opportunity_score,
'data_availability': data_availability,
'total_mentions': total_frequency,
'source_diversity': source_diversity,
'avg_sentiment': avg_sentiment
}
def generate_keyword_variations(self, base_keyword: str) -> List[str]:
"""Generate variations of a base keyword"""
variations = []
words = base_keyword.split()
if len(words) == 1:
# Single word variations
base = words[0]
variations.extend([
f"{base} app",
f"{base} software",
f"{base} tool",
f"{base} platform",
f"{base} service",
f"{base} solution",
f"best {base}",
f"{base} alternative",
f"cheap {base}",
f"free {base}"
])
else:
# Multi-word variations
variations.extend([
f"{base_keyword} app",
f"{base_keyword} software",
f"{base_keyword} platform",
f"best {base_keyword}",
f"{base_keyword} solution",
f"{base_keyword} alternative"
])
# Word order variations
if len(words) == 2:
variations.append(f"{words[1]} {words[0]}")
return variations
class MarketSegmentAnalyzer:
"""Analyze market segments and identify gaps within segments"""
def __init__(self, db_path: str):
self.db_path = db_path
def analyze_segment_trends(self, segment_keywords: List[str]) -> Dict[str, any]:
"""Analyze trends within a market segment"""
conn = sqlite3.connect(self.db_path)
# Get trend data for all keywords in segment
placeholders = ','.join(['?' for _ in segment_keywords])
query = f"""
SELECT keyword, source, AVG(volume) as avg_volume,
AVG(sentiment) as avg_sentiment, COUNT(*) as mentions,
MIN(timestamp) as first_seen, MAX(timestamp) as last_seen
FROM trends
WHERE keyword IN ({placeholders})
GROUP BY keyword, source
"""
cursor = conn.execute(query, segment_keywords)
results = cursor.fetchall()
conn.close()
if not results:
return {}
# Analyze segment characteristics
segment_data = defaultdict(lambda: defaultdict(list))
for row in results:
keyword, source, avg_vol, avg_sent, mentions, first, last = row
segment_data[keyword]['sources'].append(source)
segment_data[keyword]['avg_volume'] = segment_data[keyword].get('avg_volume', 0) + avg_vol
segment_data[keyword]['avg_sentiment'] = segment_data[keyword].get('avg_sentiment', 0) + avg_sent
segment_data[keyword]['total_mentions'] = segment_data[keyword].get('total_mentions', 0) + mentions
segment_data[keyword]['date_range'] = (first, last)
# Calculate segment-wide metrics
total_volume = sum(data['avg_volume'] for data in segment_data.values())
total_mentions = sum(data['total_mentions'] for data in segment_data.values())
avg_segment_sentiment = sum(data['avg_sentiment'] for data in segment_data.values()) / len(segment_data)
# Identify leaders and gaps in segment
sorted_by_volume = sorted(segment_data.items(), key=lambda x: x[1]['avg_volume'], reverse=True)
market_leaders = sorted_by_volume[:3] # Top 3 by volume
underserved_areas = sorted_by_volume[-3:] # Bottom 3 by volume
return {
'segment_size': len(segment_keywords),
'total_market_volume': total_volume,
'total_mentions': total_mentions,
'avg_sentiment': avg_segment_sentiment,
'market_leaders': [(kw, data['avg_volume']) for kw, data in market_leaders],
'underserved_areas': [(kw, data['avg_volume']) for kw, data in underserved_areas],
'keyword_details': dict(segment_data)
}
def identify_segment_gaps(self, segment_analysis: Dict[str, any]) -> List[str]:
"""Identify specific gaps within a market segment"""
gaps = []
# Low volume but high sentiment areas (unmet demand)
if segment_analysis:
keyword_details = segment_analysis.get('keyword_details', {})
for keyword, data in keyword_details.items():
volume = data.get('avg_volume', 0)
sentiment = data.get('avg_sentiment', 0)
# High demand, low supply indicators
if volume < segment_analysis.get('total_market_volume', 0) / len(keyword_details) * 0.5:
if sentiment > 0.2: # Positive sentiment
gaps.append(f"Underserved positive demand for {keyword}")
elif sentiment < -0.2: # Negative sentiment
gaps.append(f"User dissatisfaction with current {keyword} solutions")
return gaps
async def comprehensive_keyword_analysis(base_keywords: List[str], db_path: str):
"""Comprehensive analysis pipeline for keyword expansion and market segmentation"""
print(f"Starting comprehensive analysis for: {base_keywords}")
# Initialize engines
keyword_engine = KeywordExpansionEngine(db_path)
segment_analyzer = MarketSegmentAnalyzer(db_path)
results = {
'base_keywords': base_keywords,
'expanded_keywords': {},
'market_segments': {},
'keyword_difficulty': {},
'segment_analysis': {},
'identified_gaps': {}
}
all_keywords = set(base_keywords)
# Step 1: Expand keywords
print("Expanding keywords...")
for keyword in base_keywords:
# Get variations
variations = keyword_engine.generate_keyword_variations(keyword)
# Get trend-based expansions
trend_expansions = keyword_engine.expand_keywords_from_trends(keyword)
# Get search suggestions
search_suggestions = await keyword_engine.expand_keywords_with_search_suggestions(keyword)
# Combine all expansions
expanded = list(set(variations + trend_expansions + search_suggestions))
results['expanded_keywords'][keyword] = expanded
all_keywords.update(expanded)
# Step 2: Analyze keyword difficulty
print("Analyzing keyword difficulty...")
for keyword in all_keywords:
difficulty = keyword_engine.analyze_keyword_difficulty(keyword)
results['keyword_difficulty'][keyword] = difficulty
# Step 3: Identify market segments
print("Identifying market segments...")
all_keywords_list = list(all_keywords)
segments = keyword_engine.identify_market_segments(all_keywords_list)
results['market_segments'] = segments
# Step 4: Analyze each segment
print("Analyzing market segments...")
for segment_name, segment_keywords in segments.items():
segment_analysis = segment_analyzer.analyze_segment_trends(segment_keywords)
results['segment_analysis'][segment_name] = segment_analysis
# Identify gaps in segment
segment_gaps = segment_analyzer.identify_segment_gaps(segment_analysis)
results['identified_gaps'][segment_name] = segment_gaps
# Step 5: Generate prioritized opportunities
print("Generating opportunity priorities...")
opportunities = []
for keyword, difficulty in results['keyword_difficulty'].items():
opportunity_score = difficulty.get('opportunity_score', 0)
competition_score = difficulty.get('competition_score', 0)
data_availability = difficulty.get('data_availability', 0)
# Calculate combined opportunity score
combined_score = (opportunity_score * 0.4 +
(1 - competition_score) * 0.4 +
data_availability * 0.2)
opportunities.append({
'keyword': keyword,
'opportunity_score': opportunity_score,
'competition_score': competition_score,
'combined_score': combined_score,
'data_points': difficulty.get('total_mentions', 0),
'sentiment': difficulty.get('avg_sentiment', 0)
})
# Sort by combined score
opportunities.sort(key=lambda x: x['combined_score'], reverse=True)
results['prioritized_opportunities'] = opportunities[:20] # Top 20
# Step 6: Save results
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"keyword_analysis_report_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Analysis complete. Results saved to {filename}")
return results
# Example usage and testing
if __name__ == "__main__":
import sys
import os
from datetime import datetime
# Add the main script directory to path to import the main classes
sys.path.append(os.path.dirname(__file__))
async def test_keyword_expansion():
# Test with sample keywords
test_keywords = [
"productivity app",
"sustainable fashion",
"remote collaboration"
]
# Use the database from the main trend scraper
db_path = "market_trends.db"
# Run comprehensive analysis
results = await comprehensive_keyword_analysis(test_keywords, db_path)
# Print summary
print("\n=== KEYWORD EXPANSION RESULTS ===")
print(f"Base keywords: {len(test_keywords)}")
print(f"Total expanded keywords: {sum(len(v) for v in results['expanded_keywords'].values())}")
print(f"Market segments identified: {len(results['market_segments'])}")
print(f"Top opportunities: {len(results['prioritized_opportunities'])}")
print("\n=== TOP 5 OPPORTUNITIES ===")
for i, opp in enumerate(results['prioritized_opportunities'][:5], 1):
print(f"{i}. {opp['keyword']}")
print(f" Combined Score: {opp['combined_score']:.3f}")
print(f" Opportunity: {opp['opportunity_score']:.3f}, Competition: {opp['competition_score']:.3f}")
print()
# Run the test
asyncio.run(test_keyword_expansion())
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import sqlite3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import networkx as nx
from wordcloud import WordCloud
import warnings
warnings.filterwarnings('ignore')
class TrendVisualizationDashboard:
"""Create visualizations and reports for trend analysis results"""
def __init__(self, db_path: str):
self.db_path = db_path
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def load_trend_data(self, keywords: Optional[List[str]] = None, days: int = 30) -> pd.DataFrame:
"""Load trend data from database"""
conn = sqlite3.connect(self.db_path)
base_query = """
SELECT keyword, source, volume, sentiment, timestamp, metadata
FROM trends
WHERE timestamp >= ?
"""
params = [(datetime.now() - timedelta(days=days)).isoformat()]
if keywords:
keyword_filter = "'" + "','".join(keywords) + "'"
base_query += f" AND keyword IN ({keyword_filter})"
base_query += " ORDER BY timestamp DESC"
df = pd.read_sql_query(base_query, conn, params=params)
conn.close()
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
df['hour'] = df['timestamp'].dt.hour
try:
df['metadata'] = df['metadata'].apply(json.loads)
except:
df['metadata'] = df['metadata'].apply(lambda x: {})
return df
def load_market_gaps(self) -> pd.DataFrame:
"""Load market gap analysis results"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT gap_description, evidence_score, demand_indicators,
supply_gaps, related_keywords, confidence_level, created_at
FROM market_gaps
ORDER BY evidence_score DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
if not df.empty:
df['created_at'] = pd.to_datetime(df['created_at'])
df['demand_indicators'] = df['demand_indicators'].apply(json.loads)
df['supply_gaps'] = df['supply_gaps'].apply(json.loads)
df['related_keywords'] = df['related_keywords'].apply(json.loads)
return df
def create_trend_timeline(self, keywords: List[str], save_path: str = None) -> go.Figure:
"""Create interactive timeline of trend data"""
df = self.load_trend_data(keywords)
if df.empty:
return go.Figure().add_annotation(text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False)
fig = make_subplots(
rows=2, cols=1,
subplot_titles=('Volume Trends', 'Sentiment Trends'),
vertical_spacing=0.1
)
# Volume trends by keyword
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if not keyword_data.empty:
daily_volume = keyword_data.groupby('date')['volume'].sum().reset_index()
fig.add_trace(
go.Scatter(
x=daily_volume['date'],
y=daily_volume['volume'],
mode='lines+markers',
name=f'{keyword} Volume',
line=dict(width=3)
),
row=1, col=1
)
# Sentiment trends by keyword
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if not keyword_data.empty:
daily_sentiment = keyword_data.groupby('date')['sentiment'].mean().reset_index()
fig.add_trace(
go.Scatter(
x=daily_sentiment['date'],
y=daily_sentiment['sentiment'],
mode='lines+markers',
name=f'{keyword} Sentiment',
line=dict(width=3, dash='dash')
),
row=2, col=1
)
fig.update_layout(
title='Trend Analysis Timeline',
height=600,
showlegend=True,
template='plotly_white'
)
fig.update_xaxes(title_text="Date", row=2, col=1)
fig.update_yaxes(title_text="Volume", row=1, col=1)
fig.update_yaxes(title_text="Sentiment", row=2, col=1)
if save_path:
fig.write_html(save_path)
return fig
def create_source_distribution(self, keywords: List[str], save_path: str = None) -> go.Figure:
"""Create pie chart of data sources distribution"""
df = self.load_trend_data(keywords)
if df.empty:
return go.Figure().add_annotation(text="No data available")
source_counts = df['source'].value_counts()
fig = go.Figure(data=[
go.Pie(
labels=source_counts.index,
values=source_counts.values,
hole=.3,
textinfo='label+percent',
textposition='outside'
)
])
fig.update_layout(
title='Data Sources Distribution',
template='plotly_white',
annotations=[dict(text='Sources', x=0.5, y=0.5, font_size=20, showarrow=False)]
)
if save_path:
fig.write_html(save_path)
return fig
def create_sentiment_heatmap(self, keywords: List[str], save_path: str = None):
"""Create sentiment heatmap across sources and keywords"""
df = self.load_trend_data(keywords)
if df.empty:
print("No data available for heatmap")
return
# Create pivot table for heatmap
sentiment_pivot = df.groupby(['keyword', 'source'])['sentiment'].mean().unstack(fill_value=0)
plt.figure(figsize=(12, 8))
sns.heatmap(sentiment_pivot, annot=True, cmap='RdYlGn', center=0,
fmt='.2f', cbar_kws={'label': 'Average Sentiment'})
plt.title('Sentiment Heatmap: Keywords vs Sources')
plt.xlabel('Data Source')
plt.ylabel('Keyword')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def create_market_gap_visualization(self, save_path: str = None) -> go.Figure:
"""Visualize market gaps and opportunities"""
df = self.load_market_gaps()
if df.empty:
return go.Figure().add_annotation(text="No market gaps data available")
# Create bubble chart
fig = go.Figure()
# Extract confidence levels and evidence scores
x_vals = df['confidence_level']
y_vals = df['evidence_score']
sizes = [len(gaps) * 10 + 20 for gaps in df['supply_gaps']] # Size based on number of supply gaps
texts = [desc[:50] + "..." if len(desc) > 50 else desc for desc in df['gap_description']]
fig.add_trace(go.Scatter(
x=x_vals,
y=y_vals,
mode='markers',
marker=dict(
size=sizes,
color=y_vals,
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Evidence Score"),
line=dict(width=2, color='white')
),
text=texts,
textposition="middle center",
hovertemplate='<b>%{text}</b><br>' +
'Confidence: %{x:.2f}<br>' +
'Evidence Score: %{y:.2f}<br>' +
'<extra></extra>'
))
fig.update_layout(
title='Market Gap Opportunities',
xaxis_title='Confidence Level',
yaxis_title='Evidence Score',
template='plotly_white',
height=600
)
if save_path:
fig.write_html(save_path)
return fig
def create_keyword_network(self, keywords: List[str], save_path: str = None):
"""Create network graph of keyword relationships"""
df = self.load_trend_data(keywords)
if df.empty:
print("No data available for network")
return
# Create network graph
G = nx.Graph()
# Add nodes for each keyword
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
volume = keyword_data['volume'].sum() if not keyword_data.empty else 0
sentiment = keyword_data['sentiment'].mean() if not keyword_data.empty else 0
G.add_node(keyword, volume=volume, sentiment=sentiment)
# Add edges based on co-occurrence in time windows
for i, keyword1 in enumerate(keywords):
for keyword2 in keywords[i+1:]:
# Find temporal co-occurrence
data1 = df[df['keyword'] == keyword1]
data2 = df[df['keyword'] == keyword2]
if not data1.empty and not data2.empty:
# Simple co-occurrence based on overlapping time periods
dates1 = set(data1['date'])
dates2 = set(data2['date'])
overlap = len(dates1.intersection(dates2))
if overlap > 0:
weight = overlap / max(len(dates1), len(dates2))
G.add_edge(keyword1, keyword2, weight=weight)
# Create visualization
plt.figure(figsize=(14, 10))
pos = nx.spring_layout(G, k=3, iterations=50)
# Node sizes based on volume
node_sizes = [G.nodes[node]['volume'] * 0.1 + 300 for node in G.nodes()]
# Node colors based on sentiment
node_colors = [G.nodes[node]['sentiment'] for node in G.nodes()]
# Draw network
nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors,
cmap='RdYlGn', alpha=0.8)
nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')
# Draw edges with varying thickness based on weight
edges = G.edges()
weights = [G[u][v]['weight'] * 5 for u, v in edges]
nx.draw_networkx_edges(G, pos, width=weights, alpha=0.6, edge_color='gray')
plt.title('Keyword Relationship Network')
plt.axis('off')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def create_wordcloud(self, keywords: List[str], save_path: str = None):
"""Create word cloud from trend metadata"""
df = self.load_trend_data(keywords)
if df.empty:
print("No data available for word cloud")
return
# Extract text from metadata
text_data = []
for metadata in df['metadata']:
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, str) and len(value) > 3:
text_data.append(value)
if not text_data:
print("No text data found in metadata")
return
# Combine all text
combined_text = ' '.join(text_data)
# Create word cloud
wordcloud = WordCloud(
width=800, height=400,
background_color='white',
max_words=100,
colormap='viridis',
relative_scaling=0.5
).generate(combined_text)
plt.figure(figsize=(12, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('Trending Topics Word Cloud')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def generate_comprehensive_report(self, keywords: List[str], output_dir: str = "reports"):
"""Generate comprehensive analysis report with all visualizations"""
import os
# Create output directory
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
print(f"Generating comprehensive report for keywords: {keywords}")
# Load data for summary statistics
df = self.load_trend_data(keywords)
gaps_df = self.load_market_gaps()
# Generate all visualizations
print("Creating trend timeline...")
timeline_fig = self.create_trend_timeline(keywords,
f"{output_dir}/timeline_{timestamp}.html")
print("Creating source distribution...")
source_fig = self.create_source_distribution(keywords,
f"{output_dir}/sources_{timestamp}.html")
print("Creating sentiment heatmap...")
self.create_sentiment_heatmap(keywords,
f"{output_dir}/sentiment_heatmap_{timestamp}.png")
print("Creating market gap visualization...")
gap_fig = self.create_market_gap_visualization(f"{output_dir}/market_gaps_{timestamp}.html")
print("Creating keyword network...")
self.create_keyword_network(keywords,
f"{output_dir}/keyword_network_{timestamp}.png")
print("Creating word cloud...")
self.create_wordcloud(keywords,
f"{output_dir}/wordcloud_{timestamp}.png")
# Generate summary statistics
summary_stats = self.generate_summary_statistics(df, gaps_df, keywords)
# Create HTML report
html_report = self.create_html_report(summary_stats, keywords, timestamp)
report_path = f"{output_dir}/comprehensive_report_{timestamp}.html"
with open(report_path, 'w') as f:
f.write(html_report)
print(f"\nComprehensive report generated successfully!")
print(f"Main report: {report_path}")
print(f"All files saved in: {output_dir}/")
return report_path
def generate_summary_statistics(self, df: pd.DataFrame, gaps_df: pd.DataFrame,
keywords: List[str]) -> Dict:
"""Generate summary statistics for the report"""
if df.empty:
return {
'total_data_points': 0,
'date_range': 'No data',
'keywords_analyzed': len(keywords),
'sources_used': 0,
'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0
}
stats = {
'total_data_points': len(df),
'date_range': f"{df['timestamp'].min().strftime('%Y-%m-%d')} to {df['timestamp'].max().strftime('%Y-%m-%d')}",
'keywords_analyzed': len(keywords),
'sources_used': df['source'].nunique(),
'avg_sentiment': df['sentiment'].mean(),
'sentiment_std': df['sentiment'].std(),
'total_volume': df['volume'].sum(),
'avg_volume': df['volume'].mean(),
'market_gaps_found': len(gaps_df) if not gaps_df.empty else 0,
'top_volume_keyword': df.groupby('keyword')['volume'].sum().idxmax() if len(df) > 0 else 'N/A',
'most_positive_keyword': df.groupby('keyword')['sentiment'].mean().idxmax() if len(df) > 0 else 'N/A',
'source_breakdown': df['source'].value_counts().to_dict()
}
return stats
def create_html_report(self, stats: Dict, keywords: List[str], timestamp: str) -> str:
"""Create HTML report with embedded visualizations"""
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<title>Market Trend Analysis Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: white; padding: 30px; border-radius: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }}
h1 {{ color: #2c3e50; text-align: center; border-bottom: 3px solid #3498db; padding-bottom: 10px; }}
h2 {{ color: #34495e; margin-top: 30px; }}
.stats-grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; margin: 20px 0; }}
.stat-card {{ background-color: #ecf0f1; padding: 20px; border-radius: 8px; text-align: center; }}
.stat-number {{ font-size: 2em; font-weight: bold; color: #3498db; }}
.stat-label {{ color: #7f8c8d; margin-top: 5px; }}
.keyword-list {{ background-color: #f8f9fa; padding: 15px; border-radius: 5px; }}
.visualization {{ margin: 20px 0; text-align: center; }}
.chart-link {{ display: inline-block; padding: 10px 20px; background-color: #3498db; color: white; text-decoration: none; border-radius: 5px; margin: 5px; }}
.chart-link:hover {{ background-color: #2980b9; }}
.summary {{ background-color: #e8f6f3; padding: 20px; border-radius: 8px; margin: 20px 0; }}
table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }}
th, td {{ padding: 12px; text-align: left; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; font-weight: bold; }}
.insights {{ background-color: #fff3cd; padding: 15px; border-left: 5px solid #ffc107; margin: 15px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>Market Trend Analysis Report</h1>
<p style="text-align: center; color: #7f8c8d;">Generated on {timestamp}</p>
<div class="summary">
<h2>Executive Summary</h2>
<p>This report analyzes market trends and identifies opportunities for the following keywords:
<strong>{', '.join(keywords)}</strong>. The analysis covers data from multiple sources including
social media, news, GitHub repositories, and search trends to provide comprehensive market insights.</p>
</div>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-number">{stats.get('total_data_points', 0):,}</div>
<div class="stat-label">Total Data Points</div>
</div>
<div class="stat-card">
<div class="stat-number">{stats.get('keywords_analyzed', 0)}</div>
<div class="stat-label">Keywords Analyzed</div>
</div>
<div class="stat-card">
<div class="stat-number">{stats.get('sources_used', 0)}</div>
<div class="stat-label">Data Sources</div>
</div>
<div class="stat-card">
<div class="stat-number">{stats.get('market_gaps_found', 0)}</div>
<div class="stat-label">Market Gaps Found</div>
</div>
</div>
<h2>Key Metrics</h2>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Analysis Period</td><td>{stats.get('date_range', 'N/A')}</td></tr>
<tr><td>Average Sentiment</td><td>{stats.get('avg_sentiment', 0):.3f}</td></tr>
<tr><td>Sentiment Volatility</td><td>{stats.get('sentiment_std', 0):.3f}</td></tr>
<tr><td>Total Volume</td><td>{stats.get('total_volume', 0):,}</td></tr>
<tr><td>Average Volume</td><td>{stats.get('avg_volume', 0):,.1f}</td></tr>
<tr><td>Top Volume Keyword</td><td>{stats.get('top_volume_keyword', 'N/A')}</td></tr>
<tr><td>Most Positive Keyword</td><td>{stats.get('most_positive_keyword', 'N/A')}</td></tr>
</table>
<h2>Data Source Breakdown</h2>
<table>
<tr><th>Source</th><th>Data Points</th><th>Percentage</th></tr>
{''.join([f"<tr><td>{source}</td><td>{count:,}</td><td>{count/stats.get('total_data_points', 1)*100:.1f}%</td></tr>"
for source, count in stats.get('source_breakdown', {}).items()])}
</table>
<div class="insights">
<h3>Key Insights</h3>
<ul>
<li><strong>Market Activity:</strong> Analyzed {stats.get('total_data_points', 0):,} data points across {stats.get('sources_used', 0)} different sources</li>
<li><strong>Sentiment Analysis:</strong> Overall market sentiment is {'positive' if stats.get('avg_sentiment', 0) > 0.1 else 'negative' if stats.get('avg_sentiment', 0) < -0.1 else 'neutral'} ({stats.get('avg_sentiment', 0):.3f})</li>
<li><strong>Top Performer:</strong> "{stats.get('top_volume_keyword', 'N/A')}" shows the highest engagement volume</li>
<li><strong>Opportunities:</strong> {stats.get('market_gaps_found', 0)} potential market gaps identified for further investigation</li>
</ul>
</div>
<h2>Interactive Visualizations</h2>
<div class="visualization">
<p>Click the links below to view interactive charts and detailed visualizations:</p>
<a href="timeline_{timestamp}.html" class="chart-link">📈 Trend Timeline</a>
<a href="sources_{timestamp}.html" class="chart-link">📊 Source Distribution</a>
<a href="market_gaps_{timestamp}.html" class="chart-link">🎯 Market Opportunities</a>
</div>
<h2>Static Visualizations</h2>
<div class="visualization">
<h3>Sentiment Heatmap</h3>
<img src="sentiment_heatmap_{timestamp}.png" alt="Sentiment Heatmap" style="max-width: 100%; height: auto;">
<h3>Keyword Relationship Network</h3>
<img src="keyword_network_{timestamp}.png" alt="Keyword Network" style="max-width: 100%; height: auto;">
<h3>Trending Topics Word Cloud</h3>
<img src="wordcloud_{timestamp}.png" alt="Word Cloud" style="max-width: 100%; height: auto;">
</div>
<div class="insights">
<h3>Recommendations</h3>
<ul>
<li><strong>Priority Keywords:</strong> Focus on keywords with high volume but low competition scores</li>
<li><strong>Sentiment Opportunities:</strong> Address negative sentiment areas as potential market gaps</li>
<li><strong>Cross-Platform Strategy:</strong> Leverage insights from multiple data sources for comprehensive market approach</li>
<li><strong>Trend Monitoring:</strong> Continue monitoring identified opportunities for market entry timing</li>
</ul>
</div>
<footer style="margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd; text-align: center; color: #7f8c8d;">
<p>Report generated by Market Trend Analysis System | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</footer>
</div>
</body>
</html>
"""
return html_template
def create_dashboard_app(db_path: str):
"""Create a simple Flask web dashboard (requires Flask installation)"""
dashboard_code = f'''
import flask
from flask import Flask, render_template_string, request, jsonify
import json
from datetime import datetime
app = Flask(__name__)
viz_dashboard = TrendVisualizationDashboard("{db_path}")
@app.route('/')
def home():
return render_template_string("""
<!DOCTYPE html>
<html>
<head>
<title>Market Trend Analysis Dashboard</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background-color: #f5f5f5; }}
.container {{ max-width: 1200px; margin: 0 auto; }}
.header {{ background-color: #3498db; color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
.form-section {{ background-color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
.results {{ background-color: white; padding: 20px; border-radius: 10px; }}
input, button {{ padding: 10px; margin: 5px; }}
button {{ background-color: #3498db; color: white; border: none; cursor: pointer; border-radius: 5px; }}
button:hover {{ background-color: #2980b9; }}
.keyword-input {{ width: 300px; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>Market Trend Analysis Dashboard</h1>
<p>Enter keywords to analyze market trends and identify opportunities</p>
</div>
<div class="form-section">
<h2>Analyze Keywords</h2>
<form method="POST" action="/analyze">
<input type="text" name="keywords" class="keyword-input"
placeholder="Enter keywords separated by commas" required>
<button type="submit">Analyze Trends</button>
</form>
</div>
<div class="form-section">
<h2>Generate Report</h2>
<form method="POST" action="/report">
<input type="text" name="keywords" class="keyword-input"
placeholder="Enter keywords for comprehensive report" required>
<button type="submit">Generate Report</button>
</form>
</div>
<div class="results" id="results" style="display: none;">
<!-- Results will be displayed here -->
</div>
</div>
<script>
function showResults(data) {{
const resultsDiv = document.getElementById('results');
resultsDiv.innerHTML = '<h2>Analysis Results</h2><pre>' + JSON.stringify(data, null, 2) + '</pre>';
resultsDiv.style.display = 'block';
}}
</script>
</body>
</html>
""")
@app.route('/analyze', methods=['POST'])
def analyze():
keywords = [kw.strip() for kw in request.form['keywords'].split(',')]
try:
# Quick analysis
df = viz_dashboard.load_trend_data(keywords)
if not df.empty:
summary = {{
'keywords': keywords,
'total_data_points': len(df),
'sources': df['source'].unique().tolist(),
'avg_sentiment': df['sentiment'].mean(),
'date_range': f"{{df['timestamp'].min()}} to {{df['timestamp'].max()}}"
}}
else:
summary = {{'message': 'No data found for specified keywords'}}
return jsonify(summary)
except Exception as e:
return jsonify({{'error': str(e)}})
@app.route('/report', methods=['POST'])
def generate_report():
keywords = [kw.strip() for kw in request.form['keywords'].split(',')]
try:
report_path = viz_dashboard.generate_comprehensive_report(keywords)
return jsonify({{'message': f'Report generated successfully: {{report_path}}'}})
except Exception as e:
return jsonify({{'error': str(e)}})
if __name__ == '__main__':
print("Starting Market Trend Analysis Dashboard...")
print("Visit http://localhost:5000 to access the dashboard")
app.run(debug=True, host='0.0.0.0', port=5000)
'''
return dashboard_code
# Example usage and testing functions
def test_visualizations():
"""Test all visualization functions"""
# Initialize dashboard
dashboard = TrendVisualizationDashboard("market_trends.db")
# Test keywords
test_keywords = ["sustainable packaging", "remote work tools", "AI productivity"]
print("Testing visualizations...")
try:
# Test timeline
print("1. Creating timeline visualization...")
timeline_fig = dashboard.create_trend_timeline(test_keywords)
timeline_fig.show()
# Test source distribution
print("2. Creating source distribution...")
source_fig = dashboard.create_source_distribution(test_keywords)
source_fig.show()
# Test sentiment heatmap
print("3. Creating sentiment heatmap...")
dashboard.create_sentiment_heatmap(test_keywords)
# Test market gaps
print("4. Creating market gap visualization...")
gap_fig = dashboard.create_market_gap_visualization()
gap_fig.show()
# Test network
print("5. Creating keyword network...")
dashboard.create_keyword_network(test_keywords)
# Test word cloud
print("6. Creating word cloud...")
dashboard.create_wordcloud(test_keywords)
print("All visualizations tested successfully!")
except Exception as e:
print(f"Error in visualization testing: {e}")
def export_dashboard_code(filename: str = "dashboard_app.py"):
"""Export the dashboard application code to a file"""
dashboard_code = create_dashboard_app("market_trends.db")
with open(filename, 'w') as f:
f.write("# Market Trend Analysis Dashboard\n")
f.write("# Run this file to start the web dashboard\n\n")
f.write("from visualization_reporting import TrendVisualizationDashboard\n")
f.write(dashboard_code)
print(f"Dashboard application code exported to {filename}")
print("Install Flask with: pip install flask")
print(f"Run the dashboard with: python {filename}")
if __name__ == "__main__":
# Run tests
test_visualizations()
# Export dashboard
export_dashboard_code()
#!/usr/bin/env python3
"""
Main Execution Script for Market Gap Analysis System
This script orchestrates the complete pipeline:
1. Keyword expansion and market segmentation
2. Multi-source trend data scraping
3. Market gap analysis and correlation
4. Visualization and reporting
Usage:
python main_execution.py --keywords "keyword1,keyword2,keyword3" --days 30 --output results/
python main_execution.py --config config.json
python main_execution.py --interactive
"""
import asyncio
import argparse
import json
import os
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
# Import our custom modules
from market_gap_backend import main_analysis_pipeline, TrendScraper, MarketGapAnalyzer
from keyword_expansion import comprehensive_keyword_analysis, KeywordExpansionEngine
from visualization_reporting import TrendVisualizationDashboard
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('market_analysis.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class MarketAnalysisOrchestrator:
"""Main orchestrator class for the complete market analysis pipeline"""
def __init__(self, config: Dict = None):
self.config = config or self.get_default_config()
self.db_path = self.config.get('db_path', 'market_trends.db')
self.output_dir = self.config.get('output_dir', 'results')
# Initialize components
self.scraper = TrendScraper()
self.analyzer = MarketGapAnalyzer(self.db_path)
self.keyword_engine = KeywordExpansionEngine(self.db_path)
self.viz_dashboard = TrendVisualizationDashboard(self.db_path)
# Create output directory
os.makedirs(self.output_dir, exist_ok=True)
@staticmethod
def get_default_config() -> Dict:
"""Get default configuration"""
return {
'db_path': 'market_trends.db',
'output_dir': 'results',
'analysis_days': 30,
'max_keywords': 50,
'enable_keyword_expansion': True,
'enable_visualizations': True,
'generate_report': True,
'api_keys': {
'newsapi': 'YOUR_NEWSAPI_KEY',
'twitter_bearer': 'YOUR_TWITTER_BEARER_TOKEN'
},
'scraping_sources': {
'reddit': True,
'github': True,
'google_trends': True,
'news': False, # Requires API key
'twitter': False # Requires API key
}
}
def validate_config(self) -> bool:
"""Validate configuration settings"""
required_fields = ['db_path', 'output_dir', 'analysis_days']
for field in required_fields:
if field not in self.config:
logger.error(f"Missing required config field: {field}")
return False
if self.config['analysis_days'] < 1 or self.config['analysis_days'] > 365:
logger.error("analysis_days must be between 1 and 365")
return False
return True
async def expand_keywords(self, base_keywords: List[str]) -> List[str]:
"""Expand base keywords using various techniques"""
if not self.config.get('enable_keyword_expansion', True):
return base_keywords
logger.info(f"Expanding {len(base_keywords)} base keywords...")
try:
# Run comprehensive keyword analysis
results = await comprehensive_keyword_analysis(base_keywords, self.db_path)
# Extract all expanded keywords
all_keywords = set(base_keywords)
for keyword, expansions in results['expanded_keywords'].items():
all_keywords.update(expansions)
# Apply keyword filtering and ranking
filtered_keywords = self.filter_and_rank_keywords(
list(all_keywords),
results.get('keyword_difficulty', {})
)
max_keywords = self.config.get('max_keywords', 50)
final_keywords = filtered_keywords[:max_keywords]
logger.info(f"Expanded to {len(final_keywords)} keywords after filtering")
# Save keyword expansion results
expansion_file = os.path.join(self.output_dir, 'keyword_expansion_results.json')
with open(expansion_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
return final_keywords
except Exception as e:
logger.error(f"Error in keyword expansion: {e}")
return base_keywords
def filter_and_rank_keywords(self, keywords: List[str], difficulty_scores: Dict) -> List[str]:
"""Filter and rank keywords based on opportunity scores"""
scored_keywords = []
for keyword in keywords:
scores = difficulty_scores.get(keyword, {})
opportunity = scores.get('opportunity_score', 0.5)
competition = scores.get('competition_score', 0.5)
data_availability = scores.get('data_availability', 0.1)
# Calculate combined score
combined_score = (opportunity * 0.4 +
(1 - competition) * 0.4 +
data_availability * 0.2)
scored_keywords.append((keyword, combined_score))
# Sort by score and return keywords
scored_keywords.sort(key=lambda x: x[1], reverse=True)
return [kw for kw, score in scored_keywords]
async def run_trend_analysis(self, keywords: List[str]) -> Dict:
"""Run the main trend analysis pipeline"""
logger.info(f"Running trend analysis for {len(keywords)} keywords...")
try:
# Run the main analysis pipeline
results = await main_analysis_pipeline(keywords)
# Save results
results_file = os.path.join(self.output_dir, 'trend_analysis_results.json')
with open(results_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
logger.info("Trend analysis completed successfully")
return results
except Exception as e:
logger.error(f"Error in trend analysis: {e}")
return {}
def generate_visualizations(self, keywords: List[str]) -> str:
"""Generate all visualizations and reports"""
if not self.config.get('enable_visualizations', True):
logger.info("Visualizations disabled in config")
return ""
logger.info("Generating visualizations and reports...")
try:
# Generate comprehensive report with all visualizations
report_path = self.viz_dashboard.generate_comprehensive_report(
keywords, self.output_dir
)
logger.info(f"Visualizations and report generated: {report_path}")
return report_path
except Exception as e:
logger.error(f"Error generating visualizations: {e}")
return ""
def calculate_roi_estimates(self, market_gaps: List, keywords: List[str]) -> Dict:
"""Calculate rough ROI estimates for identified opportunities"""
roi_estimates = {}
for gap in market_gaps:
# Simple ROI calculation based on evidence score and market indicators
evidence_score = gap.get('evidence_score', 0)
confidence = gap.get('confidence_level', 0)
# Estimate market size based on trend data
estimated_market_size = self.estimate_market_size(gap.get('related_keywords', []))
# Calculate potential ROI (simplified)
potential_roi = {
'opportunity_score': evidence_score,
'confidence_level': confidence,
'estimated_market_size': estimated_market_size,
'risk_level': 'Low' if confidence > 0.7 else 'Medium' if confidence > 0.4 else 'High',
'time_to_market': self.estimate_time_to_market(gap),
'investment_category': self.categorize_investment_level(gap)
}
roi_estimates[gap.get('gap_description', 'Unknown')] = potential_roi
return roi_estimates
def estimate_market_size(self, related_keywords: List[str]) -> str:
"""Estimate market size category based on keyword data"""
# Simple heuristic based on number of related keywords and their volume
if len(related_keywords) > 10:
return "Large"
elif len(related_keywords) > 5:
return "Medium"
else:
return "Small"
def estimate_time_to_market(self, gap: Dict) -> str:
"""Estimate time to market based on gap characteristics"""
supply_gaps = gap.get('supply_gaps', [])
if any('dissatisfaction' in sg.lower() for sg in supply_gaps):
return "3-6 months" # Improving existing solutions
elif any('limited' in sg.lower() for sg in supply_gaps):
return "6-12 months" # Creating new solutions
else:
return "12+ months" # New market creation
def categorize_investment_level(self, gap: Dict) -> str:
"""Categorize required investment level"""
evidence_score = gap.get('evidence_score', 0)
if evidence_score > 0.7:
return "High - Strong market validation"
elif evidence_score > 0.4:
return "Medium - Moderate validation needed"
else:
return "Low - High validation required"
def generate_executive_summary(self, results: Dict, keywords: List[str],
roi_estimates: Dict) -> str:
"""Generate executive summary of findings"""
summary = f"""
EXECUTIVE SUMMARY - Market Gap Analysis
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
ANALYSIS SCOPE:
- Keywords Analyzed: {len(keywords)}
- Data Points Collected: {results.get('total_data_points', 0):,}
- Market Gaps Identified: {results.get('market_gaps_identified', 0)}
- Analysis Period: {self.config.get('analysis_days', 30)} days
TOP OPPORTUNITIES:
"""
top_opportunities = results.get('top_opportunities', [])[:5]
for i, opp in enumerate(top_opportunities, 1):
summary += f"""
{i}. {opp.get('description', 'Unknown')}
Evidence Score: {opp.get('evidence_score', 0):.3f}/1.0
Confidence: {opp.get('confidence', 0):.3f}/1.0
Key Indicators: {', '.join(opp.get('key_indicators', [])[:2])}
"""
summary += f"""
KEY INSIGHTS:
- {len([k for k in keywords if any('app' in k.lower() for k in [k])])} app-related opportunities identified
- Average market sentiment: {'Positive' if results.get('avg_sentiment', 0) > 0 else 'Negative'}
- Cross-platform correlation strength: {'High' if len(results.get('cross_platform_correlations', {})) > 0 else 'Low'}
INVESTMENT RECOMMENDATIONS:
"""
high_roi = [k for k, v in roi_estimates.items() if v.get('opportunity_score', 0) > 0.6]
summary += f"- {len(high_roi)} high-opportunity investments identified\n"
summary += f"- Risk distribution: {len([v for v in roi_estimates.values() if v.get('risk_level') == 'Low'])} low-risk, "
summary += f"{len([v for v in roi_estimates.values() if v.get('risk_level') == 'Medium'])} medium-risk opportunities\n"
return summary
async def run_complete_analysis(self, base_keywords: List[str]) -> Dict:
"""Run the complete analysis pipeline"""
start_time = datetime.now()
logger.info(f"Starting complete market analysis pipeline at {start_time}")
try:
# Step 1: Expand keywords
expanded_keywords = await self.expand_keywords(base_keywords)
# Step 2: Run trend analysis
trend_results = await self.run_trend_analysis(expanded_keywords)
# Step 3: Calculate ROI estimates
market_gaps = trend_results.get('top_opportunities', [])
roi_estimates = self.calculate_roi_estimates(market_gaps, expanded_keywords)
# Step 4: Generate visualizations
report_path = self.generate_visualizations(expanded_keywords)
# Step 5: Generate executive summary
executive_summary = self.generate_executive_summary(
trend_results, expanded_keywords, roi_estimates
)
# Compile final results
final_results = {
'analysis_metadata': {
'start_time': start_time.isoformat(),
'end_time': datetime.now().isoformat(),
'base_keywords': base_keywords,
'expanded_keywords': expanded_keywords,
'total_keywords_analyzed': len(expanded_keywords),
'config_used': self.config
},
'trend_analysis': trend_results,
'roi_estimates': roi_estimates,
'executive_summary': executive_summary,
'report_path': report_path,
'output_directory': self.output_dir
}
# Save final results
final_results_path = os.path.join(self.output_dir, 'final_analysis_results.json')
with open(final_results_path, 'w') as f:
json.dump(final_results, f, indent=2, default=str)
# Save executive summary as text file
summary_path = os.path.join(self.output_dir, 'executive_summary.txt')
with open(summary_path, 'w') as f:
f.write(executive_summary)
end_time = datetime.now()
duration = end_time - start_time
logger.info(f"Complete analysis finished in {duration}")
logger.info(f"Results saved to: {self.output_dir}")
logger.info(f"Main report: {report_path}")
return final_results
except Exception as e:
logger.error(f"Error in complete analysis: {e}")
raise
def load_config_file(config_path: str) -> Dict:
"""Load configuration from JSON file"""
try:
with open(config_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading config file {config_path}: {e}")
return {}
def interactive_mode():
"""Run analysis in interactive mode"""
print("\n" + "="*60)
print("MARKET GAP ANALYSIS SYSTEM - INTERACTIVE MODE")
print("="*60)
# Get keywords from user
print("\nStep 1: Enter Keywords to Analyze")
keywords_input = input("Enter keywords separated by commas: ").strip()
if not keywords_input:
print("No keywords provided. Exiting.")
return
base_keywords = [kw.strip() for kw in keywords_input.split(',')]
print(f"Keywords to analyze: {base_keywords}")
# Get analysis period
print("\nStep 2: Analysis Period")
try:
days = int(input("Enter number of days to analyze (default: 30): ") or "30")
except ValueError:
days = 30
# Get output directory
print("\nStep 3: Output Configuration")
output_dir = input("Enter output directory (default: results/): ").strip() or "results/"
# Configure options
print("\nStep 4: Analysis Options")
enable_expansion = input("Enable keyword expansion? (y/n, default: y): ").lower().startswith('y')
enable_viz = input("Generate visualizations? (y/n, default: y): ").lower().startswith('y')
# Create configuration
config = {
'db_path': 'market_trends.db',
'output_dir': output_dir,
'analysis_days': days,
'max_keywords': 50,
'enable_keyword_expansion': enable_expansion,
'enable_visualizations': enable_viz,
'generate_report': True,
'scraping_sources': {
'reddit': True,
'github': True,
'google_trends': True,
'news': False,
'twitter': False
}
}
print(f"\nConfiguration:")
print(f"- Keywords: {base_keywords}")
print(f"- Analysis period: {days} days")
print(f"- Output directory: {output_dir}")
print(f"- Keyword expansion: {enable_expansion}")
print(f"- Visualizations: {enable_viz}")
confirm = input("\nProceed with analysis? (y/n): ").lower().startswith('y')
if not confirm:
print("Analysis cancelled.")
return
# Run analysis
orchestrator = MarketAnalysisOrchestrator(config)
print("\nStarting analysis...")
try:
results = asyncio.run(orchestrator.run_complete_analysis(base_keywords))
print("\n" + "="*60)
print("ANALYSIS COMPLETE!")
print("="*60)
print(f"Results saved to: {results['output_directory']}")
print(f"Executive summary saved to: {os.path.join(results['output_directory'], 'executive_summary.txt')}")
if results.get('report_path'):
print(f"Full report: {results['report_path']}")
# Display executive summary
print("\nEXECUTIVE SUMMARY:")
print("-" * 40)
print(results['executive_summary'])
except Exception as e:
print(f"Error during analysis: {e}")
logger.error(f"Interactive mode error: {e}")
def create_sample_config():
"""Create a sample configuration file"""
sample_config = {
"db_path": "market_trends.db",
"output_dir": "results",
"analysis_days": 30,
"max_keywords": 50,
"enable_keyword_expansion": True,
"enable_visualizations": True,
"generate_report": True,
"api_keys": {
"newsapi": "YOUR_NEWSAPI_KEY_HERE",
"twitter_bearer": "YOUR_TWITTER_BEARER_TOKEN_HERE"
},
"scraping_sources": {
"reddit": True,
"github": True,
"google_trends": True,
"news": False,
"twitter": False
},
"keyword_filters": {
"min_length": 3,
"max_length": 50,
"exclude_patterns": ["spam", "fake", "scam"],
"include_only": []
},
"analysis_settings": {
"sentiment_threshold": 0.1,
"volume_threshold": 10,
"correlation_threshold": 0.3
}
}
config_path = "config_sample.json"
with open(config_path, 'w') as f:
json.dump(sample_config, f, indent=2)
print(f"Sample configuration created: {config_path}")
print("Edit this file with your API keys and preferences, then run:")
print(f"python main_execution.py --config {config_path}")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Market Gap Analysis System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main_execution.py --keywords "AI tools,productivity apps" --days 30
python main_execution.py --config config.json
python main_execution.py --interactive
python main_execution.py --create-config
"""
)
parser.add_argument('--keywords', type=str,
help='Comma-separated list of keywords to analyze')
parser.add_argument('--days', type=int, default=30,
help='Number of days to analyze (default: 30)')
parser.add_argument('--output', type=str, default='results',
help='Output directory (default: results)')
parser.add_argument('--config', type=str,
help='Path to configuration JSON file')
parser.add_argument('--interactive', action='store_true',
help='Run in interactive mode')
parser.add_argument('--create-config', action='store_true',
help='Create sample configuration file')
parser.add_argument('--max-keywords', type=int, default=50,
help='Maximum keywords to analyze (default: 50)')
parser.add_argument('--no-expansion', action='store_true',
help='Disable keyword expansion')
parser.add_argument('--no-viz', action='store_true',
help='Disable visualizations')
parser.add_argument('--verbose', action='store_true',
help='Enable verbose logging')
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create sample config
if args.create_config:
create_sample_config()
return
# Interactive mode
if args.interactive:
interactive_mode()
return
# Load configuration
if args.config:
config = load_config_file(args.config)
if not config:
print(f"Failed to load configuration from {args.config}")
return
else:
# Create config from command line arguments
config = {
'db_path': 'market_trends.db',
'output_dir': args.output,
'analysis_days': args.days,
'max_keywords': args.max_keywords,
'enable_keyword_expansion': not args.no_expansion,
'enable_visualizations': not args.no_viz,
'generate_report': True,
'scraping_sources': {
'reddit': True,
'github': True,
'google_trends': True,
'news': False,
'twitter': False
}
}
# Get keywords
if args.keywords:
base_keywords = [kw.strip() for kw in args.keywords.split(',')]
elif config.get('base_keywords'):
base_keywords = config['base_keywords']
else:
print("No keywords provided. Use --keywords, --config, or --interactive mode.")
print("Run with --help for usage information.")
return
# Validate keywords
if not base_keywords:
print("No valid keywords found.")
return
print(f"Starting analysis with keywords: {base_keywords}")
print(f"Configuration: {json.dumps(config, indent=2)}")
# Run analysis
try:
orchestrator = MarketAnalysisOrchestrator(config)
if not orchestrator.validate_config():
print("Configuration validation failed. Check your settings.")
return
# Run the complete analysis
results = asyncio.run(orchestrator.run_complete_analysis(base_keywords))
print("\n" + "="*60)
print("ANALYSIS COMPLETED SUCCESSFULLY!")
print("="*60)
print(f"Results directory: {results['output_directory']}")
print(f"Analysis duration: {results['analysis_metadata']['end_time']}")
print(f"Keywords analyzed: {results['analysis_metadata']['total_keywords_analyzed']}")
if results.get('report_path'):
print(f"Main report: {results['report_path']}")
print("\nNext steps:")
print("1. Review the executive summary in executive_summary.txt")
print("2. Open the HTML reports for detailed visualizations")
print("3. Check the JSON files for raw data and analysis results")
except KeyboardInterrupt:
print("\nAnalysis interrupted by user.")
except Exception as e:
print(f"Analysis failed: {e}")
logger.error(f"Main execution error: {e}")
return 1
return 0
# Additional utility functions
def quick_analysis(keywords: List[str], days: int = 7) -> Dict:
"""Run a quick analysis for immediate insights"""
print(f"Running quick analysis for: {', '.join(keywords)}")
config = {
'db_path': 'market_trends.db',
'output_dir': f'quick_results_{datetime.now().strftime("%Y%m%d_%H%M%S")}',
'analysis_days': days,
'max_keywords': 20,
'enable_keyword_expansion': False,
'enable_visualizations': False,
'generate_report': False
}
orchestrator = MarketAnalysisOrchestrator(config)
try:
# Run just the trend analysis without expansions
results = asyncio.run(orchestrator.run_trend_analysis(keywords))
# Quick summary
summary = {
'keywords': keywords,
'opportunities_found': len(results.get('top_opportunities', [])),
'top_opportunity': results.get('top_opportunities', [{}])[0] if results.get('top_opportunities') else None,
'total_data_points': results.get('total_data_points', 0),
'analysis_timestamp': results.get('analysis_timestamp', datetime.now().isoformat())
}
return summary
except Exception as e:
return {'error': str(e), 'keywords': keywords}
def batch_analysis(keyword_batches: List[List[str]], output_base_dir: str = "batch_results") -> List[Dict]:
"""Run analysis on multiple keyword batches"""
results = []
for i, batch in enumerate(keyword_batches):
print(f"Processing batch {i+1}/{len(keyword_batches)}: {batch}")
config = {
'db_path': 'market_trends.db',
'output_dir': f'{output_base_dir}/batch_{i+1}',
'analysis_days': 30,
'max_keywords': 30,
'enable_keyword_expansion': True,
'enable_visualizations': True,
'generate_report': True
}
orchestrator = MarketAnalysisOrchestrator(config)
try:
batch_results = asyncio.run(orchestrator.run_complete_analysis(batch))
results.append(batch_results)
print(f"Batch {i+1} completed successfully")
except Exception as e:
print(f"Batch {i+1} failed: {e}")
results.append({'error': str(e), 'batch': batch})
return results
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
make it functional and beautiful.