| import os |
| import sys |
| import importlib |
| from collections import defaultdict |
|
|
| from concurrent.futures import Future, ThreadPoolExecutor |
|
|
| from datetime import datetime, timedelta |
| import pandas as pd |
| from langsmith import Client |
| from tqdm.auto import tqdm |
|
|
| import chainlit as cl |
|
|
| async def get_trace(apiKey,task_list): |
| try: |
| client = Client(api_key=apiKey) |
| project_name = "agent-collaboratif-avid" |
| num_days = 30 |
|
|
| |
| tool_runs = client.list_runs( |
| project_name=project_name, |
| start_time=datetime.now() - timedelta(days=num_days), |
| is_root=True, |
| |
| select=["inputs","trace_id"], |
| ) |
|
|
| data = [] |
| futures: list[Future] = [] |
| trace_cursor = 0 |
| trace_batch_size = 20 |
|
|
| tool_runs_by_parent = defaultdict(lambda: defaultdict(set)) |
| |
| with ThreadPoolExecutor(max_workers=2) as executor: |
| |
| task2 = cl.Task(title="Grouper les outils invoqués dans une trace et les organiser par parent run ID") |
| await task_list.add_task(task2) |
| for run in tqdm(tool_runs): |
| |
| tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name) |
| |
| |
| |
| if len(tool_runs_by_parent) % trace_batch_size == 0: |
| if this_batch := list(tool_runs_by_parent.keys())[ |
| trace_cursor : trace_cursor + trace_batch_size |
| ]: |
| trace_cursor += trace_batch_size |
| futures.append( |
| executor.submit( |
| client.list_runs, |
| project_name=project_name, |
| run_ids=this_batch, |
| select=["inputs","trace_id"], |
| ) |
| ) |
| await task_list.send() |
| if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]: |
| futures.append( |
| executor.submit( |
| client.list_runs, |
| project_name=project_name, |
| run_ids=this_batch, |
| select=["inputs","trace_id"], |
| ) |
| ) |
| task2.status = cl.TaskStatus.DONE |
| await task_list.send() |
| task3 = cl.Task(title="Rechercher les données d'actions des utilisateurs de l'agent collabroatif AVID et les organiser par parent run ID dans un DataFrame") |
| await task_list.add_task(task3) |
| for future in tqdm(futures): |
| root_runs = future.result() |
| for root_run in root_runs: |
| root_data = tool_runs_by_parent[root_run.id] |
| data.append( |
| { |
| "inputs": root_run.inputs, |
| "start_time": root_run.start_time, |
| "end_time": root_run.end_time, |
| } |
| ) |
|
|
| |
| task3.status = cl.TaskStatus.DONE |
| await task_list.send() |
| |
| df_inputs = pd.DataFrame(data) |
| df_inputs['query'] = df_inputs.apply(lambda x: x.get('inputs', {}).get('query'), axis=1) |
| df_inputs['latency'] = df_inputs['end_time'] - df_inputs['start_time'] |
| df_inputs['latency'] = df_inputs['latency'].apply(lambda x: x.total_seconds()) |
| df_inputs=df_inputs[["query","latency","start_time"]].copy() |
| task4 = cl.Task(title="Conversion des données d'actions des utilisateurs de l'agent collabroatif AVID et les afficher au format texte") |
| await task_list.add_task(task4) |
| |
| list_inputs = df_inputs.head(20).values.tolist() |
| str_inputs="".join(['* Requête : ' + str(item[0]) + '\nDate : ' + str(item[2]) + '\nDurée de la requête : ' + str(item[1]) + '\n\n' for item in list_inputs]) |
| task4.status = cl.TaskStatus.DONE |
| await task_list.send() |
| return str_inputs |
| except Exception as e: |
| return f"Aucune connexion à LangSmith" |