| |
| """ |
| Script d'analyse des transcriptions avec Hugging Face Transformers |
| Analyse les fichiers txt dans output/transcriptions et génère un résumé structuré |
| """ |
|
|
| import os |
| from pathlib import Path |
| import torch |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM |
| from datetime import datetime |
| import re |
| import traceback |
| from dotenv import load_dotenv |
|
|
| |
| try: |
| from portable_env import setup_portable_env |
| setup_portable_env() |
| except Exception: |
| pass |
|
|
| |
| try: |
| load_dotenv(Path(__file__).parent.parent / ".env") |
| except Exception: |
| pass |
|
|
| |
| BASE_DIR = Path(os.environ.get("BOB_BASE_DIR", Path(__file__).parent.parent)) |
| TRANSCRIPTIONS_DIR = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", BASE_DIR / "output" / "transcriptions")) |
| OUTPUT_FILE = Path(os.environ.get("BOB_OUTPUT_FILE", BASE_DIR / "output" / "resume_bob.txt")) |
| HF_MODEL = os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") |
|
|
| def get_hf_model(): |
| """Récupère le modèle Hugging Face depuis la variable d'environnement""" |
| return os.environ.get("HF_MODEL", "google/gemma-3-4b-pt") |
|
|
|
|
| def load_hf_model(): |
| """Charge un modèle Hugging Face""" |
| try: |
| hf_model = get_hf_model() |
| print(f"Chargement du modèle Hugging Face: {hf_model}") |
| |
| |
| generator = pipeline( |
| "text-generation", |
| model=hf_model, |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
| device_map="auto" if torch.cuda.is_available() else "cpu", |
| token=os.environ.get("HF_TOKEN") |
| ) |
| |
| print(f"✅ Modèle {hf_model} chargé avec succès") |
| return generator |
| except Exception as e: |
| print(f"❌ Erreur lors du chargement du modèle Hugging Face: {e}") |
| print(f"Traceback: {traceback.format_exc()}") |
| print("Assurez-vous que le modèle est disponible et que vous avez les permissions nécessaires") |
| return None |
|
|
| def create_analysis_prompt(): |
| """Crée le prompt d'analyse""" |
| return """RÔLE: Expert en classification de contenu journalistique RTL. |
| |
| TÂCHE: Extraire 3 informations précises de cette transcription radio : |
| |
| 1. AUTEUR : Nom complet du journaliste/présentateur |
| - Chercher "les précisions pour RTL de [NOM]" ou signature en fin |
| - Si absent : "Inconnu" |
| |
| 2. QUALIFICATION du format (TRÈS IMPORTANT) : |
| - P = PAPIER seul : Lecture continue par le journaliste, pas d'interviews |
| • Phrases à la 3e personne uniquement |
| • Aucune citation directe de témoins |
| • Style narratif/descriptif pur |
| |
| - P+S = PAPIER + SON : Reportage avec interviews/témoignages |
| • Présence de citations directes ("Je...", "Nous...") |
| • Témoignages de personnes citées par leur prénom |
| • Alternance narratif + paroles rapportées |
| • Phrases comme "explique Alexandre", "témoigne Lucas" |
| |
| - QR = QUESTIONS-RÉPONSES : Interview/débat en direct |
| • Format conversationnel |
| • Questions-réponses explicites |
| • Dialogue en temps réel |
| |
| 3. TITRE : Sujet principal en 4-6 mots, MAJUSCULES, style presse |
| |
| INDICES DE DÉTECTION P+S : |
| - Citations à la 1ère personne : "J'ai été hospitalisé", "Nous avons commencé" |
| - Prénoms + témoignages : "Alexandre explique", "Lucas raconte" |
| - Discours rapporté : "Il dit que", "Ils nous ont dit" |
| - Changement de ton narratif |
| |
| FORMAT OBLIGATOIRE : |
| AUTEUR|QUALIFICATION|TITRE""" |
|
|
| def detect_format_indicators(text): |
| """Détecte automatiquement les indicateurs de format P/P+S/QR/MT""" |
| indicators = { |
| 'p_plus_s': 0, |
| 'qr': 0, |
| 'mt': 0, |
| 'p_only': 0 |
| } |
| |
| text_lower = text.lower() |
| |
| |
| mt_patterns = [ |
| r'\bmoi je (?:trouve|pense|crois|dis)', |
| r'\bje trouve (?:que|ça|dommage)', |
| r'\bje pense que', |
| r'\bpour moi', |
| r'\bà mon avis', |
| r'\bfranchement', |
| r'en arrivant', |
| r'je viens (?:de|d\')', |
| r'aujourd\'hui', |
| r'c\'est dommage', |
| r'malheureusement', |
| r'donc je (?:voulais|pense)', |
| r'quand même', |
| r'un petit peu', |
| r'vraiment dommage', |
| ] |
| |
| |
| p_plus_s_patterns = [ |
| r'\bje\s+(?:suis|ai|me|pense|crois|vais|veux|dois)', |
| r'\bnous\s+(?:avons|sommes|étions|allons|devons)', |
| r'\bj\'(?:ai|étais|avais|irai|aurais)', |
| r'\bmon\s+(?:père|fils|mari|frère)', |
| r'\bma\s+(?:mère|fille|femme|sœur)', |
| r'\b(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)\s+\w+', |
| r'\b\w+\s+(?:explique|témoigne|raconte|confie|précise|ajoute|poursuit)', |
| r'selon\s+\w+', |
| r'(?:il|elle|ils|elles)\s+(?:dit|disent|explique|expliquent|affirme|assure)\s+que', |
| r'pour\s+\w+\s*,', |
| r'comme\s+(?:le\s+)?(?:dit|explique|précise)\s+\w+', |
| r'voilà ce à quoi', |
| r'c\'est qu?\'?à? partir', |
| r'certains d\'entre (?:nous|eux)', |
| r'parmi les\s+\d+', |
| r'\b[A-Z][a-z]+\s+qui\s+(?:est|a|était)', |
| r'comme\s+[A-Z][a-z]+', |
| r'fièvre\s+et\s+\w+', |
| r'hospitalisé', |
| r'symptômes', |
| r'malade', |
| ] |
| |
| |
| qr_patterns = [ |
| r'\?.*[A-Z]', |
| r'question\s*:', |
| r'réponse\s*:', |
| r'vous\s+(?:pensez|croyez|dites)', |
| r'que\s+pensez-vous', |
| r'interview', |
| r'débat', |
| ] |
| |
| |
| for pattern in mt_patterns: |
| indicators['mt'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
| |
| for pattern in p_plus_s_patterns: |
| indicators['p_plus_s'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
| |
| for pattern in qr_patterns: |
| indicators['qr'] += len(re.findall(pattern, text_lower, re.IGNORECASE)) |
| |
| |
| if indicators['mt'] < 3 and indicators['p_plus_s'] < 3 and indicators['qr'] < 2: |
| indicators['p_only'] = 5 |
| |
| return indicators |
|
|
| def analyze_transcription(generator, transcription_text, filename): |
| """Analyse une transcription avec Hugging Face""" |
| try: |
| |
| format_indicators = detect_format_indicators(transcription_text) |
| |
| |
| max_score = max(format_indicators.values()) |
| likely_format = [k for k, v in format_indicators.items() if v == max_score][0] |
| |
| hint_map = { |
| 'p_plus_s': "ATTENTION: Nombreux témoignages détectés → OBLIGATOIREMENT P+S", |
| 'qr': "ATTENTION: Format questions-réponses détecté → OBLIGATOIREMENT QR", |
| 'p_only': "ATTENTION: Aucun témoignage/interview → OBLIGATOIREMENT P" |
| } |
| |
| |
| force_format = "" |
| if format_indicators['p_plus_s'] >= 5: |
| force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P+S' pour la qualification." |
| elif format_indicators['qr'] >= 3: |
| force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'QR' pour la qualification." |
| elif format_indicators['p_plus_s'] <= 1 and format_indicators['qr'] <= 1: |
| force_format = "\nFORMAT IMPOSÉ: Utilise OBLIGATOIREMENT 'P' pour la qualification." |
| |
| format_hint = hint_map.get(likely_format, "") |
| |
| prompt = create_analysis_prompt() |
| enhanced_prompt = f"{prompt}\n\n{format_hint}{force_format}" |
| |
| |
| full_prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{enhanced_prompt}\n\nTRANSCRIPTION:\n{transcription_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" |
| |
| print(f"Analyse de: {filename}") |
| print(f" Indices détectés: MT={format_indicators['mt']}, P+S={format_indicators['p_plus_s']}, QR={format_indicators['qr']}, P={format_indicators['p_only']}") |
| |
| |
| response = generator( |
| full_prompt, |
| max_new_tokens=150, |
| temperature=0.2, |
| top_k=20, |
| top_p=0.8, |
| repetition_penalty=1.15, |
| do_sample=True, |
| pad_token_id=generator.tokenizer.eos_token_id |
| ) |
| |
| result = response[0]['generated_text'].replace(full_prompt, '').strip() |
| |
| |
| result = result.replace('\n', ' ').strip() |
| |
| if "|" in result: |
| |
| lines = result.split('\n') |
| for line in lines: |
| if "|" in line and line.count("|") >= 2: |
| parts = line.split("|") |
| if len(parts) >= 3: |
| auteur = parts[0].strip() |
| qualification = parts[1].strip() |
| titre = parts[2].strip() |
| |
| |
| if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: |
| auteur = "Inconnu" |
| |
| |
| if qualification.upper() not in ["P", "P+S", "SON", "MT", "QR"]: |
| qualification = "P" |
| |
| return { |
| "success": True, |
| "auteur": auteur, |
| "qualification": qualification.upper(), |
| "titre": titre.upper(), |
| "filename": filename |
| } |
| |
| |
| if " - " in result: |
| parts = result.split(" - ") |
| if len(parts) >= 3: |
| auteur = parts[0].strip() |
| qualification = parts[1].strip() |
| titre = " - ".join(parts[2:]).strip() |
| |
| if auteur.lower() in ["inconnu", "non mentionné", "auteur", ""]: |
| auteur = "Inconnu" |
| |
| |
| if qualification.upper() not in ["P", "P+S", "QR"]: |
| qualification = "P" |
| |
| return { |
| "success": True, |
| "auteur": auteur, |
| "qualification": qualification.upper(), |
| "titre": titre.upper(), |
| "filename": filename |
| } |
| |
| |
| return { |
| "success": False, |
| "error": f"Format de réponse incorrect: {result}", |
| "filename": filename, |
| "raw_response": result |
| } |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "error": str(e), |
| "filename": filename |
| } |
|
|
| def read_transcription_file(file_path): |
| """Lit le contenu d'un fichier de transcription et extrait les métadonnées""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| |
| |
| metadata = {} |
| lines = content.split('\n') |
| |
| for line in lines[:10]: |
| if line.startswith('Fichier source:'): |
| metadata['filename'] = line.replace('Fichier source:', '').strip() |
| elif line.startswith('Durée de traitement:'): |
| metadata['processing_time'] = line.replace('Durée de traitement:', '').strip() |
| |
| |
| if "--------------------------------------------------" in content: |
| parts = content.split("--------------------------------------------------") |
| if len(parts) > 1: |
| text_content = parts[1].strip() |
| return text_content, metadata |
| |
| return content.strip(), metadata |
| |
| except Exception as e: |
| print(f"Erreur lors de la lecture de {file_path}: {e}") |
| return None, {} |
|
|
| def apply_duration_correction(result, duration_seconds, format_indicators=None): |
| """Applique une correction probabiliste basée sur la durée et les patterns détectés""" |
| if not duration_seconds: |
| return result |
| |
| original_qualification = result.get("qualification", "") |
| corrected = False |
| |
| |
| if format_indicators and format_indicators.get('mt', 0) >= 8 and duration_seconds < 60: |
| if original_qualification in ["P", "P+S", "SON"]: |
| result["qualification"] = "MT" |
| corrected = True |
| print(f" → Correction MT détecté: {original_qualification} → MT (patterns={format_indicators['mt']})") |
| |
| |
| elif not corrected: |
| if duration_seconds < 30: |
| |
| if original_qualification in ["P", "P+S"]: |
| result["qualification"] = "SON" |
| corrected = True |
| print(f" → Correction durée < 30s: {original_qualification} → SON") |
| |
| elif 30 <= duration_seconds <= 40: |
| |
| if original_qualification == "P+S": |
| result["qualification"] = "SON" |
| corrected = True |
| print(f" → Correction durée 30-40s: P+S → SON") |
| |
| return result |
|
|
| def extract_author_from_filename(filename): |
| """Extrait le nom du journaliste depuis le nom du fichier""" |
| try: |
| import re |
| |
| |
| clean_name = filename.replace('_transcription.txt', '').replace('.mp3', '').replace('.MP3', '') |
| |
| |
| |
| clean_name = re.sub(r'^\d+\s+', '', clean_name).strip() |
| |
| |
| if clean_name: |
| return clean_name |
| |
| return "Inconnu" |
| |
| except Exception as e: |
| print(f"Erreur extraction auteur: {e}") |
| return "Inconnu" |
|
|
| def get_audio_duration(audio_filename, input_dir): |
| """Calcule la durée d'un fichier audio en secondes totales (version unique)""" |
| try: |
| |
| from pydub import AudioSegment |
| audio_path = None |
| audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] |
| |
| for ext in audio_extensions: |
| potential_path = input_dir / audio_filename.replace('_transcription.txt', ext) |
| if potential_path.exists(): |
| audio_path = potential_path |
| break |
| base_name = audio_filename.replace('_transcription.txt', '') |
| potential_path = input_dir / f"{base_name}{ext}" |
| if potential_path.exists(): |
| audio_path = potential_path |
| break |
| if audio_path: |
| audio = AudioSegment.from_file(str(audio_path)) |
| duration_seconds = len(audio) / 1000 |
| minutes = int(duration_seconds // 60) |
| seconds = int(duration_seconds % 60) |
| return minutes * 100 + seconds |
| return None |
| except Exception as e: |
| print(f"Erreur calcul durée pour {audio_filename}: {e}") |
| return None |
|
|
| def get_transcription_files(transcriptions_dir): |
| """Récupère tous les fichiers de transcription (unique)""" |
| if not transcriptions_dir.exists(): |
| print(f"Le dossier {transcriptions_dir} n'existe pas") |
| return [] |
| txt_files = list(transcriptions_dir.glob("*_transcription.txt")) |
| return sorted(txt_files) |
|
|
| def main(): |
| """Fonction principale""" |
| print("=" * 60) |
| print("ANALYSE DES BOB AVEC HUGGING FACE") |
| print("=" * 60) |
| |
| |
| script_dir = Path(__file__).parent.absolute() |
| transcriptions_dir = Path(os.environ.get("BOB_TRANSCRIPTIONS_DIR", script_dir.parent / "output" / "transcriptions")) |
| output_file = Path(os.environ.get("BOB_OUTPUT_FILE", script_dir.parent / "output" / "resume_bob.txt")) |
| input_dir = Path(os.environ.get("BOB_INPUT_DIR", script_dir.parent / "input")) |
|
|
| |
| analyze_files_hf( |
| transcriptions_dir=transcriptions_dir, |
| input_dir=input_dir, |
| output_file=output_file, |
| log_fn=print, |
| progress_fn=None, |
| cancel_fn=None, |
| ) |
|
|
| if __name__ == "__main__": |
| main() |
|
|
| |
| def analyze_files_hf(transcriptions_dir: Path, input_dir: Path, output_file: Path, log_fn=print, progress_fn=None, cancel_fn=None): |
| """Analyse tous les fichiers de transcription avec Hugging Face""" |
| log = log_fn or (lambda *a, **k: None) |
|
|
| log("Dossier transcriptions: {}".format(transcriptions_dir)) |
| log("Fichier de sortie: {}".format(output_file)) |
| log("") |
|
|
| transcription_files = get_transcription_files(transcriptions_dir) |
| log(f"🔍 Recherche de fichiers dans: {transcriptions_dir}") |
| log(f"📁 Contenu du dossier: {list(transcriptions_dir.iterdir()) if transcriptions_dir.exists() else 'Dossier non trouvé'}") |
| if not transcription_files: |
| log("❌ Aucun fichier de transcription trouvé") |
| log("Assurez-vous d'avoir exécuté le script de transcription d'abord") |
| return {"success": False, "count": 0} |
|
|
| log(f"Trouvé {len(transcription_files)} fichier(s) de transcription:") |
| for i, file in enumerate(transcription_files, 1): |
| log(f" {i}. {file.name}") |
| log("") |
|
|
| |
| generator = load_hf_model() |
| if not generator: |
| log("❌ Modèle Hugging Face indisponible") |
| |
| try: |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
| with open(output_file, 'w', encoding='utf-8') as f: |
| f.write("# ERREUR - Modèle indisponible\n") |
| f.write("Erreur|P|MODÈLE INDISPONIBLE|000\n") |
| log(f"📄 Fichier d'erreur créé: {output_file}") |
| return {"success": False, "error": "Modèle Hugging Face indisponible", "count": 0} |
| except Exception as write_error: |
| log(f"❌ Erreur lors de la création du fichier d'erreur: {write_error}") |
| return {"success": False, "error": "Modèle Hugging Face indisponible"} |
| |
| log("✅ Modèle chargé, début de l'analyse...") |
|
|
| results = [] |
| success_count = 0 |
| total = len(transcription_files) |
|
|
| for i, file_path in enumerate(transcription_files, 1): |
| if cancel_fn and cancel_fn(): |
| log("⏹️ Analyse annulée") |
| break |
|
|
| log(f"[{i}/{total}] ") |
|
|
| transcription_text, metadata = read_transcription_file(file_path) |
| if not transcription_text: |
| log(f"✗ Impossible de lire {file_path.name}") |
| if progress_fn: |
| progress_fn(i, total) |
| continue |
|
|
| duration = get_audio_duration(file_path.name, input_dir) |
| author_from_filename = extract_author_from_filename(file_path.name) |
|
|
| result = analyze_transcription(generator, transcription_text, file_path.name) |
|
|
| if result["success"]: |
| format_indicators = detect_format_indicators(transcription_text) |
| result = apply_duration_correction(result, duration, format_indicators) |
| if result["auteur"].lower() in ["inconnu", "non mentionné", "auteur", ""]: |
| result["auteur"] = author_from_filename |
| result["duree"] = duration if duration else "000" |
| result["filename_source"] = metadata.get("filename", file_path.name) |
| log(f"✓ {result['auteur']} - {result['qualification']} - {result['titre']} - {result['duree']}") |
| results.append(result) |
| success_count += 1 |
| else: |
| log(f"✗ Erreur: {result['error']}") |
| if "raw_response" in result: |
| log(f" Réponse brute: {result['raw_response']}") |
|
|
| if progress_fn: |
| progress_fn(i, total) |
| log("") |
|
|
| |
| if results: |
| try: |
| log(f"💾 Tentative de création du fichier: {output_file}") |
| log(f"📁 Dossier parent existe: {output_file.parent.exists()}") |
| |
| |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
| log(f"📁 Dossier parent créé/vérifié: {output_file.parent}") |
| |
| |
| with open(output_file, 'w', encoding='utf-8') as f: |
| f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") |
| f.write(f"# Format: Auteur | Qualification | Titre | Durée\n") |
| f.write("# Qualification: P=papier, P+S=papier+son, QR=question-réponse\n") |
| f.write("# Durée: format MMss (ex: 1min04 = 104)\n") |
| f.write("# " + "="*70 + "\n\n") |
| for r in results: |
| line = f"{r['auteur']} | {r['qualification']} | {r['titre']} | {r['duree']}" |
| f.write(line + "\n") |
| |
| log(f"✅ Fichier écrit avec succès: {output_file}") |
| log(f"📄 Contenu du fichier:") |
| with open(output_file, 'r', encoding='utf-8') as f: |
| content = f.read() |
| for line in content.split('\n')[:10]: |
| if line.strip(): |
| log(f" {line}") |
| |
| log("=" * 60) |
| log("RÉSUMÉ GÉNÉRÉ") |
| log("=" * 60) |
| log(f"Fichiers analysés: {total}") |
| log(f"Analyses réussies: {success_count}") |
| log(f"Analyses échouées: {total - success_count}") |
| log(f"Fichier de résumé: {output_file}") |
| log(f"📁 Contenu final du dossier output: {list(output_file.parent.iterdir())}") |
| |
| return {"success": True, "count": total, "ok": success_count, "results": results} |
| |
| except Exception as write_error: |
| log(f"❌ Erreur lors de l'écriture du fichier: {write_error}") |
| log(f"Traceback: {traceback.format_exc()}") |
| return {"success": False, "error": f"Erreur d'écriture: {write_error}"} |
| else: |
| log("⚠️ Aucune analyse réussie, création d'un fichier factice...") |
| |
| try: |
| output_file.parent.mkdir(parents=True, exist_ok=True) |
| with open(output_file, 'w', encoding='utf-8') as f: |
| f.write(f"# RÉSUMÉ DES BOB - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") |
| f.write("# Aucune analyse réussie\n") |
| log(f"📄 Fichier factice créé: {output_file}") |
| return {"success": False, "count": total, "ok": 0} |
| except Exception as write_error: |
| log(f"❌ Erreur lors de la création du fichier factice: {write_error}") |
| return {"success": False, "error": f"Erreur d'écriture: {write_error}"} |
| |