| from dataclasses import dataclass |
| from typing import List, Tuple |
| import os |
| from openai import OpenAI |
| import tempfile |
| from pydub import AudioSegment |
| import base64 |
| from pathlib import Path |
| from script_generation import PodScript |
| @dataclass |
| class AudioConfig: |
| output_path: str = "./src/public/output" |
| api_key: str = os.getenv("OPENAI_API_KEY", "") |
| model: str = "gpt-4o-mini-tts" |
| mocked: bool = False |
|
|
| class ScriptToAudio: |
| def __init__(self, config: AudioConfig): |
| self._config = config |
| self._llm_client = OpenAI(api_key=config.api_key) |
|
|
| async def _text_to_speech(self, script: PodScript, voice_1: str, voice_2: str) -> Tuple[List[str], str]: |
| """Converts the text in the conversation JSON to speech using the specified voices. |
| Args: |
| conversation_json (Dict): The conversation JSON containing the text to be converted. |
| voice_1 (str): The voice for the first speaker. |
| voice_2 (str): The voice for the second speaker. |
| Returns: |
| Tuple[List[str], str]: A tuple containing a list of filenames of the generated audio files and the output directory. |
| Raises: |
| ValueError: If the conversation JSON is empty or if the voices are invalid. |
| RuntimeError: If the text-to-speech conversion fails. |
| """ |
| output_dir = Path(self._create_output_directory()) |
| filenames = [] |
|
|
| try: |
| for i, line in enumerate(script.conversation): |
| filename = output_dir / f"output_{i}.wav" |
| voice = voice_1 if i % 2 == 0 else voice_2 |
|
|
| tmp_path= await self._generate_audio(line.text, voice) |
| os.rename(tmp_path, filename) |
| filenames.append(str(filename)) |
|
|
| return filenames, str(output_dir) |
| except Exception as e: |
| raise RuntimeError(f"Failed to convert text to speech: {e}") |
|
|
|
|
| async def _generate_audio(self, text: str, voice: str) -> str: |
| """Generates audio from the given text using the specified voice. |
| Args: |
| text (str): The input text to be converted to audio. |
| voice (str): The voice to be used for the audio generation. |
| Returns: |
| str: Astring containing the filename of the generated audio file. |
| Raises: |
| RuntimeError: If the audio generation fails. |
| """ |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
|
| response = self._llm_client.audio.speech.create( |
| model=self._config.model, |
| voice=voice.lower(), |
| input=text, |
| response_format="wav" |
| ) |
| usage = response |
| tmp_path = tmp_file.name |
| with open(tmp_path, "wb") as f: |
| f.write(response.content) |
| """ with self._llm_client.audio.speech.with_streaming_response.create( |
| model=self._config.model, |
| voice=voice.lower(), |
| input=text, |
| ) as response: |
| response.stream_to_file(tmp_path) """ |
|
|
| return tmp_path |
| except Exception as e: |
| raise RuntimeError(f"Failed to generate audio: {e}") |
|
|
|
|
|
|
| def _create_output_directory(self) -> str: |
| """Creates a unique output directory for the generated audio files. |
| Returns: |
| str: The name of the created output directory. |
| Raises: |
| RuntimeError: If the directory creation fails. |
| """ |
| random_bytes = os.urandom(8) |
| folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") |
| final_folder_name = os.path.join(self._config.output_path, folder_name) |
| os.makedirs(final_folder_name, exist_ok=True) |
| return final_folder_name |
|
|
| def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: |
| """Combines multiple audio files into a single WAV file. |
| Args: |
| filenames (List[str]): A list of filenames of the audio files to be combined. |
| output_file (str): The name of the output WAV file. |
| Raises: |
| ValueError: If the input filenames list is empty. |
| RuntimeError: If the audio file combination fails. |
| """ |
| if not filenames: |
| raise ValueError("No input files provided") |
|
|
| try: |
| audio_segments = [] |
|
|
| for filename in filenames: |
| audio_segment = AudioSegment.from_mp3(filename) |
| audio_segments.append(audio_segment) |
|
|
| combined = sum(audio_segments, AudioSegment.empty()) |
|
|
| combined.export(output_file, format="wav") |
|
|
| for filename in filenames: |
| os.remove(filename) |
|
|
| except Exception as e: |
| raise RuntimeError(f"Failed to combine audio files: {e}") |
|
|
| async def run(self, script:PodScript ,voice_1: str, voice_2: str) -> str: |
| if self._config.mocked: |
| return './articles/mocked_java9_openai_tts.wav' |
|
|
| try: |
| audio_files, folder_name = await self._text_to_speech( |
| script, voice_1, voice_2 |
| ) |
| final_output = os.path.join(folder_name, "combined_output.wav") |
| self._combine_audio_files(audio_files, final_output) |
| return final_output |
| except Exception as e: |
| raise RuntimeError(f"Failed to convert article to podcast: {e}") |