| import os |
| import streamlit as st |
| from datasets import load_dataset |
| import chromadb |
| import string |
|
|
| from openai import OpenAI |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from scipy.spatial.distance import cosine |
|
|
| from typing import Dict, List |
|
|
| def merge_dataframes(dataframes): |
| |
| combined_dataframe = pd.concat(dataframes, ignore_index=True) |
|
|
| |
| combined_dataframe = combined_dataframe[['context', 'questions', 'answers']] |
|
|
| return combined_dataframe |
|
|
| def call_chatgpt(prompt: str, directions: str) -> str: |
| ''' |
| Uses the OpenAI API to generate an AI response to a prompt. |
| Args: |
| prompt: A string representing the prompt to send to the OpenAI API. |
| Returns: |
| A string representing the AI's generated response. |
| ''' |
|
|
| |
| client = OpenAI(api_key = os.environ["OPENAI_API_KEY"]) |
|
|
| completion = client.chat.completions.create( |
| model="gpt-3.5-turbo-0125", |
| messages=[ |
| {"role": "system", "content": directions}, |
| {"role": "user", "content": prompt} |
| ] |
| ) |
|
|
| |
| ans = completion.choices[0].message.content |
|
|
| |
| return ans |
|
|
| def openai_text_embedding(prompt: str) -> str: |
| return openai.Embedding.create(input=prompt, model="text-embedding-ada-002")[ |
| "data" |
| ][0]["embedding"] |
|
|
| def calculate_sts_openai_score(sentence1: str, sentence2: str) -> float: |
| |
| embedding1 = openai_text_embedding(sentence1) |
| embedding2 = openai_text_embedding(sentence2) |
|
|
| |
| embedding1 = np.asarray(embedding1) |
| embedding2 = np.asarray(embedding2) |
|
|
| |
| similarity_score = 1 - cosine(embedding1, embedding2) |
|
|
| return similarity_score |
|
|
| def add_dist_score_column( |
| dataframe: pd.DataFrame, sentence: str, |
| ) -> pd.DataFrame: |
| dataframe["stsopenai"] = dataframe["questions"].apply( |
| lambda x: calculate_sts_openai_score(str(x), sentence) |
| ) |
|
|
| sorted_dataframe = dataframe.sort_values(by="stsopenai", ascending=False) |
|
|
|
|
| return sorted_dataframe.iloc[:5, :] |
|
|
| def convert_to_list_of_dict(df: pd.DataFrame) -> List[Dict[str, str]]: |
| ''' |
| Reads in a pandas DataFrame and produces a list of dictionaries with two keys each, 'question' and 'answer.' |
| Args: |
| df: A pandas DataFrame with columns named 'questions' and 'answers'. |
| Returns: |
| A list of dictionaries, with each dictionary containing a 'question' and 'answer' key-value pair. |
| ''' |
|
|
| |
| result = [] |
|
|
| |
| for index, row in df.iterrows(): |
| |
| qa_dict_quest = {"role": "user", "content": row["questions"]} |
| qa_dict_ans = {"role": "assistant", "content": row["answers"]} |
|
|
| |
| result.append(qa_dict_quest) |
| result.append(qa_dict_ans) |
|
|
| |
| return result |
|
|
| domain_info = [{'link': 'KeshavRa/About_YSA_Database', 'name': 'About YSA'}] |
|
|
| st.sidebar.markdown('''This is a chatbot to help you learn more about YSA''') |
|
|
| domain = st.sidebar.selectbox("Select a topic", [d["name"] for d in domain_info]) |
|
|
| special_threshold = 0.3 |
|
|
| n_results = 3 |
|
|
| clear_button = st.sidebar.button("Clear Conversation", key="clear") |
|
|
| if clear_button: |
| st.session_state.messages = [] |
| st.session_state.curr_domain = "" |
|
|
| for d in domain_info: |
| if domain == d['name']: |
| dataset = load_dataset(d['link']) |
|
|
| initial_input = "Tell me about YSA" |
|
|
| |
| client = chromadb.Client() |
|
|
| |
| random_number: int = np.random.randint(low=1e9, high=1e10) |
|
|
| |
| random_string: str = "".join( |
| np.random.choice(list(string.ascii_uppercase + string.digits), size=10) |
| ) |
|
|
| |
| combined_string: str = f"{random_number}{random_string}" |
|
|
| |
| collection = client.create_collection(combined_string) |
|
|
| st.title("YSA Chatbot") |
|
|
| |
| if "messages" not in st.session_state: |
| st.session_state.messages = [] |
|
|
| if "curr_domain" not in st.session_state: |
| st.session_state.curr_domain = "" |
|
|
| |
| with st.spinner("Loading, please be patient with us ... 🙏"): |
| L = len(dataset["train"]["questions"]) |
|
|
| collection.add( |
| ids=[str(i) for i in range(0, L)], |
| documents=dataset["train"]["questions"], |
| metadatas=[{"type": "support"} for _ in range(0, L)], |
| ) |
|
|
| if st.session_state.curr_domain != domain: |
| st.session_state.messages = [] |
| st.session_state.curr_domain = domain |
|
|
| |
| for message in st.session_state.messages: |
| with st.chat_message(message["role"]): |
| st.markdown(message["content"]) |
|
|
| |
| if prompt := st.chat_input("Tell me about a"): |
| |
| st.chat_message("user").markdown(prompt) |
| |
| st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
| question = prompt |
|
|
| results = collection.query(query_texts=question, n_results=n_results) |
|
|
| idx = results["ids"][0] |
| idx = [int(i) for i in idx] |
| ref = pd.DataFrame( |
| { |
| "idx": idx, |
| "questions": [dataset["train"]["questions"][i] for i in idx], |
| "answers": [dataset["train"]["answers"][i] for i in idx], |
| "distances": results["distances"][0], |
| } |
| ) |
| |
| |
| filtered_ref = ref[ref["distances"] < special_threshold] |
| if filtered_ref.shape[0] > 0: |
| |
| ref_from_db_search = filtered_ref["answers"].str.cat(sep=" ") |
| final_ref = filtered_ref |
| else: |
| |
| |
| |
| ref_from_db_search = ref["answers"].str.cat(sep=" ") |
| final_ref = ref |
|
|
| engineered_prompt = f''' |
| Based on the context: {ref_from_db_search}, |
| answer the user question: {question}. |
| ''' |
| answer = call_chatgpt(engineered_prompt, "You are a helpful assistant.") |
|
|
| response = answer |
| |
| with st.chat_message("assistant"): |
| st.markdown(response) |
| with st.expander("See reference:"): |
| st.table(final_ref) |
| |
| st.session_state.messages.append({"role": "assistant", "content": response}) |