| """ |
| Système de Mémoire et Stockage Vectoriel pour l'Assistant de Recherche |
| Gère : embeddings, recherche sémantique, historique et déduplication |
| """ |
|
|
| import chromadb |
| from chromadb.config import Settings |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from langchain_core.documents import Document |
| from typing import List, Dict, Optional, Tuple |
| from datetime import datetime |
| import hashlib |
| import json |
| import pickle |
| from pathlib import Path |
| from collections import deque |
|
|
| |
| |
| |
|
|
| class VectorMemoryManager: |
| """Gère le stockage vectoriel des documents et résumés""" |
| |
| def __init__(self, |
| persist_directory: str = "./chroma_db", |
| collection_name: str = "research_documents", |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"): |
| """ |
| Initialise le gestionnaire de mémoire vectorielle |
| |
| Args: |
| persist_directory: Dossier de persistance de ChromaDB |
| collection_name: Nom de la collection ChromaDB |
| embedding_model: Modèle d'embeddings HuggingFace |
| """ |
| self.persist_directory = Path(persist_directory) |
| self.persist_directory.mkdir(parents=True, exist_ok=True) |
| |
| print(f"🔧 Initialisation du système de mémoire vectorielle...") |
| |
| |
| self.embeddings = HuggingFaceEmbeddings( |
| model_name=embedding_model, |
| model_kwargs={'device': 'cpu'}, |
| encode_kwargs={'normalize_embeddings': True} |
| ) |
| |
| |
| self.client = chromadb.PersistentClient( |
| path=str(self.persist_directory), |
| settings=Settings( |
| anonymized_telemetry=False, |
| allow_reset=True |
| ) |
| ) |
| |
| |
| try: |
| self.collection = self.client.get_collection(collection_name) |
| print(f"✅ Collection '{collection_name}' récupérée ({self.collection.count()} documents)") |
| except: |
| self.collection = self.client.create_collection( |
| name=collection_name, |
| metadata={"hnsw:space": "cosine"} |
| ) |
| print(f"✅ Nouvelle collection '{collection_name}' créée") |
| |
| |
| self.vectorstore = Chroma( |
| client=self.client, |
| collection_name=collection_name, |
| embedding_function=self.embeddings |
| ) |
| |
| |
| self.content_hashes = set() |
| self._load_existing_hashes() |
| |
| def _load_existing_hashes(self): |
| """Charge les hashes des documents existants pour déduplication""" |
| try: |
| results = self.collection.get(include=['metadatas']) |
| for metadata in results['metadatas']: |
| if 'content_hash' in metadata: |
| self.content_hashes.add(metadata['content_hash']) |
| print(f"📋 {len(self.content_hashes)} hashes chargés pour déduplication") |
| except Exception as e: |
| print(f"⚠️ Erreur lors du chargement des hashes: {e}") |
| |
| def _compute_hash(self, content: str) -> str: |
| """Calcule le hash MD5 d'un contenu""" |
| return hashlib.md5(content.encode('utf-8')).hexdigest() |
| |
| def is_duplicate(self, content: str) -> bool: |
| """Vérifie si un document est un doublon""" |
| content_hash = self._compute_hash(content) |
| return content_hash in self.content_hashes |
| |
| def add_documents(self, |
| documents: List[Dict[str, any]], |
| source: str = "research", |
| check_duplicates: bool = True) -> Dict[str, int]: |
| """ |
| Ajoute des documents au vectorstore |
| |
| Args: |
| documents: Liste de dicts avec 'content', 'title', 'url', etc. |
| source: Source des documents (research, summary, synthesis) |
| check_duplicates: Vérifier les doublons avant ajout |
| |
| Returns: |
| Dict avec statistiques d'ajout |
| """ |
| print(f"\n📥 Ajout de {len(documents)} documents (source: {source})...") |
| |
| added = 0 |
| skipped = 0 |
| |
| docs_to_add = [] |
| metadatas_to_add = [] |
| ids_to_add = [] |
| |
| for doc in documents: |
| content = doc.get('content', '') |
| |
| |
| if check_duplicates and self.is_duplicate(content): |
| skipped += 1 |
| continue |
| |
| |
| content_hash = self._compute_hash(content) |
| doc_id = f"{source}_{content_hash[:8]}_{datetime.now().timestamp()}" |
| |
| metadata = { |
| 'title': doc.get('title', 'Sans titre'), |
| 'url': doc.get('url', ''), |
| 'source': source, |
| 'timestamp': datetime.now().isoformat(), |
| 'content_hash': content_hash, |
| 'word_count': len(content.split()) |
| } |
| |
| docs_to_add.append(content) |
| metadatas_to_add.append(metadata) |
| ids_to_add.append(doc_id) |
| self.content_hashes.add(content_hash) |
| added += 1 |
| |
| |
| if docs_to_add: |
| self.collection.add( |
| documents=docs_to_add, |
| metadatas=metadatas_to_add, |
| ids=ids_to_add |
| ) |
| |
| stats = { |
| 'added': added, |
| 'skipped': skipped, |
| 'total_in_db': self.collection.count() |
| } |
| |
| print(f"✅ Ajoutés: {added} | Doublons ignorés: {skipped} | Total DB: {stats['total_in_db']}") |
| return stats |
| |
| def semantic_search(self, |
| query: str, |
| k: int = 5, |
| filter_dict: Optional[Dict] = None) -> List[Tuple[Document, float]]: |
| """ |
| Recherche sémantique dans le vectorstore |
| |
| Args: |
| query: Requête de recherche |
| k: Nombre de résultats à retourner |
| filter_dict: Filtres sur les métadonnées (ex: {'source': 'research'}) |
| |
| Returns: |
| Liste de tuples (Document, score) |
| """ |
| print(f"\n🔍 Recherche sémantique: '{query}' (top-{k})") |
| |
| results = self.vectorstore.similarity_search_with_score( |
| query=query, |
| k=k, |
| filter=filter_dict |
| ) |
| |
| print(f"✅ {len(results)} résultats trouvés") |
| return results |
| |
| def get_relevant_context(self, |
| query: str, |
| k: int = 3, |
| source_filter: Optional[str] = None) -> str: |
| """ |
| Récupère le contexte pertinent pour une requête |
| |
| Args: |
| query: Requête |
| k: Nombre de documents à récupérer |
| source_filter: Filtrer par source (research, summary, etc.) |
| |
| Returns: |
| Contexte formaté en string |
| """ |
| filter_dict = {"source": source_filter} if source_filter else None |
| results = self.semantic_search(query, k=k, filter_dict=filter_dict) |
| |
| if not results: |
| return "" |
| |
| context_parts = [] |
| for i, (doc, score) in enumerate(results, 1): |
| context_parts.append( |
| f"[Source {i} - Pertinence: {score:.2f}]\n" |
| f"Titre: {doc.metadata.get('title', 'N/A')}\n" |
| f"{doc.page_content[:500]}...\n" |
| ) |
| |
| return "\n---\n".join(context_parts) |
| |
| def clear_old_documents(self, days: int = 30) -> int: |
| """ |
| Supprime les documents plus anciens que X jours |
| |
| Args: |
| days: Nombre de jours de rétention |
| |
| Returns: |
| Nombre de documents supprimés |
| """ |
| print(f"\n🧹 Nettoyage des documents > {days} jours...") |
| |
| from datetime import timedelta |
| cutoff_date = datetime.now() - timedelta(days=days) |
| |
| results = self.collection.get(include=['metadatas']) |
| ids_to_delete = [] |
| |
| for doc_id, metadata in zip(results['ids'], results['metadatas']): |
| timestamp_str = metadata.get('timestamp', '') |
| try: |
| doc_date = datetime.fromisoformat(timestamp_str) |
| if doc_date < cutoff_date: |
| ids_to_delete.append(doc_id) |
| hash_to_remove = metadata.get('content_hash') |
| if hash_to_remove: |
| self.content_hashes.discard(hash_to_remove) |
| except: |
| continue |
| |
| if ids_to_delete: |
| self.collection.delete(ids=ids_to_delete) |
| |
| print(f"✅ {len(ids_to_delete)} documents supprimés") |
| return len(ids_to_delete) |
|
|
|
|
| |
| |
| |
|
|
| class AgentMemoryManager: |
| """Gère l'historique des conversations et résumés""" |
| |
| def __init__(self, |
| memory_file: str = "./agent_memory.pkl", |
| max_history: int = 100, |
| compression_threshold: int = 50): |
| """ |
| Initialise le gestionnaire de mémoire d'agent |
| |
| Args: |
| memory_file: Fichier de sauvegarde de la mémoire |
| max_history: Nombre maximum d'entrées dans l'historique |
| compression_threshold: Seuil pour compression de mémoire |
| """ |
| self.memory_file = Path(memory_file) |
| self.max_history = max_history |
| self.compression_threshold = compression_threshold |
| |
| |
| self.conversation_history = deque(maxlen=max_history) |
| self.research_cache = {} |
| self.summary_cache = {} |
| self.topic_keywords = {} |
| |
| print(f"🧠 Initialisation du gestionnaire de mémoire d'agent...") |
| self._load_memory() |
| |
| def _load_memory(self): |
| """Charge la mémoire depuis le fichier""" |
| if self.memory_file.exists(): |
| try: |
| with open(self.memory_file, 'rb') as f: |
| data = pickle.load(f) |
| self.conversation_history = data.get('conversation_history', deque(maxlen=self.max_history)) |
| self.research_cache = data.get('research_cache', {}) |
| self.summary_cache = data.get('summary_cache', {}) |
| self.topic_keywords = data.get('topic_keywords', {}) |
| print(f"✅ Mémoire chargée: {len(self.conversation_history)} conversations, " |
| f"{len(self.research_cache)} recherches en cache") |
| except Exception as e: |
| print(f"⚠️ Erreur lors du chargement de la mémoire: {e}") |
| else: |
| print("ℹ️ Nouvelle mémoire initialisée") |
| |
| def _save_memory(self): |
| """Sauvegarde la mémoire dans le fichier""" |
| try: |
| data = { |
| 'conversation_history': self.conversation_history, |
| 'research_cache': self.research_cache, |
| 'summary_cache': self.summary_cache, |
| 'topic_keywords': self.topic_keywords |
| } |
| with open(self.memory_file, 'wb') as f: |
| pickle.dump(data, f) |
| except Exception as e: |
| print(f"⚠️ Erreur lors de la sauvegarde de la mémoire: {e}") |
| |
| def add_conversation(self, user_message: str, assistant_response: str, metadata: Optional[Dict] = None): |
| """Ajoute une conversation à l'historique""" |
| entry = { |
| 'timestamp': datetime.now().isoformat(), |
| 'user': user_message, |
| 'assistant': assistant_response, |
| 'metadata': metadata or {} |
| } |
| self.conversation_history.append(entry) |
| |
| |
| if len(self.conversation_history) >= self.compression_threshold: |
| self._compress_memory() |
| |
| self._save_memory() |
| |
| def add_research_result(self, topic: str, result: any, keywords: List[str]): |
| """Cache un résultat de recherche""" |
| self.research_cache[topic] = { |
| 'result': result, |
| 'timestamp': datetime.now().isoformat() |
| } |
| self.topic_keywords[topic] = keywords |
| self._save_memory() |
| |
| def get_research_result(self, topic: str, max_age_hours: int = 24) -> Optional[any]: |
| """Récupère un résultat de recherche en cache""" |
| if topic not in self.research_cache: |
| return None |
| |
| cached = self.research_cache[topic] |
| cached_time = datetime.fromisoformat(cached['timestamp']) |
| |
| from datetime import timedelta |
| if datetime.now() - cached_time > timedelta(hours=max_age_hours): |
| print(f"ℹ️ Cache expiré pour '{topic}'") |
| return None |
| |
| print(f"✅ Résultat récupéré du cache pour '{topic}'") |
| return cached['result'] |
| |
| def add_summary(self, topic: str, summary: str): |
| """Ajoute un résumé au cache""" |
| self.summary_cache[topic] = { |
| 'summary': summary, |
| 'timestamp': datetime.now().isoformat() |
| } |
| self._save_memory() |
| |
| def get_conversation_context(self, n_last: int = 5) -> str: |
| """Récupère le contexte des N dernières conversations""" |
| recent = list(self.conversation_history)[-n_last:] |
| |
| if not recent: |
| return "" |
| |
| context = "Contexte des conversations récentes:\n" |
| for i, conv in enumerate(recent, 1): |
| context += f"\n[Conversation {i}]\n" |
| context += f"User: {conv['user'][:100]}...\n" |
| context += f"Assistant: {conv['assistant'][:100]}...\n" |
| |
| return context |
| |
| def _compress_memory(self): |
| """Compresse la mémoire en gardant seulement les éléments importants""" |
| print("🗜️ Compression de la mémoire...") |
| |
| |
| from datetime import timedelta |
| cutoff = datetime.now() - timedelta(days=7) |
| |
| topics_to_remove = [] |
| for topic, data in self.research_cache.items(): |
| if datetime.fromisoformat(data['timestamp']) < cutoff: |
| topics_to_remove.append(topic) |
| |
| for topic in topics_to_remove: |
| del self.research_cache[topic] |
| if topic in self.topic_keywords: |
| del self.topic_keywords[topic] |
| |
| print(f"✅ {len(topics_to_remove)} anciennes recherches supprimées") |
| self._save_memory() |
| |
| def get_related_topics(self, topic: str, threshold: float = 0.5) -> List[str]: |
| """Trouve les topics similaires dans l'historique""" |
| from difflib import SequenceMatcher |
| |
| related = [] |
| for cached_topic in self.research_cache.keys(): |
| similarity = SequenceMatcher(None, topic.lower(), cached_topic.lower()).ratio() |
| if similarity > threshold: |
| related.append((cached_topic, similarity)) |
| |
| return [t for t, _ in sorted(related, key=lambda x: x[1], reverse=True)] |
| |
| def clear_all(self): |
| """Réinitialise complètement la mémoire""" |
| print("🗑️ Réinitialisation complète de la mémoire...") |
| self.conversation_history.clear() |
| self.research_cache.clear() |
| self.summary_cache.clear() |
| self.topic_keywords.clear() |
| self._save_memory() |
| print("✅ Mémoire réinitialisée") |
|
|
|
|
| |
| |
| |
|
|
| class IntegratedMemorySystem: |
| """Système de mémoire intégré combinant vectoriel et agent""" |
| |
| def __init__(self): |
| self.vector_memory = VectorMemoryManager() |
| self.agent_memory = AgentMemoryManager() |
| print("✨ Système de mémoire intégré initialisé\n") |
| |
| def process_research_result(self, |
| topic: str, |
| extraction_result: any, |
| summarization_result: any, |
| global_synthesis: any): |
| """ |
| Traite et stocke tous les résultats d'une recherche |
| |
| Args: |
| topic: Sujet de la recherche |
| extraction_result: Résultat de l'extraction |
| summarization_result: Résultat des résumés |
| global_synthesis: Synthèse globale |
| """ |
| print(f"\n💾 Stockage des résultats pour '{topic}'...") |
| |
| |
| if extraction_result and hasattr(extraction_result, 'documents'): |
| docs_to_store = [] |
| for doc in extraction_result.documents: |
| docs_to_store.append({ |
| 'content': doc.content, |
| 'title': doc.title, |
| 'url': str(doc.url) |
| }) |
| self.vector_memory.add_documents(docs_to_store, source='research') |
| |
| |
| if summarization_result and hasattr(summarization_result, 'summaries'): |
| summaries_to_store = [] |
| for summary in summarization_result.summaries: |
| summaries_to_store.append({ |
| 'content': summary.detailed_summary, |
| 'title': summary.title, |
| 'url': str(summary.url) |
| }) |
| self.vector_memory.add_documents(summaries_to_store, source='summary') |
| |
| |
| if global_synthesis and hasattr(global_synthesis, 'final_report'): |
| synthesis_text = global_synthesis.final_report.formatted_outputs.get('text', '') |
| self.vector_memory.add_documents([{ |
| 'content': synthesis_text, |
| 'title': f"Synthèse: {topic}", |
| 'url': '' |
| }], source='synthesis') |
| |
| |
| keywords = [] |
| if hasattr(extraction_result, 'documents'): |
| |
| all_text = ' '.join([doc.content[:100] for doc in extraction_result.documents[:3]]) |
| keywords = list(set(all_text.split()[:10])) |
| |
| self.agent_memory.add_research_result(topic, global_synthesis, keywords) |
| |
| print("✅ Tous les résultats stockés avec succès") |
| |
| def retrieve_context_for_query(self, query: str, use_cache: bool = True) -> Dict: |
| """ |
| Récupère le contexte pertinent pour une requête |
| |
| Args: |
| query: Requête de l'utilisateur |
| use_cache: Utiliser le cache si disponible |
| |
| Returns: |
| Dict avec le contexte vectoriel et conversationnel |
| """ |
| context = { |
| 'semantic_context': '', |
| 'conversation_context': '', |
| 'cached_result': None, |
| 'related_topics': [] |
| } |
| |
| |
| if use_cache: |
| context['cached_result'] = self.agent_memory.get_research_result(query) |
| |
| |
| context['semantic_context'] = self.vector_memory.get_relevant_context(query, k=3) |
| |
| |
| context['conversation_context'] = self.agent_memory.get_conversation_context(n_last=3) |
| |
| |
| context['related_topics'] = self.agent_memory.get_related_topics(query) |
| |
| return context |
|
|
|
|
| |
| |
| |
|
|
| |
| memory_system = IntegratedMemorySystem() |
|
|
| print("="*60) |
| print("✅ SYSTÈME DE MÉMOIRE PRÊT") |
| print("="*60) |