| |
| """ |
| Script orchestrateur pour le traitement automatique des BOB (Hugging Face version) |
| 1. Transcrit tous les fichiers audio du dossier input avec Whisper |
| 2. Analyse toutes les transcriptions avec Hugging Face |
| 3. Génère le fichier de résumé final |
| """ |
|
|
| import os |
| import sys |
| import subprocess |
| from pathlib import Path |
| from datetime import datetime |
| import time |
|
|
| |
| SCRIPT_DIR = Path(__file__).parent.absolute() |
| TRANSCRIBE_SCRIPT = SCRIPT_DIR / "transcribe_audio.py" |
| ANALYZE_SCRIPT = SCRIPT_DIR / "analyze_bob_hf.py" |
| INPUT_DIR = SCRIPT_DIR.parent / "input" |
| OUTPUT_DIR = SCRIPT_DIR.parent / "output" |
| TRANSCRIPTIONS_DIR = OUTPUT_DIR / "transcriptions" |
| RESUME_FILE = OUTPUT_DIR / "resume_bob.txt" |
|
|
| def print_header(): |
| """Affiche l'en-tête du script""" |
| print("=" * 70) |
| print("🎙️ TRAITEMENT AUTOMATIQUE DES BOB (Hugging Face)") |
| print(" Transcription audio → Analyse IA → Résumé") |
| print("=" * 70) |
| print(f"Démarré le: {datetime.now().strftime('%d/%m/%Y à %H:%M:%S')}") |
| print() |
|
|
| def check_prerequisites(): |
| """Vérifie que tous les prérequis sont en place""" |
| print("🔍 Vérification des prérequis...") |
| |
| |
| if not INPUT_DIR.exists(): |
| print(f"❌ Le dossier input n'existe pas: {INPUT_DIR}") |
| return False |
| |
| if not TRANSCRIBE_SCRIPT.exists(): |
| print(f"❌ Script de transcription introuvable: {TRANSCRIBE_SCRIPT}") |
| return False |
| |
| if not ANALYZE_SCRIPT.exists(): |
| print(f"❌ Script d'analyse introuvable: {ANALYZE_SCRIPT}") |
| return False |
| |
| |
| audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.mp4', '.avi', '.mov'] |
| audio_files = [] |
| for ext in audio_extensions: |
| audio_files.extend(INPUT_DIR.glob(f"*{ext}")) |
| audio_files.extend(INPUT_DIR.glob(f"*{ext.upper()}")) |
| |
| if not audio_files: |
| print(f"⚠️ Aucun fichier audio trouvé dans {INPUT_DIR}") |
| print(f" Formats supportés: {', '.join(audio_extensions)}") |
| return False |
| |
| print(f"✅ Trouvé {len(audio_files)} fichier(s) audio à traiter:") |
| for i, file in enumerate(sorted(set(audio_files)), 1): |
| print(f" {i}. {file.name}") |
| |
| print("✅ Scripts de traitement trouvés") |
| print("✅ Prérequis validés") |
| print() |
| return True |
|
|
| def run_script(script_path, step_name, python_executable=None): |
| """Exécute un script Python et retourne le succès""" |
| if python_executable is None: |
| python_executable = sys.executable |
| |
| print(f"🚀 Étape {step_name}...") |
| print(f" Exécution: {script_path.name}") |
| |
| start_time = time.time() |
| |
| try: |
| |
| result = subprocess.run( |
| [python_executable, str(script_path)], |
| cwd=str(script_path.parent), |
| capture_output=True, |
| text=True, |
| encoding='utf-8', |
| errors='replace' |
| ) |
| |
| end_time = time.time() |
| duration = end_time - start_time |
| |
| if result.returncode == 0: |
| print(f"✅ {step_name} terminée avec succès") |
| print(f" Durée: {duration:.1f} secondes") |
| |
| |
| output_lines = result.stdout.strip().split('\n') |
| if output_lines: |
| print(" Résultat:") |
| for line in output_lines[-3:]: |
| if line.strip(): |
| print(f" > {line.strip()}") |
| print() |
| return True |
| else: |
| print(f"❌ {step_name} a échoué") |
| print(f" Code d'erreur: {result.returncode}") |
| if result.stderr: |
| print(f" Erreur: {result.stderr.strip()}") |
| if result.stdout: |
| print(f" Sortie: {result.stdout.strip()}") |
| print() |
| return False |
| |
| except Exception as e: |
| end_time = time.time() |
| duration = end_time - start_time |
| print(f"❌ Erreur lors de l'exécution de {step_name}") |
| print(f" Exception: {e}") |
| print(f" Durée avant erreur: {duration:.1f} secondes") |
| print() |
| return False |
|
|
| def check_results(): |
| """Vérifie et affiche les résultats finaux""" |
| print("📊 Vérification des résultats...") |
| |
| |
| if TRANSCRIPTIONS_DIR.exists(): |
| transcription_files = list(TRANSCRIPTIONS_DIR.glob("*_transcription.txt")) |
| print(f"✅ {len(transcription_files)} transcription(s) générée(s)") |
| |
| for file in transcription_files: |
| size_kb = file.stat().st_size / 1024 |
| print(f" • {file.name} ({size_kb:.1f} KB)") |
| else: |
| print("❌ Dossier de transcriptions non trouvé") |
| return False |
| |
| |
| if RESUME_FILE.exists(): |
| print(f"✅ Fichier de résumé généré: {RESUME_FILE.name}") |
| |
| |
| try: |
| with open(RESUME_FILE, 'r', encoding='utf-8') as f: |
| content = f.read().strip() |
| |
| print("📝 Contenu du résumé:") |
| print("-" * 50) |
| |
| lines = content.split('\n') |
| for line in lines: |
| if line.strip() and not line.startswith('#'): |
| print(f" {line}") |
| |
| print("-" * 50) |
| except Exception as e: |
| print(f" Erreur lors de la lecture: {e}") |
| |
| return True |
| else: |
| print("❌ Fichier de résumé non généré") |
| return False |
|
|
| def main(): |
| """Fonction principale""" |
| start_total = time.time() |
| |
| print_header() |
| |
| |
| if not check_prerequisites(): |
| print("❌ Impossible de continuer sans les prérequis") |
| return 1 |
| |
| |
| print("👀 Prêt à démarrer le traitement automatique des BOB") |
| response = input(" Continuer ? (o/N): ").lower().strip() |
| if response not in ['o', 'oui', 'y', 'yes']: |
| print("🚫 Traitement annulé par l'utilisateur") |
| return 0 |
| |
| print() |
| |
| |
| python_exe = sys.executable |
| print(f"🐍 Utilisation de Python: {python_exe}") |
| print() |
| |
| |
| success_transcription = run_script( |
| TRANSCRIBE_SCRIPT, |
| "1/2 - Transcription audio (Whisper)", |
| python_exe |
| ) |
| |
| if not success_transcription: |
| print("❌ Échec de la transcription. Arrêt du traitement.") |
| return 1 |
| |
| |
| success_analysis = run_script( |
| ANALYZE_SCRIPT, |
| "2/2 - Analyse des transcriptions (Hugging Face)", |
| python_exe |
| ) |
| |
| if not success_analysis: |
| print("❌ Échec de l'analyse. Vérifiez que les modèles Hugging Face sont accessibles.") |
| return 1 |
| |
| |
| print() |
| results_ok = check_results() |
| |
| |
| end_total = time.time() |
| total_duration = end_total - start_total |
| |
| print() |
| print("=" * 70) |
| if results_ok: |
| print("🎉 TRAITEMENT TERMINÉ AVEC SUCCÈS") |
| print(f"⏱️ Durée totale: {total_duration:.1f} secondes ({total_duration/60:.1f} minutes)") |
| print(f"📁 Fichier de résumé: {RESUME_FILE}") |
| print("✅ Tous vos BOB ont été traités automatiquement !") |
| else: |
| print("⚠️ TRAITEMENT PARTIELLEMENT RÉUSSI") |
| print(f"⏱️ Durée totale: {total_duration:.1f} secondes") |
| print("🔍 Vérifiez les fichiers de sortie manuellement") |
| print("=" * 70) |
| |
| return 0 if results_ok else 1 |
|
|
| if __name__ == "__main__": |
| try: |
| exit_code = main() |
| sys.exit(exit_code) |
| except KeyboardInterrupt: |
| print("\n🚫 Traitement interrompu par l'utilisateur") |
| sys.exit(1) |
| except Exception as e: |
| print(f"\n💥 Erreur inattendue: {e}") |
| sys.exit(1) |
|
|