| import streamlit as st |
| import pandas as pd |
| import numpy as np |
| import re |
| import random |
| import time |
|
|
| import streamlit as st |
| from dotenv import load_dotenv |
| from langchain_experimental.text_splitter import SemanticChunker |
| from langchain_community.embeddings import OpenAIEmbeddings |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.chat_models import ChatOpenAI |
| from langchain import hub |
| from langchain_core.runnables import RunnablePassthrough |
| from langchain_community.document_loaders import WebBaseLoader,FireCrawlLoader,PyPDFLoader |
| from langchain_core.prompts.prompt import PromptTemplate |
| import os |
| from high_chart import test_chart |
| from chat_with_pps import get_response |
| from ecologits.tracers.utils import compute_llm_impacts |
| from codecarbon import EmissionsTracker |
|
|
| load_dotenv() |
|
|
| def get_docs_from_website(urls): |
| loader = WebBaseLoader(urls, header_template={ |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36', |
| }) |
| try: |
| docs = loader.load() |
| return docs |
| except Exception as e: |
| return None |
| |
|
|
| def get_docs_from_website_fc(urls,firecrawl_api_key): |
| docs = [] |
| try: |
| for url in urls: |
| loader = FireCrawlLoader(api_key=firecrawl_api_key, url = url,mode="scrape") |
| docs+=loader.load() |
| return docs |
| except Exception as e: |
| return None |
| |
|
|
| def get_doc_chunks(docs): |
| |
| |
| |
| |
|
|
| text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) |
| |
| docs = text_splitter.split_documents(docs) |
| return docs |
|
|
| def get_doc_chunks_fc(docs): |
| |
| |
| |
| |
|
|
| text_splitter = SemanticChunker(OpenAIEmbeddings(model="text-embedding-3-small")) |
| docs_splitted = [] |
| for text in docs: |
| text_splitted = text_splitter.split_text(text) |
| docs_splitted+=text_splitted |
| return docs_splitted |
| |
|
|
| def get_vectorstore_from_docs(doc_chunks): |
| embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
| vectorstore = FAISS.from_documents(documents=doc_chunks, embedding=embedding) |
| return vectorstore |
|
|
| def get_vectorstore_from_text(texts): |
| embedding = OpenAIEmbeddings(model="text-embedding-3-small") |
| vectorstore = FAISS.from_texts(texts=texts, embedding=embedding) |
| return vectorstore |
|
|
| def get_conversation_chain(vectorstore): |
| llm = ChatOpenAI(model="gpt-4o",temperature=0.5, max_tokens=2048) |
| |
| retriever=vectorstore.as_retriever() |
|
|
| prompt = hub.pull("rlm/rag-prompt") |
| |
| rag_chain = ( |
| {"context": retriever , "question": RunnablePassthrough()} |
| | prompt |
| | llm |
| ) |
| return rag_chain |
|
|
| |
| def fill_promptQ_template(input_variables, template): |
| prompt = PromptTemplate(input_variables=["BRAND_NAME","BRAND_DESCRIPTION"], template=template) |
| return prompt.format(BRAND_NAME=input_variables["BRAND_NAME"], BRAND_DESCRIPTION=input_variables["BRAND_DESCRIPTION"]) |
|
|
| def text_to_list(text): |
| lines = text.replace("- ","").split('\n') |
| |
| lines = [line.split() for line in lines] |
| items = [[' '.join(line[:-1]),line[-1]] for line in lines] |
|
|
| |
| for item in items: |
| item[1] = re.sub(r'\D', '', item[1]) |
| return items |
|
|
| def delete_pp(pps): |
| for pp in pps: |
| for i in range(len(st.session_state['pp_grouped'])): |
| if st.session_state['pp_grouped'][i]['name'] == pp: |
| del st.session_state['pp_grouped'][i] |
| break |
|
|
| def display_list_urls(): |
| for index, item in enumerate(st.session_state["urls"]): |
| emp = st.empty() |
| col1, col2 = emp.columns([7, 3]) |
|
|
| |
| if col2.button("❌", key=f"but{index}"): |
| temp = st.session_state['parties_prenantes'][index] |
| delete_pp(temp) |
| del st.session_state.urls[index] |
| del st.session_state["parties_prenantes"][index] |
| st.rerun() |
|
|
| if len(st.session_state.urls) > index: |
| |
| with col1.expander(f"Source {index+1}: {item}"): |
| pp = st.session_state["parties_prenantes"][index] |
| st.write(pd.DataFrame(pp, columns=["Partie prenante"])) |
| else: |
| emp.empty() |
| |
| def colored_circle(color): |
| return f'<span style="display: inline-block; width: 15px; height: 15px; border-radius: 50%; background-color: {color};"></span>' |
|
|
| def display_list_pps(): |
| for index, item in enumerate(st.session_state["pp_grouped"]): |
| emp = st.empty() |
| col1, col2 = emp.columns([7, 3]) |
|
|
| if col2.button("❌", key=f"butp{index}"): |
|
|
| del st.session_state["pp_grouped"][index] |
| st.rerun() |
|
|
| if len(st.session_state["pp_grouped"]) > index: |
| name = st.session_state["pp_grouped"][index]["name"] |
| col1.markdown(f'<p>{colored_circle(st.session_state["pp_grouped"][index]["color"])} {st.session_state["pp_grouped"][index]["name"]}</p>', |
| unsafe_allow_html=True |
| ) |
| else: |
| emp.empty() |
|
|
|
|
| |
| def extract_pp(docs,input_variables): |
| template_extraction_PP = """ |
| Objectif : Identifiez toutes les parties prenantes de la marque suivante : |
| |
| Le nom de la marque de référence est le suivant : {BRAND_NAME} |
| |
| TA RÉPONSE DOIT ÊTRE SOUS FORME DE LISTE DE NOMS DE MARQUES, CHAQUE NOM SUR UNE LIGNE SÉPARÉE. |
| |
| """ |
| |
|
|
| if docs == None: |
| return "445" |
|
|
| |
| text_chunks = get_doc_chunks(docs) |
|
|
| |
| vectorstore = get_vectorstore_from_docs(text_chunks) |
|
|
| chain = get_conversation_chain(vectorstore) |
|
|
| question = fill_promptQ_template(input_variables, template_extraction_PP) |
|
|
| start = time.perf_counter() |
| response = chain.invoke(question) |
|
|
| response_latency = time.perf_counter() - start |
| |
| |
| |
| if "ne sais pas" in response.content: |
| return "444" |
|
|
| |
| nbre_out_tokens = response.response_metadata["token_usage"]["completion_tokens"] |
| provider = "openai" |
| model = "gpt-4o" |
| impact = compute_llm_impacts( |
| provider=provider, |
| model_name=model, |
| output_token_count=nbre_out_tokens, |
| request_latency=response_latency, |
| ) |
|
|
| st.session_state["partial_emissions"]["extraction_pp"]["el"] += impact.gwp.value |
| |
| partie_prenante = response.content.replace("- ","").split('\n') |
| partie_prenante = [item.strip() for item in partie_prenante] |
|
|
| return partie_prenante |
|
|
| def generate_random_color(): |
| |
| r = random.randint(0, 255) |
| g = random.randint(0, 255) |
| b = random.randint(0, 255) |
|
|
| |
| color_hex = '#{:02x}{:02x}{:02x}'.format(r, g, b) |
|
|
| return color_hex |
|
|
|
|
| def format_pp_add_viz(pp): |
| y = 50 |
| x = 50 |
| for i in range(len(st.session_state['pp_grouped'])): |
| if st.session_state['pp_grouped'][i]['y'] == y and st.session_state['pp_grouped'][i]['x'] == x: |
| y += 5 |
| if y > 95: |
| y = 50 |
| x += 5 |
| if st.session_state['pp_grouped'][i]['name'] == pp: |
| return None |
| else: |
| st.session_state['pp_grouped'].append({'name':pp, 'x':x,'y':y, 'color':generate_random_color()}) |
|
|
| def add_pp(new_pp, default_value=50): |
| new_pp = sorted(new_pp) |
| new_pp = [item.lower().capitalize().strip() for item in new_pp] |
| st.session_state['parties_prenantes'].append(new_pp) |
| for pp in new_pp: |
| format_pp_add_viz(pp) |
|
|
| def add_existing_pps(pp,pouvoir,influence): |
| for i in range(len(st.session_state['pp_grouped'])): |
| if st.session_state['pp_grouped'][i]['name'] == pp: |
| st.session_state['pp_grouped'][i]['x'] = influence |
| st.session_state['pp_grouped'][i]['y'] = pouvoir |
| return None |
| st.session_state['pp_grouped'].append({'name':pp, 'x':influence,'y':pouvoir, 'color':generate_random_color()}) |
|
|
| def load_csv(file): |
| df = pd.read_csv(file) |
| for index, row in df.iterrows(): |
| add_existing_pps(row['parties prenantes'],row['pouvoir'],row['influence']) |
|
|
|
|
| def add_pp_input_text(): |
| new_pp = st.text_input("Ajouter une partie prenante") |
| if st.button("Ajouter",key="add_single_pp"): |
| format_pp_add_viz(new_pp) |
|
|
|
|
| def complete_and_verify_url(partial_url): |
| |
| regex = re.compile( |
| r'^(?:http|ftp)s?://' |
| r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' |
| r'localhost|' |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| r'(?::\d+)?' |
| r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
| |
| regex = re.compile( |
| r'^(?:http|ftp)s?://' |
| r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,8}\.?|' |
| r'localhost|' |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| r'(?::\d+)?' |
| r'(?:[/?#][^\s]*)?$', |
| re.IGNORECASE) |
|
|
| |
| if not partial_url.startswith(('http://', 'https://', 'www.')): |
| if not partial_url.startswith('www.'): |
| complete_url = 'https://www.' + partial_url |
| else: |
| complete_url = 'https://' + partial_url |
|
|
| elif partial_url.startswith('www.'): |
| complete_url = 'https://' + partial_url |
|
|
| else: |
| complete_url = partial_url |
|
|
| |
| if re.match(regex, complete_url): |
| return (True, complete_url) |
| else: |
| return (False, complete_url) |
| |
| @st.dialog("Conseil IA",width="large") |
| def show_conseil_ia(): |
| prompt = "Prenant compte les données de l'entreprise (activité, produits, services ...), quelles sont les principales parties prenantes à animer pour une démarche RSE réussie ?" |
| st.markdown(f"**{prompt}**") |
| response = st.write_stream(get_response(prompt, "",st.session_state["latest_doc"][0].page_content)) |
| st.warning("Quittez et saisissez une autre URL") |
|
|
| def display_pp(): |
| if "emission" not in st.session_state: |
| tracker = EmissionsTracker() |
| tracker.start() |
| st.session_state["emission"] = tracker |
| |
| load_dotenv() |
| fire_crawl_api_key = os.getenv("FIRECRAWL_API_KEY") |
|
|
| |
| if "Nom de la marque" not in st.session_state: |
| st.session_state["Nom de la marque"] = "" |
|
|
| |
| if "urls" not in st.session_state: |
| st.session_state["urls"] = [] |
| if "parties_prenantes" not in st.session_state: |
| st.session_state['parties_prenantes'] = [] |
| if "pp_grouped" not in st.session_state: |
| st.session_state['pp_grouped'] = [] |
| if "latest_doc" not in st.session_state: |
| st.session_state['latest_doc'] = "" |
| if "not_pp" not in st.session_state: |
| st.session_state["not_pp"] = "" |
| |
|
|
| st.title("IDENTIFIER ET ANIMER VOS PARTIES PRENANTES") |
| |
| brand_name = st.text_input("Nom de la marque", st.session_state["Nom de la marque"]) |
| st.session_state["Nom de la marque"] = brand_name |
|
|
| option = st.radio("Source", ("A partir de votre site web", "A partir de vos documents entreprise","A partir de cartographie existante")) |
|
|
| |
| if option == "A partir de votre site web": |
|
|
| url = st.text_input("Ajouter une URL") |
| |
| captions = ["L’IA prend en compte uniquement les textes contenus dans les pages web analysées","L’IA prend en compte les textes, les images et les liens URL contenus dans les pages web analysées"] |
| scraping_option = st.radio("Mode", ("Analyse rapide", "Analyse profonde"),horizontal=True,captions = captions) |
| |
| if st.button("ajouter",key="add_pp"): |
| st.session_state["not_pp"] = "" |
| |
| is_valid,url = complete_and_verify_url(url) |
| if not is_valid: |
| st.error("URL invalide") |
| elif url in st.session_state["urls"] : |
| st.error("URL déjà ajoutée") |
| |
| else: |
| if scraping_option == "Analyse profonde": |
| with st.spinner("Collecte des données..."): |
| docs = get_docs_from_website_fc([url],fire_crawl_api_key) |
| if docs is None: |
| st.warning("Erreur lors de la collecte des données, 2eme essai avec collecte rapide...") |
| with st.spinner("2eme essai, collecte rapide..."): |
| docs = get_docs_from_website([url]) |
|
|
| if scraping_option == "Analyse rapide": |
| with st.spinner("Collecte des données..."): |
| docs = get_docs_from_website([url]) |
| |
| if docs is None: |
| st.error("Erreur lors de la collecte des données, URL unvalide") |
| st.session_state["latest_doc"] = "" |
| else: |
| |
| st.session_state["partial_emissions"]["Scrapping"]["cc"] = st.session_state["emission"].stop() |
| st.session_state["latest_doc"] = docs |
| |
| with st.spinner("Processing..."): |
|
|
| |
| input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} |
| partie_prenante = extract_pp(docs, input_variables) |
|
|
| if "444" in partie_prenante: |
| st.session_state["not_pp"] = "444" |
|
|
| elif "445" in partie_prenante: |
| st.error("Aucun site web trouvé avec l'url donnée") |
| st.session_state["not_pp"] = "" |
| else: |
| st.session_state["not_pp"] = "" |
| partie_prenante = sorted(partie_prenante) |
| st.session_state["urls"].append(url) |
| add_pp(partie_prenante) |
| st.session_state["partial_emissions"]["extraction_pp"]["cc"] = st.session_state["emission"].stop() |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| if option =="A partir de vos documents entreprise": |
|
|
| uploaded_file = st.file_uploader("Télécharger le fichier PDF", type="pdf") |
| if uploaded_file is not None: |
|
|
| if st.button("ajouter",key="add_pp_pdf"): |
| st.session_state["not_pp"] = "" |
|
|
| with st.spinner("Processing..."): |
| file_name = uploaded_file.name |
| with open(file_name, mode='wb') as w: |
| w.write(uploaded_file.getvalue()) |
| pdf = PyPDFLoader(file_name) |
| text = pdf.load() |
| st.session_state["latest_doc"] = text |
| input_variables = {"BRAND_NAME": brand_name, "BRAND_DESCRIPTION": ""} |
| partie_prenante = extract_pp(text, input_variables) |
|
|
| if "444" in partie_prenante: |
| st.session_state["not_pp"] = "444" |
|
|
| elif "445" in partie_prenante: |
| st.error("Aucun site web trouvé avec l'url donnée") |
| st.session_state["not_pp"] = "" |
|
|
| else: |
| st.session_state["not_pp"] = "" |
| partie_prenante = sorted(partie_prenante) |
| st.session_state["urls"].append(file_name) |
| add_pp(partie_prenante) |
| |
| if option == "A partir de cartographie existante": |
| uploaded_file = st.file_uploader("Télécharger le fichier CSV", type="csv") |
| if uploaded_file is not None: |
| if st.button("ajouter",key="add_pp_csv"): |
| file_name = uploaded_file.name |
| with open(file_name, mode='wb') as w: |
| w.write(uploaded_file.getvalue()) |
| |
| try: |
| load_csv(file_name) |
| brand_name_from_csv = file_name.split("-")[1] |
| st.session_state["Nom de la marque"] = brand_name_from_csv |
| except Exception as e: |
| st.error("Erreur lors de la lecture du fichier") |
| |
|
|
| if st.session_state["not_pp"] == "444": |
| st.warning("Aucune parties prenantes n'est identifiable sur l'URL fournie. Fournissez une autre URL ou bien cliquez sur le boutton ci-dessous pour un Conseils IA") |
| |
| if st.button("Conseil IA"): |
| show_conseil_ia() |
| |
| if st.session_state["latest_doc"] != "": |
| with st.expander("Cliquez ici pour éditer et voir le document"): |
| docs = st.session_state["latest_doc"] |
| cleaned_text = re.sub(r'\n\n+', '\n\n', docs[0].page_content.strip()) |
| text_value = st.text_area("Modifier le texte ci-dessous:", value=cleaned_text, height=300) |
| if st.button('Sauvegarder',key="save_doc_fake"): |
| st.success("Texte sauvegardé avec succès!") |
| |
| display_list_urls() |
| with st.expander("Liste des parties prenantes"): |
| add_pp_input_text() |
| display_list_pps() |
| test_chart() |
|
|
| |
|
|
|
|
|
|