we need a frontend web ui which ties together the backend consisting of our insight scraping algorithms:
import asyncio
import aiohttp
import json
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict
import sqlite3
from textblob import TextBlob
import re
from urllib.parse import quote_plus
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrendData:
"""Data structure for trend information"""
keyword: str
source: str
volume: int
sentiment: float
timestamp: datetime
metadata: Dict
@dataclass
class MarketGap:
"""Data structure for identified market gaps"""
gap_description: str
evidence_score: float
demand_indicators: List[str]
supply_gaps: List[str]
related_keywords: List[str]
confidence_level: float
class APIClient:
"""Generic API client with rate limiting and error handling"""
def __init__(self, base_url: str, rate_limit: float = 1.0):
self.base_url = base_url
self.rate_limit = rate_limit
self.last_request = 0
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def make_request(self, endpoint: str, params: Dict = None, headers: Dict = None):
"""Make rate-limited API request"""
# Rate limiting
now = time.time()
time_since_last = now - self.last_request
if time_since_last < self.rate_limit:
await asyncio.sleep(self.rate_limit - time_since_last)
try:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self.session.get(url, params=params, headers=headers) as response:
self.last_request = time.time()
if response.status == 200:
return await response.json()
else:
logger.warning(f"API request failed: {response.status}")
return None
except Exception as e:
logger.error(f"API request error: {e}")
return None
class TrendScraper:
"""Main class for scraping trend data from multiple sources"""
def __init__(self):
self.db_path = "market_trends.db"
self.initialize_database()
def initialize_database(self):
"""Initialize SQLite database for storing trend data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create trends table
cursor.execute('''
CREATE TABLE IF NOT EXISTS trends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
source TEXT,
volume INTEGER,
sentiment REAL,
timestamp TEXT,
metadata TEXT
)
''')
# Create market_gaps table
cursor.execute('''
CREATE TABLE IF NOT EXISTS market_gaps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gap_description TEXT,
evidence_score REAL,
demand_indicators TEXT,
supply_gaps TEXT,
related_keywords TEXT,
confidence_level REAL,
created_at TEXT
)
''')
conn.commit()
conn.close()
async def scrape_reddit_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape Reddit for trend data using pushshift API"""
trends = []
async with APIClient("https://api.pushshift.io/reddit/search", rate_limit=1.0) as client:
for keyword in keywords:
params = {
'q': keyword,
'subreddit': 'all',
'sort': 'desc',
'sort_type': 'created_utc',
'size': 100,
'after': int((datetime.now() - timedelta(days=30)).timestamp())
}
data = await client.make_request("submission", params)
if data and 'data' in data:
for post in data['data']:
sentiment = TextBlob(post.get('title', '') + ' ' + post.get('selftext', '')).sentiment.polarity
trends.append(TrendData(
keyword=keyword,
source='reddit',
volume=post.get('score', 0),
sentiment=sentiment,
timestamp=datetime.fromtimestamp(post.get('created_utc', 0)),
metadata={'subreddit': post.get('subreddit'), 'comments': post.get('num_comments', 0)}
))
return trends
async def scrape_twitter_trends(self, keywords: List[str]) -> List[TrendData]:
"""Simulate Twitter trend scraping (requires Twitter API credentials)"""
# This is a placeholder - you'll need to implement with actual Twitter API
trends = []
# Simulated data for demonstration
for keyword in keywords:
trends.append(TrendData(
keyword=keyword,
source='twitter',
volume=np.random.randint(100, 10000),
sentiment=np.random.uniform(-1, 1),
timestamp=datetime.now(),
metadata={'hashtags': f"#{keyword}", 'retweets': np.random.randint(10, 1000)}
))
return trends
async def scrape_google_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape Google Trends data (requires pytrends library)"""
trends = []
try:
from pytrends.request import TrendReq
pytrends = TrendReq(hl='en-US', tz=360)
for keyword in keywords:
pytrends.build_payload([keyword], timeframe='today 3-m')
interest_over_time = pytrends.interest_over_time()
if not interest_over_time.empty:
for date, row in interest_over_time.iterrows():
trends.append(TrendData(
keyword=keyword,
source='google_trends',
volume=int(row[keyword]),
sentiment=0.0, # Google Trends doesn't provide sentiment
timestamp=date,
metadata={'isPartial': row.get('isPartial', False)}
))
except ImportError:
logger.warning("pytrends not installed. Skipping Google Trends scraping.")
return trends
async def scrape_github_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape GitHub for repository trends"""
trends = []
async with APIClient("https://api.github.com", rate_limit=1.0) as client:
headers = {'Accept': 'application/vnd.github.v3+json'}
for keyword in keywords:
params = {
'q': keyword,
'sort': 'updated',
'order': 'desc',
'per_page': 100
}
data = await client.make_request("search/repositories", params, headers)
if data and 'items' in data:
for repo in data['items']:
trends.append(TrendData(
keyword=keyword,
source='github',
volume=repo.get('stargazers_count', 0),
sentiment=0.5, # Neutral sentiment for GitHub
timestamp=datetime.fromisoformat(repo.get('updated_at', '').replace('Z', '+00:00')),
metadata={
'language': repo.get('language'),
'forks': repo.get('forks_count', 0),
'issues': repo.get('open_issues_count', 0)
}
))
return trends
async def scrape_news_trends(self, keywords: List[str]) -> List[TrendData]:
"""Scrape news trends using NewsAPI (requires API key)"""
trends = []
# Placeholder for NewsAPI integration
# You'll need to register for a NewsAPI key at https://newsapi.org/
API_KEY = "YOUR_NEWSAPI_KEY" # Replace with actual key
if API_KEY != "YOUR_NEWSAPI_KEY":
async with APIClient("https://newsapi.org/v2", rate_limit=1.0) as client:
headers = {'X-API-Key': API_KEY}
for keyword in keywords:
params = {
'q': keyword,
'sortBy': 'popularity',
'pageSize': 100,
'from': (datetime.now() - timedelta(days=30)).isoformat()
}
data = await client.make_request("everything", params, headers)
if data and 'articles' in data:
for article in data['articles']:
sentiment = TextBlob(article.get('title', '') + ' ' + article.get('description', '')).sentiment.polarity
trends.append(TrendData(
keyword=keyword,
source='news',
volume=1, # News articles don't have volume metrics
sentiment=sentiment,
timestamp=datetime.fromisoformat(article.get('publishedAt', '').replace('Z', '+00:00')),
metadata={
'source': article.get('source', {}).get('name'),
'author': article.get('author')
}
))
return trends
class MarketGapAnalyzer:
"""Analyze trends to identify market gaps"""
def __init__(self, db_path: str):
self.db_path = db_path
def load_trend_data(self, keywords: List[str]) -> pd.DataFrame:
"""Load trend data from database"""
conn = sqlite3.connect(self.db_path)
keyword_filter = "'" + "','".join(keywords) + "'"
query = f"""
SELECT * FROM trends
WHERE keyword IN ({keyword_filter})
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['metadata'] = df['metadata'].apply(json.loads)
return df
def calculate_demand_indicators(self, df: pd.DataFrame) -> Dict[str, float]:
"""Calculate various demand indicators"""
indicators = {}
# Volume trend analysis
df_sorted = df.sort_values('timestamp')
if len(df_sorted) > 1:
recent_volume = df_sorted.tail(10)['volume'].mean()
older_volume = df_sorted.head(10)['volume'].mean()
indicators['volume_growth'] = (recent_volume - older_volume) / max(older_volume, 1)
# Sentiment analysis
indicators['avg_sentiment'] = df['sentiment'].mean()
indicators['sentiment_volatility'] = df['sentiment'].std()
# Source diversity
source_counts = df['source'].value_counts()
indicators['source_diversity'] = len(source_counts)
indicators['cross_platform_consistency'] = 1 - (source_counts.std() / source_counts.mean()) if source_counts.mean() > 0 else 0
# Temporal patterns
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
indicators['temporal_concentration'] = df.groupby('hour').size().std() / df.groupby('hour').size().mean()
return indicators
def identify_supply_gaps(self, keyword: str, df: pd.DataFrame) -> List[str]:
"""Identify potential supply gaps based on trend analysis"""
gaps = []
# Analyze sentiment patterns
if df['sentiment'].mean() < -0.2:
gaps.append(f"High negative sentiment around {keyword} suggests user dissatisfaction")
# Analyze GitHub data for tech gaps
github_data = df[df['source'] == 'github']
if not github_data.empty:
avg_stars = github_data['volume'].mean()
if avg_stars < 100:
gaps.append(f"Low GitHub engagement suggests limited open-source solutions for {keyword}")
# Analyze Reddit discussions for unmet needs
reddit_data = df[df['source'] == 'reddit']
if not reddit_data.empty:
high_comment_posts = reddit_data[
reddit_data['metadata'].apply(lambda x: x.get('comments', 0) > 50)
]
if len(high_comment_posts) > len(reddit_data) * 0.3:
gaps.append(f"High discussion volume on Reddit indicates active community seeking solutions for {keyword}")
return gaps
def calculate_market_gap_score(self, demand_indicators: Dict[str, float], supply_gaps: List[str]) -> float:
"""Calculate overall market gap score"""
score = 0.0
# Demand components
score += max(demand_indicators.get('volume_growth', 0) * 0.3, 0)
score += (demand_indicators.get('avg_sentiment', 0) + 1) * 0.1 # Normalize sentiment to 0-2 range
score += demand_indicators.get('source_diversity', 0) * 0.1
score += demand_indicators.get('cross_platform_consistency', 0) * 0.2
# Supply gap components
score += len(supply_gaps) * 0.1
# Temporal activity (higher concentration might indicate urgent need)
temporal_score = demand_indicators.get('temporal_concentration', 0)
if temporal_score > 1: # High concentration
score += 0.2
return min(score, 1.0) # Cap at 1.0
def analyze_market_gaps(self, keywords: List[str]) -> List[MarketGap]:
"""Main method to analyze market gaps for given keywords"""
df = self.load_trend_data(keywords)
gaps = []
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if keyword_data.empty:
continue
demand_indicators = self.calculate_demand_indicators(keyword_data)
supply_gaps = self.identify_supply_gaps(keyword, keyword_data)
# Generate gap description
gap_description = f"Market opportunity in {keyword} sector"
if demand_indicators.get('volume_growth', 0) > 0.5:
gap_description += " with high growth potential"
if demand_indicators.get('avg_sentiment', 0) < -0.2:
gap_description += " addressing user dissatisfaction"
gap_score = self.calculate_market_gap_score(demand_indicators, supply_gaps)
# Find related keywords (simple co-occurrence analysis)
related_keywords = self.find_related_keywords(keyword, df)
gap = MarketGap(
gap_description=gap_description,
evidence_score=gap_score,
demand_indicators=[f"{k}: {v:.3f}" for k, v in demand_indicators.items()],
supply_gaps=supply_gaps,
related_keywords=related_keywords,
confidence_level=min(len(keyword_data) / 100.0, 1.0) # Based on data volume
)
gaps.append(gap)
return sorted(gaps, key=lambda x: x.evidence_score, reverse=True)
def find_related_keywords(self, keyword: str, df: pd.DataFrame) -> List[str]:
"""Find keywords that often appear together with the target keyword"""
related = []
# Simple co-occurrence based on temporal proximity
keyword_data = df[df['keyword'] == keyword]
for _, row in keyword_data.iterrows():
time_window = timedelta(hours=24)
nearby_data = df[
(df['timestamp'] >= row['timestamp'] - time_window) &
(df['timestamp'] <= row['timestamp'] + time_window) &
(df['keyword'] != keyword)
]
related.extend(nearby_data['keyword'].unique())
# Count occurrences and return top related keywords
from collections import Counter
related_counts = Counter(related)
return [k for k, v in related_counts.most_common(5)]
def save_market_gaps(self, gaps: List[MarketGap]):
"""Save identified market gaps to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for gap in gaps:
cursor.execute('''
INSERT INTO market_gaps
(gap_description, evidence_score, demand_indicators, supply_gaps, related_keywords, confidence_level, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
gap.gap_description,
gap.evidence_score,
json.dumps(gap.demand_indicators),
json.dumps(gap.supply_gaps),
json.dumps(gap.related_keywords),
gap.confidence_level,
datetime.now().isoformat()
))
conn.commit()
conn.close()
class TrendCorrelationEngine:
"""Engine for correlating trends across different data sources"""
def __init__(self, db_path: str):
self.db_path = db_path
def calculate_cross_platform_correlation(self, keyword: str) -> Dict[str, float]:
"""Calculate correlation between different platforms for a keyword"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT source, DATE(timestamp) as date, AVG(volume) as avg_volume, AVG(sentiment) as avg_sentiment
FROM trends
WHERE keyword = ?
GROUP BY source, date
ORDER BY date
"""
df = pd.read_sql_query(query, conn, params=[keyword])
conn.close()
if df.empty:
return {}
# Pivot data for correlation analysis
volume_pivot = df.pivot(index='date', columns='source', values='avg_volume').fillna(0)
sentiment_pivot = df.pivot(index='date', columns='source', values='avg_sentiment').fillna(0)
correlations = {}
# Volume correlations
if len(volume_pivot.columns) > 1:
vol_corr = volume_pivot.corr()
correlations['volume_correlations'] = vol_corr.to_dict()
# Sentiment correlations
if len(sentiment_pivot.columns) > 1:
sent_corr = sentiment_pivot.corr()
correlations['sentiment_correlations'] = sent_corr.to_dict()
return correlations
async def main_analysis_pipeline(keywords: List[str]):
"""Main pipeline for trend scraping and market gap analysis"""
logger.info(f"Starting analysis for keywords: {keywords}")
# Initialize components
scraper = TrendScraper()
analyzer = MarketGapAnalyzer(scraper.db_path)
correlation_engine = TrendCorrelationEngine(scraper.db_path)
# Step 1: Scrape data from all sources
all_trends = []
logger.info("Scraping Reddit trends...")
reddit_trends = await scraper.scrape_reddit_trends(keywords)
all_trends.extend(reddit_trends)
logger.info("Scraping Twitter trends...")
twitter_trends = await scraper.scrape_twitter_trends(keywords)
all_trends.extend(twitter_trends)
logger.info("Scraping Google trends...")
google_trends = await scraper.scrape_google_trends(keywords)
all_trends.extend(google_trends)
logger.info("Scraping GitHub trends...")
github_trends = await scraper.scrape_github_trends(keywords)
all_trends.extend(github_trends)
logger.info("Scraping news trends...")
news_trends = await scraper.scrape_news_trends(keywords)
all_trends.extend(news_trends)
# Step 2: Store trends in database
conn = sqlite3.connect(scraper.db_path)
cursor = conn.cursor()
for trend in all_trends:
cursor.execute('''
INSERT INTO trends (keyword, source, volume, sentiment, timestamp, metadata)
VALUES (?, ?, ?, ?, ?, ?)
''', (
trend.keyword,
trend.source,
trend.volume,
trend.sentiment,
trend.timestamp.isoformat(),
json.dumps(trend.metadata)
))
conn.commit()
conn.close()
logger.info(f"Stored {len(all_trends)} trend data points")
# Step 3: Analyze market gaps
logger.info("Analyzing market gaps...")
market_gaps = analyzer.analyze_market_gaps(keywords)
# Step 4: Calculate correlations
logger.info("Calculating cross-platform correlations...")
correlations = {}
for keyword in keywords:
correlations[keyword] = correlation_engine.calculate_cross_platform_correlation(keyword)
# Step 5: Save results
analyzer.save_market_gaps(market_gaps)
# Step 6: Generate report
logger.info("Generating analysis report...")
report = {
'analysis_timestamp': datetime.now().isoformat(),
'keywords_analyzed': keywords,
'total_data_points': len(all_trends),
'market_gaps_identified': len(market_gaps),
'top_opportunities': [
{
'description': gap.gap_description,
'evidence_score': gap.evidence_score,
'confidence': gap.confidence_level,
'key_indicators': gap.demand_indicators[:3], # Top 3 indicators
'supply_gaps': gap.supply_gaps
}
for gap in market_gaps[:5] # Top 5 opportunities
],
'cross_platform_correlations': correlations
}
# Save report to file
with open(f"market_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w') as f:
json.dump(report, f, indent=2, default=str)
logger.info("Analysis complete. Report saved.")
return report
# Example usage
if __name__ == "__main__":
# Example keywords to analyze
test_keywords = [
"sustainable packaging",
"remote work tools",
"mental health apps",
"AI productivity",
"electric vehicle charging"
]
# Run the analysis
asyncio.run(main_analysis_pipeline(test_keywords))
import asyncio
import aiohttp
import json
import re
from typing import List, Dict, Set
import nltk
from textblob import TextBlob
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import numpy as np
from collections import defaultdict, Counter
import sqlite3
class KeywordExpansionEngine:
"""Engine for expanding keywords and identifying market segments"""
def __init__(self, db_path: str):
self.db_path = db_path
self.stop_words = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'])
# Download required NLTK data
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
def extract_keywords_from_text(self, text: str) -> List[str]:
"""Extract relevant keywords from text using NLP techniques"""
if not text:
return []
# Clean and tokenize text
text = re.sub(r'[^\w\s]', '', text.lower())
blob = TextBlob(text)
# Extract noun phrases and significant words
keywords = []
# Get noun phrases
for phrase in blob.noun_phrases:
if len(phrase.split()) <= 3 and phrase not in self.stop_words:
keywords.append(phrase)
# Get significant single words (nouns, adjectives)
words = blob.words
for word, tag in blob.tags:
if tag.startswith(('NN', 'JJ')) and len(word) > 3 and word not in self.stop_words:
keywords.append(word)
return list(set(keywords))
def expand_keywords_from_trends(self, base_keyword: str, limit: int = 20) -> List[str]:
"""Expand keywords based on existing trend data"""
conn = sqlite3.connect(self.db_path)
# Get related content from database
query = """
SELECT metadata FROM trends
WHERE keyword = ? OR keyword LIKE ?
"""
cursor = conn.execute(query, [base_keyword, f'%{base_keyword}%'])
results = cursor.fetchall()
conn.close()
expanded_keywords = set()
for row in results:
try:
metadata = json.loads(row[0])
# Extract keywords from different metadata fields
text_fields = []
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, str) and key in ['title', 'description', 'text', 'content']:
text_fields.append(value)
# Process text fields
for text in text_fields:
keywords = self.extract_keywords_from_text(text)
expanded_keywords.update(keywords[:5]) # Limit per text
except json.JSONDecodeError:
continue
# Filter and rank keywords
relevant_keywords = []
for keyword in expanded_keywords:
if (len(keyword.split()) <= 3 and
len(keyword) > 2 and
keyword != base_keyword and
base_keyword.split()[0] in keyword or any(word in keyword for word in base_keyword.split())):
relevant_keywords.append(keyword)
return relevant_keywords[:limit]
async def expand_keywords_with_search_suggestions(self, base_keyword: str) -> List[str]:
"""Expand keywords using search suggestion APIs"""
suggestions = []
# Google Suggest API (no API key required)
async with aiohttp.ClientSession() as session:
try:
url = "https://suggestqueries.google.com/complete/search"
params = {
'client': 'firefox',
'q': base_keyword
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.text()
# Parse the JSONP response
if data.startswith('window.google.ac.h('):
json_str = data[19:-1] # Remove JSONP wrapper
suggestions_data = json.loads(json_str)
if len(suggestions_data) > 1:
suggestions.extend(suggestions_data[1][:10]) # Get top 10
except Exception as e:
print(f"Error fetching Google suggestions: {e}")
return suggestions
def identify_market_segments(self, keywords: List[str]) -> Dict[str, List[str]]:
"""Identify market segments by clustering related keywords"""
if len(keywords) < 5:
return {"main_segment": keywords}
# Get trend data for all keywords
conn = sqlite3.connect(self.db_path)
keyword_texts = {}
for keyword in keywords:
query = """
SELECT metadata FROM trends WHERE keyword = ?
LIMIT 50
"""
cursor = conn.execute(query, [keyword])
results = cursor.fetchall()
texts = []
for row in results:
try:
metadata = json.loads(row[0])
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, str) and len(value) > 10:
texts.append(value)
except:
continue
keyword_texts[keyword] = ' '.join(texts)
conn.close()
# Create text corpus for clustering
corpus = [keyword_texts.get(keyword, keyword) for keyword in keywords]
if len(corpus) < 3:
return {"main_segment": keywords}
try:
# TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
tfidf_matrix = vectorizer.fit_transform(corpus)
# K-means clustering
n_clusters = min(max(2, len(keywords) // 3), 5) # 2-5 clusters
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
clusters = kmeans.fit_predict(tfidf_matrix)
# Group keywords by cluster
segments = defaultdict(list)
for i, keyword in enumerate(keywords):
cluster_id = clusters[i]
segments[f"segment_{cluster_id}"].append(keyword)
# Name segments based on common terms
named_segments = {}
feature_names = vectorizer.get_feature_names_out()
for cluster_id in range(n_clusters):
cluster_center = kmeans.cluster_centers_[cluster_id]
top_indices = cluster_center.argsort()[-3:][::-1] # Top 3 features
top_terms = [feature_names[i] for i in top_indices]
segment_name = f"{top_terms[0]}_segment"
segment_keywords = segments[f"segment_{cluster_id}"]
named_segments[segment_name] = segment_keywords
return named_segments
except Exception as e:
print(f"Clustering error: {e}")
return {"main_segment": keywords}
def analyze_keyword_difficulty(self, keyword: str) -> Dict[str, float]:
"""Analyze keyword difficulty and opportunity scores"""
conn = sqlite3.connect(self.db_path)
# Get trend data for keyword
query = """
SELECT source, volume, sentiment, COUNT(*) as frequency
FROM trends
WHERE keyword = ? OR keyword LIKE ?
GROUP BY source
"""
cursor = conn.execute(query, [keyword, f'%{keyword}%'])
results = cursor.fetchall()
conn.close()
if not results:
return {
'competition_score': 0.1, # Low competition if no data
'opportunity_score': 0.8, # High opportunity
'data_availability': 0.0
}
# Calculate metrics
total_volume = sum(row[1] for row in results)
avg_sentiment = sum(row[2] for row in results) / len(results)
source_diversity = len(results)
total_frequency = sum(row[3] for row in results)
# Competition score (higher volume = more competition)
competition_score = min(total_volume / 10000, 1.0) # Normalize to 0-1
# Opportunity score (based on sentiment and engagement)
opportunity_score = 0.5
if avg_sentiment < -0.2: # Negative sentiment indicates problems to solve
opportunity_score += 0.3
if source_diversity >= 3: # Cross-platform interest
opportunity_score += 0.2
opportunity_score = min(opportunity_score, 1.0)
# Data availability score
data_availability = min(total_frequency / 100, 1.0)
return {
'competition_score': competition_score,
'opportunity_score': opportunity_score,
'data_availability': data_availability,
'total_mentions': total_frequency,
'source_diversity': source_diversity,
'avg_sentiment': avg_sentiment
}
def generate_keyword_variations(self, base_keyword: str) -> List[str]:
"""Generate variations of a base keyword"""
variations = []
words = base_keyword.split()
if len(words) == 1:
# Single word variations
base = words[0]
variations.extend([
f"{base} app",
f"{base} software",
f"{base} tool",
f"{base} platform",
f"{base} service",
f"{base} solution",
f"best {base}",
f"{base} alternative",
f"cheap {base}",
f"free {base}"
])
else:
# Multi-word variations
variations.extend([
f"{base_keyword} app",
f"{base_keyword} software",
f"{base_keyword} platform",
f"best {base_keyword}",
f"{base_keyword} solution",
f"{base_keyword} alternative"
])
# Word order variations
if len(words) == 2:
variations.append(f"{words[1]} {words[0]}")
return variations
class MarketSegmentAnalyzer:
"""Analyze market segments and identify gaps within segments"""
def __init__(self, db_path: str):
self.db_path = db_path
def analyze_segment_trends(self, segment_keywords: List[str]) -> Dict[str, any]:
"""Analyze trends within a market segment"""
conn = sqlite3.connect(self.db_path)
# Get trend data for all keywords in segment
placeholders = ','.join(['?' for _ in segment_keywords])
query = f"""
SELECT keyword, source, AVG(volume) as avg_volume,
AVG(sentiment) as avg_sentiment, COUNT(*) as mentions,
MIN(timestamp) as first_seen, MAX(timestamp) as last_seen
FROM trends
WHERE keyword IN ({placeholders})
GROUP BY keyword, source
"""
cursor = conn.execute(query, segment_keywords)
results = cursor.fetchall()
conn.close()
if not results:
return {}
# Analyze segment characteristics
segment_data = defaultdict(lambda: defaultdict(list))
for row in results:
keyword, source, avg_vol, avg_sent, mentions, first, last = row
segment_data[keyword]['sources'].append(source)
segment_data[keyword]['avg_volume'] = segment_data[keyword].get('avg_volume', 0) + avg_vol
segment_data[keyword]['avg_sentiment'] = segment_data[keyword].get('avg_sentiment', 0) + avg_sent
segment_data[keyword]['total_mentions'] = segment_data[keyword].get('total_mentions', 0) + mentions
segment_data[keyword]['date_range'] = (first, last)
# Calculate segment-wide metrics
total_volume = sum(data['avg_volume'] for data in segment_data.values())
total_mentions = sum(data['total_mentions'] for data in segment_data.values())
avg_segment_sentiment = sum(data['avg_sentiment'] for data in segment_data.values()) / len(segment_data)
# Identify leaders and gaps in segment
sorted_by_volume = sorted(segment_data.items(), key=lambda x: x[1]['avg_volume'], reverse=True)
market_leaders = sorted_by_volume[:3] # Top 3 by volume
underserved_areas = sorted_by_volume[-3:] # Bottom 3 by volume
return {
'segment_size': len(segment_keywords),
'total_market_volume': total_volume,
'total_mentions': total_mentions,
'avg_sentiment': avg_segment_sentiment,
'market_leaders': [(kw, data['avg_volume']) for kw, data in market_leaders],
'underserved_areas': [(kw, data['avg_volume']) for kw, data in underserved_areas],
'keyword_details': dict(segment_data)
}
def identify_segment_gaps(self, segment_analysis: Dict[str, any]) -> List[str]:
"""Identify specific gaps within a market segment"""
gaps = []
# Low volume but high sentiment areas (unmet demand)
if segment_analysis:
keyword_details = segment_analysis.get('keyword_details', {})
for keyword, data in keyword_details.items():
volume = data.get('avg_volume', 0)
sentiment = data.get('avg_sentiment', 0)
# High demand, low supply indicators
if volume < segment_analysis.get('total_market_volume', 0) / len(keyword_details) * 0.5:
if sentiment > 0.2: # Positive sentiment
gaps.append(f"Underserved positive demand for {keyword}")
elif sentiment < -0.2: # Negative sentiment
gaps.append(f"User dissatisfaction with current {keyword} solutions")
return gaps
async def comprehensive_keyword_analysis(base_keywords: List[str], db_path: str):
"""Comprehensive analysis pipeline for keyword expansion and market segmentation"""
print(f"Starting comprehensive analysis for: {base_keywords}")
# Initialize engines
keyword_engine = KeywordExpansionEngine(db_path)
segment_analyzer = MarketSegmentAnalyzer(db_path)
results = {
'base_keywords': base_keywords,
'expanded_keywords': {},
'market_segments': {},
'keyword_difficulty': {},
'segment_analysis': {},
'identified_gaps': {}
}
all_keywords = set(base_keywords)
# Step 1: Expand keywords
print("Expanding keywords...")
for keyword in base_keywords:
# Get variations
variations = keyword_engine.generate_keyword_variations(keyword)
# Get trend-based expansions
trend_expansions = keyword_engine.expand_keywords_from_trends(keyword)
# Get search suggestions
search_suggestions = await keyword_engine.expand_keywords_with_search_suggestions(keyword)
# Combine all expansions
expanded = list(set(variations + trend_expansions + search_suggestions))
results['expanded_keywords'][keyword] = expanded
all_keywords.update(expanded)
# Step 2: Analyze keyword difficulty
print("Analyzing keyword difficulty...")
for keyword in all_keywords:
difficulty = keyword_engine.analyze_keyword_difficulty(keyword)
results['keyword_difficulty'][keyword] = difficulty
# Step 3: Identify market segments
print("Identifying market segments...")
all_keywords_list = list(all_keywords)
segments = keyword_engine.identify_market_segments(all_keywords_list)
results['market_segments'] = segments
# Step 4: Analyze each segment
print("Analyzing market segments...")
for segment_name, segment_keywords in segments.items():
segment_analysis = segment_analyzer.analyze_segment_trends(segment_keywords)
results['segment_analysis'][segment_name] = segment_analysis
# Identify gaps in segment
segment_gaps = segment_analyzer.identify_segment_gaps(segment_analysis)
results['identified_gaps'][segment_name] = segment_gaps
# Step 5: Generate prioritized opportunities
print("Generating opportunity priorities...")
opportunities = []
for keyword, difficulty in results['keyword_difficulty'].items():
opportunity_score = difficulty.get('opportunity_score', 0)
competition_score = difficulty.get('competition_score', 0)
data_availability = difficulty.get('data_availability', 0)
# Calculate combined opportunity score
combined_score = (opportunity_score * 0.4 +
(1 - competition_score) * 0.4 +
data_availability * 0.2)
opportunities.append({
'keyword': keyword,
'opportunity_score': opportunity_score,
'competition_score': competition_score,
'combined_score': combined_score,
'data_points': difficulty.get('total_mentions', 0),
'sentiment': difficulty.get('avg_sentiment', 0)
})
# Sort by combined score
opportunities.sort(key=lambda x: x['combined_score'], reverse=True)
results['prioritized_opportunities'] = opportunities[:20] # Top 20
# Step 6: Save results
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"keyword_analysis_report_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Analysis complete. Results saved to {filename}")
return results
# Example usage and testing
if __name__ == "__main__":
import sys
import os
from datetime import datetime
# Add the main script directory to path to import the main classes
sys.path.append(os.path.dirname(__file__))
async def test_keyword_expansion():
# Test with sample keywords
test_keywords = [
"productivity app",
"sustainable fashion",
"remote collaboration"
]
# Use the database from the main trend scraper
db_path = "market_trends.db"
# Run comprehensive analysis
results = await comprehensive_keyword_analysis(test_keywords, db_path)
# Print summary
print("\n=== KEYWORD EXPANSION RESULTS ===")
print(f"Base keywords: {len(test_keywords)}")
print(f"Total expanded keywords: {sum(len(v) for v in results['expanded_keywords'].values())}")
print(f"Market segments identified: {len(results['market_segments'])}")
print(f"Top opportunities: {len(results['prioritized_opportunities'])}")
print("\n=== TOP 5 OPPORTUNITIES ===")
for i, opp in enumerate(results['prioritized_opportunities'][:5], 1):
print(f"{i}. {opp['keyword']}")
print(f" Combined Score: {opp['combined_score']:.3f}")
print(f" Opportunity: {opp['opportunity_score']:.3f}, Competition: {opp['competition_score']:.3f}")
print()
# Run the test
asyncio.run(test_keyword_expansion())
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import sqlite3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import networkx as nx
from wordcloud import WordCloud
import warnings
warnings.filterwarnings('ignore')
class TrendVisualizationDashboard:
"""Create visualizations and reports for trend analysis results"""
def __init__(self, db_path: str):
self.db_path = db_path
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def load_trend_data(self, keywords: Optional[List[str]] = None, days: int = 30) -> pd.DataFrame:
"""Load trend data from database"""
conn = sqlite3.connect(self.db_path)
base_query = """
SELECT keyword, source, volume, sentiment, timestamp, metadata
FROM trends
WHERE timestamp >= ?
"""
params = [(datetime.now() - timedelta(days=days)).isoformat()]
if keywords:
keyword_filter = "'" + "','".join(keywords) + "'"
base_query += f" AND keyword IN ({keyword_filter})"
base_query += " ORDER BY timestamp DESC"
df = pd.read_sql_query(base_query, conn, params=params)
conn.close()
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
df['hour'] = df['timestamp'].dt.hour
try:
df['metadata'] = df['metadata'].apply(json.loads)
except:
df['metadata'] = df['metadata'].apply(lambda x: {})
return df
def load_market_gaps(self) -> pd.DataFrame:
"""Load market gap analysis results"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT gap_description, evidence_score, demand_indicators,
supply_gaps, related_keywords, confidence_level, created_at
FROM market_gaps
ORDER BY evidence_score DESC
"""
df = pd.read_sql_query(query, conn)
conn.close()
if not df.empty:
df['created_at'] = pd.to_datetime(df['created_at'])
df['demand_indicators'] = df['demand_indicators'].apply(json.loads)
df['supply_gaps'] = df['supply_gaps'].apply(json.loads)
df['related_keywords'] = df['related_keywords'].apply(json.loads)
return df
def create_trend_timeline(self, keywords: List[str], save_path: str = None) -> go.Figure:
"""Create interactive timeline of trend data"""
df = self.load_trend_data(keywords)
if df.empty:
return go.Figure().add_annotation(text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False)
fig = make_subplots(
rows=2, cols=1,
subplot_titles=('Volume Trends', 'Sentiment Trends'),
vertical_spacing=0.1
)
# Volume trends by keyword
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if not keyword_data.empty:
daily_volume = keyword_data.groupby('date')['volume'].sum().reset_index()
fig.add_trace(
go.Scatter(
x=daily_volume['date'],
y=daily_volume['volume'],
mode='lines+markers',
name=f'{keyword} Volume',
line=dict(width=3)
),
row=1, col=1
)
# Sentiment trends by keyword
for keyword in keywords:
keyword_data = df[df['keyword'] == keyword]
if not keyword_data.empty:
daily_sentiment = keyword_data.groupby('date')['sentiment'].mean().reset_index()
fig.add_trace(
go.Scatter(
x=daily_sentiment['date'],
y=daily_sentiment['sentiment'],
mode='lines+markers',
name=f'{keyword} Sentiment',
line=dict(width=3, dash='dash')
),
row=2, col=1
)
fig.update_layout(
title='Trend Analysis Timeline',
height=600,
showlegend=True,
template='plotly_white'
)
fig.update_xaxes(title_text="Date", row=2, col=1)
fig.update_yaxes(title_text="Volume", row=1, col=1)
fig.update_yaxes(title_text="Sentiment", row=2, col=1)
if save_path:
fig.write_html(save_path)
return fig
def create_source_distribution(self, keywords: List[str], save_path: str = None) -> go.Figure:
"""Create pie chart of data sources distribution"""
df = self.load_trend_data(keywords)
if df.empty:
return go.Figure().add_annotation(text="No data available")
source_counts = df['source'].value_counts()
fig = go.Figure(data=[
go.Pie(
labels=source_counts.index,
values=source_counts.values,
hole=.3,
textinfo='label+percent',
textposition='outside'
)
])
fig.update_layout(
title='Data Sources Distribution',
template='plotly_white',
annotations=[dict(text='Sources', x=0.5, y=0.5, font_size=20, showarrow=False)]
)
if save_path:
fig.write_html(save_path)
return fig
def create_sentiment_heatmap(self, keywords: List[str], save_path: str = None):
"""Create sentiment heatmap across sources and keywords"""
df = self.load_trend_data(keywords)
if df.empty:
print("No data available for heatmap")
return
# Create pivot table for heatmap
sentiment_pivot = df.groupby(['keyword', 'source'])['sentiment'].mean().unstack(fill_value=0)
plt.figure(figsize=(12, 8))
sns.heatmap(sentiment_pivot, annot=True, cmap='RdYlGn', center=0,
fmt='.2f', cbar_kws={'label': 'Average Sentiment'})
plt.title('Sentiment Heatmap: Keywords vs Sources')
plt.xlabel('Data Source')
plt.ylabel('Keyword')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def create_market_gap_visualization(self, save_path: str = None) -> go.Figure:
"""Visualize market gaps and opportunities"""
df = self.load_market_gaps()
if df.empty:
return go.Figure().add_annotation(text="No market gaps data available")
# Create bubble chart
fig = go.Figure()
# Extract confidence levels and evidence scores
x_vals = df['confidence_level']
y_vals = df['evidence_score']
sizes = [len(gaps) * 10 + 20 for gaps in df['supply_gaps']] # Size based on number of supply gaps
texts = [desc[:50] + "..." if len(desc) > 50 else desc for desc in df['gap_description']]
fig.add_trace(go.Scatter(
x=x_vals,
y=y_vals,
mode='markers',
marker=dict(
size=sizes,
color=y_vals,
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Evidence Score"),
line=dict(width=2, color='white')
),
text=texts,
textposition="middle center",
hovertemplate='%{text}
' +
'Confidence: %{x:.2f}
' +
'Evidence Score: %{y:.2f}
' +
'
Generated on {timestamp}
This report analyzes market trends and identifies opportunities for the following keywords: {', '.join(keywords)}. The analysis covers data from multiple sources including social media, news, GitHub repositories, and search trends to provide comprehensive market insights.
| Metric | Value |
|---|---|
| Analysis Period | {stats.get('date_range', 'N/A')} |
| Average Sentiment | {stats.get('avg_sentiment', 0):.3f} |
| Sentiment Volatility | {stats.get('sentiment_std', 0):.3f} |
| Total Volume | {stats.get('total_volume', 0):,} |
| Average Volume | {stats.get('avg_volume', 0):,.1f} |
| Top Volume Keyword | {stats.get('top_volume_keyword', 'N/A')} |
| Most Positive Keyword | {stats.get('most_positive_keyword', 'N/A')} |
| Source | Data Points | Percentage |
|---|---|---|
| {source} | {count:,} | {count/stats.get('total_data_points', 1)*100:.1f}% |
Click the links below to view interactive charts and detailed visualizations:
📈 Trend Timeline 📊 Source Distribution 🎯 Market Opportunities
Enter keywords to analyze market trends and identify opportunities