| |
| import os |
| import chromadb |
| from dotenv import load_dotenv |
| import json |
|
|
| |
| from langchain_core.documents import Document |
| from langchain_core.runnables import RunnablePassthrough |
| from langchain_core.output_parsers import StrOutputParser |
| from langchain.prompts import ChatPromptTemplate |
| from langchain.chains.query_constructor.base import AttributeInfo |
| from langchain.retrievers.self_query.base import SelfQueryRetriever |
| from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
| from langchain.retrievers import ContextualCompressionRetriever |
|
|
| |
| from langchain_community.vectorstores import Chroma |
| from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
| from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
| from langchain_experimental.text_splitter import SemanticChunker |
| from langchain.text_splitter import ( |
| CharacterTextSplitter, |
| RecursiveCharacterTextSplitter |
| ) |
| from langchain_core.tools import tool |
| from langchain.agents import create_tool_calling_agent, AgentExecutor |
| from langchain_core.prompts import ChatPromptTemplate |
|
|
| |
| from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI |
| from langchain.embeddings.openai import OpenAIEmbeddings |
| from langchain_openai import ChatOpenAI |
|
|
| |
| from llama_parse import LlamaParse |
| from llama_index.core import Settings, SimpleDirectoryReader |
|
|
| |
| from langgraph.graph import StateGraph, END, START |
|
|
| |
| from pydantic import BaseModel |
|
|
| |
| from typing import Dict, List, Tuple, Any, TypedDict |
|
|
| |
| import numpy as np |
| from groq import Groq |
| from mem0 import MemoryClient |
| import streamlit as st |
| from datetime import datetime |
|
|
| |
| |
| api_key = os.getenv("API_KEY") |
| endpoint = os.getenv("OPENAI_API_BASE") |
| llama_api_key = os.getenv('GROQ_API_KEY') |
| MEM0_api_key = os.getenv('mem0') |
|
|
| |
| embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( |
| api_base=endpoint, |
| api_key=api_key, |
| model_name='text-embedding-ada-002' |
| ) |
|
|
| |
|
|
| |
| embedding_model = OpenAIEmbeddings( |
| openai_api_base=endpoint, |
| openai_api_key=api_key, |
| model='text-embedding-ada-002' |
| ) |
|
|
|
|
| |
| llm = ChatOpenAI( |
| openai_api_base=endpoint, |
| openai_api_key=api_key, |
| model="gpt-4o-mini", |
| streaming=False |
| ) |
| |
|
|
| |
| Settings.llm = llm |
| Settings.embedding = embedding_model |
|
|
| |
|
|
| class AgentState(TypedDict): |
| query: str |
| expanded_query: str |
| context: List[Dict[str, Any]] |
| response: str |
| precision_score: float |
| groundedness_score: float |
| groundedness_loop_count: int |
| precision_loop_count: int |
| feedback: str |
| query_feedback: str |
| groundedness_check: bool |
| loop_max_iter: int |
|
|
| def expand_query(state): |
| """ |
| Expands the user query to improve retrieval of nutrition disorder-related information. |
| Args: |
| state (Dict): The current state of the workflow, containing the user query. |
| Returns: |
| Dict: The updated state with the expanded query. |
| """ |
| print("---------Expanding Query---------") |
| system_message = '''You are a medical assistant. Use the provided context to generate a clear, accurate, and grounded response to the user's query.''' |
|
|
|
|
| expand_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Expand this query: {query} using the feedback: {query_feedback}") |
|
|
| ]) |
|
|
| chain = expand_prompt | llm | StrOutputParser() |
| expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) |
| print("expanded_query", expanded_query) |
| state["expanded_query"] = expanded_query |
| return state |
|
|
|
|
| |
| vector_store = Chroma( |
| collection_name="nutritional_hypotheticals", |
| persist_directory="./nutritional_db", |
| embedding_function=embedding_model |
|
|
| ) |
|
|
| |
| retriever = vector_store.as_retriever( |
| search_type='similarity', |
| search_kwargs={'k': 3} |
| ) |
|
|
| def retrieve_context(state): |
| """ |
| Retrieves context from the vector store using the expanded or original query. |
| Args: |
| state (Dict): The current state of the workflow, containing the query and expanded query. |
| Returns: |
| Dict: The updated state with the retrieved context. |
| """ |
| print("---------retrieve_context---------") |
| query = state['expanded_query'] |
| |
|
|
| |
| docs = retriever.invoke(query) |
| print("Retrieved documents:", docs) |
|
|
| |
| context= [ |
| { |
| "content": doc.page_content, |
| "metadata": doc.metadata |
| } |
| for doc in docs |
| ] |
| state['context'] = context |
| print("Extracted context with metadata:", context) |
| |
| return state |
|
|
|
|
|
|
| def craft_response(state: Dict) -> Dict: |
| """ |
| Generates a response using the retrieved context, focusing on nutrition disorders. |
| Args: |
| state (Dict): The current state of the workflow, containing the query and retrieved context. |
| Returns: |
| Dict: The updated state with the generated response. |
| """ |
| print("---------craft_response---------") |
| system_message = '''You are a medical assistant. Use the provided context to generate a clear, accurate, and grounded response to the user's query.''' |
|
|
| response_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") |
| ]) |
|
|
| chain = response_prompt | llm |
| response = chain.invoke({ |
| "query": state['query'], |
| "context": "\n".join([doc["content"] for doc in state['context']]), |
| "feedback": state['feedback'] |
| }) |
| state['response'] = response |
| print("intermediate response: ", response) |
|
|
| return state |
|
|
|
|
|
|
| def score_groundedness(state: Dict) -> Dict: |
| """ |
| Checks whether the response is grounded in the retrieved context. |
| Args: |
| state (Dict): The current state of the workflow, containing the response and context. |
| Returns: |
| Dict: The updated state with the groundedness score. |
| """ |
| print("---------check_groundedness---------") |
| system_message = '''Evaluate how well the response is grounded in the provided context. Return a score between 0 and 1.''' |
|
|
| groundedness_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
| ]) |
|
|
| chain = groundedness_prompt | llm | StrOutputParser() |
| groundedness_score = float(chain.invoke({ |
| "context": "\n".join([doc["content"] for doc in state['context']]), |
| "response": state['response'] |
| })) |
| print("groundedness_score: ", groundedness_score) |
| state['groundedness_loop_count'] += 1 |
| print("#########Groundedness Incremented###########") |
| state['groundedness_score'] = groundedness_score |
|
|
| return state |
|
|
|
|
|
|
| def check_precision(state: Dict) -> Dict: |
| """ |
| Checks whether the response precisely addresses the user’s query. |
| Args: |
| state (Dict): The current state of the workflow, containing the query and response. |
| Returns: |
| Dict: The updated state with the precision score. |
| """ |
| print("---------check_precision---------") |
| system_message = '''Evaluate how precisely the response addresses the user's query. Return a score between 0 and 1.''' |
|
|
| precision_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
| ]) |
|
|
| chain = precision_prompt| llm | StrOutputParser() |
| precision_score = float(chain.invoke({ |
| "query": state['query'], |
| "response":state['response'] |
| })) |
| state['precision_score'] = precision_score |
| print("precision_score:", precision_score) |
| state['precision_loop_count'] +=1 |
| print("#########Precision Incremented###########") |
| return state |
|
|
|
|
|
|
| def refine_response(state: Dict) -> Dict: |
| """ |
| Suggests improvements for the generated response. |
| Args: |
| state (Dict): The current state of the workflow, containing the query and response. |
| Returns: |
| Dict: The updated state with response refinement suggestions. |
| """ |
| print("---------refine_response---------") |
|
|
| system_message = '''You are a helpful assistant. Suggest improvements to the response to enhance accuracy and completeness.''' |
|
|
| refine_response_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Query: {query}\nResponse: {response}\n\n" |
| "What improvements can be made to enhance accuracy and completeness?") |
| ]) |
|
|
| chain = refine_response_prompt | llm| StrOutputParser() |
|
|
| |
| feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" |
| print("feedback: ", feedback) |
| print(f"State: {state}") |
| state['feedback'] = feedback |
| return state |
|
|
|
|
|
|
| def refine_query(state: Dict) -> Dict: |
| """ |
| Suggests improvements for the expanded query. |
| Args: |
| state (Dict): The current state of the workflow, containing the query and expanded query. |
| Returns: |
| Dict: The updated state with query refinement suggestions. |
| """ |
| print("---------refine_query---------") |
| system_message = '''You are a helpful assistant. Suggest improvements to the expanded query to improve search relevance.''' |
|
|
| refine_query_prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_message), |
| ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" |
| "What improvements can be made for a better search?") |
| ]) |
|
|
| chain = refine_query_prompt | llm | StrOutputParser() |
|
|
| |
| query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" |
| print("query_feedback: ", query_feedback) |
| print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
| state['query_feedback'] = query_feedback |
| return state |
|
|
|
|
|
|
| def should_continue_groundedness(state): |
| """Decides if groundedness is sufficient or needs improvement.""" |
| print("---------should_continue_groundedness---------") |
| print("groundedness loop count: ", state['groundedness_loop_count']) |
| if state['groundedness_score'] >= 0.8: |
| print("Moving to precision") |
| return "check_precision" |
| else: |
| if state["groundedness_loop_count"] > state['loop_max_iter']: |
| return "max_iterations_reached" |
| else: |
| print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") |
| return "refine_response" |
|
|
|
|
| def should_continue_precision(state: Dict) -> str: |
| """Decides if precision is sufficient or needs improvement.""" |
| print("---------should_continue_precision---------") |
| print("precision loop count: ",state['precision_loop_count']) |
| if state['precision_score'] >= 0.8: |
| return "pass" |
| else: |
| if state['precision_loop_count'] > state['loop_max_iter']: |
| return "max_iterations_reached" |
| else: |
| print(f"---------Precision Score Threshold Not met. Refining Query-----------") |
| return "refine_query" |
|
|
|
|
|
|
|
|
| def max_iterations_reached(state: Dict) -> Dict: |
| """Handles the case when the maximum number of iterations is reached.""" |
| print("---------max_iterations_reached---------") |
| """Handles the case when the maximum number of iterations is reached.""" |
| response = "I'm unable to refine the response further. Please provide more context or clarify your question." |
| state['response'] = response |
| return state |
|
|
|
|
|
|
| from langgraph.graph import END, StateGraph, START |
|
|
| def create_workflow() -> StateGraph: |
| """Creates the updated workflow for the AI nutrition agent.""" |
| workflow = StateGraph(AgentState) |
|
|
| |
| workflow.add_node("expand_query", expand_query) |
| workflow.add_node("retrieve_context", retrieve_context) |
| workflow.add_node("craft_response", craft_response) |
| workflow.add_node("score_groundedness", score_groundedness ) |
| workflow.add_node("refine_response", refine_response) |
| workflow.add_node("check_precision", check_precision) |
| workflow.add_node("refine_query", refine_query) |
| workflow.add_node("max_iterations_reached",max_iterations_reached) |
|
|
| |
| workflow.add_edge(START, "expand_query") |
| workflow.add_edge("expand_query", "retrieve_context") |
| workflow.add_edge("retrieve_context", "craft_response") |
| workflow.add_edge("craft_response", "score_groundedness") |
|
|
| |
| workflow.add_conditional_edges( |
| "score_groundedness", |
| should_continue_groundedness, |
| { |
| "check_precision": "check_precision", |
| "refine_response": "refine_response", |
| "max_iterations_reached": "max_iterations_reached" |
| } |
| ) |
|
|
| workflow.add_edge("refine_response","craft_response") |
|
|
| |
| workflow.add_conditional_edges( |
| "check_precision", |
| should_continue_precision, |
| { |
| "pass": END, |
| "refine_query":"refine_query", |
| "max_iterations_reached": "max_iterations_reached" |
| } |
| ) |
|
|
| workflow.add_edge("refine_query", "expand_query") |
|
|
| workflow.add_edge("max_iterations_reached", END) |
|
|
| return workflow |
|
|
|
|
|
|
|
|
| |
| WORKFLOW_APP = create_workflow().compile() |
| @tool |
| def agentic_rag(query: str): |
| """ |
| Runs the RAG-based agent with conversation history for context-aware responses. |
| Args: |
| query (str): The current user query. |
| Returns: |
| Dict[str, Any]: The updated state with the generated response and conversation history. |
| """ |
| |
| inputs = { |
| "query": query, |
| "expanded_query": "", |
| "context": [], |
| "response": "", |
| "precision_score": 0.0, |
| "groundedness_score": 0.0, |
| "groundedness_loop_count": 0, |
| "precision_loop_count": 0, |
| "feedback": "", |
| "query_feedback": "", |
| "loop_max_iter": 3 |
| } |
|
|
| output = WORKFLOW_APP.invoke(inputs) |
|
|
| return output |
|
|
|
|
| |
| llama_guard_client = Groq(api_key=llama_api_key) |
| |
| def filter_input_with_llama_guard(user_input, model="llama-guard-3-8b"): |
| """ |
| Filters user input using Llama Guard to ensure it is safe. |
| Parameters: |
| - user_input: The input provided by the user. |
| - model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). |
| Returns: |
| - The filtered and safe input. |
| """ |
| try: |
| |
| response = llama_guard_client.chat.completions.create( |
| messages=[{"role": "user", "content": user_input}], |
| model=model, |
| ) |
| |
| return response.choices[0].message.content.strip() |
| except Exception as e: |
| print(f"Error with Llama Guard: {e}") |
| return None |
|
|
|
|
| |
|
|
| class NutritionBot: |
| def __init__(self): |
| """ |
| Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. |
| """ |
|
|
| |
| self.memory = MemoryClient(api_key=userdata.get("mem0")) |
|
|
| |
| self.client = ChatOpenAI( |
| model_name="gpt-4o-mini", |
| api_key=config.get("API_KEY"), |
| endpoint = config.get("OPENAI_API_BASE"), |
| temperature=0 |
| ) |
|
|
| |
| tools = [agentic_rag] |
|
|
| |
| system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
| Guidelines for Interaction: |
| Maintain a polite, professional, and reassuring tone. |
| Show genuine empathy for customer concerns and health challenges. |
| Reference past interactions to provide personalized and consistent advice. |
| Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
| Ensure consistent and accurate information across conversations. |
| If any detail is unclear or missing, proactively ask for clarification. |
| Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
| Keep track of ongoing issues and follow-ups to ensure continuity in support. |
| Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. |
| """ |
|
|
| |
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", system_prompt), |
| ("human", "{input}"), |
| ("placeholder", "{agent_scratchpad}") |
| ]) |
|
|
| |
| agent = create_tool_calling_agent(self.client, tools, prompt) |
|
|
| |
| self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) |
|
|
|
|
| def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): |
| """ |
| Store customer interaction in memory for future reference. |
| Args: |
| user_id (str): Unique identifier for the customer. |
| message (str): Customer's query or message. |
| response (str): Chatbot's response. |
| metadata (Dict, optional): Additional metadata for the interaction. |
| """ |
| if metadata is None: |
| metadata = {} |
|
|
| |
| metadata["timestamp"] = datetime.now().isoformat() |
|
|
| |
| conversation = [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": response} |
| ] |
|
|
| |
| self.memory.add( |
| conversation, |
| user_id=user_id, |
| output_format="v1.1", |
| metadata=metadata |
| ) |
|
|
|
|
| def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: |
| """ |
| Retrieve past interactions relevant to the current query. |
| Args: |
| user_id (str): Unique identifier for the customer. |
| query (str): The customer's current query. |
| Returns: |
| List[Dict]: A list of relevant past interactions. |
| """ |
| return self.memory.search( |
| query=query, |
| user_id=user_id, |
| limit=5 |
| ) |
|
|
|
|
| def handle_customer_query(self, user_id: str, query: str) -> str: |
| """ |
| Process a customer's query and provide a response, taking into account past interactions. |
| Args: |
| user_id (str): Unique identifier for the customer. |
| query (str): Customer's query. |
| Returns: |
| str: Chatbot's response. |
| """ |
|
|
| |
| relevant_history = self.get_relevant_history(user_id, query) |
|
|
| |
| context = "Previous relevant interactions:\n" |
| for memory in relevant_history: |
| context += f"Customer: {memory['memory']}\n" |
| context += f"Support: {memory['memory']}\n" |
| context += "---\n" |
|
|
| |
| print("Context: ", context) |
|
|
| |
| prompt = f""" |
| Context: |
| {context} |
| Current customer query: {query} |
| Provide a helpful response that takes into account any relevant past interactions. |
| """ |
|
|
| |
| response = self.agent_executor.invoke({"input": prompt}) |
|
|
| |
| self.store_customer_interaction( |
| user_id=user_id, |
| message=query, |
| response=response["output"], |
| metadata={"type": "support_query"} |
| ) |
|
|
| |
| return response['output'] |
|
|
|
|
| |
| def nutrition_disorder_streamlit(): |
| """ |
| A Streamlit-based UI for the Nutrition Disorder Specialist Agent. |
| """ |
| st.title("Nutrition Disorder Specialist") |
| st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
| st.write("Type 'exit' to end the conversation.") |
|
|
| |
| if 'chat_history' not in st.session_state: |
| st.session_state.chat_history = [] |
| if 'user_id' not in st.session_state: |
| st.session_state.user_id = None |
|
|
| |
| if st.session_state.user_id is None: |
| with st.form("login_form", clear_on_submit=True): |
| user_id = st.text_input("Please enter your name to begin:") |
| submit_button = st.form_submit_button("Login") |
| if submit_button and user_id: |
| st.session_state.user_id = user_id |
| st.session_state.chat_history.append({ |
| "role": "assistant", |
| "content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" |
| }) |
| st.session_state.login_submitted = True |
| if st.session_state.get("login_submitted", False): |
| st.session_state.pop("login_submitted") |
| st.rerun() |
| else: |
| |
| for message in st.session_state.chat_history: |
| with st.chat_message(message["role"]): |
| st.write(message["content"]) |
|
|
| |
| user_query = st.chat_input('Type your question here or exit to end the conversation') |
| if user_query: |
| if user_query.lower() == "exit": |
| st.session_state.chat_history.append({"role": "user", "content": "exit"}) |
| with st.chat_message("user"): |
| st.write("exit") |
| goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." |
| st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) |
| with st.chat_message("assistant"): |
| st.write(goodbye_msg) |
| st.session_state.user_id = None |
| st.rerun() |
| return |
|
|
| st.session_state.chat_history.append({"role": "user", "content": user_query}) |
| with st.chat_message("user"): |
| st.write(user_query) |
|
|
| |
| filtered_result = filter_input_with_llama_guard(user_query) |
| print(filtered_result) |
|
|
| |
| with st.chat_message("assistant"): |
| if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: |
| try: |
| |
| if 'chatbot' not in st.session_state: |
| st.session_state.chatbot = NutritionBot() |
|
|
| |
| response = st.session_state.chatbot.handle_customer_query( |
| st.session_state.user_id, |
| user_query |
| ) |
|
|
| st.write(response) |
| st.session_state.chat_history.append({"role": "assistant", "content": response}) |
| except Exception as e: |
| error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" |
| st.write(error_msg) |
| st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
| else: |
| inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." |
| st.write(inappropriate_msg) |
| st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) |
|
|
| if __name__ == "__main__": |
| nutrition_disorder_streamlit() |
|
|