| |
| """ |
| BeigeTTS - Standard Inference Script |
| Research release for high-quality neural speech synthesis |
| Based on BlandAI's production Khaki TTS system |
| """ |
|
|
| import torch |
| import soundfile as sf |
| import numpy as np |
| from neucodec import NeuCodec |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| import argparse |
| from typing import Optional, List, Tuple |
|
|
| |
| |
| |
|
|
| class TTSConfig: |
| """Configuration for BeigeTTS inference""" |
| |
| AUDIO_START_TOKEN = 262145 |
| AUDIO_END_TOKEN = 262146 |
| NEUCODEC_BASE_OFFSET = 262154 |
| NEUCODEC_VOCABULARY_SIZE = 65536 |
| AUDIO_TOKEN_MIN = NEUCODEC_BASE_OFFSET |
| AUDIO_TOKEN_MAX = NEUCODEC_BASE_OFFSET + NEUCODEC_VOCABULARY_SIZE |
|
|
| |
| DEFAULT_TEMPERATURE = 0.1 |
| DEFAULT_TOP_P = 0.97 |
| DEFAULT_MAX_TOKENS = 500 |
| SAMPLE_RATE = 24000 |
|
|
| |
| MAX_AUDIO_TOKENS = 1000 |
|
|
| |
| |
| |
|
|
| class BeigeTTS: |
| """BeigeTTS synthesis engine - research version of Khaki TTS""" |
|
|
| def __init__(self, model_path: str = "BlandAI/BeigeTTS", device: str = "auto"): |
| """Initialize BeigeTTS engine |
| |
| Args: |
| model_path: HuggingFace model path or local directory |
| device: Device for inference ("auto", "cuda", "cpu") |
| """ |
| self.config = TTSConfig() |
| self.device = self._setup_device(device) |
|
|
| print("Loading BeigeTTS model (research release)...") |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_path, |
| torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, |
| device_map="auto" if device == "auto" else None, |
| trust_remote_code=True, |
| ) |
| if device != "auto": |
| self.model = self.model.to(self.device) |
| self.model.eval() |
|
|
| print("Loading tokenizer...") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) |
| if self.tokenizer.pad_token is None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
| print("Loading NeuCodec...") |
| self.neucodec = NeuCodec.from_pretrained("neuphonic/neucodec") |
| self.neucodec.eval() |
| if self.device.type == "cuda": |
| self.neucodec = self.neucodec.to(self.device) |
|
|
| def _setup_device(self, device: str) -> torch.device: |
| """Setup compute device""" |
| if device == "auto": |
| return torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| return torch.device(device) |
|
|
| def synthesize( |
| self, |
| text: str, |
| temperature: float = None, |
| top_p: float = None, |
| max_tokens: int = None, |
| voice_prompt: Optional[str] = None |
| ) -> Tuple[np.ndarray, int]: |
| """Synthesize speech from text |
| |
| Note: Full Khaki system supports 57 languages, voice cloning, |
| and unlimited duration. This research release is English-only. |
| |
| Args: |
| text: Input text to synthesize |
| temperature: Sampling temperature (lower = more deterministic) |
| top_p: Nucleus sampling parameter |
| max_tokens: Maximum tokens to generate |
| voice_prompt: Optional voice conditioning (limited in BeigeTTS) |
| |
| Returns: |
| Tuple of (audio_array, sample_rate) |
| """ |
| |
| temperature = temperature or self.config.DEFAULT_TEMPERATURE |
| top_p = top_p or self.config.DEFAULT_TOP_P |
| max_tokens = max_tokens or self.config.DEFAULT_MAX_TOKENS |
|
|
| |
| prompt = self._format_prompt(text, voice_prompt) |
|
|
| |
| audio_tokens = self._generate_tokens(prompt, temperature, top_p, max_tokens) |
|
|
| if not audio_tokens: |
| raise ValueError("No audio tokens generated") |
|
|
| |
| audio = self._decode_audio(audio_tokens) |
|
|
| return audio, self.config.SAMPLE_RATE |
|
|
| def _format_prompt(self, text: str, voice_prompt: Optional[str] = None) -> str: |
| """Format text into model prompt""" |
| |
| base_prompt = f"<start_of_turn>user\n{text}<end_of_turn>\n<start_of_turn>model\n<start_of_speech>" |
|
|
| |
| if voice_prompt: |
| base_prompt = f"[Voice: {voice_prompt}]\n{base_prompt}" |
|
|
| return base_prompt |
|
|
| def _generate_tokens( |
| self, |
| prompt: str, |
| temperature: float, |
| top_p: float, |
| max_tokens: int |
| ) -> List[int]: |
| """Generate audio tokens from prompt""" |
|
|
| |
| inputs = self.tokenizer(prompt, return_tensors="pt") |
| input_ids = inputs.input_ids.to(self.model.device) |
|
|
| print(f"Generating audio tokens (temp={temperature}, top_p={top_p})...") |
|
|
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| input_ids, |
| max_new_tokens=max_tokens, |
| temperature=temperature, |
| top_p=top_p, |
| do_sample=True, |
| pad_token_id=self.tokenizer.pad_token_id, |
| eos_token_id=[self.tokenizer.eos_token_id, self.config.AUDIO_END_TOKEN], |
| ) |
|
|
| |
| generated = outputs[0, input_ids.shape[1]:].cpu().tolist() |
| audio_tokens = [] |
|
|
| for token_id in generated: |
| if self.config.AUDIO_TOKEN_MIN <= token_id < self.config.AUDIO_TOKEN_MAX: |
| audio_tokens.append(token_id - self.config.NEUCODEC_BASE_OFFSET) |
| elif token_id == self.config.AUDIO_END_TOKEN: |
| break |
|
|
| if len(audio_tokens) >= self.config.MAX_AUDIO_TOKENS: |
| print(f"Reached maximum audio length ({self.config.MAX_AUDIO_TOKENS} tokens)") |
| break |
|
|
| print(f"Generated {len(audio_tokens)} audio tokens") |
| return audio_tokens |
|
|
| def _decode_audio(self, audio_tokens: List[int]) -> np.ndarray: |
| """Decode audio tokens to waveform""" |
|
|
| |
| audio_array = np.array(audio_tokens, dtype=np.int32) |
| audio_array = np.clip(audio_array, 0, self.config.NEUCODEC_VOCABULARY_SIZE - 1) |
|
|
| |
| fsq_codes = torch.tensor(audio_array, dtype=torch.long) |
| fsq_codes = fsq_codes.unsqueeze(0).unsqueeze(1) |
|
|
| if self.device.type == "cuda": |
| fsq_codes = fsq_codes.to(self.device) |
|
|
| print(f"Decoding audio (shape: {fsq_codes.shape})...") |
|
|
| |
| with torch.no_grad(): |
| wav = self.neucodec.decode_code(fsq_codes).cpu() |
|
|
| |
| if wav.dim() == 3: |
| wav = wav[0, 0] |
| elif wav.dim() == 2: |
| wav = wav[0] |
|
|
| wav = wav.numpy() |
|
|
| |
| if np.abs(wav).max() > 0: |
| wav = wav / np.abs(wav).max() * 0.95 |
|
|
| return wav |
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="BeigeTTS Synthesis (Research Release)") |
| parser.add_argument("text", type=str, help="Text to synthesize") |
| parser.add_argument("-o", "--output", type=str, default="output.wav", help="Output WAV file") |
| parser.add_argument("-m", "--model", type=str, default="BlandAI/BeigeTTS", help="Model path") |
| parser.add_argument("-t", "--temperature", type=float, default=0.1, help="Sampling temperature") |
| parser.add_argument("-p", "--top-p", type=float, default=0.97, help="Top-p sampling") |
| parser.add_argument("--max-tokens", type=int, default=500, help="Maximum tokens to generate") |
| parser.add_argument("--voice", type=str, help="Voice conditioning prompt") |
| parser.add_argument("--device", type=str, default="auto", help="Device (auto/cuda/cpu)") |
|
|
| args = parser.parse_args() |
|
|
| |
| tts = BeigeTTS(model_path=args.model, device=args.device) |
|
|
| |
| try: |
| audio, sr = tts.synthesize( |
| text=args.text, |
| temperature=args.temperature, |
| top_p=args.top_p, |
| max_tokens=args.max_tokens, |
| voice_prompt=args.voice |
| ) |
|
|
| |
| sf.write(args.output, audio, sr) |
| duration = len(audio) / sr |
| print(f"β
Saved {duration:.1f}s of audio to {args.output}") |
| print("Note: This is a research release. Production Khaki TTS supports 57 languages and unlimited duration.") |
|
|
| except Exception as e: |
| print(f"β Synthesis failed: {e}") |
| return 1 |
|
|
| return 0 |
|
|
| if __name__ == "__main__": |
| exit(main()) |
|
|