| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline |
| import torch |
| from sklearn.metrics import classification_report, confusion_matrix, accuracy_score |
| import io |
| import base64 |
| from textblob import TextBlob |
| from collections import defaultdict |
| from tabulate import tabulate |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.cluster import KMeans |
| from sentence_transformers import SentenceTransformer |
| from sklearn.decomposition import PCA |
| from collections import Counter |
|
|
| |
| model_path = "./final_model" |
| tokenizer = AutoTokenizer.from_pretrained(model_path) |
| model = AutoModelForSequenceClassification.from_pretrained(model_path) |
|
|
| |
| try: |
| summarizer = pipeline( |
| "summarization", |
| model="sshleifer/distilbart-cnn-6-6", |
| device=-1 |
| ) |
| except Exception as e: |
| print(f"Error loading summarizer: {str(e)}") |
| summarizer = None |
|
|
| |
| def load_dataset(): |
| try: |
| df = pd.read_csv("dataset.csv") |
| |
| required_columns = ['reviews.text', 'reviews.rating', 'name', 'categories'] |
| if not all(col in df.columns for col in required_columns): |
| raise ValueError("Missing required columns in dataset.csv") |
| return df |
| except Exception as e: |
| print(f"Error loading dataset: {str(e)}") |
| return None |
|
|
| |
| def get_initial_summary(): |
| df = load_dataset() |
| if df is None: |
| return "Error: Could not load dataset.csv" |
| |
| try: |
| |
| if 'cluster_name' not in df.columns: |
| df = create_clusters(df) |
| |
| |
| summaries = generate_category_summaries(df) |
| |
| |
| html_output = [] |
| |
| |
| unique_count = df['name'].nunique() |
| total_count = len(df) |
| avg_rating = df['reviews.rating'].mean() |
| |
| html_output.append(f""" |
| <h2>Dataset Statistics</h2> |
| <ul> |
| <li>Total Reviews: {total_count}</li> |
| <li>Unique Products: {unique_count}</li> |
| <li>Average Rating: {avg_rating:.2f}⭐</li> |
| </ul> |
| """) |
| |
| |
| for category, tables in summaries.items(): |
| html_output.append(f"<h2>CATEGORY: {category}</h2>") |
| |
| for table in tables: |
| html_output.append(f"<h3>{table['section']}</h3>") |
| |
| table_html = tabulate( |
| table['data'], |
| headers=table['headers'], |
| tablefmt="html", |
| stralign="left", |
| numalign="center" |
| ) |
| |
| styled_table = f""" |
| <style> |
| table {{ |
| border-collapse: collapse; |
| margin: 15px 0; |
| width: 100%; |
| box-shadow: 0 1px 3px rgba(0,0,0,0.2); |
| }} |
| th, td {{ |
| padding: 12px; |
| border: 1px solid #ddd; |
| text-align: left; |
| }} |
| th {{ |
| background-color: #f5f5f5; |
| font-weight: bold; |
| }} |
| tr:nth-child(even) {{ |
| background-color: #f9f9f9; |
| }} |
| tr:hover {{ |
| background-color: #f5f5f5; |
| }} |
| </style> |
| {table_html} |
| """ |
| html_output.append(styled_table) |
| |
| html_output.append("<hr>") |
| |
| return "\n".join(html_output) |
| except Exception as e: |
| import traceback |
| print(traceback.format_exc()) |
| return f"Error generating initial summary: {str(e)}" |
|
|
| def predict_sentiment(text): |
| |
| text = text.lower() |
| |
| |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) |
| |
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits = outputs.logits |
| probabilities = torch.nn.functional.softmax(logits, dim=-1) |
| predicted_class = torch.argmax(probabilities, dim=-1).item() |
| |
| |
| sentiment_map = {0: "Negative", 1: "Neutral", 2: "Positive"} |
| sentiment = sentiment_map[predicted_class] |
| |
| |
| probs = probabilities[0].tolist() |
| prob_dict = {sentiment_map[i]: f"{prob*100:.2f}%" for i, prob in enumerate(probs)} |
| |
| return sentiment, prob_dict |
|
|
| def analyze_sentiment(reviews): |
| """Perform sentiment analysis on reviews""" |
| pros = defaultdict(int) |
| cons = defaultdict(int) |
|
|
| for review in reviews: |
| blob = TextBlob(str(review)) |
| for sentence in blob.sentences: |
| polarity = sentence.sentiment.polarity |
| words = [word for word, tag in blob.tags |
| if tag in ('NN', 'NNS', 'JJ', 'JJR', 'JJS')] |
|
|
| if polarity > 0.3: |
| for word in words: |
| pros[word] += 1 |
| elif polarity < -0.3: |
| for word in words: |
| cons[word] += 1 |
|
|
| pros_sorted = [k for k, _ in sorted(pros.items(), key=lambda x: -x[1])] if pros else [] |
| cons_sorted = [k for k, _ in sorted(cons.items(), key=lambda x: -x[1])] if cons else [] |
|
|
| return pros_sorted, cons_sorted |
|
|
| def generate_category_summary(reviews_text): |
| """Generate summary for a set of reviews""" |
| reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] |
| |
| if not reviews: |
| return "Please enter at least one review." |
| |
| |
| pros, cons = analyze_sentiment(reviews) |
| |
| |
| summary_text = f""" |
| Review Analysis Summary: |
| |
| PROS: |
| {', '.join(pros[:5]) if pros else 'No significant positive feedback'} |
| |
| CONS: |
| {', '.join(cons[:5]) if cons else 'No major complaints'} |
| |
| Based on {len(reviews)} reviews analyzed. |
| """ |
| |
| |
| if summarizer and len(summary_text) > 100: |
| try: |
| generated_summary = summarizer( |
| summary_text, |
| max_length=150, |
| min_length=50, |
| do_sample=False, |
| truncation=True |
| )[0]['summary_text'] |
| except Exception as e: |
| generated_summary = f"Error generating summary: {str(e)}" |
| else: |
| generated_summary = summary_text |
| |
| return generated_summary |
|
|
| def analyze_reviews(reviews_text): |
| |
| df, plot_html = analyze_reviews_sentiment(reviews_text) |
| |
| |
| temp_df = pd.DataFrame({ |
| 'text': reviews_text.split('\n'), |
| 'rating': [3] * len(reviews_text.split('\n')), |
| 'name': ['New Review'] * len(reviews_text.split('\n')), |
| 'cluster_name': ['New Reviews'] * len(reviews_text.split('\n')) |
| }) |
| |
| |
| summaries = generate_category_summaries(temp_df) |
| |
| |
| html_output = [] |
| for category, tables in summaries.items(): |
| for table in tables: |
| html_output.append(f"<h3>{table['section']}</h3>") |
| table_html = tabulate( |
| table['data'], |
| headers=table['headers'], |
| tablefmt="html", |
| stralign="left", |
| numalign="center" |
| ) |
| html_output.append(table_html) |
| |
| summary_html = "\n".join(html_output) |
| |
| return df, plot_html, summary_html |
|
|
| def analyze_reviews_sentiment(reviews_text): |
| reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] |
| |
| if not reviews: |
| return "Please enter at least one review.", None |
| |
| results = [] |
| for review in reviews: |
| sentiment, probs = predict_sentiment(review) |
| results.append({ |
| 'Review': review, |
| 'Sentiment': sentiment, |
| 'Confidence': probs |
| }) |
| |
| df = pd.DataFrame(results) |
| |
| plt.figure(figsize=(10, 6)) |
| sentiment_counts = df['Sentiment'].value_counts() |
| plt.bar(sentiment_counts.index, sentiment_counts.values) |
| plt.title('Sentiment Distribution') |
| plt.xlabel('Sentiment') |
| plt.ylabel('Count') |
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png') |
| buf.seek(0) |
| plot_base64 = base64.b64encode(buf.read()).decode('utf-8') |
| plt.close() |
| |
| return df, f'<img src="data:image/png;base64,{plot_base64}" style="max-width:100%;">' |
|
|
| def create_interface(): |
| |
| initial_summary = get_initial_summary() |
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Review Analysis System") |
| |
| with gr.Tab("Review Analysis"): |
| |
| gr.Markdown("## Dataset Overview") |
| gr.HTML(initial_summary) |
| |
| gr.Markdown("## Analyze New Reviews") |
| reviews_input = gr.Textbox( |
| label="Enter reviews (one per line)", |
| placeholder="Enter product reviews here...", |
| lines=5 |
| ) |
| analyze_button = gr.Button("Analyze Reviews") |
| |
| with gr.Row(): |
| with gr.Column(): |
| sentiment_output = gr.Dataframe( |
| label="Sentiment Analysis Results" |
| ) |
| plot_output = gr.HTML(label="Sentiment Distribution") |
| |
| with gr.Column(): |
| summary_output = gr.HTML( |
| label="Review Summary" |
| ) |
| |
| analyze_button.click( |
| analyze_reviews, |
| inputs=[reviews_input], |
| outputs=[sentiment_output, plot_output, summary_output] |
| ) |
| |
| return demo |
|
|
| def add_clusters_to_df(df): |
| """Add cluster names to the DataFrame if they don't exist""" |
| |
| vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') |
| text_features = vectorizer.fit_transform(df['text']) |
| |
| |
| n_clusters = 4 |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
| df['cluster_name'] = kmeans.fit_predict(text_features) |
| |
| |
| cluster_names = { |
| 0: "Electronics", |
| 1: "Home & Kitchen", |
| 2: "Books & Media", |
| 3: "Other Products" |
| } |
| df['cluster_name'] = df['cluster_name'].map(cluster_names) |
| |
| return df |
|
|
| def generate_category_summaries(df): |
| """Generate product summaries in table format""" |
| summaries = {} |
| |
| for category in df['cluster_name'].unique(): |
| category_df = df[df['cluster_name'] == category] |
| |
| if len(category_df) < 10: |
| continue |
| |
| |
| product_stats = category_df.groupby('name').agg({ |
| 'reviews.rating': ['mean', 'count'], |
| 'reviews.text': list |
| }).reset_index() |
| |
| product_stats.columns = ['name', 'avg_rating', 'review_count', 'reviews'] |
| product_stats = product_stats[product_stats['review_count'] >= 5] |
| |
| if len(product_stats) < 3: |
| continue |
| |
| |
| top_3 = product_stats.nlargest(3, 'avg_rating') |
| worst_product = product_stats.nsmallest(1, 'avg_rating') |
| |
| |
| product_details = [] |
| for _, product in top_3.iterrows(): |
| pros, cons = analyze_sentiment(product['reviews']) |
| product_details.append({ |
| 'name': product['name'], |
| 'rating': product['avg_rating'], |
| 'review_count': product['review_count'], |
| 'pros': pros[:3] or ["No significant positive feedback"], |
| 'cons': cons[:3] or ["No major complaints"] |
| }) |
| |
| |
| tables = [] |
| |
| |
| top_table = [] |
| for product in product_details: |
| top_table.append([ |
| product['name'], |
| f"★{product['rating']:.1f}", |
| product['review_count'], |
| "\n".join(product['pros']), |
| "\n".join(product['cons']) |
| ]) |
| |
| tables.append({ |
| 'section': f"TOP PRODUCTS IN {category.upper()}", |
| 'headers': ["Product", "Rating", "Reviews", "Pros", "Cons"], |
| 'data': top_table |
| }) |
| |
| |
| if not worst_product.empty: |
| worst = worst_product.iloc[0] |
| pros, cons = analyze_sentiment(worst['reviews']) |
| tables.append({ |
| 'section': "PRODUCT TO AVOID", |
| 'headers': ["Product", "Rating", "Reasons to Avoid"], |
| 'data': [[ |
| worst['name'], |
| f"★{worst['avg_rating']:.1f}", |
| ", ".join(cons[:3]) if cons else "Consistently poor ratings" |
| ]] |
| }) |
| |
| summaries[category] = tables |
| |
| return summaries |
|
|
| def create_clusters(df): |
| """Create clusters from product data""" |
| |
| products = df[['name', 'categories']].drop_duplicates() |
| product_texts = (products['name'] + " " + products['categories']).tolist() |
| |
| |
| model = SentenceTransformer('all-MiniLM-L6-v2') |
| embeddings = model.encode(product_texts, show_progress_bar=True) |
| |
| |
| num_clusters = 4 |
| kmeans = KMeans(n_clusters=num_clusters, random_state=42) |
| clusters = kmeans.fit_predict(embeddings) |
| products['cluster'] = clusters |
| |
| |
| cluster_names = {} |
| for cluster_num in range(num_clusters): |
| cluster_df = products[products['cluster'] == cluster_num] |
| |
| |
| words = [] |
| for name in cluster_df['name']: |
| words += name.lower().split() |
| |
| |
| top_words = [word for word, count in Counter(words).most_common(10) |
| if len(word) > 3][:3] |
| label = ' '.join(top_words) |
| cluster_names[cluster_num] = label |
| |
| |
| product_to_cluster = dict(zip(products['name'], products['cluster'])) |
| df['cluster'] = df['name'].map(product_to_cluster) |
| df['cluster_name'] = df['cluster'].map(cluster_names) |
| |
| return df |
|
|
| |
| if __name__ == "__main__": |
| demo = create_interface() |
| demo.launch() |