import asyncio import atexit import json import logging import os import queue import sys import tempfile from datetime import datetime, timezone from pathlib import Path from threading import Lock, Semaphore, Thread from typing import Optional, Tuple import gradio as gr import numpy as np os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["OPENBLAS_NUM_THREADS"] = "4" os.environ["OMP_NUM_THREADS"] = "4" os.environ["MKL_NUM_THREADS"] = "4" DEFAULT_MODEL_REF = "openbmb/VoxCPM2" if ( os.environ.get("NANOVLLM_MODEL", "").strip() == "" and os.environ.get("NANOVLLM_MODEL_PATH", "").strip() == "" and os.environ.get("HF_REPO_ID", "").strip() == "" ): os.environ["HF_REPO_ID"] = DEFAULT_MODEL_REF logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) DEFAULT_ASR_MODEL_REF = "FunAudioLLM/SenseVoiceSmall" DEFAULT_ZIPENHANCER_MODEL = "iic/speech_zipenhancer_ans_multiloss_16k_base" MAX_REFERENCE_AUDIO_SECONDS = 50.0 _persistent_root = None _request_log_dir = None def _configure_cache_dirs() -> None: global _persistent_root, _request_log_dir persistent_root = Path(os.environ.get("SPACE_PERSISTENT_ROOT", "/data")).expanduser() if not persistent_root.exists(): logger.info("Persistent storage not detected. Request logs disabled.") return logs_dir = Path( os.environ.get("REQUEST_LOG_DIR", str(persistent_root / "logs")) ).expanduser() logs_dir.mkdir(parents=True, exist_ok=True) _persistent_root = persistent_root _request_log_dir = logs_dir logger.info(f"Persistent storage detected at {persistent_root}") logger.info(f"Request logs will be written to daily files under {_request_log_dir}") _configure_cache_dirs() _asr_model = None _voxcpm_server = None _model_info = None _denoiser = None _asr_lock = Lock() _server_lock = Lock() _prewarm_lock = Lock() _denoiser_lock = Lock() _denoise_semaphore = Semaphore(int(os.environ.get("DENOISE_MAX_CONCURRENT", "1"))) _prewarm_started = False _runtime_diag_logged = False _active_generation_requests = 0 _active_generation_lock = Lock() def _get_int_env(name: str, default: int) -> int: value = os.environ.get(name, "").strip() if not value: return default return int(value) def _get_float_env(name: str, default: float) -> float: value = os.environ.get(name, "").strip() if not value: return default return float(value) def _get_bool_env(name: str, default: bool) -> bool: value = os.environ.get(name, "").strip().lower() if not value: return default if value in {"1", "true", "yes", "on"}: return True if value in {"0", "false", "no", "off"}: return False raise ValueError(f"Invalid boolean env: {name}={value!r}") def _get_devices_env() -> list[int]: raw = os.environ.get("NANOVLLM_SERVERPOOL_DEVICES", "0").strip() values = [part.strip() for part in raw.split(",") if part.strip()] if not values: return [0] return [int(part) for part in values] def _resolve_model_ref() -> str: for env_name in ("NANOVLLM_MODEL", "NANOVLLM_MODEL_PATH", "HF_REPO_ID"): value = os.environ.get(env_name, "").strip() if value: return value return DEFAULT_MODEL_REF def _resolve_asr_model_ref() -> str: return DEFAULT_ASR_MODEL_REF def _resolve_zipenhancer_model_ref() -> str: for env_name in ("ZIPENHANCER_MODEL_ID", "ZIPENHANCER_MODEL_PATH"): value = os.environ.get(env_name, "").strip() if value: return value return DEFAULT_ZIPENHANCER_MODEL def _log_runtime_diagnostics_once() -> None: global _runtime_diag_logged if _runtime_diag_logged: return import torch info = { "python": sys.version.split()[0], "torch": torch.__version__, "cuda": torch.version.cuda, "cuda_available": torch.cuda.is_available(), "cuda_device_count": torch.cuda.device_count(), "cxx11abi": bool(torch._C._GLIBCXX_USE_CXX11_ABI), "model_ref": _resolve_model_ref(), "devices": _get_devices_env(), } logger.info(f"Runtime diagnostics: {info}") _runtime_diag_logged = True class _ZipEnhancer: def __init__(self, model_ref: str): import torchaudio from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks self._torchaudio = torchaudio self.model_ref = model_ref self._pipeline = pipeline(Tasks.acoustic_noise_suppression, model=model_ref) def _normalize_loudness(self, wav_path: str) -> None: audio, sr = self._torchaudio.load(wav_path) loudness = self._torchaudio.functional.loudness(audio, sr) normalized_audio = self._torchaudio.functional.gain(audio, -20 - loudness) self._torchaudio.save(wav_path, normalized_audio, sr) def enhance(self, input_path: str) -> str: with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: output_path = tmp_file.name try: self._pipeline(input_path, output_path=output_path) self._normalize_loudness(output_path) return output_path except Exception: if os.path.exists(output_path): try: os.unlink(output_path) except OSError: pass raise def get_denoiser(): global _denoiser if _denoiser is not None: return _denoiser with _denoiser_lock: if _denoiser is not None: return _denoiser model_ref = _resolve_zipenhancer_model_ref() logger.info(f"Loading ZipEnhancer denoiser from {model_ref} ...") _denoiser = _ZipEnhancer(model_ref) logger.info("ZipEnhancer denoiser loaded.") return _denoiser def _extract_asr_text(asr_result) -> str: if not asr_result: return "" first_item = asr_result[0] if isinstance(first_item, dict): return str(first_item.get("text", "")).split("|>")[-1].strip() return "" def _read_audio_bytes(audio_path: Optional[str]) -> tuple[bytes | None, str | None]: if audio_path is None or not audio_path.strip(): return None, None path = Path(audio_path) audio_format = path.suffix.lstrip(".").lower() or "wav" return path.read_bytes(), audio_format def _get_audio_duration_seconds(audio_path: str) -> float: import soundfile as sf info = sf.info(audio_path) return float(info.frames) / float(info.samplerate) def _begin_generation_request() -> None: global _active_generation_requests with _active_generation_lock: _active_generation_requests += 1 def _end_generation_request() -> None: global _active_generation_requests with _active_generation_lock: _active_generation_requests = max(0, _active_generation_requests - 1) def _get_active_generation_requests() -> int: with _active_generation_lock: return _active_generation_requests def _validate_reference_audio_duration( audio_path: str, request: Optional[gr.Request] = None ) -> None: duration_seconds = _get_audio_duration_seconds(audio_path) if duration_seconds > MAX_REFERENCE_AUDIO_SECONDS: raise gr.Error(_get_i18n_text("reference_audio_too_long_error", request)) def _prepare_audio_for_encoding( audio_path: Optional[str], *, denoise: bool, request: Optional[gr.Request] = None, ) -> tuple[bytes | None, str | None, Optional[str]]: if audio_path is None or not audio_path.strip(): return None, None, None _validate_reference_audio_duration(audio_path, request) source_path = audio_path temp_path = None if denoise: logger.info("Applying ZipEnhancer denoising to reference audio ...") acquired = _denoise_semaphore.acquire(timeout=30) if not acquired: raise gr.Error(_get_i18n_text("denoise_busy_error", request)) try: temp_path = get_denoiser().enhance(audio_path) source_path = temp_path except Exception as exc: logger.exception("ZipEnhancer denoising failed") raise gr.Error(_get_i18n_text("denoise_failed_error", request)) from exc finally: _denoise_semaphore.release() audio_bytes, audio_format = _read_audio_bytes(source_path) return audio_bytes, audio_format, temp_path def _safe_prompt_wav_recognition( use_prompt_text: bool, prompt_wav: Optional[str], request: Optional[gr.Request] = None ) -> str: try: return prompt_wav_recognition(use_prompt_text, prompt_wav) except Exception as exc: logger.warning(f"ASR recognition failed: {exc}") raise gr.Error(_get_i18n_text("asr_failed_error", request)) from exc def _stop_server_if_needed() -> None: global _voxcpm_server, _model_info if _voxcpm_server is None: return if isinstance(_voxcpm_server, _AsyncServerBridge): _voxcpm_server.stop() else: stop = getattr(_voxcpm_server, "stop", None) if callable(stop): try: stop() except Exception as exc: logger.warning(f"Failed to stop nano-vLLM server cleanly: {exc}") _voxcpm_server = None _model_info = None atexit.register(_stop_server_if_needed) # ---------- Inline i18n (en + zh-CN only) ---------- _USAGE_INSTRUCTIONS_EN = ( "**VoxCPM2 — Three Modes of Speech Generation:**\n\n" "🎨 **Voice Design** — Create a brand-new voice \n" "No reference audio required. Describe the desired voice characteristics " "(gender, age, tone, emotion, pace …) in **Control Instruction**, and VoxCPM2 " "will craft a unique voice from your description alone.\n\n" "🎛️ **Controllable Cloning** — Clone a voice with optional style guidance \n" "Upload a reference audio clip, then use **Control Instruction** to steer " "emotion, speaking pace, and overall style while preserving the original timbre.\n\n" "🎙️ **Ultimate Cloning** — Reproduce every vocal nuance through audio continuation \n" "Turn on **Ultimate Cloning Mode** and provide (or auto-transcribe) the reference audio's transcript. " "The model treats the reference clip as a spoken prefix and seamlessly **continues** from it, faithfully preserving every vocal detail." "Note: This mode will disable Control Instruction.\n\n" "### [A Voice Chef's Guide to VoxCPM2](https://voxcpm.readthedocs.io/en/latest/cookbook.html)" ) _EXAMPLES_FOOTER_EN = ( "---\n" "**💡 Voice Description Examples:** \n" "Try the following Control Instructions to explore different voices: \n\n" "**Example 1 — Gentle & Melancholic Girl** \n" '`Control Instruction`: *"A young girl with a soft, sweet voice. ' 'Speaks slowly with a melancholic, slightly tsundere tone."* \n' '`Target Text`: *"I never asked you to stay… It\'s not like I care or anything. ' 'But… why does it still hurt so much now that you\'re gone?"* \n\n' "**Example 2 — Laid-Back Surfer Dude** \n" '`Control Instruction`: *"Relaxed young male voice, slightly nasal, ' 'lazy drawl, very casual and chill."* \n' '`Target Text`: *"Dude, did you see that set? The waves out there are totally gnarly today. ' "Just catching barrels all morning — it's like, totally righteous, you know what I mean?\"*" ) _USAGE_INSTRUCTIONS_ZH = ( "**VoxCPM2 — 三种语音生成方式:**\n\n" "🎨 **声音设计(Voice Design)** \n" "无需参考音频。在 **Control Instruction** 中描述目标音色特征" "(性别、年龄、语气、情绪、语速等),VoxCPM2 即可为你从零创造独一无二的声音。\n\n" "🎛️ **可控克隆(Controllable Cloning)** \n" "上传参考音频,同时可选地使用 **Control Instruction** 来指定情绪、语速、风格等表达方式," "在保留原始音色的基础上灵活控制说话风格。\n\n" "🎙️ **极致克隆(Ultimate Cloning)** \n" "开启 **极致克隆模式** 并提供参考音频的文字内容(可自动识别)。" "模型会将参考音频视为已说出的前文,以**音频续写**的方式完整还原参考音频中的所有声音细节。" "注意:该模式与可控克隆模式互斥,将禁用Control Instruction。\n\n" "### [VoxCPM 2 最佳实践指南](https://voxcpm.readthedocs.io/en/latest/cookbook.html)\n\n" ) _EXAMPLES_FOOTER_ZH = ( "---\n" "**💡 声音描述示例(中英文均可):** \n\n" "**示例 1 — 深宫太后** \n" '`Control Instruction`: *"中老年女性,声音低沉阴冷,语速缓慢而有力,' '字字深思熟虑,带有深不可测的城府与威慑感。"* \n' '`Target Text`: *"哀家在这深宫待了四十年,什么风浪没见过?你以为瞒得过哀家?"* \n\n' "**示例 2 — 暴躁驾校教练** \n" '`Control Instruction`: *"暴躁的中年男声,语速快,充满无奈和愤怒"* \n' '`Target Text`: *"踩离合!踩刹车啊!你往哪儿开呢?前面是树你看不见吗?' '我教了你八百遍了,打死方向盘!你是不是想把车给我开到沟里去?"* \n\n' "---\n" "**🗣️ 方言生成指南:** \n" "要生成地道的方言语音,请在 **Target Text** 中直接使用方言词汇和句式," "并在 **Control Instruction** 中描述方言特征。 \n\n" "**示例 — 广东话** \n" '`Control Instruction`: *"粤语,中年男性,语气平淡"* \n' '✅ 正确(粤语表达):*"伙計,唔該一個A餐,凍奶茶少甜!"* \n' '❌ 错误(普通话原文):*"伙计,麻烦来一个A餐,冻奶茶少甜!"* \n\n' "**示例 — 河南话** \n" '`Control Instruction`: *"河南话,接地气的大叔"* \n' '✅ 正确(河南话表达):*"恁这是弄啥嘞?晌午吃啥饭?"* \n' '❌ 错误(普通话原文):*"你这是在干什么呢?中午吃什么饭?"* \n\n' "🤖 **小技巧:** 不知道方言怎么写?可以用豆包、DeepSeek、Kimi 等 AI 助手" "将普通话翻译为方言文本,再粘贴到 Target Text 中即可。 \n\n" ) _I18N_TRANSLATIONS = { "en": { "reference_audio_label": "🎤 Reference Audio (optional — upload for cloning)", "show_prompt_text_label": "🎙️ Ultimate Cloning Mode (transcript-guided cloning)", "show_prompt_text_info": "Auto-transcribes reference audio for every vocal nuance reproduced. Control Instruction will be disabled when active.", "prompt_text_label": "Transcript of Reference Audio (auto-filled via ASR, editable)", "prompt_text_placeholder": "The transcript of your reference audio will appear here …", "control_label": "🎛️ Control Instruction (optional — supports Chinese & English)", "control_placeholder": "e.g. A warm young woman / 年轻女性,温柔甜美 / Excited and fast-paced", "target_text_label": "✍️ Target Text — the content to speak", "generate_btn": "🔊 Generate Speech", "generated_audio_label": "Generated Audio", "advanced_settings_title": "⚙️ Advanced Settings", "ref_denoise_label": "Reference audio enhancement", "ref_denoise_info": "Apply ZipEnhancer denoising to the reference audio before cloning", "normalize_label": "Text normalization", "normalize_info": "Normalize numbers, dates, and abbreviations via wetext", "cfg_label": "CFG (guidance scale)", "cfg_info": "Higher → closer to the prompt / reference; lower → more creative variation", "reference_audio_too_long_error": "Reference audio is too long. Please upload audio no longer than 50 seconds.", "denoise_busy_error": "Too many reference-audio enhancement requests are running. Please try again in a moment.", "denoise_failed_error": "Reference audio enhancement failed. Please try disabling denoise or use a cleaner clip.", "backend_retry_error": "The backend is temporarily unstable. Please try again in a moment.", "asr_failed_error": "ASR failed. Please fill the transcript manually or try another reference audio.", "usage_instructions": _USAGE_INSTRUCTIONS_EN, "examples_footer": _EXAMPLES_FOOTER_EN, }, "zh-CN": { "reference_audio_label": "🎤 参考音频(可选 — 上传后用于克隆)", "show_prompt_text_label": "🎙️ 极致克隆模式(基于文本引导的极致克隆)", "show_prompt_text_info": "自动识别参考音频文本,完整还原音色、节奏、情感等全部声音细节。开启后 Control Instruction 将暂时禁用", "prompt_text_label": "参考音频内容文本(ASR 自动填充,可手动编辑)", "prompt_text_placeholder": "参考音频的文字内容将自动识别并显示在此处 …", "control_label": "🎛️ Control Instruction(可选 — 支持中英文描述)", "control_placeholder": "如:年轻女性,温柔甜美 / A warm young woman / 暴躁老哥,语速飞快", "target_text_label": "✍️ Target Text — 要合成的目标文本", "generate_btn": "🔊 开始生成", "generated_audio_label": "生成结果", "advanced_settings_title": "⚙️ 高级设置", "ref_denoise_label": "参考音频降噪增强", "ref_denoise_info": "克隆前使用 ZipEnhancer 对参考音频进行降噪处理", "normalize_label": "文本规范化", "normalize_info": "自动规范化数字、日期及缩写(基于 wetext)", "cfg_label": "CFG(引导强度)", "cfg_info": "数值越高 → 越贴合提示/参考音色;数值越低 → 生成风格更自由", "reference_audio_too_long_error": "参考音频太长了,请上传不超过 50 秒的音频。", "denoise_busy_error": "当前参考音频降噪请求过多,请稍后再试。", "denoise_failed_error": "参考音频降噪失败,请尝试关闭降噪或更换更干净的音频。", "backend_retry_error": "后端暂时不稳定,请稍后再试。", "asr_failed_error": "ASR 识别失败,请手动填写参考音频文本,或更换一段参考音频后重试。", "usage_instructions": _USAGE_INSTRUCTIONS_ZH, "examples_footer": _EXAMPLES_FOOTER_ZH, }, "zh-Hans": None, "zh": None, } _I18N_TRANSLATIONS["zh-Hans"] = _I18N_TRANSLATIONS["zh-CN"] _I18N_TRANSLATIONS["zh"] = _I18N_TRANSLATIONS["zh-CN"] for _d in _I18N_TRANSLATIONS.values(): if _d is not None: for _k, _v in _I18N_TRANSLATIONS["en"].items(): _d.setdefault(_k, _v) I18N = gr.I18n(**_I18N_TRANSLATIONS) def _resolve_ui_language(request: Optional[gr.Request] = None) -> str: if request is None: return "en" accept_language = str(request.headers.get("accept-language", "")).lower() if accept_language.startswith("zh"): return "zh-CN" return "en" def _get_i18n_text(key: str, request: Optional[gr.Request] = None) -> str: locale = _resolve_ui_language(request) return _I18N_TRANSLATIONS.get(locale, _I18N_TRANSLATIONS["en"]).get( key, _I18N_TRANSLATIONS["en"].get(key, key) ) def _append_request_log(payload: dict) -> None: if _request_log_dir is None: return now = datetime.now(timezone.utc) record = {"timestamp": now.isoformat(), **payload} log_path = _request_log_dir / f"{now.date().isoformat()}.jsonl" with log_path.open("a", encoding="utf-8") as fp: fp.write(json.dumps(record, ensure_ascii=False) + "\n") DEFAULT_TARGET_TEXT = ( "VoxCPM2 is a creative multilingual TTS model from ModelBest, " "designed to generate highly realistic speech." ) _CUSTOM_CSS = """ .logo-container { text-align: center; margin: 0.5rem 0 1rem 0; } .logo-container img { height: 80px; width: auto; max-width: 200px; display: inline-block; } /* Toggle switch style */ .switch-toggle { padding: 8px 12px; border-radius: 8px; background: var(--block-background-fill); } .switch-toggle input[type="checkbox"] { appearance: none; -webkit-appearance: none; width: 44px; height: 24px; background: #ccc; border-radius: 12px; position: relative; cursor: pointer; transition: background 0.3s ease; flex-shrink: 0; } .switch-toggle input[type="checkbox"]::after { content: ""; position: absolute; top: 2px; left: 2px; width: 20px; height: 20px; background: white; border-radius: 50%; transition: transform 0.3s ease; box-shadow: 0 1px 3px rgba(0,0,0,0.2); } .switch-toggle input[type="checkbox"]:checked { background: var(--color-accent); } .switch-toggle input[type="checkbox"]:checked::after { transform: translateX(20px); } """ _APP_THEME = gr.themes.Soft( primary_hue="blue", secondary_hue="gray", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"], ) def get_asr_model(): global _asr_model if _asr_model is not None: return _asr_model with _asr_lock: if _asr_model is not None: return _asr_model from funasr import AutoModel from huggingface_hub import snapshot_download device = os.environ.get("ASR_DEVICE", "cpu").strip() or "cpu" asr_model_ref = _resolve_asr_model_ref() logger.info(f"Downloading ASR model from Hugging Face: {asr_model_ref}") asr_model_path = snapshot_download(repo_id=asr_model_ref) logger.info(f"Loading ASR model on {device} ...") _asr_model = AutoModel( model=asr_model_path, disable_update=True, log_level="INFO", device=device, ) logger.info("ASR model loaded.") return _asr_model class _AsyncServerBridge: """Thread-safe bridge to AsyncVoxCPM2ServerPool running in a dedicated event loop.""" def __init__(self): self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[Thread] = None self._server_pool = None self._model_info: Optional[dict] = None self._closed = False def _run_loop(self) -> None: assert self._loop is not None asyncio.set_event_loop(self._loop) self._loop.run_forever() def start(self) -> None: _log_runtime_diagnostics_once() model_ref = _resolve_model_ref() logger.info(f"Loading nano-vLLM VoxCPM async server from {model_ref} ...") self._loop = asyncio.new_event_loop() self._thread = Thread(target=self._run_loop, name="nanovllm-event-loop", daemon=True) self._thread.start() try: async def _init(): from nanovllm_voxcpm import VoxCPM pool = VoxCPM.from_pretrained( model=model_ref, max_num_batched_tokens=_get_int_env("NANOVLLM_SERVERPOOL_MAX_NUM_BATCHED_TOKENS", 8192), max_num_seqs=_get_int_env("NANOVLLM_SERVERPOOL_MAX_NUM_SEQS", 16), max_model_len=_get_int_env("NANOVLLM_SERVERPOOL_MAX_MODEL_LEN", 4096), gpu_memory_utilization=_get_float_env("NANOVLLM_SERVERPOOL_GPU_MEMORY_UTILIZATION", 0.95), enforce_eager=_get_bool_env("NANOVLLM_SERVERPOOL_ENFORCE_EAGER", False), devices=_get_devices_env(), ) await pool.wait_for_ready() return pool future = asyncio.run_coroutine_threadsafe(_init(), self._loop) self._server_pool = future.result() info_future = asyncio.run_coroutine_threadsafe( self._server_pool.get_model_info(), self._loop ) self._model_info = info_future.result() logger.info(f"nano-vLLM async server loaded: {self._model_info}") except Exception: self.stop() raise def get_model_info(self) -> dict: assert self._model_info is not None return self._model_info def encode_latents(self, wav: bytes, wav_format: str, timeout: float = 120) -> bytes: if self._closed: raise RuntimeError("nano-vLLM bridge is closed") assert self._loop is not None and self._server_pool is not None future = asyncio.run_coroutine_threadsafe( self._server_pool.encode_latents(wav, wav_format), self._loop ) try: return future.result(timeout=timeout) finally: if not future.done(): future.cancel() def generate(self, timeout: float = 300, **kwargs): if self._closed: raise RuntimeError("nano-vLLM bridge is closed") assert self._loop is not None and self._server_pool is not None result_queue: queue.Queue = queue.Queue() import time as _time async def _drain(): try: async for chunk in self._server_pool.generate(**kwargs): result_queue.put(chunk) result_queue.put(None) except Exception as exc: result_queue.put(exc) deadline = _time.monotonic() + timeout future = asyncio.run_coroutine_threadsafe(_drain(), self._loop) try: while True: remaining = deadline - _time.monotonic() if remaining <= 0: raise TimeoutError(f"Generation exceeded {timeout}s timeout") try: item = result_queue.get(timeout=min(0.5, remaining)) except queue.Empty: if future.done(): exc = future.exception() if exc is not None: raise exc continue if item is None: break if isinstance(item, Exception): raise item yield item finally: if not future.done(): future.cancel() def stop(self) -> None: if self._closed: return self._closed = True try: if self._loop is not None and self._server_pool is not None: future = asyncio.run_coroutine_threadsafe(self._server_pool.stop(), self._loop) future.result(timeout=10) except Exception as exc: logger.warning(f"Failed to stop async server pool cleanly: {exc}") finally: if self._loop is not None: self._loop.call_soon_threadsafe(self._loop.stop) if self._thread is not None: self._thread.join(timeout=5) if ( self._loop is not None and not self._loop.is_closed() and (self._thread is None or not self._thread.is_alive()) ): self._loop.close() self._server_pool = None self._model_info = None self._thread = None self._loop = None def get_voxcpm_server() -> _AsyncServerBridge: global _voxcpm_server, _model_info if _voxcpm_server is not None: return _voxcpm_server with _server_lock: if _voxcpm_server is not None: return _voxcpm_server bridge = _AsyncServerBridge() bridge.start() _voxcpm_server = bridge _model_info = bridge.get_model_info() return _voxcpm_server def get_model_info() -> dict: global _model_info if _model_info is None: get_voxcpm_server() assert _model_info is not None return _model_info def _prewarm_backend() -> None: try: logger.info("Starting backend prewarm ...") get_voxcpm_server() logger.info("Backend prewarm completed.") except Exception as exc: logger.warning(f"Backend prewarm failed: {exc}") def _start_background_prewarm() -> None: global _prewarm_started if not _get_bool_env("NANOVLLM_PREWARM", True): return with _prewarm_lock: if _prewarm_started: return _prewarm_started = True Thread(target=_prewarm_backend, name="nanovllm-prewarm", daemon=True).start() # ---------- GPU-accelerated inference ---------- def prompt_wav_recognition(use_prompt_text: bool, prompt_wav: Optional[str]) -> str: if not use_prompt_text or prompt_wav is None or not prompt_wav.strip(): return "" asr_model = get_asr_model() res = asr_model.generate(input=prompt_wav, language="auto", use_itn=True) return _extract_asr_text(res) def _float_audio_to_int16(wav: np.ndarray) -> np.ndarray: clipped = np.clip(wav, -1.0, 1.0) return (clipped * 32767.0).astype(np.int16, copy=False) def _generate_tts_audio_once( text_input: str, control_instruction: str = "", reference_wav_path_input: Optional[str] = None, use_prompt_text: bool = False, prompt_text_input: str = "", cfg_value_input: float = 2.0, do_normalize: bool = True, denoise: bool = True, request: Optional[gr.Request] = None, ) -> Tuple[int, np.ndarray]: temp_audio_path = None try: server = get_voxcpm_server() model_info = get_model_info() text = (text_input or "").strip() if len(text) == 0: raise ValueError("Please input text to synthesize.") control = (control_instruction or "").strip() final_text = f"({control}){text}" if control and not use_prompt_text else text audio_bytes, audio_format, temp_audio_path = _prepare_audio_for_encoding( reference_wav_path_input, denoise=bool(denoise), request=request, ) prompt_text_clean = (prompt_text_input or "").strip() if use_prompt_text and audio_bytes is None: raise ValueError("Ultimate Cloning Mode requires a reference audio clip.") if use_prompt_text and not prompt_text_clean: raise ValueError( "Ultimate Cloning Mode requires a transcript. Please wait for ASR or fill it in manually." ) if not use_prompt_text: prompt_text_clean = "" if do_normalize: logger.info( "Ignoring normalize option: nano-vLLM backend does not support per-request text normalization." ) prompt_latents = None ref_audio_latents = None if audio_bytes is not None and audio_format is not None and use_prompt_text: logger.info(f"[Ultimate Cloning] encoding prompt audio as {audio_format}") prompt_latents = server.encode_latents(audio_bytes, audio_format) elif audio_bytes is not None and audio_format is not None: logger.info(f"[Controllable Cloning] encoding reference audio as {audio_format}") ref_audio_latents = server.encode_latents(audio_bytes, audio_format) if prompt_latents is not None: logger.info("[Ultimate Cloning] reference audio + transcript") elif ref_audio_latents is not None: logger.info("[Controllable Cloning] reference audio only") else: logger.info(f"[Voice Design] control: {control[:50] if control else 'None'}") chunks: list[np.ndarray] = [] logger.info(f"Generating: '{final_text[:80]}...'") for chunk in server.generate( target_text=final_text, prompt_latents=prompt_latents, prompt_text=prompt_text_clean if prompt_latents is not None else "", max_generate_length=_get_int_env("NANOVLLM_MAX_GENERATE_LENGTH", 2000), temperature=_get_float_env("NANOVLLM_TEMPERATURE", 1.0), cfg_value=float(cfg_value_input), ref_audio_latents=ref_audio_latents, ): chunks.append(chunk) if not chunks: raise RuntimeError("The model returned no audio chunks.") wav = np.concatenate(chunks, axis=0).astype(np.float32, copy=False) wav = _float_audio_to_int16(wav) return (int(model_info["sample_rate"]), wav) finally: if temp_audio_path and os.path.exists(temp_audio_path): try: os.unlink(temp_audio_path) except OSError: pass def generate_tts_audio( text_input: str, control_instruction: str = "", reference_wav_path_input: Optional[str] = None, use_prompt_text: bool = False, prompt_text_input: str = "", cfg_value_input: float = 2.0, do_normalize: bool = True, denoise: bool = True, request: Optional[gr.Request] = None, ) -> Tuple[int, np.ndarray]: _begin_generation_request() request_payload = { "event": "tts_request", "ui_language": _resolve_ui_language(request), "text": (text_input or "").strip(), "control_instruction": (control_instruction or "").strip(), "use_prompt_text": bool(use_prompt_text), "prompt_text": (prompt_text_input or "").strip(), "cfg_value": float(cfg_value_input), "do_normalize": bool(do_normalize), "denoise": bool(denoise), "has_reference_audio": bool(reference_wav_path_input and reference_wav_path_input.strip()), } if request_payload["has_reference_audio"]: try: request_payload["reference_audio_duration_seconds"] = round( _get_audio_duration_seconds(reference_wav_path_input), 3 ) except Exception as exc: request_payload["reference_audio_duration_error"] = str(exc) try: try: result = _generate_tts_audio_once( text_input=text_input, control_instruction=control_instruction, reference_wav_path_input=reference_wav_path_input, use_prompt_text=use_prompt_text, prompt_text_input=prompt_text_input, cfg_value_input=cfg_value_input, do_normalize=do_normalize, denoise=denoise, request=request, ) try: _append_request_log({**request_payload, "status": "success"}) except Exception as exc: logger.warning(f"Failed to append request log: {exc}") return result except (ValueError, gr.Error) as exc: try: _append_request_log( {**request_payload, "status": "rejected", "error": str(exc)} ) except Exception as log_exc: logger.warning(f"Failed to append request log: {log_exc}") if isinstance(exc, gr.Error): raise raise gr.Error(str(exc)) from exc except Exception as exc: logger.exception("Generation failed") try: _append_request_log({**request_payload, "status": "error", "error": str(exc)}) except Exception as log_exc: logger.warning(f"Failed to append request log: {log_exc}") active_requests = _get_active_generation_requests() if active_requests > 1: logger.warning( "Generation failed with %s active requests; skipping shared backend restart: %s", active_requests, exc, ) raise gr.Error(_get_i18n_text("backend_retry_error", request)) from exc logger.warning(f"Generation failed, restarting backend and retrying once: {exc}") with _server_lock: _stop_server_if_needed() try: result = _generate_tts_audio_once( text_input=text_input, control_instruction=control_instruction, reference_wav_path_input=reference_wav_path_input, use_prompt_text=use_prompt_text, prompt_text_input=prompt_text_input, cfg_value_input=cfg_value_input, do_normalize=do_normalize, denoise=denoise, request=request, ) try: _append_request_log({**request_payload, "status": "success_after_retry"}) except Exception as log_exc: logger.warning(f"Failed to append request log: {log_exc}") return result except Exception as retry_exc: logger.exception("Retry failed") try: _append_request_log( {**request_payload, "status": "retry_failed", "error": str(retry_exc)} ) except Exception as log_exc: logger.warning(f"Failed to append request log: {log_exc}") raise gr.Error(_get_i18n_text("backend_retry_error", request)) from retry_exc finally: _end_generation_request() # ---------- UI ---------- def create_demo_interface(): assets_dir = Path.cwd().absolute() / "assets" if assets_dir.exists(): gr.set_static_paths(paths=[assets_dir]) def _on_toggle_instant(checked): if checked: return ( gr.update(visible=True, value="", placeholder="Recognizing reference audio..."), gr.update(visible=False), ) return ( gr.update(visible=False), gr.update(visible=True, interactive=True), ) def _run_asr_if_needed(checked, audio_path, request: gr.Request = None): if not checked or not audio_path: return gr.update() logger.info("Running ASR on reference audio...") asr_text = _safe_prompt_wav_recognition(True, audio_path, request=request) logger.info(f"ASR result: {asr_text[:60]}...") return gr.update( value=asr_text, placeholder=_get_i18n_text("prompt_text_placeholder", request), ) with gr.Blocks() as interface: if (assets_dir / "voxcpm_logo.png").exists(): gr.HTML( '
' 'VoxCPM Logo' "
" ) gr.Markdown(I18N("usage_instructions")) with gr.Row(): with gr.Column(): reference_wav = gr.Audio( sources=["upload", "microphone"], type="filepath", label=I18N("reference_audio_label"), ) show_prompt_text = gr.Checkbox( value=False, label=I18N("show_prompt_text_label"), info=I18N("show_prompt_text_info"), elem_classes=["switch-toggle"], ) prompt_text = gr.Textbox( value="", label=I18N("prompt_text_label"), placeholder=I18N("prompt_text_placeholder"), lines=2, visible=False, ) control_instruction = gr.Textbox( value="", label=I18N("control_label"), placeholder=I18N("control_placeholder"), lines=2, ) text = gr.Textbox( value=DEFAULT_TARGET_TEXT, label=I18N("target_text_label"), lines=3, ) with gr.Accordion(I18N("advanced_settings_title"), open=False): DoDenoisePromptAudio = gr.Checkbox( value=False, label=I18N("ref_denoise_label"), elem_classes=["switch-toggle"], info=I18N("ref_denoise_info"), ) DoNormalizeText = gr.Checkbox( value=False, label=I18N("normalize_label"), elem_classes=["switch-toggle"], info=I18N("normalize_info"), ) cfg_value = gr.Slider( minimum=1.0, maximum=3.0, value=2.0, step=0.1, label=I18N("cfg_label"), info=I18N("cfg_info"), ) run_btn = gr.Button(I18N("generate_btn"), variant="primary", size="lg") with gr.Column(): audio_output = gr.Audio(label=I18N("generated_audio_label")) gr.Markdown(I18N("examples_footer")) show_prompt_text.change( fn=_on_toggle_instant, inputs=[show_prompt_text], outputs=[prompt_text, control_instruction], ).then( fn=_run_asr_if_needed, inputs=[show_prompt_text, reference_wav], outputs=[prompt_text], ) run_btn.click( fn=generate_tts_audio, inputs=[ text, control_instruction, reference_wav, show_prompt_text, prompt_text, cfg_value, DoNormalizeText, DoDenoisePromptAudio, ], outputs=[audio_output], show_progress=True, api_name="generate", ) return interface def run_demo( server_name: str = "0.0.0.0", server_port: int = 7860, show_error: bool = True ): interface = create_demo_interface() _start_background_prewarm() interface.queue( max_size=_get_int_env("GRADIO_QUEUE_MAX_SIZE", 10), default_concurrency_limit=_get_int_env("GRADIO_DEFAULT_CONCURRENCY_LIMIT", 4), ).launch( server_name=server_name, server_port=int(os.environ.get("PORT", server_port)), show_error=show_error, i18n=I18N, theme=_APP_THEME, css=_CUSTOM_CSS, ssr_mode=_get_bool_env("GRADIO_SSR_MODE", False), ) if __name__ == "__main__": run_demo()