| """ |
| ADDING NEW ANALYSIS TASKS: |
| ========================== |
| |
| 1. Add task configuration in TaskRegistry.get_config(): |
| "Your Data Type": { |
| "Your Task Name": TaskConfig( |
| name="Your Task Name", |
| data_type="Your Data Type", |
| requires_params=True, # Set to True if needs parameters |
| param_components=[...], # Define parameter types |
| output_tabs=["original", "summary", ...] # Which tabs to show |
| ) |
| } |
| |
| 2. Add execution method in AnalysisExecutor: |
| @staticmethod |
| def execute_your_task(df, param1, param2): |
| # Your analysis logic |
| return "✓ Done", { |
| "original": df, |
| "summary": summary_data, |
| ... |
| } |
| |
| 3. In create_interface(), add parameter group in the LEFT panel: |
| with gr.Group(visible=False) as your_task_param_group: |
| gr.Markdown("**Your Task Parameters**") |
| your_param1 = gr.Slider(minimum=0, maximum=1, label="Threshold") |
| your_param2 = gr.Dropdown(choices=["A", "B"], label="Option") |
| |
| 4. In UIManager.on_tasks_change() |
| """ |
|
|
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from typing import Optional, Tuple, List, Dict, Any |
| import io |
| import base64 |
| from tqdm.auto import tqdm |
| from dataclasses import dataclass |
| import gradio as gr |
| |
|
|
|
|
| from pipeline.deduplication import find_near_duplicates |
| from pipeline.featurizer import custom_featurizer |
| from pipeline.issues import find_issues |
| from pipeline.pipeline import make_step, run_pipeline |
|
|
|
|
|
|
| from pipeline import make_step, run_pipeline |
|
|
| |
| |
| |
|
|
| @dataclass |
| class TaskConfig: |
| """Configuration for each analysis task""" |
| name: str |
| data_type: str |
| requires_params: bool |
| param_components: List[Dict[str, Any]] |
| output_tabs: List[str] |
| |
| class TaskRegistry: |
| """Registry mapping tasks to their configurations""" |
| |
| @staticmethod |
| def get_config(data_type: str, task_name: str) -> TaskConfig: |
| """Get configuration for a specific task""" |
| configs = { |
| "EHR Data": { |
| "Near-Duplicate Detection": TaskConfig( |
| name="Near-Duplicate Detection", |
| data_type="EHR Data", |
| requires_params=True, |
| param_components=[ |
| {"type": "dropdown", "label": "Label Column", "elem_id": "ndd_label"} |
| ], |
| output_tabs=["original", "processed", "summary"] |
| ), |
| "Find Mislabeled Data": TaskConfig( |
| name="Find Mislabeled Data", |
| data_type="EHR Data", |
| requires_params=True, |
| param_components=[ |
| {"type": "dropdown", "label": "Label Column", "elem_id": "mislabel_label"} |
| ], |
| output_tabs=["original", "summary"] |
| ) |
| }, |
| "ECG Data": { |
| "ECG Visualization": TaskConfig( |
| name="ECG Visualization", |
| data_type="ECG Data", |
| requires_params=False, |
| param_components=[], |
| output_tabs=["visualization", "summary"] |
| ), |
| "Statistical Summary": TaskConfig( |
| name="Statistical Summary", |
| data_type="ECG Data", |
| requires_params=False, |
| param_components=[], |
| output_tabs=["summary"] |
| ) |
| } |
| } |
| return configs.get(data_type, {}).get(task_name) |
| |
| @staticmethod |
| def get_tasks_for_data_type(data_type: str) -> List[str]: |
| """Get available tasks for a data type""" |
| tasks = { |
| "EHR Data": ["Near-Duplicate Detection", "Find Mislabeled Data"], |
| "ECG Data": ["ECG Visualization", "Statistical Summary"] |
| } |
| return tasks.get(data_type, []) |
|
|
|
|
| |
| |
| |
|
|
| class AnalysisExecutor: |
| """Executes analysis tasks and returns results""" |
| |
| @staticmethod |
| def execute_near_duplicate_detection(df: pd.DataFrame, label: str) -> Tuple[str, Dict[str, Any]]: |
| """Execute near-duplicate detection pipeline""" |
| try: |
| if not label: |
| return "⚠ Label column required", { |
| "original": df, |
| "processed": None, |
| "summary": None |
| } |
| |
| bar = tqdm(total=100, leave=False, desc="Pipeline Progress") |
| steps = [ |
| make_step(find_near_duplicates, name="dedup")(progress=bar), |
| make_step(custom_featurizer, name="featurize")( |
| label=label, nan_strategy="impute", on_pipeline_error="drop", progress=bar |
| ), |
| make_step(find_issues, name="find_label_issues")(label=label, progress=bar), |
| ] |
| results_df, summary_list = run_pipeline(steps, df=df) |
| bar.close() |
| |
| return "✓ Near-duplicate detection completed", { |
| "original": df, |
| "processed": results_df, |
| "summary": summary_list |
| } |
| except Exception as e: |
| return f"✗ Error: {str(e)}", { |
| "original": df, |
| "processed": None, |
| "summary": None |
| } |
| |
| @staticmethod |
| def execute_find_mislabeled(df: pd.DataFrame, label: str) -> Tuple[str, Dict[str, Any]]: |
| """Execute mislabeled data detection""" |
| try: |
| if not label: |
| return "⚠ Label column required", { |
| "original": df, |
| "summary": None |
| } |
| |
| |
| summary = { |
| "task": "Find Mislabeled Data", |
| "label_column": label, |
| "total_samples": len(df), |
| "suspicious_samples": 0, |
| "message": "Mislabeled detection analysis completed" |
| } |
| |
| return "✓ Mislabeled data analysis completed", { |
| "original": df, |
| "summary": summary |
| } |
| except Exception as e: |
| return f"✗ Error: {str(e)}", { |
| "original": df, |
| "summary": None |
| } |
| |
| @staticmethod |
| def execute_ecg_visualization(df: pd.DataFrame) -> Tuple[str, Dict[str, Any]]: |
| """Execute ECG visualization""" |
| try: |
| fig, ax = plt.subplots(figsize=(12, 6)) |
| if len(df.columns) > 1: |
| for col in df.columns[1:]: |
| ax.plot(df.iloc[:, 0], df[col], label=str(col), linewidth=0.8) |
| ax.set_xlabel('Time (ms)') |
| ax.set_ylabel('Amplitude (mV)') |
| ax.set_title('ECG Signal Visualization') |
| ax.legend() |
| else: |
| ax.plot(df.iloc[:, 0], linewidth=0.8) |
| ax.set_xlabel('Sample Index') |
| ax.set_ylabel('Amplitude') |
| ax.set_title('ECG Signal') |
| ax.grid(True, alpha=0.3) |
| plt.tight_layout() |
| |
| |
| buf = io.BytesIO() |
| fig.savefig(buf, format='png', dpi=100, bbox_inches='tight') |
| buf.seek(0) |
| img_base64 = base64.b64encode(buf.read()).decode('utf-8') |
| plt.close(fig) |
| |
| viz_html = f'<div style="text-align:center;"><img src="data:image/png;base64,{img_base64}" style="max-width:100%;"/></div>' |
| |
| summary = { |
| "task": "ECG Visualization", |
| "samples": len(df), |
| "channels": len(df.columns) - 1 if len(df.columns) > 1 else 1 |
| } |
| |
| return "✓ ECG visualization created", { |
| "visualization": viz_html, |
| "summary": summary |
| } |
| except Exception as e: |
| return f"✗ Error: {str(e)}", { |
| "visualization": None, |
| "summary": None |
| } |
| |
| @staticmethod |
| def execute_statistical_summary(df: pd.DataFrame) -> Tuple[str, Dict[str, Any]]: |
| """Execute statistical summary""" |
| try: |
| stats = df.describe().to_html(classes='preview-table') |
| summary_html = f"<h3>Statistical Summary</h3><div style='overflow-x:auto;'>{stats}</div>" |
| |
| summary = { |
| "task": "Statistical Summary", |
| "rows": len(df), |
| "columns": len(df.columns), |
| "numeric_columns": len(df.select_dtypes(include=[np.number]).columns), |
| "html": summary_html |
| } |
| |
| return "✓ Statistical summary generated", { |
| "summary": summary, |
| "visualization": summary_html |
| } |
| except Exception as e: |
| return f"✗ Error: {str(e)}", { |
| "summary": None, |
| "visualization": None |
| } |
|
|
|
|
| |
| |
| |
|
|
| class UIManager: |
| """Manages UI state and dynamic updates""" |
| |
| def __init__(self): |
| self.current_df = None |
| self.current_data_type = "EHR Data" |
| self.chatbot_context = {} |
| |
| def load_csv(self, file) -> Tuple[str, Optional[pd.DataFrame]]: |
| """Load CSV file""" |
| if file is None: |
| return "⚠ No file uploaded", None |
| try: |
| df = pd.read_csv(file.name) |
| self.current_df = df |
| return f"✓ Loaded {len(df)} rows, {len(df.columns)} columns", df |
| except Exception as e: |
| return f"✗ Error: {str(e)}", None |
| |
| def on_file_upload(self, file, data_type: str): |
| """Handle file upload - returns updates for all components""" |
| status, df = self.load_csv(file) |
| |
| if df is None: |
| return ( |
| status, |
| gr.update(value=None), |
| gr.update(choices=[], value=[], interactive=True), |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(choices=[]), |
| gr.update(choices=[]), |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), |
| ) |
| |
| self.chatbot_context = {"file": file.name, "type": data_type, "df": df} |
| available_tasks = TaskRegistry.get_tasks_for_data_type(data_type) |
| col_choices = list(df.columns) |
| |
| return ( |
| status, |
| gr.update(value=df.head(200)), |
| gr.update(choices=available_tasks, value=[], interactive=True), |
| gr.update(visible=False), |
| gr.update(visible=False), |
| gr.update(choices=col_choices, value=None), |
| gr.update(choices=col_choices, value=None), |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), |
| ) |
| |
| def on_data_type_change(self, data_type: str, file): |
| """Handle data type change""" |
| self.current_data_type = data_type |
| available_tasks = TaskRegistry.get_tasks_for_data_type(data_type) |
| |
| if file and self.current_df is not None: |
| self.chatbot_context["type"] = data_type |
| |
| return ( |
| gr.update(choices=available_tasks, value=[]), |
| gr.update(visible=False), |
| gr.update(visible=False), |
| f"Data type changed to: {data_type}" |
| ) |
| |
| def on_tasks_change(self, selected_tasks: List[str], data_type: str, df_columns: List[str]): |
| """Handle task selection change - show/hide parameter groups for all selected tasks""" |
| if not selected_tasks: |
| |
| return ( |
| gr.update(visible=False), |
| gr.update(visible=False), |
| ) |
| |
| |
| show_ndd_params = "Near-Duplicate Detection" in selected_tasks |
| show_mislabel_params = "Find Mislabeled Data" in selected_tasks |
| |
| return ( |
| gr.update(visible=show_ndd_params), |
| gr.update(visible=show_mislabel_params), |
| ) |
| |
| def process_analysis(self, file, data_type: str, selected_tasks: List[str], ndd_label: str, mislabel_label: str): |
| """Process selected analysis tasks - handles multiple tasks""" |
| status, df = self.load_csv(file) |
| |
| if df is None: |
| return ( |
| status, |
| None, None, None, None, |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| ) |
| |
| if not selected_tasks: |
| return ( |
| "⚠ No tasks selected", |
| df.head(200), None, None, None, |
| gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
| ) |
| |
| |
| all_tabs = set() |
| all_results = { |
| "original": df.head(200), |
| "processed": None, |
| "summary": [], |
| "visualization": "" |
| } |
| |
| status_messages = [] |
| executor = AnalysisExecutor() |
| |
| |
| for task_name in selected_tasks: |
| config = TaskRegistry.get_config(data_type, task_name) |
| |
| if not config: |
| status_messages.append(f"✗ Unknown task: {task_name}") |
| continue |
| |
| |
| all_tabs.update(config.output_tabs) |
| |
| |
| if task_name == "Near-Duplicate Detection": |
| status_msg, results = executor.execute_near_duplicate_detection(df, ndd_label) |
| status_messages.append(f"{task_name}: {status_msg}") |
| if results.get("processed") is not None: |
| all_results["processed"] = results["processed"] |
| if results.get("summary") is not None: |
| all_results["summary"].append({"task": task_name, "data": results["summary"]}) |
| |
| elif task_name == "Find Mislabeled Data": |
| status_msg, results = executor.execute_find_mislabeled(df, mislabel_label) |
| status_messages.append(f"{task_name}: {status_msg}") |
| if results.get("summary") is not None: |
| all_results["summary"].append({"task": task_name, "data": results["summary"]}) |
| |
| elif task_name == "ECG Visualization": |
| status_msg, results = executor.execute_ecg_visualization(df) |
| status_messages.append(f"{task_name}: {status_msg}") |
| if results.get("visualization"): |
| all_results["visualization"] += results["visualization"] |
| if results.get("summary") is not None: |
| all_results["summary"].append({"task": task_name, "data": results["summary"]}) |
| |
| elif task_name == "Statistical Summary": |
| status_msg, results = executor.execute_statistical_summary(df) |
| status_messages.append(f"{task_name}: {status_msg}") |
| if results.get("visualization"): |
| all_results["visualization"] += results["visualization"] |
| if results.get("summary") is not None: |
| all_results["summary"].append({"task": task_name, "data": results["summary"]}) |
| |
| |
| show_original = gr.update(visible="original" in all_tabs) |
| show_processed = gr.update(visible="processed" in all_tabs) |
| show_summary = gr.update(visible="summary" in all_tabs) |
| show_viz = gr.update(visible="visualization" in all_tabs) |
| |
| |
| final_status = "\n".join(status_messages) |
| |
| return ( |
| final_status, |
| all_results["original"], |
| all_results["processed"], |
| all_results["summary"] if all_results["summary"] else None, |
| all_results["visualization"] if all_results["visualization"] else None, |
| show_original, |
| show_processed, |
| show_summary, |
| show_viz |
| ) |
| |
| def _format_results(self, status: str, output_tabs: List[str], results: Dict[str, Any]): |
| """Format results based on output tabs configuration""" |
| |
| original_out = None |
| processed_out = None |
| summary_out = None |
| viz_out = None |
| |
| |
| show_original = gr.update(visible=False) |
| show_processed = gr.update(visible=False) |
| show_summary = gr.update(visible=False) |
| show_viz = gr.update(visible=False) |
| |
| |
| if "original" in output_tabs: |
| original_out = results.get("original") |
| show_original = gr.update(visible=True) |
| |
| if "processed" in output_tabs: |
| processed_out = results.get("processed") |
| show_processed = gr.update(visible=True) |
| |
| if "summary" in output_tabs: |
| summary_data = results.get("summary") |
| if results.get("summary_html"): |
| summary_out = results.get("summary_html") |
| else: |
| summary_out = summary_data |
| show_summary = gr.update(visible=True) |
| |
| if "visualization" in output_tabs: |
| viz_out = results.get("visualization") |
| show_viz = gr.update(visible=True) |
| |
| return ( |
| status, |
| original_out, |
| processed_out, |
| summary_out, |
| viz_out, |
| show_original, |
| show_processed, |
| show_summary, |
| show_viz |
| ) |
| |
| def chatbot_respond(self, message: str, history: List): |
| """Chatbot response""" |
| if history is None: |
| history = [] |
| if not message or message.strip() == "": |
| return history, "" |
| |
| df = self.chatbot_context.get("df") |
| |
| if "column" in message.lower(): |
| response = (f"Dataset has {len(df.columns)} columns: {', '.join(map(str, df.columns))}" |
| if df is not None else "Please upload a file first.") |
| elif "row" in message.lower(): |
| response = f"Dataset has {len(df)} rows." if df is not None else "Please upload a file first." |
| elif "help" in message.lower(): |
| response = "Ask about 'columns', 'rows', or your data analysis." |
| else: |
| response = f"You asked: '{message}'. Analyzing: {self.chatbot_context.get('type', 'No data')}." |
| |
| history.append((message, response)) |
| return history, "" |
|
|
|
|
| |
| |
| |
|
|
| def create_interface(): |
| """Build the Gradio interface""" |
| |
| ui_manager = UIManager() |
| |
| custom_css = """ |
| * { box-sizing: border-box; } |
| html, body { margin: 0; padding: 0; height: 100vh; overflow: hidden; } |
| .gradio-container { height: 100vh !important; max-width: 100% !important; padding: 0 !important; } |
| |
| #app-container { |
| height: 100vh; |
| display: flex; |
| flex-direction: column; |
| padding: 0.75rem; |
| gap: 0.75rem; |
| } |
| |
| #main-row { |
| flex: 1; |
| min-height: 0; |
| display: flex; |
| gap: 0.75rem; |
| } |
| |
| #left-panel { |
| display: flex; |
| flex-direction: column; |
| height: 100%; |
| background: #f9fafb; |
| border-radius: 10px; |
| padding: 0.75rem; |
| gap: 0.5rem; |
| } |
| |
| #task-section { |
| flex: 1; |
| min-height: 0; |
| overflow-y: auto; |
| display: flex; |
| flex-direction: column; |
| gap: 0.5rem; |
| } |
| |
| #action-section { |
| flex-shrink: 0; |
| } |
| |
| #middle-panel { |
| display: flex; |
| flex-direction: column; |
| height: 100%; |
| min-width: 0; |
| } |
| |
| #tabs-container { |
| flex: 1; |
| min-height: 0; |
| display: flex; |
| flex-direction: column; |
| } |
| |
| #tabs-container .tabs { |
| height: 100%; |
| display: flex; |
| flex-direction: column; |
| } |
| |
| #tabs-container .tab-nav { |
| flex-shrink: 0; |
| } |
| |
| #tabs-container .tabitem { |
| flex: 1; |
| min-height: 0; |
| overflow: auto; |
| } |
| |
| #chat-panel { |
| display: flex; |
| flex-direction: column; |
| height: 100%; |
| background: #f9fafb; |
| border-radius: 10px; |
| padding: 0.75rem; |
| } |
| |
| #chat-header { |
| flex-shrink: 0; |
| margin-bottom: 0.5rem; |
| } |
| |
| #chat-history { |
| flex: 1; |
| min-height: 0; |
| overflow-y: auto; |
| margin-bottom: 0.5rem; |
| } |
| |
| #chat-input-row { |
| flex-shrink: 0; |
| display: flex; |
| gap: 0.5rem; |
| align-items: flex-end; |
| } |
| |
| #chat-input-row .textbox { |
| flex: 1; |
| } |
| |
| #chat-input-row button { |
| flex-shrink: 0; |
| } |
| |
| .preview-table { |
| border-collapse: collapse; |
| width: 100%; |
| font-size: 0.875rem; |
| } |
| .preview-table th { |
| background-color: #3498db; |
| color: white; |
| padding: 8px; |
| text-align: left; |
| position: sticky; |
| top: 0; |
| } |
| .preview-table td { |
| padding: 6px; |
| border-bottom: 1px solid #ddd; |
| } |
| |
| .compact-header { font-size: 0.95rem; margin: 0; } |
| """ |
| |
| with gr.Blocks(css=custom_css, theme=gr.themes.Soft(), title="Medical Data Analysis Platform") as demo: |
| |
| with gr.Column(elem_id="app-container"): |
| |
| gr.Markdown("# 🏥 Medical Data Analysis Platform", elem_classes=["compact-header"]) |
| |
| |
| with gr.Row(): |
| file_input = gr.File(label="Upload CSV", file_types=[".csv"], scale=2) |
| data_type = gr.Dropdown( |
| choices=["EHR Data", "ECG Data"], |
| value="EHR Data", |
| label="Data Type", |
| scale=1 |
| ) |
| |
| |
| with gr.Row(elem_id="main-row"): |
| |
| |
| with gr.Column(scale=2, elem_id="left-panel"): |
| with gr.Group(elem_id="task-section"): |
| gr.Markdown("#### Analysis Tasks") |
| task_selector = gr.CheckboxGroup( |
| choices=TaskRegistry.get_tasks_for_data_type("EHR Data"), |
| label=None, |
| elem_id="task-checkboxes" |
| ) |
| |
| |
| with gr.Group(visible=False) as ndd_param_group: |
| gr.Markdown("**Near-Duplicate Detection Parameters**") |
| ndd_label_dropdown = gr.Dropdown( |
| choices=[], |
| label="Label Column", |
| elem_id="ndd-label-dropdown" |
| ) |
| |
| with gr.Group(visible=False) as mislabel_param_group: |
| gr.Markdown("**Find Mislabeled Data Parameters**") |
| mislabel_label_dropdown = gr.Dropdown( |
| choices=[], |
| label="Label Column", |
| elem_id="mislabel-label-dropdown" |
| ) |
| |
| with gr.Group(elem_id="action-section"): |
| process_btn = gr.Button("▶ Process", variant="primary", size="lg") |
| status_output = gr.Textbox(label="Status", interactive=False, lines=2) |
| |
| |
| with gr.Column(scale=7, elem_id="middle-panel"): |
| with gr.Tabs(elem_id="tabs-container") as result_tabs: |
| with gr.TabItem("Original Data", visible=False) as tab_original: |
| original_df_output = gr.Dataframe(interactive=False) |
| |
| with gr.TabItem("Processed Data", visible=False) as tab_processed: |
| processed_df_output = gr.Dataframe(interactive=False) |
| |
| with gr.TabItem("Summary", visible=False) as tab_summary: |
| summary_output = gr.JSON() |
| |
| with gr.TabItem("Visualization", visible=False) as tab_viz: |
| viz_output = gr.HTML() |
| |
| |
| with gr.Column(scale=3, elem_id="chat-panel"): |
| gr.Markdown("### 💬 AI Assistant", elem_id="chat-header") |
| chatbot = gr.Chatbot(elem_id="chat-history", height=None, label=None) |
| with gr.Row(elem_id="chat-input-row"): |
| msg_input = gr.Textbox( |
| placeholder="Ask about your data...", |
| label="", |
| scale=4, |
| container=False, |
| lines=1 |
| ) |
| send_btn = gr.Button("Send", scale=1, size="sm") |
| |
| |
| |
| |
| file_input.change( |
| fn=ui_manager.on_file_upload, |
| inputs=[file_input, data_type], |
| outputs=[ |
| status_output, |
| original_df_output, |
| task_selector, |
| ndd_param_group, |
| mislabel_param_group, |
| ndd_label_dropdown, |
| mislabel_label_dropdown, |
| tab_original, tab_processed, tab_summary, tab_viz, |
| ] |
| ) |
| |
| |
| data_type.change( |
| fn=ui_manager.on_data_type_change, |
| inputs=[data_type, file_input], |
| outputs=[task_selector, ndd_param_group, mislabel_param_group, status_output] |
| ) |
| |
| |
| task_selector.change( |
| fn=ui_manager.on_tasks_change, |
| inputs=[task_selector, data_type, ndd_label_dropdown], |
| outputs=[ndd_param_group, mislabel_param_group] |
| ) |
| |
| |
| process_btn.click( |
| fn=ui_manager.process_analysis, |
| inputs=[file_input, data_type, task_selector, ndd_label_dropdown, mislabel_label_dropdown], |
| outputs=[ |
| status_output, |
| original_df_output, |
| processed_df_output, |
| summary_output, |
| viz_output, |
| tab_original, |
| tab_processed, |
| tab_summary, |
| tab_viz |
| ] |
| ) |
| |
| |
| send_btn.click( |
| fn=ui_manager.chatbot_respond, |
| inputs=[msg_input, chatbot], |
| outputs=[chatbot, msg_input] |
| ) |
| |
| msg_input.submit( |
| fn=ui_manager.chatbot_respond, |
| inputs=[msg_input, chatbot], |
| outputs=[chatbot, msg_input] |
| ) |
| |
| return demo |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| demo = create_interface() |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |
|
|
|
|
| |
| |
| |
|
|
| """ |
| ADDING NEW ANALYSIS TASKS: |
| ========================== |
| |
| 1. Add task configuration in TaskRegistry: |
| - Define name, data_type, requires_params, param_components, output_tabs |
| |
| 2. Add execution method in AnalysisExecutor: |
| - Create execute_your_task() method |
| - Return (status_message, results_dict) |
| |
| 3. Add condition in UIManager.process_analysis(): |
| - elif task_name == "Your Task Name": |
| - status_msg, results = executor.execute_your_task(df, params) |
| - return self._format_results(status_msg, config.output_tabs, results) |
| |
| |
| EXAMPLE - Adding a new ECG task with parameters: |
| ================================================ |
| |
| In TaskRegistry.get_config(): |
| "ECG Data": { |
| ... |
| "ECG Quality Check": TaskConfig( |
| name="ECG Quality Check", |
| data_type="ECG Data", |
| requires_params=True, |
| param_components=[ |
| {"type": "slider", "label": "Quality Threshold", "elem_id": "quality_thresh"} |
| ], |
| output_tabs=["original", "summary", "visualization"] |
| ) |
| } |
| |
| In AnalysisExecutor: |
| @staticmethod |
| def execute_ecg_quality_check(df: pd.DataFrame, threshold: float) -> Tuple[str, Dict[str, Any]]: |
| # Your analysis logic here |
| return "✓ Quality check completed", { |
| "original": df, |
| "summary": quality_summary, |
| "visualization": viz_html |
| } |
| |
| In UIManager.process_analysis(): |
| elif task_name == "ECG Quality Check": |
| status_msg, results = executor.execute_ecg_quality_check(df, float(param_value)) |
| return self._format_results(status_msg, config.output_tabs, results) |
| |
| |
| The architecture is now: |
| - Simple and clean |
| - Easy to extend with new tasks |
| - Dynamic UI based on task configuration |
| - Fixed chat scrolling issue |
| """ |