| import os |
| import sys |
| import re |
| import gradio as gr |
| import json |
| import tempfile |
| import base64 |
| import io |
| from typing import List, Dict, Any, Optional, Tuple, Union |
| import logging |
| import pandas as pd |
| import plotly.express as px |
| import plotly.graph_objects as go |
| from plotly.subplots import make_subplots |
| try: |
| from sqlalchemy import text as sa_text |
| except Exception: |
| sa_text = None |
|
|
| try: |
| |
| from langchain_community.agent_toolkits import create_sql_agent |
| from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit |
| from langchain_community.utilities import SQLDatabase |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain.agents.agent_types import AgentType |
| from langchain.memory import ConversationBufferWindowMemory |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage |
| import pymysql |
| from dotenv import load_dotenv |
| |
| DEPENDENCIES_AVAILABLE = True |
| except ImportError as e: |
| logger.warning(f"Some dependencies are not available: {e}") |
| DEPENDENCIES_AVAILABLE = False |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
|
|
| def generate_chart(data: Union[Dict, List[Dict], pd.DataFrame], |
| chart_type: str, |
| x: str, |
| y: str = None, |
| title: str = "", |
| x_label: str = None, |
| y_label: str = None): |
| """ |
| Generate an interactive Plotly figure from data. |
| |
| Args: |
| data: The data to plot (can be a list of dicts or a pandas DataFrame) |
| chart_type: Type of chart to generate (bar, line, pie, scatter, histogram) |
| x: Column name for x-axis (names for pie) |
| y: Column name for y-axis (values for pie) |
| title: Chart title |
| x_label: Label for x-axis |
| y_label: Label for y-axis |
| |
| Returns: |
| A Plotly Figure object (interactive) or None on error |
| """ |
| try: |
| |
| if isinstance(data, list): |
| df = pd.DataFrame(data) |
| elif isinstance(data, dict): |
| df = pd.DataFrame([data]) |
| else: |
| df = data |
| |
| if not isinstance(df, pd.DataFrame): |
| return None |
| |
| |
| fig = None |
| if chart_type == 'bar': |
| fig = px.bar(df, x=x, y=y, title=title) |
| elif chart_type == 'line': |
| fig = px.line(df, x=x, y=y, title=title) |
| elif chart_type == 'pie': |
| fig = px.pie(df, names=x, values=y, title=title, hole=0) |
| elif chart_type == 'scatter': |
| fig = px.scatter(df, x=x, y=y, title=title) |
| elif chart_type == 'histogram': |
| fig = px.histogram(df, x=x, title=title) |
| else: |
| return None |
| |
| |
| fig.update_layout( |
| xaxis_title=x_label or x, |
| yaxis_title=y_label or (y if y != x else ''), |
| title=title or f"{chart_type.capitalize()} Chart of {x} vs {y}" if y else f"{chart_type.capitalize()} Chart of {x}", |
| template="plotly_white", |
| margin=dict(l=20, r=20, t=40, b=20), |
| height=400 |
| ) |
| |
| return fig |
| |
| except Exception as e: |
| error_msg = f"Error generating chart: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| return None |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def check_environment(): |
| """Verifica si el entorno está configurado correctamente.""" |
| if not DEPENDENCIES_AVAILABLE: |
| return False, "Missing required Python packages. Please install them with: pip install -r requirements.txt" |
| |
| |
| required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"] |
| missing_vars = [var for var in required_vars if not os.getenv(var)] |
| |
| if missing_vars: |
| return False, f"Missing required environment variables: {', '.join(missing_vars)}" |
| |
| return True, "Environment is properly configured" |
|
|
| def setup_database_connection(): |
| """Intenta establecer una conexión a la base de datos.""" |
| if not DEPENDENCIES_AVAILABLE: |
| return None, "Dependencies not available" |
| |
| try: |
| load_dotenv(override=True) |
| |
| |
| logger.info("Environment variables:") |
| for key, value in os.environ.items(): |
| if any(s in key.lower() for s in ['pass', 'key', 'secret']): |
| logger.info(f" {key}: {'*' * 8} (hidden for security)") |
| else: |
| logger.info(f" {key}: {value}") |
| |
| db_user = os.getenv("DB_USER") |
| db_password = os.getenv("DB_PASSWORD") |
| db_host = os.getenv("DB_HOST") |
| db_name = os.getenv("DB_NAME") |
| |
| |
| logger.info(f"Database connection attempt - Host: {db_host}, User: {db_user}, DB: {db_name}") |
| if not all([db_user, db_password, db_host, db_name]): |
| missing = [var for var, val in [ |
| ("DB_USER", db_user), |
| ("DB_PASSWORD", "*" if db_password else ""), |
| ("DB_HOST", db_host), |
| ("DB_NAME", db_name) |
| ] if not val] |
| logger.error(f"Missing required database configuration: {', '.join(missing)}") |
| return None, f"Missing database configuration: {', '.join(missing)}" |
| |
| if not all([db_user, db_password, db_host, db_name]): |
| return None, "Missing database configuration" |
| |
| logger.info(f"Connecting to database: {db_user}@{db_host}/{db_name}") |
| |
| |
| connection = pymysql.connect( |
| host=db_host, |
| user=db_user, |
| password=db_password, |
| database=db_name, |
| connect_timeout=5, |
| cursorclass=pymysql.cursors.DictCursor |
| ) |
| connection.close() |
| |
| |
| db_uri = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" |
| logger.info("Database connection successful") |
| return SQLDatabase.from_uri(db_uri), "" |
| |
| except Exception as e: |
| error_msg = f"Error connecting to database: {str(e)}" |
| logger.error(error_msg) |
| return None, error_msg |
|
|
| def initialize_llm(): |
| """Inicializa el modelo de lenguaje.""" |
| if not DEPENDENCIES_AVAILABLE: |
| error_msg = "Dependencies not available. Make sure all required packages are installed." |
| logger.error(error_msg) |
| return None, error_msg |
| |
| google_api_key = os.getenv("GOOGLE_API_KEY") |
| logger.info(f"GOOGLE_API_KEY found: {'Yes' if google_api_key else 'No'}") |
| |
| if not google_api_key: |
| error_msg = "GOOGLE_API_KEY not found in environment variables. Please check your Hugging Face Space secrets." |
| logger.error(error_msg) |
| return None, error_msg |
| |
| try: |
| logger.info("Initializing Google Generative AI...") |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-2.0-flash", |
| temperature=0, |
| google_api_key=google_api_key, |
| convert_system_message_to_human=True |
| ) |
| |
| |
| test_prompt = "Hello, this is a test." |
| logger.info(f"Testing model with prompt: {test_prompt}") |
| test_response = llm.invoke(test_prompt) |
| logger.info(f"Model test response: {str(test_response)[:100]}...") |
| |
| logger.info("Google Generative AI initialized successfully") |
| return llm, "" |
| |
| except Exception as e: |
| error_msg = f"Error initializing Google Generative AI: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| return None, error_msg |
|
|
| def create_agent(): |
| """Crea el agente SQL si es posible.""" |
| if not DEPENDENCIES_AVAILABLE: |
| error_msg = "Dependencies not available. Please check if all required packages are installed." |
| logger.error(error_msg) |
| return None, error_msg |
| |
| logger.info("Starting agent creation process...") |
| |
| def create_agent(llm, db_connection): |
| """Create and return a SQL database agent with conversation memory.""" |
| if not llm: |
| error_msg = "Cannot create agent: LLM is not available" |
| logger.error(error_msg) |
| return None, error_msg |
| |
| if not db_connection: |
| error_msg = "Cannot create agent: Database connection is not available" |
| logger.error(error_msg) |
| return None, error_msg |
| |
| try: |
| logger.info("Creating SQL agent with memory...") |
| |
| |
| memory = ConversationBufferWindowMemory( |
| memory_key="chat_history", |
| k=5, |
| return_messages=True, |
| output_key="output" |
| ) |
| |
| |
| toolkit = SQLDatabaseToolkit( |
| db=db_connection, |
| llm=llm |
| ) |
| |
| |
| agent = create_sql_agent( |
| llm=llm, |
| toolkit=toolkit, |
| agent_type=AgentType.OPENAI_FUNCTIONS, |
| verbose=True, |
| handle_parsing_errors=True, |
| max_iterations=10, |
| early_stopping_method="generate", |
| memory=memory, |
| return_intermediate_steps=True |
| ) |
| |
| |
| logger.info("Testing agent with a simple query...") |
| try: |
| test_query = "SELECT 1" |
| test_result = agent.run(test_query) |
| logger.info(f"Agent test query successful: {str(test_result)[:200]}...") |
| except Exception as e: |
| logger.warning(f"Agent test query failed (this might be expected): {str(e)}") |
| |
| |
| logger.info("SQL agent created successfully") |
| return agent, "" |
| |
| except Exception as e: |
| error_msg = f"Error creating SQL agent: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| return None, error_msg |
|
|
| |
| logger.info("="*50) |
| logger.info("Starting application initialization...") |
| logger.info(f"Python version: {sys.version}") |
| logger.info(f"Current working directory: {os.getcwd()}") |
| logger.info(f"Files in working directory: {os.listdir()}") |
|
|
| |
| logger.info("Checking environment variables...") |
| for var in ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_NAME", "GOOGLE_API_KEY"]: |
| logger.info(f"{var}: {'*' * 8 if os.getenv(var) else 'NOT SET'}") |
|
|
| |
| logger.info("Initializing database connection...") |
| db_connection, db_error = setup_database_connection() |
| if db_error: |
| logger.error(f"Failed to initialize database: {db_error}") |
|
|
| logger.info("Initializing language model...") |
| llm, llm_error = initialize_llm() |
| if llm_error: |
| logger.error(f"Failed to initialize language model: {llm_error}") |
|
|
| logger.info("Initializing agent...") |
| agent, agent_error = create_agent(llm, db_connection) |
| db_connected = agent is not None |
|
|
| if agent: |
| logger.info("Agent initialized successfully") |
| else: |
| logger.error(f"Failed to initialize agent: {agent_error}") |
|
|
| logger.info("="*50) |
|
|
| def looks_like_sql(s: str) -> bool: |
| """Heuristic to check if a string looks like an executable SQL statement.""" |
| if not s: |
| return False |
| s_strip = s.strip().lstrip("-- ") |
| |
| return bool(re.match(r"^(WITH|SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE)\b", s_strip, re.IGNORECASE)) |
|
|
|
|
| def extract_sql_query(text): |
| """Extrae consultas SQL del texto. Acepta solo bloques etiquetados como ```sql |
| o cadenas que claramente parezcan SQL. Evita ejecutar texto genérico. |
| """ |
| if not text: |
| return None |
|
|
| |
| for m in re.finditer(r"```(\w+)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE): |
| lang = (m.group(1) or '').lower() |
| body = (m.group(2) or '').strip() |
| if lang in {"sql", "postgresql", "mysql"} and looks_like_sql(body): |
| return body |
|
|
| |
| simple = re.search(r"(WITH|SELECT|INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE)[\s\S]*?;", text, re.IGNORECASE) |
| if simple: |
| candidate = simple.group(0).strip() |
| if looks_like_sql(candidate): |
| return candidate |
|
|
| return None |
|
|
| def execute_sql_query(query, db_connection): |
| """Ejecuta una consulta SQL y devuelve los resultados como una cadena.""" |
| if not db_connection: |
| return "Error: No hay conexión a la base de datos" |
| |
| try: |
| with db_connection._engine.connect() as connection: |
| |
| if sa_text is not None and isinstance(query, str): |
| result = connection.execute(sa_text(query)) |
| else: |
| result = connection.execute(query) |
|
|
| |
| columns = list(result.keys()) if hasattr(result, "keys") else [] |
| rows = result.fetchall() |
|
|
| |
| if not rows: |
| return "La consulta no devolvió resultados" |
|
|
| |
| try: |
| if len(rows) == 1 and len(rows[0]) == 1: |
| return str(rows[0][0]) |
| except Exception: |
| pass |
|
|
| |
| try: |
| import pandas as pd |
|
|
| |
| if columns: |
| data = [ |
| {col: val for col, val in zip(columns, tuple(row))} |
| for row in rows |
| ] |
| df = pd.DataFrame(data) |
| else: |
| |
| df = pd.DataFrame(rows) |
|
|
| |
| try: |
| return df.to_markdown(index=False) |
| except Exception: |
| |
| headers = list(map(str, df.columns)) |
| header_line = "| " + " | ".join(headers) + " |" |
| sep_line = "| " + " | ".join(["---"] * len(headers)) + " |" |
| body_lines = [] |
| for _, r in df.iterrows(): |
| body_lines.append("| " + " | ".join(map(lambda v: str(v), r.values)) + " |") |
| return "\n".join([header_line, sep_line, *body_lines]) |
| except ImportError: |
| |
| return "\n".join([str(row) for row in rows]) |
|
|
| except Exception as e: |
| return f"Error ejecutando la consulta: {str(e)}" |
|
|
| def detect_chart_preferences(question: str) -> Tuple[bool, str]: |
| """Detect whether the user is asking for a chart and infer desired type. |
| |
| Returns (wants_chart, chart_type) where chart_type is one of |
| {'bar', 'pie', 'line', 'scatter', 'histogram'}. |
| Defaults to 'bar' when ambiguous. |
| """ |
| try: |
| q = (question or "").lower() |
|
|
| |
| chart_triggers = [ |
| "grafico", "gráfico", "grafica", "gráfica", "chart", "graph", |
| "visualizacion", "visualización", "plot", "plotly", "diagrama" |
| ] |
| wants_chart = any(k in q for k in chart_triggers) |
|
|
| |
| if any(k in q for k in ["pastel", "pie", "circular", "donut", "dona", "anillo"]): |
| return wants_chart or True, "pie" |
| if any(k in q for k in ["linea", "línea", "line", "tendencia"]): |
| return wants_chart or True, "line" |
| if any(k in q for k in ["dispersión", "dispersion", "scatter", "puntos"]): |
| return wants_chart or True, "scatter" |
| if any(k in q for k in ["histograma", "histogram"]): |
| return wants_chart or True, "histogram" |
| if any(k in q for k in ["barra", "barras", "columnas", "column"]): |
| return wants_chart or True, "bar" |
|
|
| |
| return wants_chart, "bar" |
| except Exception: |
| return False, "bar" |
|
|
| def generate_plot(data, x_col, y_col, title, x_label, y_label): |
| """Generate a plot from data and return the file path.""" |
| plt.figure(figsize=(10, 6)) |
| plt.bar(data[x_col], data[y_col]) |
| plt.title(title) |
| plt.xlabel(x_label) |
| plt.ylabel(y_label) |
| plt.xticks(rotation=45) |
| plt.tight_layout() |
| |
| |
| temp_dir = tempfile.mkdtemp() |
| plot_path = os.path.join(temp_dir, "plot.png") |
| plt.savefig(plot_path) |
| plt.close() |
| |
| return plot_path |
|
|
| def convert_to_messages_format(chat_history): |
| """Convert chat history to the format expected by Gradio 5.x""" |
| if not chat_history: |
| return [] |
| |
| messages = [] |
| |
| |
| if isinstance(chat_history[0], list): |
| for msg in chat_history: |
| if isinstance(msg, list) and len(msg) == 2: |
| |
| user_msg, bot_msg = msg |
| if user_msg: |
| messages.append({"role": "user", "content": user_msg}) |
| if bot_msg: |
| messages.append({"role": "assistant", "content": bot_msg}) |
| else: |
| |
| for msg in chat_history: |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: |
| messages.append(msg) |
| elif isinstance(msg, str): |
| |
| messages.append({"role": "user", "content": msg}) |
| |
| return messages |
|
|
| async def stream_agent_response(question: str, chat_history: List[List[str]]) -> Tuple[str, Optional["go.Figure"]]: |
| """Procesa la pregunta del usuario y devuelve la respuesta del agente con memoria de conversación.""" |
| global agent |
| |
| |
| response_text = "" |
| chart_fig = None |
| messages = [] |
| |
| |
| for msg_pair in chat_history: |
| if len(msg_pair) >= 1 and msg_pair[0]: |
| messages.append(HumanMessage(content=msg_pair[0])) |
| if len(msg_pair) >= 2 and msg_pair[1]: |
| messages.append(AIMessage(content=msg_pair[1])) |
| |
| |
| user_message = HumanMessage(content=question) |
| messages.append(user_message) |
| |
| if not agent: |
| error_msg = ( |
| "## ⚠️ Error: Agente no inicializado\n\n" |
| "No se pudo inicializar el agente de base de datos. Por favor, verifica que:\n" |
| "1. Todas las variables de entorno estén configuradas correctamente\n" |
| "2. La base de datos esté accesible\n" |
| f"3. El modelo de lenguaje esté disponible\n\n" |
| f"Error: {agent_error}" |
| ) |
| return error_msg, None |
| |
| |
| try: |
| |
| if hasattr(agent, 'memory') and agent.memory is not None: |
| agent.memory.clear() |
| for i in range(0, len(messages)-1, 2): |
| if i+1 < len(messages): |
| agent.memory.save_context( |
| {"input": messages[i].content}, |
| {"output": messages[i+1].content} |
| ) |
| except Exception as e: |
| logger.error(f"Error updating agent memory: {str(e)}", exc_info=True) |
| |
| try: |
| |
| assistant_message = {"role": "assistant", "content": ""} |
| messages.append(assistant_message) |
| |
| |
| try: |
| |
| response = await agent.ainvoke({"input": question}) |
| logger.info(f"Agent response type: {type(response)}") |
| logger.info(f"Agent response content: {str(response)[:500]}...") |
| |
| |
| if hasattr(response, 'output') and response.output: |
| response_text = response.output |
| elif isinstance(response, str): |
| response_text = response |
| elif hasattr(response, 'get') and callable(response.get) and 'output' in response: |
| response_text = response['output'] |
| else: |
| response_text = str(response) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| if chart_fig is None: |
| wants_chart, default_type = detect_chart_preferences(question) |
| if wants_chart: |
| try: |
| logger.info("Second pass: asking agent for ONLY SQL query in fenced block.") |
| sql_only_prompt = ( |
| "Devuelve SOLO la consulta SQL en un bloque ```sql``` para responder a: " |
| f"{question}. No incluyas explicación ni texto adicional." |
| ) |
| sql_only_resp = await agent.ainvoke({"input": sql_only_prompt}) |
| sql_only_text = str(sql_only_resp) |
| sql_query2 = extract_sql_query(sql_only_text) |
| if sql_query2 and looks_like_sql(sql_query2): |
| logger.info(f"Second pass SQL detected: {sql_query2}") |
| db_connection, _ = setup_database_connection() |
| if db_connection: |
| query_result = execute_sql_query(sql_query2, db_connection) |
| |
| data = None |
| if isinstance(query_result, str): |
| try: |
| import pandas as pd |
| df = pd.read_csv(io.StringIO(query_result), sep="|") |
| data = df |
| except Exception: |
| pass |
| |
| if data is not None and hasattr(data, "empty") and not data.empty: |
| |
| x_col = data.columns[0] |
| |
| y_col = None |
| for col in data.columns[1:]: |
| try: |
| pd.to_numeric(data[col]) |
| y_col = col |
| break |
| except Exception: |
| continue |
| if y_col: |
| desired_type = default_type |
| chart_fig = generate_chart( |
| data=data, |
| chart_type=desired_type, |
| x=x_col, |
| y=y_col, |
| title=f"{y_col} por {x_col}" |
| ) |
| if chart_fig is not None: |
| logger.info("Chart generated from second-pass SQL execution.") |
| else: |
| logger.info("No DB connection on second pass; skipping.") |
| except Exception as e: |
| logger.error(f"Second-pass SQL synthesis failed: {e}") |
| |
| |
| |
| if chart_fig is None: |
| wants_chart, desired_type = detect_chart_preferences(question) |
| if wants_chart: |
| |
| candidate_text = "" |
| if chat_history: |
| for pair in reversed(chat_history): |
| if len(pair) >= 2 and isinstance(pair[1], str) and pair[1].strip(): |
| candidate_text = pair[1] |
| break |
| |
| if not candidate_text and isinstance(response_text, str) and response_text.strip(): |
| candidate_text = response_text |
| if candidate_text: |
| raw_lines = candidate_text.split('\n') |
| |
| norm_lines = [] |
| for l in raw_lines: |
| s = l.strip() |
| if not s: |
| continue |
| s = s.lstrip("•*-\t ") |
| |
| norm_lines.append(s) |
| data = [] |
| for l in norm_lines: |
| |
| m = re.match(r"^(.+?):\s*([0-9][0-9.,]*)$", l) |
| if m: |
| label = m.group(1).strip() |
| |
| label = re.sub(r"[*_`]+", "", label).strip() |
| try: |
| val = float(m.group(2).replace(',', '')) |
| except Exception: |
| continue |
| data.append({"label": label, "value": val}) |
| logger.info(f"Fallback parse from text: extracted {len(data)} items for potential chart") |
| if len(data) >= 2: |
| chart_fig = generate_chart( |
| data=data, |
| chart_type=desired_type, |
| x="label", |
| y="value", |
| title="Distribución" |
| ) |
| if chart_fig is not None: |
| logger.info(f"Chart generated from text fallback: type={desired_type}, items={len(data)}") |
| |
| |
| assistant_message["content"] = response_text |
| |
| except Exception as e: |
| error_msg = f"Error al ejecutar el agente: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| assistant_message["content"] = f"## ❌ Error\n\n{error_msg}" |
| |
| |
| |
| |
| message_content = "" |
| |
| if isinstance(assistant_message, dict) and "content" in assistant_message: |
| message_content = assistant_message["content"] |
| elif isinstance(assistant_message, str): |
| message_content = assistant_message |
| else: |
| message_content = str(assistant_message) |
| |
| |
| if chart_fig is None: |
| logger.info("No chart generated for this turn.") |
| else: |
| logger.info("Returning a chart figure to UI.") |
| return message_content, chart_fig |
| |
| except Exception as e: |
| error_msg = f"## ❌ Error\n\nOcurrió un error al procesar tu solicitud:\n\n```\n{str(e)}\n```" |
| logger.error(f"Error in stream_agent_response: {str(e)}", exc_info=True) |
| |
| return error_msg, None |
|
|
| |
| custom_css = """ |
| .gradio-container { |
| max-width: 1200px !important; |
| margin: 0 auto !important; |
| font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; |
| } |
| |
| #chatbot { |
| min-height: 500px; |
| border: 1px solid #e0e0e0; |
| border-radius: 8px; |
| margin-bottom: 20px; |
| padding: 20px; |
| background-color: #f9f9f9; |
| } |
| |
| .user-message, .bot-message { |
| padding: 12px 16px; |
| border-radius: 18px; |
| margin: 8px 0; |
| max-width: 80%; |
| line-height: 1.5; |
| } |
| |
| .user-message { |
| background-color: #007bff; |
| color: white; |
| margin-left: auto; |
| border-bottom-right-radius: 4px; |
| } |
| |
| .bot-message { |
| background-color: #f1f1f1; |
| color: #333; |
| margin-right: auto; |
| border-bottom-left-radius: 4px; |
| } |
| |
| #question-input textarea { |
| min-height: 50px !important; |
| border-radius: 8px !important; |
| padding: 12px !important; |
| font-size: 16px !important; |
| } |
| |
| #send-button { |
| height: 100%; |
| background-color: #007bff !important; |
| color: white !important; |
| border: none !important; |
| border-radius: 8px !important; |
| font-weight: 500 !important; |
| transition: background-color 0.2s !important; |
| } |
| |
| #send-button:hover { |
| background-color: #0056b3 !important; |
| } |
| |
| .status-message { |
| text-align: center; |
| color: #666; |
| font-style: italic; |
| margin: 10px 0; |
| } |
| """ |
|
|
| def create_ui(): |
| """Crea y devuelve los componentes de la interfaz de usuario de Gradio.""" |
| |
| env_ok, env_message = check_environment() |
| |
| |
| theme = gr.themes.Soft( |
| primary_hue="blue", |
| secondary_hue="indigo", |
| neutral_hue="slate" |
| ) |
| |
| with gr.Blocks( |
| css=custom_css, |
| title="Asistente de Base de Datos SQL", |
| theme=theme |
| ) as demo: |
| |
| gr.Markdown(""" |
| # 🤖 Asistente de Base de Datos SQL |
| |
| Haz preguntas en lenguaje natural sobre tu base de datos y obtén resultados de consultas SQL. |
| """) |
| |
| |
| if not env_ok: |
| gr.Warning("⚠️ " + env_message) |
| |
| |
| with gr.Row(): |
| chatbot = gr.Chatbot( |
| value=[], |
| elem_id="chatbot", |
| type="messages", |
| avatar_images=( |
| None, |
| (os.path.join(os.path.dirname(__file__), "logo.svg")), |
| ), |
| height=600, |
| render_markdown=True, |
| show_label=False, |
| show_share_button=False, |
| container=True, |
| layout="panel" |
| ) |
| |
| |
| |
| chart_display = gr.Plot( |
| label="📊 Visualización", |
| ) |
| |
| |
| with gr.Row(): |
| question_input = gr.Textbox( |
| label="", |
| placeholder="Escribe tu pregunta aquí...", |
| container=False, |
| scale=5, |
| min_width=300, |
| max_lines=3, |
| autofocus=True, |
| elem_id="question-input" |
| ) |
| submit_button = gr.Button( |
| "Enviar", |
| variant="primary", |
| min_width=100, |
| scale=1, |
| elem_id="send-button" |
| ) |
| |
| |
| with gr.Accordion("ℹ️ Estado del sistema", open=not env_ok): |
| if not DEPENDENCIES_AVAILABLE: |
| gr.Markdown(""" |
| ## ❌ Dependencias faltantes |
| |
| Para ejecutar esta aplicación localmente, necesitas instalar las dependencias: |
| |
| ```bash |
| pip install -r requirements.txt |
| ``` |
| """) |
| else: |
| if not agent: |
| gr.Markdown(f""" |
| ## ⚠️ Configuración incompleta |
| |
| No se pudo inicializar el agente de base de datos. Por favor, verifica que: |
| |
| 1. Todas las variables de entorno estén configuradas correctamente |
| 2. La base de datos esté accesible |
| 3. La API de Google Gemini esté configurada |
| |
| **Error:** {agent_error if agent_error else 'No se pudo determinar el error'} |
| |
| ### Configuración local |
| |
| Crea un archivo `.env` en la raíz del proyecto con las siguientes variables: |
| |
| ``` |
| DB_USER=tu_usuario |
| DB_PASSWORD=tu_contraseña |
| DB_HOST=tu_servidor |
| DB_NAME=tu_base_de_datos |
| GOOGLE_API_KEY=tu_api_key_de_google |
| ``` |
| """) |
| else: |
| if os.getenv('SPACE_ID'): |
| |
| gr.Markdown(""" |
| ## 🚀 Modo Demo |
| |
| Esta es una demostración del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos: |
| |
| 1. Clona este espacio en tu cuenta de Hugging Face |
| 2. Configura las variables de entorno en la configuración del espacio: |
| - `DB_USER`: Tu usuario de base de datos |
| - `DB_PASSWORD`: Tu contraseña de base de datos |
| - `DB_HOST`: La dirección del servidor de base de datos |
| - `DB_NAME`: El nombre de la base de datos |
| - `GOOGLE_API_KEY`: Tu clave de API de Google Gemini |
| |
| **Nota:** Actualmente estás en modo de solo demostración. |
| """) |
| else: |
| gr.Markdown(""" |
| ## ✅ Sistema listo |
| |
| El asistente está listo para responder tus preguntas sobre la base de datos. |
| """) |
| |
| |
| streaming_output_display = gr.Textbox(visible=False) |
| |
| return demo, chatbot, chart_display, question_input, submit_button, streaming_output_display |
|
|
| def create_application(): |
| """Create and configure the Gradio application.""" |
| |
| demo, chatbot, chart_display, question_input, submit_button, streaming_output_display = create_ui() |
| |
| def user_message(user_input: str, chat_history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]]]: |
| """Add user message to chat history (messages format) and clear input.""" |
| if not user_input.strip(): |
| return "", chat_history |
|
|
| logger.info(f"User message: {user_input}") |
|
|
| if chat_history is None: |
| chat_history = [] |
|
|
| |
| chat_history.append({"role": "user", "content": user_input}) |
|
|
| return "", chat_history |
| |
| async def bot_response(chat_history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], Optional[go.Figure]]: |
| """Generate bot response for messages-format chat history and return optional chart figure.""" |
| if not chat_history: |
| return chat_history, None |
|
|
| |
| last = chat_history[-1] |
| if not isinstance(last, dict) or last.get("role") != "user" or not last.get("content"): |
| return chat_history, None |
|
|
| try: |
| question = last["content"] |
| logger.info(f"Processing question: {question}") |
|
|
| |
| pair_history: List[List[str]] = [] |
| i = 0 |
| while i < len(chat_history) - 1: |
| m1 = chat_history[i] |
| m2 = chat_history[i + 1] if i + 1 < len(chat_history) else None |
| if ( |
| isinstance(m1, dict) |
| and m1.get("role") == "user" |
| and isinstance(m2, dict) |
| and m2.get("role") == "assistant" |
| ): |
| pair_history.append([m1.get("content", ""), m2.get("content", "")]) |
| i += 2 |
| else: |
| i += 1 |
|
|
| |
| assistant_message, chart_fig = await stream_agent_response(question, pair_history) |
|
|
| |
| chat_history.append({"role": "assistant", "content": assistant_message}) |
|
|
| |
| |
| if chart_fig is None: |
| wants_chart, desired_type = detect_chart_preferences(question) |
| if wants_chart and isinstance(assistant_message, str): |
| candidate_text = assistant_message |
| raw_lines = candidate_text.split('\n') |
| norm_lines = [] |
| for l in raw_lines: |
| s = l.strip().lstrip("•*\t -") |
| if s: |
| norm_lines.append(s) |
| data = [] |
| for l in norm_lines: |
| m = re.match(r"^(.+?):\s*([0-9][0-9.,]*)$", l) |
| if m: |
| label = re.sub(r"[*_`]+", "", m.group(1)).strip() |
| try: |
| val = float(m.group(2).replace(',', '')) |
| except Exception: |
| continue |
| data.append({"label": label, "value": val}) |
| if len(data) >= 2: |
| chart_fig = generate_chart( |
| data=data, |
| chart_type=desired_type, |
| x="label", |
| y="value", |
| title="Distribución" |
| ) |
|
|
| logger.info("Response generation complete") |
| return chat_history, chart_fig |
|
|
| except Exception as e: |
| error_msg = f"## ❌ Error\n\nError al procesar la solicitud:\n\n```\n{str(e)}\n```" |
| logger.error(error_msg, exc_info=True) |
| |
| chat_history.append({"role": "assistant", "content": error_msg}) |
| return chat_history, None |
| |
| |
| with demo: |
| |
| msg_submit = question_input.submit( |
| fn=user_message, |
| inputs=[question_input, chatbot], |
| outputs=[question_input, chatbot], |
| queue=True |
| ).then( |
| fn=bot_response, |
| inputs=[chatbot], |
| outputs=[chatbot, chart_display], |
| api_name="ask" |
| ) |
| |
| |
| btn_click = submit_button.click( |
| fn=user_message, |
| inputs=[question_input, chatbot], |
| outputs=[question_input, chatbot], |
| queue=True |
| ).then( |
| fn=bot_response, |
| inputs=[chatbot], |
| outputs=[chatbot, chart_display] |
| ) |
| |
| return demo |
|
|
| |
| demo = create_application() |
|
|
| |
| def get_app(): |
| """Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" |
| |
| if os.getenv('SPACE_ID'): |
| |
| demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" |
| demo.description = """ |
| Este es un demo del asistente de base de datos SQL. |
| Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. |
| """ |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| debug=True, |
| share=False |
| ) |
|
|