| |
| import os |
| import re |
| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import shap |
| import lime.lime_tabular |
| import optuna |
| import wandb |
| import json |
| import time |
| import psutil |
| import shutil |
| import ast |
| from smolagents import HfApiModel, CodeAgent |
| from huggingface_hub import login |
| from sklearn.model_selection import train_test_split, cross_val_score |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score |
| from sklearn.metrics import ConfusionMatrixDisplay |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.preprocessing import LabelEncoder |
| from datetime import datetime |
| from PIL import Image |
|
|
|
|
| |
| hf_token = os.getenv("HF_TOKEN") |
| login(token=hf_token) |
|
|
|
|
| |
| model = HfApiModel("mistralai/Mixtral-8x7B-Instruct-v0.1", token=hf_token) |
|
|
|
|
| |
| df_global = None |
| target_column_global = None |
|
|
|
|
| |
| def upload_file(file): |
| global df_global, data_summary_global |
| if file is None: |
| return pd.DataFrame({"Error": ["No file uploaded."]}), gr.update(choices=[]) |
| |
| ext = os.path.splitext(file.name)[-1] |
| df = pd.read_csv(file.name) if ext == ".csv" else pd.read_excel(file.name) |
| df = clean_data(df) |
| df_global = df |
| return df.head(), gr.update(choices=df.columns.tolist()) |
|
|
| def set_target_column(col_name): |
| global target_column_global |
| target_column_global = col_name |
| return f"✅ Target column set to: {col_name}" |
|
|
| def clean_data(df): |
| from sklearn.preprocessing import LabelEncoder |
| import numpy as np |
|
|
| |
| df = df.dropna(how='all', axis=1).dropna(how='all', axis=0) |
|
|
| |
| for col in df.columns: |
| if df[col].dtype == 'object': |
| |
| try: |
| cleaned = df[col].str.replace(r'[$,]', '', regex=True).str.strip() |
| df[col] = pd.to_numeric(cleaned, errors='ignore') |
| except Exception: |
| pass |
|
|
| |
| for col in df.select_dtypes(include='object').columns: |
| try: |
| df[col] = df[col].astype(str) |
| df[col] = LabelEncoder().fit_transform(df[col]) |
| except Exception: |
| pass |
|
|
| |
| df = df.fillna(df.mean(numeric_only=True)) |
|
|
| return df |
|
|
|
|
|
|
|
|
|
|
|
|
| |
|
|
| import json |
| import re |
| import ast |
|
|
| def extract_json_from_codeagent_output(raw_output): |
| try: |
| |
| if isinstance(raw_output, dict): |
| |
| if "output" in raw_output and isinstance(raw_output["output"], str): |
| try: |
| return json.loads(raw_output["output"]) |
| except json.JSONDecodeError: |
| pass |
| return raw_output |
|
|
| |
| if isinstance(raw_output, str): |
| try: |
| return json.loads(raw_output) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| code_blocks = re.findall(r"```(?:json|py|python)?\n([\s\S]*?)```", raw_output, re.DOTALL) |
|
|
| for block in code_blocks: |
| for pattern in [ |
| r"print\(\s*json\.dumps\(\s*(\{[\s\S]*?\})\s*\)\s*\)", |
| r"json\.dumps\(\s*(\{[\s\S]*?\})\s*\)", |
| r"result\s*=\s*(\{[\s\S]*?\})", |
| r"final_answer\s*\(\s*(\{[\s\S]*?\})\s*\)", |
| r"^(\{[\s\S]*\})$" |
| ]: |
| match = re.search(pattern, block, re.DOTALL) |
| if match: |
| try: |
| return json.loads(match.group(1)) |
| except json.JSONDecodeError: |
| return ast.literal_eval(match.group(1)) |
|
|
| |
| fallback = re.search(r"\{[\s\S]+?\}", raw_output) |
| if fallback: |
| try: |
| return json.loads(fallback.group(0)) |
| except json.JSONDecodeError: |
| return ast.literal_eval(fallback.group(0)) |
|
|
| except Exception as e: |
| print(f"[extract_json] Error: {e}") |
|
|
| |
| return {"error": "Failed to extract structured JSON"} |
|
|
|
|
|
|
|
|
| import pandas as pd |
| import tempfile |
|
|
| def analyze_data(csv_file, additional_notes=""): |
| start_time = time.time() |
| process = psutil.Process(os.getpid()) |
| initial_memory = process.memory_info().rss / 1024 ** 2 |
|
|
| |
| try: |
| df = pd.read_csv(csv_file) |
| df = clean_data(df) |
| except Exception as e: |
| return f"<p style='color:red'><b>Error loading or cleaning CSV:</b> {e}</p>", [] |
|
|
| |
| cleaned_csv_path = "./cleaned_data.csv" |
| df.to_csv(cleaned_csv_path, index=False) |
|
|
| |
| if os.path.exists('./figures'): |
| shutil.rmtree('./figures') |
| os.makedirs('./figures', exist_ok=True) |
|
|
| |
| wandb.login(key=os.environ.get('WANDB_API_KEY')) |
| run = wandb.init(project="huggingface-data-analysis", config={ |
| "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", |
| "additional_notes": additional_notes, |
| "source_file": cleaned_csv_path |
| }) |
|
|
| |
| agent = CodeAgent( |
| tools=[], |
| model=model, |
| additional_authorized_imports=[ |
| "numpy", "pandas", "matplotlib.pyplot", "seaborn", "sklearn", "json" |
| ] |
| ) |
|
|
| |
| raw_output = agent.run(""" |
| You are a data analysis agent. Output in JSON only. Follow these instructions EXACTLY: |
| 1. Load the data from the given `source_file` ONLY. DO NOT create your OWN DATA. |
| 2. Analyze the data structure and generate up to 3 visualizations and 3 insights. |
| 3. Save all figures to `./figures` as PNG using matplotlib or seaborn. |
| 4. Use only authorized imports: `pandas`, `numpy`, `matplotlib.pyplot`, `seaborn`, `json`. |
| 5. DO NOT return any explanations, thoughts, or narration outside the final JSON block |
| 6. Run only 5 iteration and return output quickly. |
| 7. DO NOT include any natural language (e.g., "Thoughts:", "Code:", explanations). |
| 8. ONLY output a single, valid JSON block. No markdown or extra text. |
| 9. Output ONLY the following JSON code block format, exactly: |
| { |
| 'observations': { |
| 'observation_1_key': 'observation_1_value', |
| ... |
| }, |
| 'insights': { |
| 'insight_1_key': 'insight_1_value', |
| ... |
| } |
| } |
| """, additional_args={"additional_notes": additional_notes, "source_file": cleaned_csv_path}) |
|
|
|
|
| if isinstance(raw_output, dict) and "output" in raw_output: |
| print(f"Raw output: {raw_output['output'][:1000]}") |
| else: |
| print(f"Raw output: {str(raw_output)[:1000]}") |
|
|
|
|
| |
| parsed_result = extract_json_from_codeagent_output(raw_output) or { |
| "error": "Failed to extract structured JSON" |
| } |
|
|
| |
| execution_time = time.time() - start_time |
| final_memory = process.memory_info().rss / 1024 ** 2 |
| memory_usage = final_memory - initial_memory |
|
|
| wandb.log({ |
| "execution_time_sec": round(execution_time, 2), |
| "memory_usage_mb": round(memory_usage, 2) |
| }) |
|
|
| |
| visuals = [os.path.join('./figures', f) for f in os.listdir('./figures') if f.lower().endswith(('.png', '.jpg', '.jpeg'))] |
| for viz in visuals: |
| wandb.log({os.path.basename(viz): wandb.Image(viz)}) |
|
|
| run.finish() |
|
|
| |
| summary_html = "<h3>📊 Data Analysis Summary</h3>" |
| if "observations" in parsed_result: |
| summary_html += "<h4>🔍 Observations</h4><ul>" + "".join( |
| f"<li><b>{k}:</b> {v}</li>" for k, v in parsed_result["observations"].items() |
| ) + "</ul>" |
| if "insights" in parsed_result: |
| summary_html += "<h4>💡 Insights</h4><ul>" + "".join( |
| f"<li><b>{k}:</b> {v}</li>" for k, v in parsed_result["insights"].items() |
| ) + "</ul>" |
| if "error" in parsed_result: |
| summary_html += f"<p style='color:red'><b>Error:</b> {parsed_result['error']}</p>" |
|
|
| return summary_html, visuals |
|
|
|
|
|
|
|
|
| def format_analysis_report(raw_output, visuals): |
| import json |
|
|
| try: |
| if isinstance(raw_output, dict): |
| analysis_dict = raw_output |
| else: |
| try: |
| analysis_dict = json.loads(str(raw_output)) |
| except (json.JSONDecodeError, TypeError) as e: |
| print(f"Error parsing CodeAgent output: {e}") |
| return f"<pre>{str(raw_output)}</pre>", visuals |
|
|
| report = f""" |
| <div style="font-family: Arial, sans-serif; padding: 20px; color: #333;"> |
| <h1 style="color: #2B547E; border-bottom: 2px solid #2B547E; padding-bottom: 10px;">📊 Data Analysis Report</h1> |
| <div style="margin-top: 25px; background: #f8f9fa; padding: 20px; border-radius: 8px;"> |
| <h2 style="color: #2B547E;">🔍 Key Observations</h2> |
| {format_observations(analysis_dict.get('observations', {}))} |
| </div> |
| <div style="margin-top: 30px;"> |
| <h2 style="color: #2B547E;">💡 Insights & Visualizations</h2> |
| {format_insights(analysis_dict.get('insights', {}), visuals)} |
| </div> |
| </div> |
| """ |
| return report, visuals |
|
|
| except Exception as e: |
| print(f"Error in format_analysis_report: {e}") |
| return f"<pre>{str(raw_output)}</pre>", visuals |
|
|
|
|
| def format_observations(observations): |
| return '\n'.join([ |
| f""" |
| <div style="margin: 15px 0; padding: 15px; background: white; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);"> |
| <h3 style="margin: 0 0 10px 0; color: #4A708B;">{key.replace('_', ' ').title()}</h3> |
| <pre style=" |
| margin: 0; |
| padding: 10px; |
| background: #eef2f7; |
| border-radius: 4px; |
| color: #1f2d3d; |
| font-size: 14px; |
| font-family: 'Courier New', Courier, monospace; |
| white-space: pre-wrap; |
| opacity: 1; |
| ">{value}</pre> |
| </div> |
| """ for key, value in observations.items() |
| ]) |
|
|
|
|
| def format_insights(insights, visuals): |
| if isinstance(insights, dict): |
| |
| insight_items = list(insights.items()) |
| elif isinstance(insights, list): |
| |
| insight_items = [(item.get("category", f"Insight {idx+1}"), item["insight"]) for idx, item in enumerate(insights)] |
| else: |
| return "<p>No insights available or incorrect format.</p>" |
|
|
| return '\n'.join([ |
| f""" |
| <div style="margin: 20px 0; padding: 20px; background: white; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);"> |
| <div style="display: flex; align-items: center; gap: 10px;"> |
| <div style="background: #2B547E; color: white; width: 30px; height: 30px; border-radius: 50%; display: flex; align-items: center; justify-content: center;">{idx+1}</div> |
| <div> |
| <h4 style="margin: 0; color: #2B547E;">{title}</h4> |
| <p style="margin: 5px 0 0 0; font-size: 16px; color: #333; font-weight: 500;">{insight}</p> |
| </div> |
| </div> |
| {f'<img src="file/{os.path.basename(visuals[idx])}" style="max-width: 100%; height: auto; margin-top: 10px; border-radius: 6px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">' if idx < len(visuals) else ''} |
| </div> |
| """ for idx, (title, insight) in enumerate(insight_items) |
| ]) |
|
|
|
|
| from sklearn.model_selection import StratifiedKFold, GridSearchCV |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.preprocessing import StandardScaler |
| from sklearn.metrics import f1_score, precision_score, recall_score |
| import optuna |
|
|
| def compare_models(): |
| import seaborn as sns |
| from sklearn.model_selection import cross_val_predict, cross_val_score |
|
|
| if df_global is None: |
| return pd.DataFrame({"Error": ["Please upload and preprocess a dataset first."]}), None |
|
|
| global target_column_global |
| target = target_column_global |
| X = df_global.drop(target, axis=1) |
| y = df_global[target] |
|
|
| |
| if y.dtype == 'object': |
| y = LabelEncoder().fit_transform(y) |
|
|
| |
| scaler = StandardScaler() |
| X_scaled = scaler.fit_transform(X) |
|
|
| |
| models = { |
| "RandomForest": RandomForestClassifier(), |
| "LogisticRegression": LogisticRegression(max_iter=1000), |
| "GradientBoosting": GradientBoostingClassifier(), |
| |
| } |
|
|
| |
| ensemble_model = VotingClassifier(estimators=[('rf', RandomForestClassifier()), |
| ('lr', LogisticRegression(max_iter=1000)), |
| ('gb', GradientBoostingClassifier())], voting='hard') |
|
|
| |
| models["Voting Classifier"] = ensemble_model |
|
|
| results = [] |
| for name, model in models.items(): |
| |
| scores = cross_val_score(model, X_scaled, y, cv=5) |
| |
| |
| y_pred = cross_val_predict(model, X_scaled, y, cv=5) |
|
|
| metrics = { |
| "Model": name, |
| "CV Mean Accuracy": np.mean(scores), |
| "CV Std Dev": np.std(scores), |
| "F1 Score": f1_score(y, y_pred, average="weighted", zero_division=0), |
| "Precision": precision_score(y, y_pred, average="weighted", zero_division=0), |
| "Recall": recall_score(y, y_pred, average="weighted", zero_division=0), |
| } |
| |
| if wandb.run is None: |
| wandb.init(project="model_comparison", name="compare_models", reinit=True) |
| wandb.log({f"{name}_{k.replace(' ', '_').lower()}": v for k, v in metrics.items() if isinstance(v, (float, int))}) |
| results.append(metrics) |
|
|
| results_df = pd.DataFrame(results) |
|
|
| |
| plt.figure(figsize=(8, 5)) |
| sns.barplot(data=results_df, x="Model", y="CV Mean Accuracy", palette="Blues_d") |
| plt.title("Model Comparison (CV Mean Accuracy)") |
| plt.ylim(0, 1) |
| plt.tight_layout() |
|
|
| plot_path = "./model_comparison.png" |
| plt.savefig(plot_path) |
| plt.close() |
|
|
| return results_df, plot_path |
|
|
|
|
|
|
|
|
|
|
| |
| |
| def prepare_data(df): |
| global target_column_global |
| from sklearn.model_selection import train_test_split |
|
|
| |
| if target_column_global is None: |
| raise ValueError("Target column not set.") |
| |
| X = df.drop(columns=[target_column_global]) |
| y = df[target_column_global] |
| |
| return train_test_split(X, y, test_size=0.3, random_state=42) |
|
|
| def train_model(_): |
| try: |
| wandb.login(key=os.environ.get("WANDB_API_KEY")) |
| wandb_run = wandb.init( |
| project="huggingface-data-analysis", |
| name=f"Optuna_Run_{datetime.now().strftime('%Y%m%d_%H%M%S')}", |
| reinit=True |
| ) |
|
|
| X_train, X_test, y_train, y_test = prepare_data(df_global) |
|
|
| def objective(trial): |
| params = { |
| "n_estimators": trial.suggest_int("n_estimators", 50, 200), |
| "max_depth": trial.suggest_int("max_depth", 3, 10), |
| } |
| model = RandomForestClassifier(**params) |
| score = cross_val_score(model, X_train, y_train, cv=3).mean() |
| wandb.log({**params, "cv_score": score}) |
| return score |
|
|
| study = optuna.create_study(direction="maximize") |
| study.optimize(objective, n_trials=15) |
|
|
| best_params = study.best_params |
| model = RandomForestClassifier(**best_params) |
| model.fit(X_train, y_train) |
| y_pred = model.predict(X_test) |
|
|
| metrics = { |
| "accuracy": accuracy_score(y_test, y_pred), |
| "precision": precision_score(y_test, y_pred, average="weighted", zero_division=0), |
| "recall": recall_score(y_test, y_pred, average="weighted", zero_division=0), |
| "f1_score": f1_score(y_test, y_pred, average="weighted", zero_division=0), |
| } |
|
|
| wandb.log(metrics) |
| wandb_run.finish() |
|
|
| |
| top_trials = sorted(study.trials, key=lambda x: x.value, reverse=True)[:7] |
| trial_rows = [dict(**t.params, score=t.value) for t in top_trials] |
| trials_df = pd.DataFrame(trial_rows) |
|
|
| return metrics, trials_df |
|
|
| except Exception as e: |
| print(f"Training Error: {e}") |
| return {}, pd.DataFrame() |
|
|
|
|
|
|
| def explainability(_): |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| global target_column_global |
| target = target_column_global |
| X = df_global.drop(target, axis=1) |
| y = df_global[target] |
|
|
| if y.dtype == "object": |
| y = LabelEncoder().fit_transform(y) |
|
|
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) |
|
|
| model = RandomForestClassifier() |
| model.fit(X_train, y_train) |
|
|
| explainer = shap.TreeExplainer(model) |
| shap_values = explainer.shap_values(X_test) |
|
|
| try: |
| if isinstance(shap_values, list): |
| class_idx = 0 |
| sv = shap_values[class_idx] |
| else: |
| sv = shap_values |
|
|
| |
| if len(sv.shape) > 2: |
| sv = sv.reshape(sv.shape[0], -1) |
|
|
| |
| num_features = sv.shape[1] |
| if num_features <= X_test.shape[1]: |
| feature_names = X_test.columns[:num_features] |
| else: |
| feature_names = [f"Feature_{i}" for i in range(num_features)] |
|
|
| X_shap_safe = pd.DataFrame(np.zeros_like(sv), columns=feature_names) |
|
|
| shap.summary_plot(sv, X_shap_safe, show=False) |
| shap_path = "./shap_plot.png" |
| plt.title("SHAP Summary") |
| plt.savefig(shap_path) |
| if wandb.run: |
| wandb.log({"shap_summary": wandb.Image(shap_path)}) |
| plt.clf() |
|
|
| except Exception as e: |
| shap_path = "./shap_error.png" |
| print("SHAP plotting failed:", e) |
| plt.figure(figsize=(6, 3)) |
| plt.text(0.5, 0.5, f"SHAP Error:\n{str(e)}", ha='center', va='center') |
| plt.axis('off') |
| plt.savefig(shap_path) |
| if wandb.run: |
| wandb.log({"shap_error": wandb.Image(shap_path)}) |
| plt.clf() |
|
|
| |
| lime_explainer = lime.lime_tabular.LimeTabularExplainer( |
| X_train.values, |
| feature_names=X_train.columns.tolist(), |
| class_names=[str(c) for c in np.unique(y_train)], |
| mode='classification' |
| ) |
| lime_exp = lime_explainer.explain_instance(X_test.iloc[0].values, model.predict_proba) |
| lime_fig = lime_exp.as_pyplot_figure() |
| lime_path = "./lime_plot.png" |
| lime_fig.savefig(lime_path) |
| if wandb.run: |
| wandb.log({"lime_explanation": wandb.Image(lime_path)}) |
| plt.clf() |
|
|
| return shap_path, lime_path |
|
|
| |
|
|
| def update_target_choices(): |
| global df_global |
| if df_global is not None: |
| return gr.update(choices=df_global.columns.tolist()) |
| else: |
| return gr.update(choices=[]) |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("## 📊 AI-Powered Data Analysis with Hyperparameter Optimization") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| file_input = gr.File(label="Upload CSV or Excel", type="filepath") |
| df_output = gr.DataFrame(label="Cleaned Data Preview") |
| target_dropdown = gr.Dropdown(label="Select Target Column", choices=[], interactive=True) |
| target_status = gr.Textbox(label="Target Column Status", interactive=False) |
|
|
| file_input.change(fn=upload_file, inputs=file_input, outputs=[df_output, target_dropdown]) |
| |
| target_dropdown.change(fn=set_target_column, inputs=target_dropdown, outputs=target_status) |
|
|
| with gr.Column(): |
| insights_output = gr.HTML(label="Insights from SmolAgent") |
| visual_output = gr.Gallery(label="Visualizations (Auto-generated by Agent)", columns=2) |
| agent_btn = gr.Button("Run AI Agent (5 Insights + 5 Visualizations)") |
|
|
| with gr.Row(): |
| train_btn = gr.Button("Train Model with Optuna + WandB") |
| metrics_output = gr.JSON(label="Performance Metrics") |
| trials_output = gr.DataFrame(label="Top 7 Hyperparameter Trials") |
|
|
| with gr.Row(): |
| explain_btn = gr.Button("SHAP + LIME Explainability") |
| shap_img = gr.Image(label="SHAP Summary Plot") |
| lime_img = gr.Image(label="LIME Explanation") |
|
|
| with gr.Row(): |
| compare_btn = gr.Button("Compare Models (A/B Testing)") |
| compare_output = gr.DataFrame(label="Model Comparison (CV + Metrics)") |
| compare_img = gr.Image(label="Model Accuracy Plot") |
|
|
| agent_btn.click(fn=analyze_data, inputs=[file_input], outputs=[insights_output, visual_output]) |
| train_btn.click(fn=train_model, inputs=[file_input], outputs=[metrics_output, trials_output]) |
| explain_btn.click(fn=explainability, inputs=[], outputs=[shap_img, lime_img]) |
| compare_btn.click(fn=compare_models, inputs=[], outputs=[compare_output, compare_img]) |
|
|
| demo.launch(debug=True) |
|
|