| import gradio as gr |
| import logging |
| import os |
| from huggingface_hub import InferenceClient |
| from datetime import datetime |
| import uuid |
| import json |
| import time |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler("chatbot_logs.log"), |
| logging.StreamHandler() |
| ] |
| ) |
| logger = logging.getLogger("CompanyChatbot") |
|
|
| |
| HF_MODEL = os.environ.get("HF_MODEL", "mistralai/Mixtral-8x7B-Instruct-v0.1") |
| HF_API_TOKEN = os.environ.get("HF_API_TOKEN") |
| COMPANY_NAME = os.environ.get("COMPANY_NAME", "RS") |
| DEFAULT_SYSTEM_PROMPT = os.environ.get("DEFAULT_SYSTEM_PROMPT", |
| f"You are {COMPANY_NAME}'s professional AI assistant. Be helpful, accurate, and concise.") |
|
|
| |
| if not HF_API_TOKEN: |
| raise RuntimeError("HF_API_TOKEN environment variable is not set.") |
|
|
| |
| try: |
| client = InferenceClient(model=HF_MODEL, token=HF_API_TOKEN) |
| logger.info(f"Successfully initialized InferenceClient with model: {HF_MODEL}") |
| except Exception as e: |
| logger.error(f"Failed to initialize InferenceClient: {str(e)}") |
| raise RuntimeError(f"Failed to initialize the model: {str(e)}") |
|
|
| |
| class ConfigState: |
| def __init__(self): |
| self.system_message = DEFAULT_SYSTEM_PROMPT |
| self.max_tokens = 512 |
| self.temperature = 0.7 |
| self.top_p = 0.95 |
|
|
| config_state = ConfigState() |
|
|
| |
| def save_conversation(user_id, conversation): |
| filename = f"conversations/{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| os.makedirs(os.path.dirname(filename), exist_ok=True) |
| with open(filename, 'w') as f: |
| json.dump(conversation, f) |
| logger.info(f"Saved conversation for user {user_id}") |
|
|
| |
| def respond(message, chat_history, user_id): |
| if not message.strip(): |
| return chat_history or [] + [[message, "I'm sorry, I didn't receive any input. How can I help you today?"]] |
| |
| logger.info(f"User {user_id} sent message - Length: {len(message)}") |
| |
| try: |
| chat_history = chat_history or [] |
| messages = [{"role": "system", "content": config_state.system_message}] |
| |
| for user_msg, assistant_msg in chat_history: |
| messages.append({"role": "user", "content": user_msg}) |
| if assistant_msg: |
| messages.append({"role": "assistant", "content": assistant_msg}) |
| |
| messages.append({"role": "user", "content": message}) |
| |
| start_time = datetime.now() |
| full_response = "" |
| |
| for message_chunk in client.chat_completion( |
| messages, |
| max_tokens=config_state.max_tokens, |
| temperature=config_state.temperature, |
| top_p=config_state.top_p, |
| stream=True, |
| ): |
| if message_chunk.choices[0].delta.content: |
| full_response += message_chunk.choices[0].delta.content |
| yield chat_history + [[message, full_response]] |
| |
| time_taken = (datetime.now() - start_time).total_seconds() |
| logger.info(f"Response generated for user {user_id} in {time_taken:.2f}s - Length: {len(full_response)}") |
| |
| conversation_data = { |
| "timestamp": datetime.now().isoformat(), |
| "user_id": user_id, |
| "messages": messages, |
| "response": full_response, |
| "parameters": { |
| "max_tokens": config_state.max_tokens, |
| "temperature": config_state.temperature, |
| "top_p": config_state.top_p |
| }, |
| "time_taken": time_taken |
| } |
| save_conversation(user_id, conversation_data) |
| |
| if not full_response: |
| return chat_history |
| return chat_history + [[message, full_response]] |
| |
| except Exception as e: |
| error_msg = f"An error occurred: {str(e)}" |
| logger.error(f"Error generating response for user {user_id}: {str(e)}") |
| yield chat_history + [[message, error_msg]] |
|
|
| |
| def update_config(system_msg, max_tok, temp, tp, role): |
| if role == "admin": |
| config_state.system_message = system_msg |
| config_state.max_tokens = max_tok |
| config_state.temperature = temp |
| config_state.top_p = tp |
| logger.info("Configuration updated by admin") |
| return "Configuration updated successfully" |
| return "Only administrators can update configuration" |
|
|
| |
| def authenticate(username, password): |
| valid_credentials = { |
| "admin": {"password": "admin123", "role": "admin"}, |
| "user": {"password": "user123", "role": "user"} |
| } |
| |
| if username in valid_credentials and valid_credentials[username]["password"] == password: |
| return True, str(uuid.uuid4()), valid_credentials[username]["role"] |
| return False, None, None |
|
|
| |
| def login(username, password): |
| if not username or not password: |
| return ( |
| gr.update(visible=True), |
| gr.update(visible=False), |
| None, |
| None, |
| gr.update(visible=True, value="Please enter both username and password") |
| ) |
| |
| time.sleep(0.5) |
| |
| success, user_id, role = authenticate(username, password) |
| if success: |
| return ( |
| gr.update(visible=False), |
| gr.update(visible=True), |
| user_id, |
| role, |
| gr.update(visible=False) |
| ) |
| else: |
| return ( |
| gr.update(visible=True), |
| gr.update(visible=False), |
| None, |
| None, |
| gr.update(visible=True, value="Invalid username or password") |
| ) |
|
|
|
|
| css = """ |
| |
| body { |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
| background-color: #f9f9f9; |
| } |
| |
| .container { |
| max-width: 1400px !important; |
| margin: auto; |
| } |
| |
| .setting-panel { |
| background-color: #f0f4f8; |
| border-radius: 10px; |
| padding: 15px; |
| box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
| } |
| |
| .chat-container { |
| border-radius: 10px; |
| box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
| background-color: white; |
| } |
| |
| .company-header { |
| background-color: #2c3e50; |
| color: white; |
| padding: 15px; |
| border-radius: 10px 10px 0 0; |
| margin-bottom: 15px; |
| } |
| |
| .footer { |
| text-align: center; |
| margin-top: 20px; |
| color: #666; |
| font-size: 0.8em; |
| } |
| |
| .message-user { |
| background-color: #e6f7ff !important; |
| border-radius: 15px 15px 0 15px !important; |
| } |
| |
| .message-bot { |
| background-color: #f0f0f0 !important; |
| border-radius: 15px 15px 15px 0 !important; |
| } |
| |
| .login-container { |
| max-width: 500px; |
| margin: 50px auto; |
| padding: 30px; |
| background-color: white; |
| border-radius: 10px; |
| box-shadow: 0 4px 10px rgba(0,0,0,0.1); |
| } |
| |
| .login-header { |
| text-align: center; |
| margin-bottom: 30px; |
| } |
| |
| .error-message { |
| color: #e74c3c; |
| background-color: #fdedeb; |
| padding: 10px; |
| border-radius: 5px; |
| margin-bottom: 15px; |
| font-size: 14px; |
| } |
| |
| .role-badge { |
| font-size: 12px; |
| padding: 3px 8px; |
| border-radius: 10px; |
| margin-left: 10px; |
| } |
| |
| .admin-badge { |
| background-color: #e74c3c; |
| color: white; |
| } |
| |
| .user-badge { |
| background-color: #3498db; |
| color: white; |
| } |
| |
| .setting-disabled { |
| opacity: 0.5; |
| pointer-events: none; |
| } |
| |
| """ |
|
|
| |
| with gr.Blocks(css=css, title=f"{COMPANY_NAME} AI Assistant") as demo: |
| user_id = gr.State(None) |
| user_role = gr.State(None) |
| |
| with gr.Row(): |
| gr.Markdown(f"<div class='company-header'><h1>{COMPANY_NAME} AI Assistant</h1></div>") |
| |
| with gr.Group(visible=True) as login_group: |
| with gr.Column(elem_classes=["login-container"]): |
| gr.Markdown(f"<div class='login-header'><h2>Welcome to {COMPANY_NAME}</h2><p>Please log in to continue</p></div>") |
| error_message = gr.Markdown(visible=False, value="", elem_classes=["error-message"]) |
| username = gr.Textbox(label="Username", placeholder="Enter your username") |
| password = gr.Textbox(label="Password", type="password", placeholder="Enter your password") |
| login_button = gr.Button("Login", variant="primary", size="lg") |
| |
| with gr.Group(visible=False) as chat_group: |
| with gr.Row(): |
| with gr.Column(scale=1, elem_classes=["setting-panel"]): |
| role_indicator = gr.Markdown("", elem_id="role-indicator") |
| gr.Markdown("### Configuration") |
| with gr.Group() as config_group: |
| system_message = gr.Textbox(value=DEFAULT_SYSTEM_PROMPT, label="System Instructions", lines=4, interactive=False) |
| max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max Response Length", interactive=False) |
| temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature", interactive=False) |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p", interactive=False) |
| update_config_btn = gr.Button("Update Configuration", visible=False) |
| config_status = gr.Markdown("") |
| gr.Markdown("### Chat Actions") |
| with gr.Row(): |
| clear_btn = gr.Button("Clear Chat", variant="secondary") |
| export_btn = gr.Button("Export Conversation", variant="secondary") |
| logout_btn = gr.Button("Logout", variant="stop") |
| |
| with gr.Column(scale=2, elem_classes=["chat-container"]): |
| chatbot = gr.Chatbot(elem_classes=["chatbox"]) |
| with gr.Row(): |
| msg = gr.Textbox(show_label=False, placeholder="Type your message here...", container=False) |
| submit_btn = gr.Button("Send", variant="primary") |
| |
| def update_role_display(role): |
| badge_class = "admin-badge" if role == "admin" else "user-badge" |
| role_display = "Administrator" if role == "admin" else "Standard User" |
| return f"<h3>Role: <span class='role-badge {badge_class}'>{role_display}</span></h3>" |
| |
| def handle_role_permissions(role): |
| is_admin = role == "admin" |
| return [ |
| gr.update(interactive=is_admin), |
| gr.update(interactive=is_admin), |
| gr.update(interactive=is_admin), |
| gr.update(interactive=is_admin), |
| gr.update(visible=is_admin), |
| ] |
| |
| login_button.click(login, inputs=[username, password], outputs=[login_group, chat_group, user_id, user_role, error_message]) \ |
| .then(update_role_display, inputs=[user_role], outputs=[role_indicator]) \ |
| .then(handle_role_permissions, inputs=[user_role], outputs=[system_message, max_tokens, temperature, top_p, update_config_btn]) |
| |
| update_config_btn.click(update_config, inputs=[system_message, max_tokens, temperature, top_p, user_role], outputs=[config_status]) |
| |
| msg.submit(respond, inputs=[msg, chatbot, user_id], outputs=[chatbot]).then(lambda: "", None, [msg]) |
| |
| submit_btn.click(respond, inputs=[msg, chatbot, user_id], outputs=[chatbot]).then(lambda: "", None, [msg]) |
| |
| clear_btn.click(lambda: [], None, chatbot, queue=False) |
| |
| def export_conversation(chat_history, uid): |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"conversations/export_{uid}_{timestamp}.json" |
| os.makedirs(os.path.dirname(filename), exist_ok=True) |
| with open(filename, 'w') as f: |
| json.dump(chat_history, f) |
| logger.info(f"Exported conversation for user {uid}") |
| return gr.update(value=f"Conversation exported to {filename}", visible=True) |
| |
| export_btn.click(export_conversation, inputs=[chatbot, user_id], outputs=[error_message]) |
| |
| def logout(): |
| return gr.update(visible=True), gr.update(visible=False), None, None |
| |
| logout_btn.click(logout, outputs=[login_group, chat_group, user_id, user_role]) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True) |