| |
|
|
| import os |
| import tempfile |
| import math |
| from io import BytesIO |
| from typing import List, Dict, Tuple, Union |
| import streamlit as st |
|
|
| try: |
| from pydub import AudioSegment |
| PYDUB_AVAILABLE = True |
| except ImportError: |
| PYDUB_AVAILABLE = False |
| st.warning("⚠️ Pydub nie jest dostępny. Funkcje kompresji ograniczone.") |
|
|
| try: |
| import librosa |
| import soundfile as sf |
| LIBROSA_AVAILABLE = True |
| except ImportError: |
| LIBROSA_AVAILABLE = False |
|
|
| from config import FILE_PROCESSING, USER_MESSAGES |
|
|
| class FileHandler: |
| """Klasa do obsługi plików audio/video - zoptymalizowana dla Whisper API (max 25MB)""" |
| |
| def __init__(self): |
| self.temp_files = [] |
| self.processing_stats = {} |
| |
| |
| self.WHISPER_MAX_SIZE_MB = 25 |
| self.SAFE_CHUNK_SIZE_MB = 20 |
| |
| def process_file(self, uploaded_file, max_chunk_size_mb: int = 20, auto_compress: bool = True) -> List[str]: |
| """ |
| Główna funkcja przetwarzania pliku dla Whisper API |
| Returns: Lista ścieżek do plików gotowych do transkrypcji (każdy <25MB) |
| """ |
| try: |
| file_size_mb = uploaded_file.size / (1024 * 1024) |
| |
| st.info(f"🔄 Przetwarzam {uploaded_file.name} ({file_size_mb:.1f}MB)") |
| |
| |
| if file_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| |
| temp_path = self._save_temp_file(uploaded_file) |
| if temp_path: |
| st.success(f"✅ Plik gotowy do transkrypcji ({file_size_mb:.1f}MB)") |
| return [temp_path] |
| else: |
| return [] |
| |
| |
| if file_size_mb > 100: |
| st.error(f"❌ Plik zbyt duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.") |
| return [] |
| |
| |
| if auto_compress and file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| compressed_file = self._compress_audio_for_whisper(uploaded_file) |
| if compressed_file: |
| compressed_size_mb = len(compressed_file.getvalue()) / (1024 * 1024) |
| if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| temp_path = self._save_bytesio_to_temp(compressed_file, uploaded_file.name) |
| if temp_path: |
| st.success(f"✅ Skompresowano: {file_size_mb:.1f}MB → {compressed_size_mb:.1f}MB") |
| return [temp_path] |
| |
| |
| return self._split_audio_for_whisper(uploaded_file, max_chunk_size_mb) |
| |
| except Exception as e: |
| st.error(f"❌ Błąd przetwarzania {uploaded_file.name}: {str(e)}") |
| return [] |
| |
| def _compress_audio_for_whisper(self, uploaded_file) -> Union[BytesIO, None]: |
| """Agresywna kompresja audio dla Whisper API""" |
| if not PYDUB_AVAILABLE: |
| st.warning("Pydub niedostępny - pomijam kompresję") |
| return None |
| |
| try: |
| st.info("🗜️ Kompresuję audio...") |
| |
| |
| audio_data = uploaded_file.read() |
| uploaded_file.seek(0) |
| |
| audio = AudioSegment.from_file(BytesIO(audio_data)) |
| |
| |
| compressed = audio.set_channels(1) |
| compressed = compressed.set_frame_rate(16000) |
| |
| |
| original_size_mb = uploaded_file.size / (1024 * 1024) |
| |
| if original_size_mb > 50: |
| |
| bitrate = "32k" |
| elif original_size_mb > 35: |
| |
| bitrate = "48k" |
| else: |
| |
| bitrate = "64k" |
| |
| |
| output = BytesIO() |
| compressed.export( |
| output, |
| format="mp3", |
| bitrate=bitrate, |
| parameters=["-ac", "1", "-ar", "16000"] |
| ) |
| output.seek(0) |
| |
| |
| compressed_size_mb = len(output.getvalue()) / (1024 * 1024) |
| |
| if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| return output |
| else: |
| st.warning(f"⚠️ Kompresja niewystarczająca ({compressed_size_mb:.1f}MB). Przechodzę do dzielenia.") |
| return None |
| |
| except Exception as e: |
| st.warning(f"Kompresja nieudana: {str(e)}") |
| return None |
| |
| def _split_audio_for_whisper(self, uploaded_file, max_chunk_size_mb: int) -> List[str]: |
| """Dzieli plik audio na części <25MB dla Whisper""" |
| try: |
| if not PYDUB_AVAILABLE: |
| st.error("❌ Pydub wymagany do dzielenia plików. Zainstaluj: pip install pydub") |
| return [] |
| |
| st.info("✂️ Dzielę plik na części...") |
| |
| |
| audio_data = uploaded_file.read() |
| audio = AudioSegment.from_file(BytesIO(audio_data)) |
| |
| |
| total_duration_ms = len(audio) |
| file_size_mb = uploaded_file.size / (1024 * 1024) |
| |
| |
| safe_chunk_size_mb = min(max_chunk_size_mb, self.SAFE_CHUNK_SIZE_MB) |
| |
| |
| estimated_parts = math.ceil(file_size_mb / safe_chunk_size_mb) |
| chunk_duration_ms = total_duration_ms // estimated_parts |
| |
| |
| overlap_ms = 10 * 1000 |
| |
| st.info(f"📂 Dzielę na {estimated_parts} części (~{chunk_duration_ms//60000:.1f} min każda)") |
| |
| parts = [] |
| base_name = os.path.splitext(uploaded_file.name)[0] |
| |
| for i in range(estimated_parts): |
| start_ms = max(0, i * chunk_duration_ms - (overlap_ms if i > 0 else 0)) |
| end_ms = min(total_duration_ms, (i + 1) * chunk_duration_ms + overlap_ms) |
| |
| |
| chunk = audio[start_ms:end_ms] |
| |
| |
| chunk = chunk.set_channels(1) |
| chunk = chunk.set_frame_rate(22050) |
| |
| |
| temp_fd, temp_path = tempfile.mkstemp( |
| suffix=f"_part{i+1:02d}.mp3", |
| prefix=f"{base_name}_" |
| ) |
| os.close(temp_fd) |
| |
| chunk.export(temp_path, format="mp3", bitrate="96k") |
| |
| |
| part_size_mb = os.path.getsize(temp_path) / (1024 * 1024) |
| |
| if part_size_mb > self.WHISPER_MAX_SIZE_MB: |
| st.error(f"❌ Część {i+1} nadal za duża ({part_size_mb:.1f}MB)") |
| os.remove(temp_path) |
| continue |
| |
| parts.append(temp_path) |
| self.temp_files.append(temp_path) |
| |
| st.success(f"✅ Część {i+1}/{estimated_parts}: {part_size_mb:.1f}MB, {(end_ms-start_ms)//60000:.1f} min") |
| |
| if not parts: |
| st.error("❌ Nie udało się utworzyć żadnej prawidłowej części") |
| |
| return parts |
| |
| except Exception as e: |
| st.error(f"❌ Błąd dzielenia pliku: {str(e)}") |
| return [] |
| |
| def _save_temp_file(self, uploaded_file) -> str: |
| """Zapisuje uploaded file do pliku tymczasowego""" |
| try: |
| suffix = f".{uploaded_file.name.split('.')[-1]}" |
| temp_fd, temp_path = tempfile.mkstemp(suffix=suffix) |
| |
| |
| with os.fdopen(temp_fd, 'wb') as tmp_file: |
| content = uploaded_file.read() |
| tmp_file.write(content) |
| |
| |
| uploaded_file.seek(0) |
| |
| self.temp_files.append(temp_path) |
| return temp_path |
| |
| except Exception as e: |
| st.error(f"❌ Błąd zapisu tymczasowego: {str(e)}") |
| return "" |
| |
| def _save_bytesio_to_temp(self, bytes_io: BytesIO, original_name: str) -> str: |
| """Zapisz BytesIO do pliku tymczasowego""" |
| try: |
| suffix = f"_compressed.mp3" |
| base_name = os.path.splitext(original_name)[0] |
| |
| temp_fd, temp_path = tempfile.mkstemp( |
| suffix=suffix, |
| prefix=f"{base_name}_" |
| ) |
| |
| with os.fdopen(temp_fd, 'wb') as tmp_file: |
| tmp_file.write(bytes_io.getvalue()) |
| |
| self.temp_files.append(temp_path) |
| return temp_path |
| |
| except Exception as e: |
| st.error(f"❌ Błąd zapisu skompresowanego: {str(e)}") |
| return "" |
| |
| def validate_file_for_whisper(self, uploaded_file) -> Tuple[bool, str]: |
| """Walidacja pliku dla Whisper API""" |
| try: |
| |
| file_size_mb = uploaded_file.size / (1024 * 1024) |
| |
| if file_size_mb == 0: |
| return False, "Plik jest pusty" |
| |
| if file_size_mb > 100: |
| return False, f"Plik za duży: {file_size_mb:.1f}MB > 100MB" |
| |
| |
| file_ext = uploaded_file.name.split('.')[-1].lower() |
| supported_formats = ['mp3', 'wav', 'mp4', 'm4a', 'aac', 'mov', 'avi'] |
| |
| if file_ext not in supported_formats: |
| return False, f"Nieobsługiwany format: .{file_ext}" |
| |
| |
| if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| return True, f"Plik wymaga przetwarzania ({file_size_mb:.1f}MB > {self.WHISPER_MAX_SIZE_MB}MB)" |
| |
| return True, "OK" |
| |
| except Exception as e: |
| return False, f"Błąd walidacji: {str(e)}" |
| |
| def get_audio_duration(self, file_path: str) -> float: |
| """Pobierz długość pliku audio w sekundach""" |
| try: |
| if LIBROSA_AVAILABLE: |
| duration = librosa.get_duration(filename=file_path) |
| return duration |
| elif PYDUB_AVAILABLE: |
| audio = AudioSegment.from_file(file_path) |
| return len(audio) / 1000.0 |
| else: |
| |
| file_size = os.path.getsize(file_path) |
| return file_size / (1024 * 1024) * 60 |
| except: |
| file_size = os.path.getsize(file_path) |
| return file_size / (1024 * 1024) * 60 |
| |
| def estimate_processing_time(self, uploaded_files: List) -> Dict: |
| """Estymuj czas przetwarzania""" |
| total_size_mb = sum(f.size for f in uploaded_files) / (1024 * 1024) |
| total_duration_est = total_size_mb * 60 |
| |
| |
| processing_time = 0 |
| for f in uploaded_files: |
| file_size_mb = f.size / (1024 * 1024) |
| if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| processing_time += file_size_mb * 2 |
| |
| |
| transcription_time = total_duration_est * 0.1 |
| |
| |
| report_time = len(uploaded_files) * 30 |
| |
| return { |
| 'total_size_mb': total_size_mb, |
| 'estimated_audio_duration': total_duration_est, |
| 'estimated_processing_time': processing_time, |
| 'estimated_transcription_time': transcription_time, |
| 'estimated_report_time': report_time, |
| 'total_estimated_time': processing_time + transcription_time + report_time, |
| 'files_needing_processing': sum(1 for f in uploaded_files |
| if f.size / (1024 * 1024) > self.WHISPER_MAX_SIZE_MB) |
| } |
| |
| def get_file_info(self, uploaded_file) -> Dict: |
| """Pobierz szczegółowe informacje o pliku""" |
| file_size_mb = uploaded_file.size / (1024 * 1024) |
| file_ext = uploaded_file.name.split('.')[-1].lower() |
| |
| return { |
| 'name': uploaded_file.name, |
| 'size_mb': file_size_mb, |
| 'format': file_ext, |
| 'whisper_ready': file_size_mb <= self.WHISPER_MAX_SIZE_MB, |
| 'needs_compression': file_size_mb > self.WHISPER_MAX_SIZE_MB and file_size_mb <= 50, |
| 'needs_splitting': file_size_mb > 50, |
| 'too_large': file_size_mb > 100, |
| 'estimated_duration': file_size_mb * 60, |
| 'estimated_processing_time': max(0, file_size_mb - self.WHISPER_MAX_SIZE_MB) * 2 |
| } |
| |
| def cleanup_temp_files(self): |
| """Wyczyść pliki tymczasowe""" |
| cleaned = 0 |
| errors = 0 |
| |
| for temp_file in self.temp_files: |
| try: |
| if os.path.exists(temp_file): |
| os.remove(temp_file) |
| cleaned += 1 |
| except Exception as e: |
| errors += 1 |
| st.warning(f"Nie można usunąć {temp_file}: {e}") |
| |
| self.temp_files = [] |
| |
| if cleaned > 0: |
| st.success(f"🧹 Wyczyszczono {cleaned} plików tymczasowych") |
| |
| if errors > 0: |
| st.warning(f"⚠️ {errors} plików nie udało się usunąć") |
| |
| def get_processing_stats(self) -> Dict: |
| """Zwróć statystyki przetwarzania""" |
| return { |
| 'temp_files_count': len(self.temp_files), |
| 'whisper_max_size_mb': self.WHISPER_MAX_SIZE_MB, |
| 'safe_chunk_size_mb': self.SAFE_CHUNK_SIZE_MB, |
| 'processing_stats': self.processing_stats, |
| 'libraries_available': { |
| 'pydub': PYDUB_AVAILABLE, |
| 'librosa': LIBROSA_AVAILABLE |
| } |
| } |
| |
| def analyze_upload_batch(self, uploaded_files: List) -> Dict: |
| """Analizuj całą paczkę plików""" |
| analysis = { |
| 'total_files': len(uploaded_files), |
| 'total_size_mb': 0, |
| 'whisper_ready': 0, |
| 'need_compression': 0, |
| 'need_splitting': 0, |
| 'too_large': 0, |
| 'estimated_parts': 0, |
| 'file_details': [] |
| } |
| |
| for file in uploaded_files: |
| info = self.get_file_info(file) |
| analysis['file_details'].append(info) |
| analysis['total_size_mb'] += info['size_mb'] |
| |
| if info['whisper_ready']: |
| analysis['whisper_ready'] += 1 |
| elif info['needs_compression']: |
| analysis['need_compression'] += 1 |
| elif info['needs_splitting']: |
| analysis['need_splitting'] += 1 |
| |
| parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
| analysis['estimated_parts'] += parts |
| elif info['too_large']: |
| analysis['too_large'] += 1 |
| |
| return analysis |
| |
| def create_processing_plan(self, uploaded_files: List) -> str: |
| """Stwórz plan przetwarzania dla użytkownika""" |
| analysis = self.analyze_upload_batch(uploaded_files) |
| |
| plan = f""" |
| 📋 **PLAN PRZETWARZANIA** |
| |
| 📊 **Podsumowanie:** |
| - Plików: {analysis['total_files']} ({analysis['total_size_mb']:.1f}MB) |
| - Gotowych do transkrypcji: {analysis['whisper_ready']} |
| - Wymagających kompresji: {analysis['need_compression']} |
| - Wymagających dzielenia: {analysis['need_splitting']} |
| - Za dużych: {analysis['too_large']} |
| |
| """ |
| |
| if analysis['estimated_parts'] > 0: |
| plan += f"- Szacowana liczba części: {analysis['estimated_parts']}\n" |
| |
| if analysis['too_large'] > 0: |
| plan += f"\n❌ **PLIKI ZA DUŻE (>100MB):**\n" |
| for info in analysis['file_details']: |
| if info['too_large']: |
| plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
| |
| if analysis['need_splitting'] > 0: |
| plan += f"\n✂️ **PLIKI DO PODZIELENIA:**\n" |
| for info in analysis['file_details']: |
| if info['needs_splitting']: |
| parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
| plan += f"- {info['name']}: {info['size_mb']:.1f}MB → ~{parts} części\n" |
| |
| if analysis['need_compression'] > 0: |
| plan += f"\n🗜️ **PLIKI DO KOMPRESJI:**\n" |
| for info in analysis['file_details']: |
| if info['needs_compression']: |
| plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
| |
| |
| times = self.estimate_processing_time(uploaded_files) |
| plan += f""" |
| ⏱️ **ESTYMACJA CZASÓW:** |
| - Przetwarzanie plików: ~{times['estimated_processing_time']:.1f}s |
| - Transkrypcja: ~{times['estimated_transcription_time']:.1f}s |
| - Generowanie raportu: ~{times['estimated_report_time']:.1f}s |
| - **ŁĄCZNIE: ~{times['total_estimated_time']:.1f}s ({times['total_estimated_time']/60:.1f} min)** |
| """ |
| |
| return plan |
|
|
| |
| def check_file_size_for_whisper(file_path: str) -> Tuple[bool, float]: |
| """Sprawdź czy plik mieści się w limicie Whisper""" |
| try: |
| size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| return size_mb <= 25, size_mb |
| except: |
| return False, 0 |
|
|
| def estimate_compression_ratio(file_ext: str) -> float: |
| """Estymuj współczynnik kompresji dla różnych formatów""" |
| ratios = { |
| 'wav': 0.1, |
| 'aac': 0.7, |
| 'mp3': 0.8, |
| 'm4a': 0.7, |
| 'mp4': 0.5, |
| 'mov': 0.5, |
| 'avi': 0.4 |
| } |
| return ratios.get(file_ext.lower(), 0.6) |
|
|
| |
| if __name__ == "__main__": |
| print("🧪 Test FileHandler") |
| handler = FileHandler() |
| |
| stats = handler.get_processing_stats() |
| print(f"📊 Biblioteki: {stats['libraries_available']}") |
| print(f"🎯 Limit Whisper: {stats['whisper_max_size_mb']}MB") |
| print(f"🔒 Bezpieczny chunk: {stats['safe_chunk_size_mb']}MB") |
| |
| print("✅ FileHandler gotowy do użycia") |