Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain import chains | |
| from goose3 import Goose | |
| import streamlit as st | |
| import whisper | |
| from pytube import YouTube | |
| import moviepy.editor | |
| import time | |
| from langchain_community.vectorstores import Milvus | |
| from pymilvus import connections | |
| # HF | |
| from huggingface_hub import InferenceClient | |
| from langchain.embeddings.base import Embeddings | |
| from langchain.llms.base import LLM | |
| from typing import Optional, List | |
| # -------------------- INIT -------------------- | |
| load_dotenv() | |
| connections.connect(alias="default", host="localhost", port="19530") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # -------------------- HF EMBEDDINGS -------------------- | |
| class HFInferenceEmbeddings(Embeddings): | |
| def __init__(self): | |
| self.client = InferenceClient(api_key=HF_TOKEN) | |
| self.model = "sentence-transformers/all-MiniLM-L6-v2" | |
| def embed_documents(self, texts): | |
| return self.client.feature_extraction(texts, model=self.model) | |
| def embed_query(self, text): | |
| return self.client.feature_extraction(text, model=self.model) | |
| # -------------------- HF LLM -------------------- | |
| class HFChatLLM(LLM): | |
| def __init__(self): | |
| self.client = InferenceClient(api_key=HF_TOKEN) | |
| self.model = "deepseek-ai/DeepSeek-V3.2:novita" | |
| def _llm_type(self) -> str: | |
| return "hf_chat" | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: | |
| completion = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "Answer only from the given context. Be concise and accurate." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| ) | |
| return completion.choices[0].message.content | |
| def get_embeddings(): | |
| return HFInferenceEmbeddings() | |
| def get_llm(): | |
| return HFChatLLM() | |
| def get_collection(user_id, name): | |
| return f"multigpt_{user_id}_{name}" | |
| # -------------------- AUTH -------------------- | |
| def login(): | |
| st.title("🔐 Login") | |
| user = st.text_input("Enter username") | |
| if st.button("Login"): | |
| if user: | |
| st.session_state["user_id"] = user.strip().lower() | |
| st.success(f"Logged in as {user}") | |
| st.rerun() | |
| else: | |
| st.error("Enter username") | |
| # -------------------- INGESTION -------------------- | |
| def store_data(chunks, collection_name): | |
| Milvus.from_texts( | |
| chunks, | |
| embedding=get_embeddings(), | |
| collection_name=collection_name, | |
| connection_args={"host": "localhost", "port": "19530"} | |
| ) | |
| def txtread(file): | |
| user_id = st.session_state["user_id"] | |
| text = file.read().decode("utf-8") | |
| chunks = CharacterTextSplitter("\n", 1000, 0).split_text(text) | |
| process.success("Chunking done") | |
| store_data(chunks, get_collection(user_id, "txt")) | |
| process.success("Uploaded") | |
| def pdfread(file): | |
| user_id = st.session_state["user_id"] | |
| reader = PdfReader(file) | |
| text = "".join([p.extract_text() for p in reader.pages]) | |
| chunks = CharacterTextSplitter("\n", 4000, 0).split_text(text) | |
| process.success("Chunking done") | |
| store_data(chunks, get_collection(user_id, "pdf")) | |
| process.success("Uploaded") | |
| def urlread(url): | |
| user_id = st.session_state["user_id"] | |
| g = Goose() | |
| text = g.extract(url=url).cleaned_text | |
| chunks = CharacterTextSplitter("\n", 2000, 0).split_text(text) | |
| process.success("Chunking done") | |
| store_data(chunks, get_collection(user_id, "url")) | |
| process.success("Uploaded") | |
| def scrape(link): | |
| user_id = st.session_state["user_id"] | |
| yt = YouTube(link).streams.get_highest_resolution() | |
| yt.download(filename="video.mp4") | |
| process.success("Downloading video") | |
| while not os.path.exists("video.mp4"): | |
| time.sleep(5) | |
| video = moviepy.editor.VideoFileClip("video.mp4") | |
| process.warning("Extracting audio") | |
| audio = video.audio | |
| audio.write_audiofile("audio.mp3") | |
| process.warning("Transcribing") | |
| model = whisper.load_model("base") | |
| result = model.transcribe("audio.mp3") | |
| chunks = CharacterTextSplitter("\n", 1000, 0).split_text(result["text"]) | |
| process.success("Chunking done") | |
| store_data(chunks, get_collection(user_id, "vid")) | |
| process.success("Uploaded") | |
| # -------------------- QA -------------------- | |
| def chain(name): | |
| user_id = st.session_state["user_id"] | |
| db = Milvus( | |
| embedding_function=get_embeddings(), | |
| collection_name=get_collection(user_id, name), | |
| connection_args={"host": "localhost", "port": "19530"} | |
| ) | |
| retriever = db.as_retriever(search_kwargs={"k": 10}) | |
| return chains.ConversationalRetrievalChain.from_llm( | |
| llm=get_llm(), | |
| retriever=retriever | |
| ) | |
| def ai(qa, query): | |
| result = qa({"question": query, "chat_history": []}) | |
| process.success("Answer ready") | |
| return result | |
| # -------------------- UI -------------------- | |
| def upload(): | |
| placeholder.title("Upload Data") | |
| choice = st.sidebar.radio("Mode", ['', 'TEXT', 'PDF', 'URL', 'VIDEO']) | |
| if choice == 'TEXT': | |
| file = st.file_uploader("Upload txt") | |
| if file: | |
| txtread(file) | |
| elif choice == 'PDF': | |
| file = st.file_uploader("Upload PDF") | |
| if file: | |
| pdfread(file) | |
| elif choice == 'URL': | |
| url = st.text_input("Enter URL") | |
| if url: | |
| urlread(url) | |
| elif choice == 'VIDEO': | |
| link = st.text_input("YouTube link") | |
| if link: | |
| scrape(link) | |
| def chat(): | |
| placeholder.title("Chat with your data") | |
| choice = st.sidebar.radio("Mode", ['', 'TEXT', 'PDF', 'URL', 'VIDEO']) | |
| if choice: | |
| query = st.text_input("Ask your question") | |
| if query: | |
| qa = chain(choice.lower()) | |
| result = ai(qa, query) | |
| ph = st.empty() | |
| x = "" | |
| for i in result["answer"]: | |
| x += i | |
| time.sleep(0.01) | |
| ph.markdown(x) | |
| # -------------------- MAIN -------------------- | |
| def main(): | |
| global placeholder, process, data | |
| placeholder = st.empty() | |
| data = st.empty() | |
| process = st.empty() | |
| if "user_id" not in st.session_state: | |
| login() | |
| return | |
| st.sidebar.write(f"👤 {st.session_state['user_id']}") | |
| page = st.sidebar.radio("Navigate", ['Upload', 'Chat', 'Logout']) | |
| if page == "Upload": | |
| upload() | |
| elif page == "Chat": | |
| chat() | |
| elif page == "Logout": | |
| st.session_state.clear() | |
| st.rerun() | |
| if __name__ == "__main__": | |
| main() |