| """ |
| Service d'extraction de contenu web. |
| Supporte HTML, PDF et autres formats de documents. |
| """ |
|
|
| import aiohttp |
| import asyncio |
| from typing import Optional, List, Dict, Any, TYPE_CHECKING |
| from urllib.parse import urljoin, urlparse |
| from datetime import datetime |
| import re |
| import mimetypes |
|
|
| from asyncssh import logger |
|
|
| from src.core.logging import setup_logger |
| from src.models.document_models import Document, DocumentType |
|
|
| |
| try: |
| from bs4 import BeautifulSoup |
| BEAUTIFULSOUP_AVAILABLE = True |
| except ImportError: |
| BEAUTIFULSOUP_AVAILABLE = False |
|
|
| if TYPE_CHECKING: |
| from bs4 import BeautifulSoup |
|
|
| try: |
| import PyPDF2 |
| PDF_AVAILABLE = True |
| except ImportError: |
| PDF_AVAILABLE = False |
| PyPDF2 = None |
|
|
| try: |
| import requests |
| REQUESTS_AVAILABLE = True |
| except ImportError: |
| REQUESTS_AVAILABLE = False |
| requests = None |
|
|
|
|
| class ContentExtractionError(Exception): |
| """Exception pour les erreurs d'extraction de contenu.""" |
| pass |
|
|
|
|
| class WebContentExtractor: |
| """ |
| Extracteur de contenu web avec support multi-format. |
| """ |
| |
| def __init__(self, timeout: int = 30, max_content_length: int = 10_000_000): |
| self.logger = setup_logger("content_extractor") |
| self.timeout = timeout |
| self.max_content_length = max_content_length |
| |
| |
| self.headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
| 'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8', |
| 'Accept-Encoding': 'gzip, deflate', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1', |
| } |
| |
| |
| self._check_dependencies() |
| |
| def _check_dependencies(self): |
| """Vérifie que les dépendances nécessaires sont installées.""" |
| if not BEAUTIFULSOUP_AVAILABLE: |
| self.logger.warning("BeautifulSoup4 non installé - extraction HTML limitée") |
| if not PDF_AVAILABLE: |
| self.logger.warning("PyPDF2 non installé - extraction PDF non disponible") |
| if not REQUESTS_AVAILABLE: |
| self.logger.warning("requests non installé - extraction synchrone non disponible") |
| |
| async def extract_content(self, url: str) -> Document: |
| """ |
| Extrait le contenu d'une URL. |
| |
| Args: |
| url: URL à extraire |
| |
| Returns: |
| Document avec le contenu extrait |
| |
| Raises: |
| ContentExtractionError: Si l'extraction échoue |
| """ |
| self.logger.info(f"Extraction de contenu: {url}") |
| |
| try: |
| |
| content_type = await self._detect_content_type(url) |
| |
| if content_type.startswith('application/pdf'): |
| return await self._extract_pdf_content(url) |
| elif content_type.startswith('text/html') or 'html' in content_type: |
| return await self._extract_html_content(url) |
| else: |
| |
| |
| return await self._extract_generic_content(url) |
| |
| except Exception as e: |
| self.logger.error(f"Erreur lors de l'extraction de {url}: {str(e)}") |
| raise ContentExtractionError(f"Impossible d'extraire le contenu de {url}: {str(e)}") |
| |
| async def _detect_content_type(self, url: str) -> str: |
| """Détecte le type de contenu d'une URL.""" |
| try: |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: |
| async with session.head(url, headers=self.headers) as response: |
| content_type = response.headers.get('content-type', '').lower() |
| if content_type: |
| return content_type.split(';')[0] |
| |
| |
| parsed_url = urlparse(url) |
| content_type, _ = mimetypes.guess_type(parsed_url.path) |
| return content_type or 'text/html' |
| |
| except Exception as e: |
| self.logger.warning(f"Impossible de détecter le type de contenu pour {url}: {e}") |
| return 'text/html' |
| |
| async def _extract_html_content(self, url: str) -> Document: |
| """Extrait le contenu d'une page HTML.""" |
| if not BEAUTIFULSOUP_AVAILABLE: |
| raise ContentExtractionError("BeautifulSoup4 non installé pour l'extraction HTML") |
| |
| async with aiohttp.ClientSession( |
| timeout=aiohttp.ClientTimeout(total=self.timeout) |
| ) as session: |
| async with session.get(url, headers=self.headers) as response: |
| if response.status != 200: |
| raise ContentExtractionError(f"Erreur HTTP {response.status} pour {url}") |
| |
| |
| content_length = response.headers.get('content-length') |
| if content_length and int(content_length) > self.max_content_length: |
| raise ContentExtractionError(f"Contenu trop volumineux: {content_length} bytes") |
| |
| html_content = await response.text() |
| |
| |
| from bs4 import BeautifulSoup |
| soup = BeautifulSoup(html_content, 'html.parser') |
| |
| |
| title = self._extract_title(soup) |
| |
| |
| content = self._extract_main_content(soup) |
| |
| if len(content) > self.max_content_length: |
| raise ContentExtractionError(f"Contenu extrait trop volumineux: {len(content)} caractères") |
| |
| |
| |
| |
| author = self._extract_author(soup) |
| publish_date = self._extract_publish_date(soup) |
| |
| return Document( |
| title=title, |
| url=url, |
| content=content, |
| doc_type=DocumentType.ARTICLE, |
| author=author, |
| published_date=publish_date, |
| word_count=len(content.split()), |
| language='fr' |
| ) |
| |
| def _extract_title(self, soup: "BeautifulSoup") -> str: |
| """Extrait le titre de la page.""" |
| |
| |
| |
| title_tag = soup.find('title') |
| if title_tag and title_tag.get_text().strip(): |
| return title_tag.get_text().strip() |
| |
| |
| og_title = soup.find('meta', {'property': 'og:title'}) |
| if og_title and og_title.get('content'): |
| return og_title.get('content').strip() |
| |
| |
| h1 = soup.find('h1') |
| if h1 and h1.get_text().strip(): |
| return h1.get_text().strip() |
| |
| |
| return "Titre non trouvé" |
| |
| def _extract_main_content(self, soup: "BeautifulSoup") -> str: |
| """Extrait le contenu principal de la page.""" |
| |
| for element in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form']): |
| element.decompose() |
| |
| |
| for comment in soup.find_all(string=lambda text: isinstance(text, str) and text.strip().startswith('<!--')): |
| comment.extract() |
| |
| |
| content_selectors = [ |
| 'article', |
| '[role="main"]', |
| 'main', |
| '.content', |
| '.post-content', |
| '.entry-content', |
| '.article-content', |
| '#content', |
| '.main-content' |
| ] |
| |
| main_content = None |
| for selector in content_selectors: |
| element = soup.select_one(selector) |
| if element: |
| main_content = element |
| break |
| |
| |
| if not main_content: |
| main_content = soup.find('body') or soup |
| |
| |
| return self._clean_text(main_content.get_text()) |
| |
| def _clean_text(self, text: str) -> str: |
| """Nettoie et formate le texte extrait.""" |
| if not text: |
| return "" |
| |
| |
| text = re.sub(r'\s+', ' ', text) |
| text = re.sub(r'\n\s*\n', '\n\n', text) |
| |
| |
| text = text.strip() |
| |
| |
| if len(text) > 50000: |
| text = text[:50000] + "... [Contenu tronqué]" |
| |
| return text |
| |
| def _extract_author(self, soup: "BeautifulSoup") -> Optional[str]: |
| """Extrait l'auteur de l'article.""" |
| |
| author_meta = soup.find('meta', {'name': 'author'}) |
| if author_meta and author_meta.get('content'): |
| return author_meta.get('content').strip() |
| |
| |
| author_schema = soup.find(attrs={'itemprop': 'author'}) |
| if author_schema: |
| return author_schema.get_text().strip() |
| |
| |
| author_selectors = [ |
| '.author', |
| '.byline', |
| '.post-author', |
| '.article-author' |
| ] |
| |
| for selector in author_selectors: |
| element = soup.select_one(selector) |
| if element: |
| author_text = element.get_text().strip() |
| if author_text and len(author_text) < 100: |
| return author_text |
| |
| return None |
| |
| def _extract_publish_date(self, soup: "BeautifulSoup") -> Optional[datetime]: |
| """Extrait la date de publication.""" |
| |
| time_meta = soup.find('meta', {'property': 'article:published_time'}) |
| if time_meta and time_meta.get('content'): |
| try: |
| from dateutil.parser import parse |
| return parse(time_meta.get('content')) |
| except: |
| pass |
| |
| |
| date_schema = soup.find(attrs={'itemprop': 'datePublished'}) |
| if date_schema: |
| date_str = date_schema.get('datetime') or date_schema.get_text() |
| try: |
| from dateutil.parser import parse |
| return parse(date_str) |
| except: |
| pass |
| |
| return None |
| |
| async def _extract_pdf_content(self, url: str) -> Document: |
| """Extrait le contenu d'un PDF.""" |
| if not PDF_AVAILABLE: |
| raise ContentExtractionError("PyPDF2 non installé pour l'extraction PDF") |
| |
| |
| async with aiohttp.ClientSession( |
| timeout=aiohttp.ClientTimeout(total=self.timeout) |
| ) as session: |
| async with session.get(url, headers=self.headers) as response: |
| if response.status != 200: |
| raise ContentExtractionError(f"Erreur HTTP {response.status} pour {url}") |
| |
| pdf_content = await response.read() |
| |
| |
| try: |
| import io |
| pdf_file = io.BytesIO(pdf_content) |
| pdf_reader = PyPDF2.PdfReader(pdf_file) |
| |
| title = "Document PDF" |
| content = "" |
| |
| |
| for page in pdf_reader.pages: |
| page_text = page.extract_text() |
| content += page_text + "\n" |
| |
| |
| content = self._clean_text(content) |
| |
| return Document( |
| title=title, |
| url=url, |
| content=content, |
| doc_type=DocumentType.ACADEMIC_PAPER, |
| word_count=len(content.split()), |
| language='fr' |
| ) |
| |
| except Exception as e: |
| raise ContentExtractionError(f"Erreur lors de l'extraction PDF: {str(e)}") |
| |
| async def _extract_generic_content(self, url: str) -> Document: |
| """Extraction générique pour les autres types de contenu.""" |
| async with aiohttp.ClientSession( |
| timeout=aiohttp.ClientTimeout(total=self.timeout) |
| ) as session: |
| async with session.get(url, headers=self.headers) as response: |
| if response.status != 200: |
| raise ContentExtractionError(f"Erreur HTTP {response.status} pour {url}") |
| |
| content = await response.text() |
| |
| |
| content = self._clean_text(content) |
| |
| return Document( |
| title=f"Document depuis {urlparse(url).netloc}", |
| url=url, |
| content=content, |
| doc_type=DocumentType.OTHER, |
| word_count=len(content.split()), |
| language='fr' |
| ) |
|
|
|
|
| class ContentExtractionManager: |
| """ |
| Gestionnaire d'extraction de contenu avec gestion des erreurs et retry. |
| """ |
| |
| def __init__(self, max_concurrent: int = 5, max_retries: int = 2): |
| self.logger = setup_logger("extraction_manager") |
| self.extractor = WebContentExtractor() |
| self.max_concurrent = max_concurrent |
| self.max_retries = max_retries |
| self.semaphore = asyncio.Semaphore(max_concurrent) |
| |
| async def extract_multiple(self, urls: List[str]) -> List[Document]: |
| """ |
| Extrait le contenu de plusieurs URLs en parallèle. |
| |
| Args: |
| urls: Liste des URLs à extraire |
| |
| Returns: |
| Liste des documents extraits (peut contenir moins d'éléments en cas d'erreur) |
| """ |
| self.logger.info(f"Extraction de contenu pour {len(urls)} URLs") |
| |
| |
| tasks = [self._extract_with_retry(url) for url in urls] |
| |
| |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| |
| documents = [] |
| for i, result in enumerate(results): |
| if isinstance(result, Document): |
| documents.append(result) |
| elif isinstance(result, Exception): |
| self.logger.error(f"Échec d'extraction pour {urls[i]}: {str(result)}") |
| else: |
| self.logger.warning(f"Résultat inattendu pour {urls[i]}: {type(result)}") |
| |
| self.logger.info(f"Extraction terminée: {len(documents)}/{len(urls)} succès") |
| return documents |
| |
| async def _extract_with_retry(self, url: str) -> Document: |
| """Extrait le contenu d'une URL avec retry automatique.""" |
| async with self.semaphore: |
| last_error = None |
| |
| for attempt in range(self.max_retries + 1): |
| try: |
| if attempt > 0: |
| |
| await asyncio.sleep(2 ** attempt) |
| self.logger.info(f"Tentative {attempt + 1}/{self.max_retries + 1} pour {url}") |
| |
| return await self.extractor.extract_content(url) |
| |
| except Exception as e: |
| last_error = e |
| if attempt < self.max_retries: |
| self.logger.warning(f"Tentative {attempt + 1} échouée pour {url}: {str(e)}") |
| else: |
| self.logger.error(f"Toutes les tentatives ont échoué pour {url}: {str(e)}") |
| |
| |
| raise last_error or ContentExtractionError(f"Échec d'extraction pour {url}") |
| |
|
|
|
|
|
|
|
|
| |
| |
| async def main(): |
| extractor_manager = ContentExtractionManager(max_concurrent=3, max_retries=2) |
| urls = [ |
| 'https://www.iana.org/help/example-domains', |
| 'https://documents1.worldbank.org/curated/en/691261636143890139/pdf/Taxing-Pollution.pdf' |
| ] |
| documents = await extractor_manager.extract_multiple(urls) |
| for doc in documents: |
| print(f"Title: {doc.title}, URL: {doc.url}, Word Count: {doc.word_count}, Language: {doc.language}, Content Length: {len(doc.content)}, \nContenu tronqué: {doc.content[:500]}") |
| logger.error("⚠️ pytest n'est pas installé. Impossible de tester les erreurs de validation.") |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |