| import os |
| import getpass |
| import pandas as pd |
| from typing import Optional, Dict, Any |
|
|
| |
| try: |
| from langchain_core.runnables.base import Runnable |
| except ImportError: |
| try: |
| from langchain.runnables.base import Runnable |
| except ImportError: |
| raise ImportError("Cannot find Runnable class. Please upgrade LangChain or check your installation.") |
|
|
| from langchain.docstore.document import Document |
| from langchain.embeddings import HuggingFaceEmbeddings |
| from langchain.vectorstores import FAISS |
| from langchain.chains import RetrievalQA |
|
|
| from smolagents import CodeAgent, DuckDuckGoSearchTool, ManagedAgent, LiteLLMModel |
| import litellm |
|
|
| from classification_chain import get_classification_chain |
| from refusal_chain import get_refusal_chain |
| from tailor_chain import get_tailor_chain |
| from cleaner_chain import get_cleaner_chain |
| from contextualize_chain import get_contextualize_chain |
|
|
| from langchain.llms.base import LLM |
|
|
|
|
| |
| |
| |
| if not os.environ.get("GEMINI_API_KEY"): |
| os.environ["GEMINI_API_KEY"] = getpass.getpass("Enter your Gemini API Key: ") |
| if not os.environ.get("GROQ_API_KEY"): |
| os.environ["GROQ_API_KEY"] = getpass.getpass("Enter your GROQ API Key: ") |
|
|
| |
| |
| |
| def build_or_load_vectorstore(csv_path: str, store_dir: str) -> FAISS: |
| if os.path.exists(store_dir): |
| print(f"DEBUG: Found existing FAISS store at '{store_dir}'. Loading from disk.") |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/multi-qa-mpnet-base-dot-v1") |
| vectorstore = FAISS.load_local(store_dir, embeddings) |
| return vectorstore |
| else: |
| print(f"DEBUG: Building new store from CSV: {csv_path}") |
| df = pd.read_csv(csv_path) |
| df = df.loc[:, ~df.columns.str.contains('^Unnamed')] |
| df.columns = df.columns.str.strip() |
|
|
| if "Answer" in df.columns: |
| df.rename(columns={"Answer": "Answers"}, inplace=True) |
| if "Question" not in df.columns and "Question " in df.columns: |
| df.rename(columns={"Question ": "Question"}, inplace=True) |
|
|
| if "Question" not in df.columns or "Answers" not in df.columns: |
| raise ValueError("CSV must have 'Question' and 'Answers' columns.") |
|
|
| docs = [] |
| for _, row in df.iterrows(): |
| q = str(row["Question"]) |
| ans = str(row["Answers"]) |
| doc = Document(page_content=ans, metadata={"question": q}) |
| docs.append(doc) |
|
|
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/multi-qa-mpnet-base-dot-v1") |
| vectorstore = FAISS.from_documents(docs, embedding=embeddings) |
| vectorstore.save_local(store_dir) |
| return vectorstore |
|
|
| |
| |
| |
| def build_rag_chain(llm_model: LiteLLMModel, vectorstore: FAISS) -> RetrievalQA: |
| class GeminiLangChainLLM(LLM): |
| def _call(self, prompt: str, stop: Optional[list] = None, **kwargs) -> str: |
| messages = [{"role": "user", "content": prompt}] |
| return llm_model(messages, stop_sequences=stop) |
|
|
| @property |
| def _llm_type(self) -> str: |
| return "custom_gemini" |
|
|
| retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3}) |
| gemini_as_llm = GeminiLangChainLLM() |
| rag_chain = RetrievalQA.from_chain_type( |
| llm=gemini_as_llm, |
| chain_type="stuff", |
| retriever=retriever, |
| return_source_documents=True |
| ) |
| return rag_chain |
|
|
| |
| |
| |
| classification_chain = get_classification_chain() |
| refusal_chain = get_refusal_chain() |
| tailor_chain = get_tailor_chain() |
| cleaner_chain = get_cleaner_chain() |
| contextualize_chain = get_contextualize_chain() |
|
|
| |
| |
| |
| gemini_llm = LiteLLMModel(model_id="gemini/gemini-pro", api_key=os.environ.get("GEMINI_API_KEY")) |
|
|
| wellness_csv = "AIChatbot.csv" |
| brand_csv = "BrandAI.csv" |
| wellness_store_dir = "faiss_wellness_store" |
| brand_store_dir = "faiss_brand_store" |
|
|
| wellness_vectorstore = build_or_load_vectorstore(wellness_csv, wellness_store_dir) |
| brand_vectorstore = build_or_load_vectorstore(brand_csv, brand_store_dir) |
|
|
| wellness_rag_chain = build_rag_chain(gemini_llm, wellness_vectorstore) |
| brand_rag_chain = build_rag_chain(gemini_llm, brand_vectorstore) |
|
|
| search_tool = DuckDuckGoSearchTool() |
| web_agent = CodeAgent(tools=[search_tool], model=gemini_llm) |
| managed_web_agent = ManagedAgent(agent=web_agent, name="web_search", description="Runs web search for you.") |
| manager_agent = CodeAgent(tools=[], model=gemini_llm, managed_agents=[managed_web_agent]) |
|
|
| def do_web_search(query: str) -> str: |
| print("DEBUG: Attempting web search for more info...") |
| search_query = f"Give me relevant info: {query}" |
| response = manager_agent.run(search_query) |
| return response |
|
|
| |
| |
| |
| def run_with_chain_context(inputs: Dict[str, Any]) -> Dict[str, str]: |
| user_query = inputs["input"] |
| chat_history = inputs.get("chat_history", []) |
|
|
| contextualized_query = contextualize_chain.invoke({"user_query": user_query, "chat_history": chat_history}) |
|
|
| |
| class_result = classification_chain.invoke({"query": contextualized_query, "chat_history": chat_history}) |
| classification = class_result.get("text", "").strip() |
|
|
| if classification == "OutOfScope": |
| refusal_text = refusal_chain.run({"chat_history": chat_history}) |
| final_refusal = tailor_chain.run({"response": refusal_text, "chat_history": chat_history}) |
| return {"answer": final_refusal.strip()} |
|
|
| if classification == "Wellness": |
| rag_result = wellness_rag_chain.invoke({ |
| "query": contextualized_query, |
| "chat_history": chat_history |
| }) |
| csv_answer = rag_result["result"].strip() |
| web_answer = do_web_search(contextualized_query) if not csv_answer else "" |
| final_merged = cleaner_chain.merge(kb=csv_answer, web=web_answer, chat_history=chat_history) |
| final_answer = tailor_chain.run({"response": final_merged, "chat_history": chat_history}).strip() |
| return {"answer": final_answer} |
|
|
| if classification == "Brand": |
| rag_result = brand_rag_chain.invoke({ |
| "query": contextualized_query, |
| "chat_history": chat_history |
| }) |
| csv_answer = rag_result["result"].strip() |
| final_merged = cleaner_chain.merge(kb=csv_answer, web="", chat_history=chat_history) |
| final_answer = tailor_chain.run({"response": final_merged, "chat_history": chat_history}).strip() |
| return {"answer": final_answer} |
|
|
| refusal_text = refusal_chain.run({"chat_history": chat_history}) |
| final_refusal = tailor_chain.run({"response": refusal_text, "chat_history": chat_history}).strip() |
| return {"answer": final_refusal} |
|
|
| |
| |
| |
| class PipelineRunnable(Runnable[Dict[str, Any], Dict[str, str]]): |
| def invoke(self, input: Dict[str, Any], config: Optional[Any] = None) -> Dict[str, str]: |
| return run_with_chain_context(input) |
|
|
| pipeline_runnable = PipelineRunnable() |
|
|