import os from dotenv import load_dotenv from PyPDF2 import PdfReader from langchain.text_splitter import CharacterTextSplitter from langchain import chains from goose3 import Goose import streamlit as st import whisper from pytube import YouTube import moviepy.editor import time from langchain_community.vectorstores import Milvus from pymilvus import connections # HF from huggingface_hub import InferenceClient from langchain.embeddings.base import Embeddings from langchain.llms.base import LLM from typing import Optional, List # -------------------- INIT -------------------- load_dotenv() connections.connect(alias="default", host="localhost", port="19530") HF_TOKEN = os.getenv("HF_TOKEN") # -------------------- HF EMBEDDINGS -------------------- class HFInferenceEmbeddings(Embeddings): def __init__(self): self.client = InferenceClient(api_key=HF_TOKEN) self.model = "sentence-transformers/all-MiniLM-L6-v2" def embed_documents(self, texts): return self.client.feature_extraction(texts, model=self.model) def embed_query(self, text): return self.client.feature_extraction(text, model=self.model) # -------------------- HF LLM -------------------- class HFChatLLM(LLM): def __init__(self): self.client = InferenceClient(api_key=HF_TOKEN) self.model = "deepseek-ai/DeepSeek-V3.2:novita" @property def _llm_type(self) -> str: return "hf_chat" def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: completion = self.client.chat.completions.create( model=self.model, messages=[ { "role": "system", "content": "Answer only from the given context. Be concise and accurate." }, { "role": "user", "content": prompt } ], ) return completion.choices[0].message.content def get_embeddings(): return HFInferenceEmbeddings() def get_llm(): return HFChatLLM() def get_collection(user_id, name): return f"multigpt_{user_id}_{name}" # -------------------- AUTH -------------------- def login(): st.title("🔐 Login") user = st.text_input("Enter username") if st.button("Login"): if user: st.session_state["user_id"] = user.strip().lower() st.success(f"Logged in as {user}") st.rerun() else: st.error("Enter username") # -------------------- INGESTION -------------------- def store_data(chunks, collection_name): Milvus.from_texts( chunks, embedding=get_embeddings(), collection_name=collection_name, connection_args={"host": "localhost", "port": "19530"} ) def txtread(file): user_id = st.session_state["user_id"] text = file.read().decode("utf-8") chunks = CharacterTextSplitter("\n", 1000, 0).split_text(text) process.success("Chunking done") store_data(chunks, get_collection(user_id, "txt")) process.success("Uploaded") def pdfread(file): user_id = st.session_state["user_id"] reader = PdfReader(file) text = "".join([p.extract_text() for p in reader.pages]) chunks = CharacterTextSplitter("\n", 4000, 0).split_text(text) process.success("Chunking done") store_data(chunks, get_collection(user_id, "pdf")) process.success("Uploaded") def urlread(url): user_id = st.session_state["user_id"] g = Goose() text = g.extract(url=url).cleaned_text chunks = CharacterTextSplitter("\n", 2000, 0).split_text(text) process.success("Chunking done") store_data(chunks, get_collection(user_id, "url")) process.success("Uploaded") def scrape(link): user_id = st.session_state["user_id"] yt = YouTube(link).streams.get_highest_resolution() yt.download(filename="video.mp4") process.success("Downloading video") while not os.path.exists("video.mp4"): time.sleep(5) video = moviepy.editor.VideoFileClip("video.mp4") process.warning("Extracting audio") audio = video.audio audio.write_audiofile("audio.mp3") process.warning("Transcribing") model = whisper.load_model("base") result = model.transcribe("audio.mp3") chunks = CharacterTextSplitter("\n", 1000, 0).split_text(result["text"]) process.success("Chunking done") store_data(chunks, get_collection(user_id, "vid")) process.success("Uploaded") # -------------------- QA -------------------- def chain(name): user_id = st.session_state["user_id"] db = Milvus( embedding_function=get_embeddings(), collection_name=get_collection(user_id, name), connection_args={"host": "localhost", "port": "19530"} ) retriever = db.as_retriever(search_kwargs={"k": 10}) return chains.ConversationalRetrievalChain.from_llm( llm=get_llm(), retriever=retriever ) def ai(qa, query): result = qa({"question": query, "chat_history": []}) process.success("Answer ready") return result # -------------------- UI -------------------- def upload(): placeholder.title("Upload Data") choice = st.sidebar.radio("Mode", ['', 'TEXT', 'PDF', 'URL', 'VIDEO']) if choice == 'TEXT': file = st.file_uploader("Upload txt") if file: txtread(file) elif choice == 'PDF': file = st.file_uploader("Upload PDF") if file: pdfread(file) elif choice == 'URL': url = st.text_input("Enter URL") if url: urlread(url) elif choice == 'VIDEO': link = st.text_input("YouTube link") if link: scrape(link) def chat(): placeholder.title("Chat with your data") choice = st.sidebar.radio("Mode", ['', 'TEXT', 'PDF', 'URL', 'VIDEO']) if choice: query = st.text_input("Ask your question") if query: qa = chain(choice.lower()) result = ai(qa, query) ph = st.empty() x = "" for i in result["answer"]: x += i time.sleep(0.01) ph.markdown(x) # -------------------- MAIN -------------------- def main(): global placeholder, process, data placeholder = st.empty() data = st.empty() process = st.empty() if "user_id" not in st.session_state: login() return st.sidebar.write(f"👤 {st.session_state['user_id']}") page = st.sidebar.radio("Navigate", ['Upload', 'Chat', 'Logout']) if page == "Upload": upload() elif page == "Chat": chat() elif page == "Logout": st.session_state.clear() st.rerun() if __name__ == "__main__": main()