| |
|
|
| import os |
| import time |
| import streamlit as st |
| from typing import List, Dict, Optional, Union, Tuple |
| from pathlib import Path |
|
|
| try: |
| from openai import OpenAI |
| OPENAI_AVAILABLE = True |
| except ImportError: |
| OPENAI_AVAILABLE = False |
| st.error("❌ OpenAI library nie jest dostępna. Zainstaluj: pip install openai") |
|
|
| from config import MODEL_SETTINGS, USER_MESSAGES |
|
|
| class AudioTranscriber: |
| """Klasa do transkrypcji audio używając OpenAI Whisper API""" |
| |
| def __init__(self, api_key: str): |
| if not OPENAI_AVAILABLE: |
| raise Exception("OpenAI library nie jest dostępna") |
| |
| self.client = OpenAI(api_key=api_key) |
| self.api_key = api_key |
| self.transcription_stats = { |
| 'total_files': 0, |
| 'successful': 0, |
| 'failed': 0, |
| 'total_duration': 0, |
| 'total_cost_estimate': 0 |
| } |
| |
| def transcribe_files(self, file_paths: Union[str, List[str]], language: str = "pl") -> str: |
| """ |
| Transkrypcja listy plików audio lub pojedynczego pliku |
| Returns: Połączona transkrypcja wszystkich plików |
| """ |
| |
| if isinstance(file_paths, str): |
| file_paths = [file_paths] |
| |
| transcriptions = [] |
| |
| for i, file_path in enumerate(file_paths): |
| if not os.path.exists(file_path): |
| st.error(f"❌ Plik nie istnieje: {file_path}") |
| continue |
| |
| try: |
| |
| if len(file_paths) > 1: |
| st.info(f"🎙️ Transkrybuję część {i+1}/{len(file_paths)}") |
| |
| |
| transcription = self.transcribe_with_retries(file_path, language) |
| |
| if transcription: |
| transcriptions.append(transcription) |
| self.transcription_stats['successful'] += 1 |
| st.success(f"✅ Część {i+1} zakończona") |
| else: |
| self.transcription_stats['failed'] += 1 |
| st.error(f"❌ Błąd części {i+1}") |
| |
| except Exception as e: |
| st.error(f"❌ Błąd transkrypcji części {i+1}: {str(e)}") |
| self.transcription_stats['failed'] += 1 |
| |
| |
| if transcriptions: |
| |
| if len(transcriptions) > 1: |
| final_transcription = transcriptions[0] |
| for i, text in enumerate(transcriptions[1:], 1): |
| final_transcription += f"\n\n=== CZĘŚĆ {i+1} ===\n\n{text}" |
| else: |
| final_transcription = transcriptions[0] |
| |
| return final_transcription |
| else: |
| raise Exception("Wszystkie transkrypcje zakończone błędem") |
| |
| def transcribe_with_retries(self, file_path: str, language: str = "pl", max_retries: int = 3) -> Optional[str]: |
| """Transkrypcja z ponawianiem przy błędach""" |
| for attempt in range(max_retries): |
| try: |
| |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| if file_size_mb > 25: |
| raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") |
| |
| result = self._transcribe_single_file(file_path, language) |
| if result: |
| return result |
| |
| except Exception as e: |
| error_msg = str(e).lower() |
| st.warning(f"⚠️ Próba {attempt + 1}/{max_retries} nieudana: {str(e)}") |
| |
| if attempt < max_retries - 1: |
| |
| if "rate limit" in error_msg: |
| wait_time = 60 + (attempt * 30) |
| st.info(f"⏳ Rate limit - czekam {wait_time}s...") |
| elif "timeout" in error_msg: |
| wait_time = 30 + (attempt * 15) |
| st.info(f"⏳ Timeout - czekam {wait_time}s...") |
| else: |
| wait_time = 15 + (attempt * 10) |
| st.info(f"⏳ Błąd - czekam {wait_time}s...") |
| |
| time.sleep(wait_time) |
| else: |
| st.error(f"❌ Wszystkie {max_retries} prób nieudane dla {file_path}") |
| |
| return None |
| |
| def _transcribe_single_file(self, file_path: str, language: str = "pl") -> Optional[str]: |
| """Transkrypcja pojedynczego pliku""" |
| try: |
| self.transcription_stats['total_files'] += 1 |
| |
| |
| file_size = os.path.getsize(file_path) |
| file_size_mb = file_size / (1024 * 1024) |
| |
| |
| if file_size_mb > 25: |
| raise Exception(f"Plik za duży dla Whisper API: {file_size_mb:.1f}MB > 25MB") |
| |
| |
| if file_size == 0: |
| raise Exception("Plik jest pusty") |
| |
| st.info(f"📤 Wysyłam do Whisper ({file_size_mb:.1f}MB)...") |
| |
| |
| with open(file_path, 'rb') as audio_file: |
| |
| params = { |
| 'model': MODEL_SETTINGS['whisper']['model'], |
| 'file': audio_file, |
| 'temperature': MODEL_SETTINGS['whisper']['temperature'] |
| } |
| |
| |
| if language != 'auto': |
| params['language'] = language |
| |
| |
| transcript = self.client.audio.transcriptions.create(**params) |
| |
| |
| if not transcript or not hasattr(transcript, 'text') or len(transcript.text.strip()) == 0: |
| raise Exception("Pusty wynik transkrypcji") |
| |
| |
| estimated_duration = file_size_mb * 60 |
| estimated_cost = (estimated_duration / 60) * 0.006 |
| self.transcription_stats['total_duration'] += estimated_duration |
| self.transcription_stats['total_cost_estimate'] += estimated_cost |
| |
| st.success(f"✅ Transkrypcja otrzymana ({len(transcript.text.split())} słów)") |
| |
| |
| return self.clean_transcription(transcript.text) |
| |
| except Exception as e: |
| st.error(f"❌ Błąd Whisper API: {str(e)}") |
| raise e |
| |
| def clean_transcription(self, transcription: str) -> str: |
| """Oczyszczenie i formatowanie transkrypcji""" |
| try: |
| |
| cleaned = transcription.strip() |
| |
| |
| cleaned = ' '.join(cleaned.split()) |
| |
| |
| sentences = cleaned.split('. ') |
| paragraphs = [] |
| current_paragraph = [] |
| |
| for sentence in sentences: |
| current_paragraph.append(sentence) |
| |
| |
| if len(current_paragraph) >= 4: |
| paragraphs.append('. '.join(current_paragraph) + '.') |
| current_paragraph = [] |
| |
| |
| if current_paragraph: |
| paragraphs.append('. '.join(current_paragraph)) |
| |
| |
| formatted = '\n\n'.join(paragraphs) |
| |
| return formatted |
| |
| except Exception as e: |
| st.warning(f"⚠️ Błąd formatowania transkrypcji: {e}") |
| return transcription |
| |
| def detect_interview_type(self, transcription: str) -> str: |
| """ |
| Automatyczne rozpoznanie typu wywiadu na podstawie treści |
| Returns: 'fgi', 'idi', lub 'unknown' |
| """ |
| text_lower = transcription.lower() |
| |
| |
| fgi_indicators = [ |
| 'moderator', 'grupa', 'wszyscy', 'kto jeszcze', 'a państwo', |
| 'czy zgadzacie się', 'co myślicie', 'focus group', |
| 'uczestnicy', 'grupa fokusowa', 'dyskusja grupowa', |
| 'co sądzicie', 'może ktoś inny', 'a jak pan/pani' |
| ] |
| |
| |
| idi_indicators = [ |
| 'wywiad indywidualny', 'jeden na jeden', 'prywatnie', |
| 'osobiście', 'indywidualne', 'w cztery oczy', |
| 'tylko między nami', 'powiedz mi', 'jak się czujesz' |
| ] |
| |
| fgi_score = sum(1 for indicator in fgi_indicators if indicator in text_lower) |
| idi_score = sum(1 for indicator in idi_indicators if indicator in text_lower) |
| |
| |
| |
| interruption_patterns = ['...', '[niewyraźnie]', '[nakładanie się głosów]', '(śmiech)', '--'] |
| interruption_count = sum(text_lower.count(pattern) for pattern in interruption_patterns) |
| |
| |
| word_count = len(transcription.split()) |
| |
| |
| if fgi_score > idi_score * 1.5 and word_count > 1000: |
| return 'fgi' |
| elif idi_score > fgi_score * 1.5: |
| return 'idi' |
| elif interruption_count > 10 and word_count > 1500: |
| return 'fgi' |
| elif word_count < 800: |
| return 'idi' |
| else: |
| return 'unknown' |
| |
| def validate_api_key(self) -> bool: |
| """Sprawdź czy klucz API działa""" |
| try: |
| |
| models = self.client.models.list() |
| |
| |
| model_names = [model.id for model in models.data] |
| if 'whisper-1' not in model_names: |
| st.warning("⚠️ Model whisper-1 nie jest dostępny") |
| return False |
| |
| return True |
| |
| except Exception as e: |
| st.error(f"❌ Nieprawidłowy klucz API: {str(e)}") |
| return False |
| |
| def get_transcription_stats(self) -> Dict: |
| """Zwróć statystyki transkrypcji""" |
| stats = self.transcription_stats.copy() |
| |
| |
| if stats['total_files'] > 0: |
| stats['success_rate'] = (stats['successful'] / stats['total_files']) * 100 |
| else: |
| stats['success_rate'] = 0 |
| |
| return stats |
| |
| def estimate_transcription_time(self, file_paths: List[str]) -> Dict: |
| """Estymuj czas i koszt transkrypcji""" |
| valid_files = [path for path in file_paths if os.path.exists(path)] |
| |
| if not valid_files: |
| return { |
| 'error': 'Brak prawidłowych plików', |
| 'files_count': 0 |
| } |
| |
| total_size = sum(os.path.getsize(path) for path in valid_files) |
| total_size_mb = total_size / (1024 * 1024) |
| |
| |
| estimated_duration_minutes = total_size_mb |
| estimated_api_time = estimated_duration_minutes * 0.1 |
| estimated_cost = estimated_duration_minutes * 0.006 |
| |
| |
| files_too_large = [] |
| for path in valid_files: |
| file_size_mb = os.path.getsize(path) / (1024 * 1024) |
| if file_size_mb > 25: |
| files_too_large.append((path, file_size_mb)) |
| |
| return { |
| 'total_size_mb': total_size_mb, |
| 'estimated_audio_duration': estimated_duration_minutes, |
| 'estimated_processing_time': estimated_api_time, |
| 'estimated_cost_usd': estimated_cost, |
| 'files_count': len(valid_files), |
| 'files_too_large': files_too_large |
| } |
|
|
| |
| def validate_audio_file(file_path: str) -> Tuple[bool, str]: |
| """Sprawdź czy plik audio jest prawidłowy""" |
| if not os.path.exists(file_path): |
| return False, "Plik nie istnieje" |
| |
| |
| file_size = os.path.getsize(file_path) |
| file_size_mb = file_size / (1024 * 1024) |
| |
| if file_size == 0: |
| return False, "Plik jest pusty" |
| |
| if file_size_mb > 25: |
| return False, f"Plik za duży: {file_size_mb:.1f}MB > 25MB" |
| |
| |
| valid_extensions = ['.mp3', '.wav', '.mp4', '.m4a', '.aac'] |
| file_ext = Path(file_path).suffix.lower() |
| |
| if file_ext not in valid_extensions: |
| return False, f"Nieobsługiwane rozszerzenie: {file_ext}" |
| |
| return True, "OK" |
|
|
| def get_file_duration_estimate(file_path: str) -> float: |
| """Estymuj długość pliku audio w minutach""" |
| try: |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| |
| return file_size_mb |
| except: |
| return 0.0 |
|
|
| |
| if __name__ == "__main__": |
| print("🧪 Test AudioTranscriber") |
| print("✅ Import OK - wszystkie typy dostępne") |