| from dotenv import load_dotenv
|
| import os
|
| import requests
|
| import gradio as gr
|
| from pypdf import PdfReader
|
| import google.generativeai as genai
|
| from chromadb import Documents, EmbeddingFunction, Embeddings
|
| from typing import Dict, List
|
| import numpy as np
|
| from sklearn.metrics.pairwise import cosine_similarity
|
| import re
|
| import pickle
|
| import json
|
| from embed import *
|
| load_dotenv(override=True)
|
| genai.configure(api_key=os.getenv("GEMINI_API"))
|
| pushover_user = os.getenv("PUSHOVER_USER")
|
| pushover_token = os.getenv("PUSHOVER_API")
|
| pushover_url = f"https://api.pushover.net/1/messages.json"
|
|
|
|
|
| def push(message: str):
|
| print("Pushing to Pushover ", message)
|
| payload = {"user": pushover_user, "token": pushover_token, "message": message}
|
| requests.post(pushover_url, data=payload)
|
|
|
| def record_user_details(email: str,
|
| name: str,
|
| notes: str) -> Dict[str, str]:
|
| push(f"Email: {email}\nName: {name}\nNotes: {notes}")
|
| return {"recorded": "ok"}
|
|
|
|
|
| def record_unknown_question(question: str) -> Dict[str, str]:
|
| push(f"Question: {question}")
|
| return {"recorded": "ok"}
|
|
|
|
|
| def handle_tool_calls(tool_calls: List) -> List[Dict[str, str]]:
|
| results = []
|
| for tool_call in tool_calls:
|
| tool_name = tool_call.name
|
| arguments = dict(tool_call.args)
|
| print(f"Tool called: {tool_name} with arguments: {arguments}")
|
| tool = globals().get(tool_name)
|
| result = tool(**arguments) if tool else {}
|
|
|
| results.append({
|
| "function_response": {
|
| "name": tool_name,
|
| "response": result
|
| }
|
| })
|
| return results
|
|
|
| record_user_details_json = {
|
| "name": "record_user_details",
|
| "description": "Use this tool to record that a user is interested in being in touch and provided an email address",
|
| "parameters": {
|
| "type": "OBJECT",
|
| "properties": {
|
| "email": {
|
| "type": "STRING",
|
| "description": "The email address of this user"
|
| },
|
| "name": {
|
| "type": "STRING",
|
| "description": "The user's name, if they provided it"
|
| }
|
| ,
|
| "notes": {
|
| "type": "STRING",
|
| "description": "Any additional information about the conversation that's worth recording to give context"
|
| }
|
| },
|
| "required": ["name", "email"]
|
| }
|
| }
|
|
|
| record_unknown_question_json = {
|
| "name": "record_unknown_question",
|
| "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
|
| "parameters": {
|
| "type": "OBJECT",
|
| "properties": {
|
| "question": {
|
| "type": "STRING",
|
| "description": "The question that couldn't be answered"
|
| },
|
| },
|
| "required": ["question"]
|
| }
|
| }
|
|
|
| tools = [
|
| record_user_details_json,
|
| record_unknown_question_json
|
| ]
|
|
|
|
|
|
|
| class App:
|
|
|
| def __init__(self):
|
| self.db = load_chroma_db(path="Week_1/Data_w1", name='RAG_DB')
|
|
|
|
|
| def rag_prompt(self, query: str, relevant_passages: str) -> str:
|
| escaped = relevant_passages.replace("'", "").replace('"', "").replace("\n", " ")
|
| prompt = f'''
|
| Please answer questions using text from the reference passage included below. \
|
| Be sure to respond in a complete sentence, being comprehensive, including all relevant background information. \
|
| However, you are talking to a non-technical audience, so be sure to break down complicated concepts and \
|
| strike a friendly and converstional tone. \
|
| If the passage is irrelevant to the question, you should respond with "I do not have an answer for that." and use record_unknown_question tool to record the question. \
|
| QUESTION: {query} \
|
| PASSAGE: {escaped}
|
| '''
|
| return prompt
|
|
|
| def system_prompt(self) -> str:
|
| return '''
|
| You are acting as Ed Donner. You are answering questions on Ed Donner's website, \
|
| particularly questions related to Ed Donner's career, background, skills and experience. \
|
| Your responsibility is to represent Ed Donner for interactions on the website as faithfully as possible. \
|
| Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
|
| If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \
|
| If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool.
|
| '''
|
| def chat_with_gemini(self, message, history, system_prompt):
|
| try:
|
|
|
|
|
| model = genai.GenerativeModel(
|
| 'gemini-2.0-flash',
|
| system_instruction=system_prompt,
|
| tools=tools
|
| )
|
|
|
| gemini_history = []
|
| max_iteration = 3
|
| iteration = 0
|
| for msg in history:
|
| if msg["role"] == "user":
|
| gemini_history.append({
|
| "role": "user",
|
| "parts": [msg["content"]]
|
| })
|
| elif msg["role"] == "assistant":
|
| gemini_history.append({
|
| "role": "model",
|
| "parts": [msg["content"]]
|
| })
|
|
|
|
|
| chat_session = model.start_chat(history=gemini_history)
|
| relevant_passage = get_relevant_passage(query= message,
|
| db= self.db,
|
| n_results=3)
|
|
|
| prompt = self.rag_prompt(query= current_message,
|
| relevant_passages= " ".join(relevant_passage))
|
|
|
| current_message = prompt
|
|
|
| try:
|
| while iteration < max_iteration:
|
|
|
| response = chat_session.send_message(current_message)
|
|
|
| finish_reason = response.candidates[0].finish_reason
|
|
|
| print(f"Response parts: {[part for part in response.candidates[0].content.parts]}")
|
|
|
| function_calls = []
|
| text_parts = []
|
|
|
|
|
| for part in response.candidates[0].content.parts:
|
| if hasattr(part, "function_call") and part.function_call:
|
| function_calls.append(part.function_call)
|
| print("Function calls list not empty")
|
| elif hasattr(part, "text"):
|
| text_parts.append(part.text)
|
|
|
|
|
| if function_calls:
|
| results = handle_tool_calls(function_calls)
|
|
|
| current_message = results
|
| iteration += 1
|
| else:
|
| if text_parts:
|
| return "".join(text_parts)
|
| else:
|
| return response.text
|
| return ""
|
| except Exception as e:
|
| return f"Error: {e}"
|
| except Exception as e:
|
| return f"Error: {e}"
|
|
|
|
|
| if __name__ == "__main__":
|
| chat_grad = App()
|
| with gr.Blocks() as demo:
|
| gr.Markdown("# Chat with Google Gemini")
|
|
|
| system_prompt = gr.Textbox(
|
| value=chat_grad.system_prompt(),
|
| label="System Prompt",
|
| placeholder="Enter system instructions for the AI...",
|
| lines=2
|
| )
|
|
|
| chat_interface = gr.ChatInterface(
|
| fn=chat_grad.chat_with_gemini,
|
| additional_inputs=[system_prompt],
|
| title="",
|
| cache_examples=False,
|
| type='messages'
|
|
|
| )
|
| demo.launch()
|
|
|