| """ |
| Agent Content Extractor - Extraction et nettoyage de contenu web. |
| Extrait le contenu de pages web, PDFs et autres documents. |
| """ |
|
|
| import asyncio |
| from typing import List, Optional |
| from datetime import datetime |
|
|
| from src.agents.base_agent import BaseAgent |
| from src.models.document_models import Document, ExtractionInput, ExtractionResult |
| from src.models.research_models import ResearchOutput |
| from src.models.state_models import AgentState, AgentType |
| from src.services.content_extraction import ContentExtractionManager, ContentExtractionError |
| from src.core.logging import setup_logger |
|
|
|
|
| class ContentExtractorAgent(BaseAgent[ExtractionInput, ExtractionResult]): |
| """ |
| Agent responsable de l'extraction de contenu depuis des URLs. |
| |
| Fonctionnalités: |
| - Extraction de contenu HTML avec nettoyage intelligent |
| - Support des PDFs et autres formats |
| - Traitement parallèle de plusieurs URLs |
| - Gestion des erreurs et retry automatique |
| - Structuration et nettoyage du contenu |
| """ |
| |
| def __init__(self, max_concurrent_extractions: int = 5, max_retries: int = 2): |
| super().__init__( |
| agent_type=AgentType.CONTENT_EXTRACTOR, |
| name="content_extractor", |
| max_retries=max_retries, |
| timeout=300.0 |
| ) |
| self.extraction_manager = ContentExtractionManager( |
| max_concurrent=max_concurrent_extractions, |
| max_retries=max_retries |
| ) |
| |
| def validate_input(self, input_data: ExtractionInput) -> bool: |
| """ |
| Valide les données d'entrée pour l'extraction. |
| |
| Args: |
| input_data: Input contenant les URLs à extraire |
| |
| Returns: |
| True si les données sont valides |
| """ |
| if not input_data.urls: |
| self.logger.error("Aucune URL fournie pour l'extraction") |
| return False |
| |
| if len(input_data.urls) > 50: |
| self.logger.error(f"Trop d'URLs ({len(input_data.urls)}), maximum 50") |
| return False |
| |
| |
| valid_urls = self._filter_valid_urls(input_data.urls) |
| if not valid_urls: |
| self.logger.error("Aucune URL valide trouvée") |
| return False |
| |
| return True |
| |
| async def process_from_research_output(self, research_output: ResearchOutput) -> ExtractionResult: |
| """ |
| Traite directement un ResearchOutput pour extraire le contenu des URLs. |
| |
| Args: |
| research_output: Résultats de recherche avec URLs à extraire |
| |
| Returns: |
| ExtractionResult avec les documents extraits |
| """ |
| |
| urls = [str(result.url) for result in research_output.results] |
| |
| self.logger.info(f"Extraction de contenu depuis ResearchOutput: {len(urls)} URLs") |
| self.logger.info(f"Sujet de recherche: {research_output.query.topic}") |
| |
| |
| extraction_input = ExtractionInput( |
| urls=urls, |
| content_filters={ |
| 'min_content_length': 200, |
| 'max_content_length': 50000, |
| 'required_keywords': research_output.query.keywords |
| }, |
| extraction_options={ |
| 'source_query': research_output.query.topic, |
| 'search_keywords': research_output.query.keywords |
| } |
| ) |
| |
| |
| return await self.process(extraction_input) |
| |
| async def process(self, input_data: ExtractionInput) -> ExtractionResult: |
| """ |
| Exécute l'extraction de contenu pour les URLs fournies. |
| |
| Args: |
| input_data: Input contenant les URLs à extraire et les options |
| |
| Returns: |
| ExtractionResult avec les documents extraits |
| |
| Raises: |
| ValueError: Si les URLs sont invalides |
| ContentExtractionError: Si l'extraction échoue |
| """ |
| start_time = datetime.now() |
| self.logger.info(f"Début extraction de contenu pour {len(input_data.urls)} URLs") |
| |
| |
| valid_urls = self._filter_valid_urls(input_data.urls) |
| self.logger.info(f"URLs valides à traiter: {len(valid_urls)}/{len(input_data.urls)}") |
| |
| try: |
| |
| documents = await self._extract_all_content(valid_urls, input_data) |
| |
| |
| processed_documents = self._post_process_documents(documents, input_data) |
| |
| |
| execution_time = (datetime.now() - start_time).total_seconds() |
| |
| |
| successful_urls = {str(doc.url) for doc in processed_documents} |
| failed_urls = [url for url in valid_urls if url not in successful_urls] |
| |
| |
| result = ExtractionResult( |
| documents=processed_documents, |
| total_urls=len(input_data.urls), |
| successful_extractions=len(processed_documents), |
| failed_extractions=len(input_data.urls) - len(processed_documents), |
| failed_urls=failed_urls, |
| execution_time=execution_time, |
| extraction_stats=self._calculate_stats(processed_documents) |
| ) |
| |
| self.logger.info( |
| f"Extraction terminée: {result.successful_extractions}/{result.total_urls} " |
| f"succès en {execution_time:.2f}s" |
| ) |
| |
| return result |
| |
| except Exception as e: |
| self.logger.error(f"Erreur lors de l'extraction: {str(e)}") |
| raise ContentExtractionError(f"Échec de l'extraction de contenu: {str(e)}") |
| |
| def _filter_valid_urls(self, urls: List[str]) -> List[str]: |
| """Filtre et valide les URLs.""" |
| import re |
| from urllib.parse import urlparse |
| |
| valid_urls = [] |
| url_pattern = re.compile( |
| r'^https?://' |
| r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
| r'localhost|' |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
| r'(?::\d+)?' |
| r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
| |
| for url in urls: |
| if not url or not isinstance(url, str): |
| self.logger.warning(f"URL invalide ignorée: {url}") |
| continue |
| |
| url = url.strip() |
| if not url: |
| continue |
| |
| |
| if not url_pattern.match(url): |
| self.logger.warning(f"Format URL invalide: {url}") |
| continue |
| |
| |
| try: |
| parsed = urlparse(url) |
| if not parsed.netloc: |
| self.logger.warning(f"URL sans domaine: {url}") |
| continue |
| |
| valid_urls.append(url) |
| |
| except Exception as e: |
| self.logger.warning(f"Erreur de parsing URL {url}: {e}") |
| continue |
| |
| return valid_urls |
| |
| async def _extract_all_content(self, urls: List[str], input_data: ExtractionInput) -> List[Document]: |
| """Extrait le contenu de toutes les URLs.""" |
| try: |
| |
| documents = await self.extraction_manager.extract_multiple(urls) |
| |
| |
| if input_data.content_filters: |
| documents = self._apply_content_filters(documents, input_data.content_filters) |
| |
| return documents |
| |
| except Exception as e: |
| self.logger.error(f"Erreur lors de l'extraction multiple: {str(e)}") |
| raise |
| |
| def _apply_content_filters(self, documents: List[Document], filters: dict) -> List[Document]: |
| """Applique les filtres de contenu aux documents.""" |
| filtered_documents = [] |
| |
| for doc in documents: |
| |
| min_length = filters.get('min_content_length', 100) |
| if len(doc.content) < min_length: |
| self.logger.debug(f"Document {doc.title} trop court: {len(doc.content)} caractères") |
| continue |
| |
| |
| max_length = filters.get('max_content_length', 100000) |
| if len(doc.content) > max_length: |
| self.logger.debug(f"Document {doc.title} trop long, troncature") |
| doc.content = doc.content[:max_length] + "... [Contenu tronqué]" |
| |
| |
| required_language = filters.get('language') |
| if required_language and doc.language != required_language: |
| self.logger.debug(f"Document {doc.title} ignoré: langue {doc.language}") |
| continue |
| |
| |
| required_keywords = filters.get('required_keywords', []) |
| if required_keywords: |
| content_lower = doc.content.lower() |
| if not any(keyword.lower() in content_lower for keyword in required_keywords): |
| self.logger.debug(f"Document {doc.title} ignoré: mots-clés manquants") |
| continue |
| |
| filtered_documents.append(doc) |
| |
| self.logger.info(f"Filtres appliqués: {len(filtered_documents)}/{len(documents)} documents retenus") |
| return filtered_documents |
| |
| def _post_process_documents(self, documents: List[Document], input_data: ExtractionInput) -> List[Document]: |
| """Post-traitement des documents extraits.""" |
| processed_docs = [] |
| |
| for doc in documents: |
| |
| doc.content = self._clean_content(doc.content) |
| |
| |
| doc.word_count = len(doc.content.split()) |
| |
| |
| if self._is_valid_document(doc, input_data): |
| processed_docs.append(doc) |
| else: |
| self.logger.debug(f"Document {doc.title} rejeté lors de la validation finale") |
| |
| return processed_docs |
| |
| def _clean_content(self, content: str) -> str: |
| """Nettoyage avancé du contenu.""" |
| import re |
| |
| if not content: |
| return "" |
| |
| |
| content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', content) |
| |
| |
| content = re.sub(r'[ \t]+', ' ', content) |
| |
| |
| content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) |
| |
| |
| lines = content.split('\n') |
| lines = [line.strip() for line in lines] |
| content = '\n'.join(lines) |
| |
| |
| content = re.sub(r'\n{3,}', '\n\n', content) |
| |
| return content.strip() |
| |
| def _is_valid_document(self, doc: Document, input_data: ExtractionInput) -> bool: |
| """Valide un document extrait.""" |
| |
| if not doc.content or not doc.content.strip(): |
| return False |
| |
| if len(doc.content) < 50: |
| return False |
| |
| |
| if doc.word_count < 20: |
| return False |
| |
| |
| if hasattr(input_data, 'min_quality_score'): |
| quality_score = self._calculate_content_quality(doc) |
| if quality_score < input_data.min_quality_score: |
| return False |
| |
| return True |
| |
| def _calculate_content_quality(self, doc: Document) -> float: |
| """Calcule un score de qualité pour le contenu (0-1).""" |
| score = 0.0 |
| |
| |
| if doc.word_count > 100: |
| score += 0.3 |
| elif doc.word_count > 50: |
| score += 0.1 |
| |
| |
| if doc.title and len(doc.title) > 10: |
| score += 0.2 |
| |
| if doc.author: |
| score += 0.1 |
| |
| if doc.published_date: |
| score += 0.1 |
| |
| |
| content = doc.content.lower() |
| if any(marker in content for marker in ['conclusion', 'introduction', 'sommaire']): |
| score += 0.2 |
| |
| |
| lines = doc.content.split('\n') |
| unique_lines = set(line.strip() for line in lines if line.strip()) |
| if len(lines) > 0: |
| uniqueness_ratio = len(unique_lines) / len(lines) |
| if uniqueness_ratio < 0.5: |
| score -= 0.2 |
| |
| return max(0.0, min(1.0, score)) |
| |
| def _calculate_stats(self, documents: List[Document]) -> dict: |
| """Calcule les statistiques d'extraction.""" |
| if not documents: |
| return { |
| 'total_words': 0, |
| 'average_words_per_doc': 0, |
| 'doc_types': {}, |
| 'languages': {}, |
| 'has_authors': 0, |
| 'has_dates': 0 |
| } |
| |
| total_words = sum(doc.word_count for doc in documents) |
| |
| |
| doc_types = {} |
| for doc in documents: |
| doc_type = doc.doc_type.value if doc.doc_type else 'unknown' |
| doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 |
| |
| |
| languages = {} |
| for doc in documents: |
| lang = doc.language or 'unknown' |
| languages[lang] = languages.get(lang, 0) + 1 |
| |
| |
| has_authors = sum(1 for doc in documents if doc.author) |
| has_dates = sum(1 for doc in documents if doc.published_date) |
| |
| return { |
| 'total_words': total_words, |
| 'average_words_per_doc': total_words // len(documents), |
| 'doc_types': doc_types, |
| 'languages': languages, |
| 'has_authors': has_authors, |
| 'has_dates': has_dates |
| } |
|
|
|
|
| |
| async def extract_content_from_urls(urls: List[str], **options) -> List[Document]: |
| """ |
| Fonction utilitaire pour extraire du contenu depuis une liste d'URLs. |
| |
| Args: |
| urls: Liste des URLs à extraire |
| **options: Options d'extraction (filters, etc.) |
| |
| Returns: |
| Liste des documents extraits |
| """ |
| agent = ContentExtractorAgent() |
| |
| input_data = ExtractionInput( |
| urls=urls, |
| content_filters=options.get('content_filters', {}), |
| extraction_options=options.get('extraction_options', {}) |
| ) |
| |
| result = await agent.execute(input_data) |
| return result.documents |
|
|
|
|
| |
| async def extract_from_search_results(search_results: List[dict]) -> List[Document]: |
| """ |
| Extrait le contenu depuis des résultats de recherche. |
| |
| Args: |
| search_results: Résultats de recherche avec URLs |
| |
| Returns: |
| Liste des documents extraits |
| """ |
| urls = [] |
| for result in search_results: |
| if isinstance(result, dict) and 'url' in result: |
| urls.append(result['url']) |
| elif hasattr(result, 'url'): |
| urls.append(result.url) |
| |
| if not urls: |
| return [] |
| |
| return await extract_content_from_urls(urls) |
|
|
|
|
| |
| def save_extraction_result(result: ExtractionResult, filename: str = None) -> str: |
| """ |
| Sauvegarde un ExtractionResult dans un fichier JSON. |
| |
| Args: |
| result: Résultat d'extraction à sauvegarder |
| filename: Nom du fichier (optionnel) |
| |
| Returns: |
| Nom du fichier sauvegardé |
| """ |
| import json |
| from datetime import datetime |
| |
| if not filename: |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"extraction_result_{result.successful_extractions}docs_{timestamp}.json" |
| |
| try: |
| |
| result_dict = result.model_dump(mode='json') |
| |
| |
| with open(filename, 'w', encoding='utf-8') as f: |
| json.dump(result_dict, f, indent=2, ensure_ascii=False) |
| |
| return filename |
| |
| except Exception as e: |
| raise Exception(f"Erreur lors de la sauvegarde: {e}") |
|
|
|
|
| def load_extraction_result(filename: str) -> ExtractionResult: |
| """ |
| Charge un ExtractionResult depuis un fichier JSON. |
| |
| Args: |
| filename: Nom du fichier à charger |
| |
| Returns: |
| ExtractionResult chargé |
| """ |
| import json |
| |
| try: |
| with open(filename, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| |
| return ExtractionResult(**data) |
| |
| except Exception as e: |
| raise Exception(f"Erreur lors du chargement: {e}") |
|
|
|
|
| |
| logger = setup_logger("ContentExtractorAgent") |
| |
| if __name__ == "__main__": |
| import asyncio |
| import json |
| from src.models.research_models import ResearchOutput |
| |
| async def test_with_research_output(): |
| """Test avec un fichier ResearchOutput sauvegardé.""" |
| |
| research_file = "research_output_impact_de_lintelligence_artifi_20251116_141136.json" |
| |
| try: |
| |
| with open(research_file, 'r', encoding='utf-8') as f: |
| research_data = json.load(f) |
| |
| research_output = ResearchOutput(**research_data) |
| logger.info(f"=== CHARGEMENT DU RESEARCH OUTPUT ===") |
| logger.info(f"Sujet: {research_output.query.topic}") |
| logger.info(f"URLs à extraire: {len(research_output.results)}") |
| |
| |
| agent = ContentExtractorAgent() |
| |
| logger.info(f"=== DÉBUT DE L'EXTRACTION DE CONTENU ===") |
| extraction_result = await agent.process_from_research_output(research_output) |
| |
| logger.info(f"=== RÉSULTATS D'EXTRACTION ===") |
| logger.info(f"URLs traitées: {extraction_result.total_urls}") |
| logger.info(f"Extractions réussies: {extraction_result.successful_extractions}") |
| logger.info(f"Extractions échouées: {extraction_result.failed_extractions}") |
| logger.info(f"Temps d'exécution: {extraction_result.execution_time:.2f}s") |
| |
| |
| for i, doc in enumerate(extraction_result.documents, 1): |
| logger.info(f"\n{i}. {doc.title}") |
| logger.info(f" URL: {doc.url}") |
| logger.info(f" Mots: {doc.word_count}") |
| logger.info(f" Langue: {doc.language}") |
| logger.info(f" Type: {doc.doc_type}") |
| logger.info(f" Contenu (aperçu): {doc.content[:200]}...") |
| |
| |
| if extraction_result.failed_urls: |
| logger.info(f"\n❌ URLs en échec:") |
| for url in extraction_result.failed_urls: |
| logger.info(f" • {url}") |
| |
| |
| logger.info(f"\n=== SAUVEGARDE DE L'EXTRACTION RESULT ===") |
| |
| try: |
| filename = save_extraction_result(extraction_result) |
| logger.info(f"✅ ExtractionResult sauvegardé dans: {filename}") |
| |
| |
| logger.info("📄 Contenu sauvegardé:") |
| logger.info(f" • Documents extraits: {len(extraction_result.documents)}") |
| logger.info(f" • Temps d'extraction: {extraction_result.execution_time:.2f}s") |
| logger.info(f" • Statistiques: {extraction_result.extraction_stats}") |
| |
| |
| logger.info("=== Test de chargement ===") |
| loaded_result = load_extraction_result(filename) |
| logger.info(f"✅ ExtractionResult rechargé avec succès") |
| logger.info(f" • Vérification: {len(loaded_result.documents)} documents chargés") |
| |
| |
| if loaded_result.successful_extractions == extraction_result.successful_extractions: |
| logger.info("✅ Intégrité des données vérifiée") |
| else: |
| logger.error("❌ Erreur d'intégrité des données") |
| |
| |
| logger.info("\n📋 EXEMPLE DE FORMAT JSON SAUVEGARDÉ:") |
| logger.info("-" * 50) |
| |
| |
| example_result = { |
| "documents": [ |
| { |
| "title": doc.title, |
| "url": str(doc.url), |
| "content": doc.content[:200] + "...", |
| "word_count": doc.word_count, |
| "language": doc.language, |
| "doc_type": doc.doc_type.value if doc.doc_type else None |
| } for doc in extraction_result.documents[:2] |
| ], |
| "total_urls": extraction_result.total_urls, |
| "successful_extractions": extraction_result.successful_extractions, |
| "failed_extractions": extraction_result.failed_extractions, |
| "failed_urls": extraction_result.failed_urls, |
| "execution_time": extraction_result.execution_time, |
| "extraction_stats": extraction_result.extraction_stats |
| } |
| |
| print(json.dumps(example_result, indent=2, ensure_ascii=False)) |
| |
| except Exception as save_error: |
| logger.error(f"❌ Erreur lors de la sauvegarde: {save_error}") |
| |
| except FileNotFoundError: |
| logger.error(f"❌ Fichier ResearchOutput non trouvé: {research_file}") |
| logger.info("Utilisation de l'exemple avec URLs directes...") |
| await test_with_direct_urls() |
| except Exception as e: |
| logger.error(f"❌ Erreur lors du traitement: {e}") |
| |
| async def test_with_direct_urls(): |
| """Test avec des URLs directes.""" |
| urls = [ |
| 'https://www.iana.org/help/example-domains', |
| ] |
| |
| logger.info(f"=== TEST AVEC URLS DIRECTES ===") |
| documents = await extract_content_from_urls(urls) |
| for doc in documents: |
| logger.info(f"Title: {doc.title}, URL: {doc.url}, Word Count: {doc.word_count}, Language: {doc.language}, Content Length: {len(doc.content)}") |
| |
| |
| import sys |
| if len(sys.argv) > 1 and sys.argv[1] == "--direct": |
| asyncio.run(test_with_direct_urls()) |
| else: |
| asyncio.run(test_with_research_output()) |