| """ |
| Service de chunking pour la gestion des textes longs. |
| Divise intelligemment les documents en chunks pour le traitement par LLM. |
| """ |
|
|
| import re |
| from typing import List, Dict, Tuple, Optional |
| from dataclasses import dataclass |
|
|
| from src.core.logging import setup_logger |
|
|
|
|
| @dataclass |
| class TextChunk: |
| """Représente un chunk de texte avec métadonnées.""" |
| content: str |
| start_index: int |
| end_index: int |
| chunk_id: int |
| total_chunks: int |
| word_count: int |
| has_heading: bool = False |
| heading_text: Optional[str] = None |
|
|
|
|
| class TextChunker: |
| """ |
| Service de découpage intelligent de texte pour le traitement par LLM. |
| |
| Fonctionnalités: |
| - Découpage respectant les phrases et paragraphes |
| - Préservation des titres et structure |
| - Gestion du chevauchement entre chunks |
| - Optimisation pour les limites de tokens LLM |
| """ |
| |
| def __init__( |
| self, |
| max_chunk_size: int = 4000, |
| overlap_size: int = 200, |
| min_chunk_size: int = 500 |
| ): |
| self.max_chunk_size = max_chunk_size |
| self.overlap_size = overlap_size |
| self.min_chunk_size = min_chunk_size |
| self.logger = setup_logger("text_chunker") |
| |
| |
| self.heading_patterns = [ |
| r'^#{1,6}\s+.+$', |
| r'^\d+\.\s+.+$', |
| r'^[A-Z\s]{5,}$', |
| r'^\w+:$', |
| ] |
| |
| self.sentence_endings = r'[.!?]+(?:\s|$)' |
| self.paragraph_breaks = r'\n\s*\n' |
| |
| def chunk_text(self, text: str, preserve_structure: bool = True) -> List[TextChunk]: |
| """ |
| Découpe un texte en chunks intelligents. |
| |
| Args: |
| text: Texte à découper |
| preserve_structure: Préserver la structure (titres, paragraphes) |
| |
| Returns: |
| Liste des chunks créés |
| """ |
| if not text or len(text.strip()) == 0: |
| return [] |
| |
| |
| text = self._clean_text(text) |
| |
| |
| if len(text) <= self.max_chunk_size: |
| return [TextChunk( |
| content=text, |
| start_index=0, |
| end_index=len(text), |
| chunk_id=1, |
| total_chunks=1, |
| word_count=len(text.split()) |
| )] |
| |
| |
| if preserve_structure: |
| chunks = self._chunk_with_structure(text) |
| else: |
| chunks = self._chunk_simple(text) |
| |
| |
| chunks = self._post_process_chunks(chunks) |
| |
| self.logger.info(f"Texte découpé en {len(chunks)} chunks (taille moyenne: {sum(len(c.content) for c in chunks) // len(chunks)} caractères)") |
| |
| return chunks |
| |
| def _clean_text(self, text: str) -> str: |
| """Nettoie le texte avant découpage.""" |
| |
| text = re.sub(r'\s+', ' ', text) |
| |
| |
| text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) |
| |
| |
| text = text.strip() |
| |
| return text |
| |
| def _chunk_with_structure(self, text: str) -> List[TextChunk]: |
| """Découpage en préservant la structure du document.""" |
| chunks = [] |
| current_chunk = "" |
| current_start = 0 |
| |
| |
| paragraphs = re.split(self.paragraph_breaks, text) |
| text_position = 0 |
| |
| for paragraph in paragraphs: |
| if not paragraph.strip(): |
| continue |
| |
| |
| is_heading, heading_text = self._detect_heading(paragraph) |
| |
| |
| if len(current_chunk) + len(paragraph) > self.max_chunk_size and current_chunk: |
| |
| chunk = self._create_chunk( |
| current_chunk.strip(), |
| current_start, |
| text_position, |
| len(chunks) + 1 |
| ) |
| chunks.append(chunk) |
| |
| |
| overlap_text = self._get_overlap_text(current_chunk) |
| current_chunk = overlap_text + paragraph |
| current_start = text_position - len(overlap_text) |
| else: |
| |
| if current_chunk: |
| current_chunk += "\n\n" + paragraph |
| else: |
| current_chunk = paragraph |
| current_start = text_position |
| |
| text_position += len(paragraph) + 2 |
| |
| |
| if current_chunk.strip(): |
| chunk = self._create_chunk( |
| current_chunk.strip(), |
| current_start, |
| len(text), |
| len(chunks) + 1 |
| ) |
| chunks.append(chunk) |
| |
| return chunks |
| |
| def _chunk_simple(self, text: str) -> List[TextChunk]: |
| """Découpage simple par phrases.""" |
| chunks = [] |
| sentences = re.split(self.sentence_endings, text) |
| |
| current_chunk = "" |
| current_start = 0 |
| text_position = 0 |
| |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if not sentence: |
| continue |
| |
| |
| sentence_in_text = sentence + "." |
| |
| if len(current_chunk) + len(sentence_in_text) > self.max_chunk_size and current_chunk: |
| |
| chunk = self._create_chunk( |
| current_chunk.strip(), |
| current_start, |
| text_position, |
| len(chunks) + 1 |
| ) |
| chunks.append(chunk) |
| |
| |
| overlap_text = self._get_overlap_text(current_chunk) |
| current_chunk = overlap_text + sentence_in_text |
| current_start = text_position - len(overlap_text) |
| else: |
| if current_chunk: |
| current_chunk += " " + sentence_in_text |
| else: |
| current_chunk = sentence_in_text |
| current_start = text_position |
| |
| text_position += len(sentence_in_text) |
| |
| |
| if current_chunk.strip(): |
| chunk = self._create_chunk( |
| current_chunk.strip(), |
| current_start, |
| len(text), |
| len(chunks) + 1 |
| ) |
| chunks.append(chunk) |
| |
| return chunks |
| |
| def _detect_heading(self, paragraph: str) -> Tuple[bool, Optional[str]]: |
| """Détecte si un paragraphe est un titre.""" |
| lines = paragraph.strip().split('\n') |
| first_line = lines[0].strip() |
| |
| for pattern in self.heading_patterns: |
| if re.match(pattern, first_line, re.MULTILINE): |
| return True, first_line |
| |
| |
| if (len(first_line) < 100 and |
| len(first_line.split()) < 10 and |
| first_line[0].isupper()): |
| return True, first_line |
| |
| return False, None |
| |
| def _get_overlap_text(self, chunk: str) -> str: |
| """Extrait le texte de chevauchement à la fin d'un chunk.""" |
| if len(chunk) <= self.overlap_size: |
| return "" |
| |
| |
| sentences = re.split(self.sentence_endings, chunk[-self.overlap_size:]) |
| |
| if len(sentences) > 1: |
| |
| return ". ".join(sentences[1:]) + ". " |
| else: |
| |
| words = chunk.split() |
| overlap_words = [] |
| char_count = 0 |
| |
| for word in reversed(words): |
| if char_count + len(word) > self.overlap_size: |
| break |
| overlap_words.insert(0, word) |
| char_count += len(word) + 1 |
| |
| return " ".join(overlap_words) + " " if overlap_words else "" |
| |
| def _create_chunk(self, content: str, start: int, end: int, chunk_id: int) -> TextChunk: |
| """Crée un objet TextChunk avec métadonnées.""" |
| is_heading, heading_text = self._detect_heading(content) |
| |
| return TextChunk( |
| content=content, |
| start_index=start, |
| end_index=end, |
| chunk_id=chunk_id, |
| total_chunks=0, |
| word_count=len(content.split()), |
| has_heading=is_heading, |
| heading_text=heading_text |
| ) |
| |
| def _post_process_chunks(self, chunks: List[TextChunk]) -> List[TextChunk]: |
| """Post-traitement des chunks.""" |
| total_chunks = len(chunks) |
| |
| |
| for chunk in chunks: |
| chunk.total_chunks = total_chunks |
| |
| |
| merged_chunks = [] |
| i = 0 |
| |
| while i < len(chunks): |
| current_chunk = chunks[i] |
| |
| |
| if (len(current_chunk.content) < self.min_chunk_size and |
| i + 1 < len(chunks) and |
| len(current_chunk.content) + len(chunks[i + 1].content) <= self.max_chunk_size): |
| |
| |
| next_chunk = chunks[i + 1] |
| merged_content = current_chunk.content + "\n\n" + next_chunk.content |
| |
| merged_chunk = TextChunk( |
| content=merged_content, |
| start_index=current_chunk.start_index, |
| end_index=next_chunk.end_index, |
| chunk_id=len(merged_chunks) + 1, |
| total_chunks=0, |
| word_count=len(merged_content.split()), |
| has_heading=current_chunk.has_heading or next_chunk.has_heading, |
| heading_text=current_chunk.heading_text or next_chunk.heading_text |
| ) |
| |
| merged_chunks.append(merged_chunk) |
| i += 2 |
| else: |
| |
| current_chunk.chunk_id = len(merged_chunks) + 1 |
| merged_chunks.append(current_chunk) |
| i += 1 |
| |
| |
| for chunk in merged_chunks: |
| chunk.total_chunks = len(merged_chunks) |
| |
| return merged_chunks |
| |
| def get_chunking_stats(self, chunks: List[TextChunk]) -> Dict[str, any]: |
| """Calcule les statistiques de découpage.""" |
| if not chunks: |
| return {} |
| |
| chunk_sizes = [len(chunk.content) for chunk in chunks] |
| word_counts = [chunk.word_count for chunk in chunks] |
| |
| return { |
| "total_chunks": len(chunks), |
| "total_characters": sum(chunk_sizes), |
| "total_words": sum(word_counts), |
| "average_chunk_size": sum(chunk_sizes) // len(chunks), |
| "average_words_per_chunk": sum(word_counts) // len(chunks), |
| "min_chunk_size": min(chunk_sizes), |
| "max_chunk_size": max(chunk_sizes), |
| "chunks_with_headings": sum(1 for chunk in chunks if chunk.has_heading) |
| } |
|
|
|
|
| class ChunkingManager: |
| """ |
| Gestionnaire de chunking avec différentes stratégies. |
| """ |
| |
| def __init__(self): |
| self.logger = setup_logger("chunking_manager") |
| |
| |
| self.chunkers = { |
| "default": TextChunker(max_chunk_size=4000, overlap_size=200), |
| "small": TextChunker(max_chunk_size=2000, overlap_size=100), |
| "large": TextChunker(max_chunk_size=20000, overlap_size=300), |
| "precise": TextChunker(max_chunk_size=3000, overlap_size=150, min_chunk_size=800) |
| } |
| |
| def chunk_document( |
| self, |
| content: str, |
| strategy: str = "default", |
| preserve_structure: bool = True |
| ) -> List[TextChunk]: |
| """ |
| Découpe un document selon la stratégie spécifiée. |
| |
| Args: |
| content: Contenu à découper |
| strategy: Stratégie de découpage (default, small, large, precise) |
| preserve_structure: Préserver la structure du document |
| |
| Returns: |
| Liste des chunks créés |
| """ |
| if strategy not in self.chunkers: |
| self.logger.warning(f"Stratégie inconnue '{strategy}', utilisation de 'default'") |
| strategy = "default" |
| |
| chunker = self.chunkers[strategy] |
| chunks = chunker.chunk_text(content, preserve_structure) |
| |
| |
| stats = chunker.get_chunking_stats(chunks) |
| self.logger.info(f"Chunking '{strategy}': {stats['total_chunks']} chunks créés") |
| |
| return chunks |
| |
| def auto_select_strategy(self, content: str) -> str: |
| """Sélectionne automatiquement la meilleure stratégie de chunking.""" |
| content_length = len(content) |
| word_count = len(content.split()) |
| |
| |
| if content_length < 5000: |
| return "small" |
| elif content_length > 20000: |
| return "large" |
| elif word_count > 3000: |
| return "precise" |
| else: |
| return "default" |