VoxCPM-Demo / app.py
刘鑫
fix: harden async nanovllm demo stability
44d9bcd
import asyncio
import atexit
import json
import logging
import os
import queue
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock, Semaphore, Thread
from typing import Optional, Tuple
import gradio as gr
import numpy as np
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["OPENBLAS_NUM_THREADS"] = "4"
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["MKL_NUM_THREADS"] = "4"
DEFAULT_MODEL_REF = "openbmb/VoxCPM2"
if (
os.environ.get("NANOVLLM_MODEL", "").strip() == ""
and os.environ.get("NANOVLLM_MODEL_PATH", "").strip() == ""
and os.environ.get("HF_REPO_ID", "").strip() == ""
):
os.environ["HF_REPO_ID"] = DEFAULT_MODEL_REF
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
DEFAULT_ASR_MODEL_REF = "FunAudioLLM/SenseVoiceSmall"
DEFAULT_ZIPENHANCER_MODEL = "iic/speech_zipenhancer_ans_multiloss_16k_base"
MAX_REFERENCE_AUDIO_SECONDS = 50.0
_persistent_root = None
_request_log_dir = None
def _configure_cache_dirs() -> None:
global _persistent_root, _request_log_dir
persistent_root = Path(os.environ.get("SPACE_PERSISTENT_ROOT", "/data")).expanduser()
if not persistent_root.exists():
logger.info("Persistent storage not detected. Request logs disabled.")
return
logs_dir = Path(
os.environ.get("REQUEST_LOG_DIR", str(persistent_root / "logs"))
).expanduser()
logs_dir.mkdir(parents=True, exist_ok=True)
_persistent_root = persistent_root
_request_log_dir = logs_dir
logger.info(f"Persistent storage detected at {persistent_root}")
logger.info(f"Request logs will be written to daily files under {_request_log_dir}")
_configure_cache_dirs()
_asr_model = None
_voxcpm_server = None
_model_info = None
_denoiser = None
_asr_lock = Lock()
_server_lock = Lock()
_prewarm_lock = Lock()
_denoiser_lock = Lock()
_denoise_semaphore = Semaphore(int(os.environ.get("DENOISE_MAX_CONCURRENT", "1")))
_prewarm_started = False
_runtime_diag_logged = False
_active_generation_requests = 0
_active_generation_lock = Lock()
def _get_int_env(name: str, default: int) -> int:
value = os.environ.get(name, "").strip()
if not value:
return default
return int(value)
def _get_float_env(name: str, default: float) -> float:
value = os.environ.get(name, "").strip()
if not value:
return default
return float(value)
def _get_bool_env(name: str, default: bool) -> bool:
value = os.environ.get(name, "").strip().lower()
if not value:
return default
if value in {"1", "true", "yes", "on"}:
return True
if value in {"0", "false", "no", "off"}:
return False
raise ValueError(f"Invalid boolean env: {name}={value!r}")
def _get_devices_env() -> list[int]:
raw = os.environ.get("NANOVLLM_SERVERPOOL_DEVICES", "0").strip()
values = [part.strip() for part in raw.split(",") if part.strip()]
if not values:
return [0]
return [int(part) for part in values]
def _resolve_model_ref() -> str:
for env_name in ("NANOVLLM_MODEL", "NANOVLLM_MODEL_PATH", "HF_REPO_ID"):
value = os.environ.get(env_name, "").strip()
if value:
return value
return DEFAULT_MODEL_REF
def _resolve_asr_model_ref() -> str:
return DEFAULT_ASR_MODEL_REF
def _resolve_zipenhancer_model_ref() -> str:
for env_name in ("ZIPENHANCER_MODEL_ID", "ZIPENHANCER_MODEL_PATH"):
value = os.environ.get(env_name, "").strip()
if value:
return value
return DEFAULT_ZIPENHANCER_MODEL
def _log_runtime_diagnostics_once() -> None:
global _runtime_diag_logged
if _runtime_diag_logged:
return
import torch
info = {
"python": sys.version.split()[0],
"torch": torch.__version__,
"cuda": torch.version.cuda,
"cuda_available": torch.cuda.is_available(),
"cuda_device_count": torch.cuda.device_count(),
"cxx11abi": bool(torch._C._GLIBCXX_USE_CXX11_ABI),
"model_ref": _resolve_model_ref(),
"devices": _get_devices_env(),
}
logger.info(f"Runtime diagnostics: {info}")
_runtime_diag_logged = True
class _ZipEnhancer:
def __init__(self, model_ref: str):
import torchaudio
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
self._torchaudio = torchaudio
self.model_ref = model_ref
self._pipeline = pipeline(Tasks.acoustic_noise_suppression, model=model_ref)
def _normalize_loudness(self, wav_path: str) -> None:
audio, sr = self._torchaudio.load(wav_path)
loudness = self._torchaudio.functional.loudness(audio, sr)
normalized_audio = self._torchaudio.functional.gain(audio, -20 - loudness)
self._torchaudio.save(wav_path, normalized_audio, sr)
def enhance(self, input_path: str) -> str:
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
output_path = tmp_file.name
try:
self._pipeline(input_path, output_path=output_path)
self._normalize_loudness(output_path)
return output_path
except Exception:
if os.path.exists(output_path):
try:
os.unlink(output_path)
except OSError:
pass
raise
def get_denoiser():
global _denoiser
if _denoiser is not None:
return _denoiser
with _denoiser_lock:
if _denoiser is not None:
return _denoiser
model_ref = _resolve_zipenhancer_model_ref()
logger.info(f"Loading ZipEnhancer denoiser from {model_ref} ...")
_denoiser = _ZipEnhancer(model_ref)
logger.info("ZipEnhancer denoiser loaded.")
return _denoiser
def _extract_asr_text(asr_result) -> str:
if not asr_result:
return ""
first_item = asr_result[0]
if isinstance(first_item, dict):
return str(first_item.get("text", "")).split("|>")[-1].strip()
return ""
def _read_audio_bytes(audio_path: Optional[str]) -> tuple[bytes | None, str | None]:
if audio_path is None or not audio_path.strip():
return None, None
path = Path(audio_path)
audio_format = path.suffix.lstrip(".").lower() or "wav"
return path.read_bytes(), audio_format
def _get_audio_duration_seconds(audio_path: str) -> float:
import soundfile as sf
info = sf.info(audio_path)
return float(info.frames) / float(info.samplerate)
def _begin_generation_request() -> None:
global _active_generation_requests
with _active_generation_lock:
_active_generation_requests += 1
def _end_generation_request() -> None:
global _active_generation_requests
with _active_generation_lock:
_active_generation_requests = max(0, _active_generation_requests - 1)
def _get_active_generation_requests() -> int:
with _active_generation_lock:
return _active_generation_requests
def _validate_reference_audio_duration(
audio_path: str, request: Optional[gr.Request] = None
) -> None:
duration_seconds = _get_audio_duration_seconds(audio_path)
if duration_seconds > MAX_REFERENCE_AUDIO_SECONDS:
raise gr.Error(_get_i18n_text("reference_audio_too_long_error", request))
def _prepare_audio_for_encoding(
audio_path: Optional[str],
*,
denoise: bool,
request: Optional[gr.Request] = None,
) -> tuple[bytes | None, str | None, Optional[str]]:
if audio_path is None or not audio_path.strip():
return None, None, None
_validate_reference_audio_duration(audio_path, request)
source_path = audio_path
temp_path = None
if denoise:
logger.info("Applying ZipEnhancer denoising to reference audio ...")
acquired = _denoise_semaphore.acquire(timeout=30)
if not acquired:
raise gr.Error(_get_i18n_text("denoise_busy_error", request))
try:
temp_path = get_denoiser().enhance(audio_path)
source_path = temp_path
except Exception as exc:
logger.exception("ZipEnhancer denoising failed")
raise gr.Error(_get_i18n_text("denoise_failed_error", request)) from exc
finally:
_denoise_semaphore.release()
audio_bytes, audio_format = _read_audio_bytes(source_path)
return audio_bytes, audio_format, temp_path
def _safe_prompt_wav_recognition(
use_prompt_text: bool, prompt_wav: Optional[str], request: Optional[gr.Request] = None
) -> str:
try:
return prompt_wav_recognition(use_prompt_text, prompt_wav)
except Exception as exc:
logger.warning(f"ASR recognition failed: {exc}")
raise gr.Error(_get_i18n_text("asr_failed_error", request)) from exc
def _stop_server_if_needed() -> None:
global _voxcpm_server, _model_info
if _voxcpm_server is None:
return
if isinstance(_voxcpm_server, _AsyncServerBridge):
_voxcpm_server.stop()
else:
stop = getattr(_voxcpm_server, "stop", None)
if callable(stop):
try:
stop()
except Exception as exc:
logger.warning(f"Failed to stop nano-vLLM server cleanly: {exc}")
_voxcpm_server = None
_model_info = None
atexit.register(_stop_server_if_needed)
# ---------- Inline i18n (en + zh-CN only) ----------
_USAGE_INSTRUCTIONS_EN = (
"**VoxCPM2 — Three Modes of Speech Generation:**\n\n"
"🎨 **Voice Design** — Create a brand-new voice \n"
"No reference audio required. Describe the desired voice characteristics "
"(gender, age, tone, emotion, pace …) in **Control Instruction**, and VoxCPM2 "
"will craft a unique voice from your description alone.\n\n"
"🎛️ **Controllable Cloning** — Clone a voice with optional style guidance \n"
"Upload a reference audio clip, then use **Control Instruction** to steer "
"emotion, speaking pace, and overall style while preserving the original timbre.\n\n"
"🎙️ **Ultimate Cloning** — Reproduce every vocal nuance through audio continuation \n"
"Turn on **Ultimate Cloning Mode** and provide (or auto-transcribe) the reference audio's transcript. "
"The model treats the reference clip as a spoken prefix and seamlessly **continues** from it, faithfully preserving every vocal detail."
"Note: This mode will disable Control Instruction.\n\n"
"### [A Voice Chef's Guide to VoxCPM2](https://voxcpm.readthedocs.io/en/latest/cookbook.html)"
)
_EXAMPLES_FOOTER_EN = (
"---\n"
"**💡 Voice Description Examples:** \n"
"Try the following Control Instructions to explore different voices: \n\n"
"**Example 1 — Gentle & Melancholic Girl** \n"
'`Control Instruction`: *"A young girl with a soft, sweet voice. '
'Speaks slowly with a melancholic, slightly tsundere tone."* \n'
'`Target Text`: *"I never asked you to stay… It\'s not like I care or anything. '
'But… why does it still hurt so much now that you\'re gone?"* \n\n'
"**Example 2 — Laid-Back Surfer Dude** \n"
'`Control Instruction`: *"Relaxed young male voice, slightly nasal, '
'lazy drawl, very casual and chill."* \n'
'`Target Text`: *"Dude, did you see that set? The waves out there are totally gnarly today. '
"Just catching barrels all morning — it's like, totally righteous, you know what I mean?\"*"
)
_USAGE_INSTRUCTIONS_ZH = (
"**VoxCPM2 — 三种语音生成方式:**\n\n"
"🎨 **声音设计(Voice Design)** \n"
"无需参考音频。在 **Control Instruction** 中描述目标音色特征"
"(性别、年龄、语气、情绪、语速等),VoxCPM2 即可为你从零创造独一无二的声音。\n\n"
"🎛️ **可控克隆(Controllable Cloning)** \n"
"上传参考音频,同时可选地使用 **Control Instruction** 来指定情绪、语速、风格等表达方式,"
"在保留原始音色的基础上灵活控制说话风格。\n\n"
"🎙️ **极致克隆(Ultimate Cloning)** \n"
"开启 **极致克隆模式** 并提供参考音频的文字内容(可自动识别)。"
"模型会将参考音频视为已说出的前文,以**音频续写**的方式完整还原参考音频中的所有声音细节。"
"注意:该模式与可控克隆模式互斥,将禁用Control Instruction。\n\n"
"### [VoxCPM 2 最佳实践指南](https://voxcpm.readthedocs.io/en/latest/cookbook.html)\n\n"
)
_EXAMPLES_FOOTER_ZH = (
"---\n"
"**💡 声音描述示例(中英文均可):** \n\n"
"**示例 1 — 深宫太后** \n"
'`Control Instruction`: *"中老年女性,声音低沉阴冷,语速缓慢而有力,'
'字字深思熟虑,带有深不可测的城府与威慑感。"* \n'
'`Target Text`: *"哀家在这深宫待了四十年,什么风浪没见过?你以为瞒得过哀家?"* \n\n'
"**示例 2 — 暴躁驾校教练** \n"
'`Control Instruction`: *"暴躁的中年男声,语速快,充满无奈和愤怒"* \n'
'`Target Text`: *"踩离合!踩刹车啊!你往哪儿开呢?前面是树你看不见吗?'
'我教了你八百遍了,打死方向盘!你是不是想把车给我开到沟里去?"* \n\n'
"---\n"
"**🗣️ 方言生成指南:** \n"
"要生成地道的方言语音,请在 **Target Text** 中直接使用方言词汇和句式,"
"并在 **Control Instruction** 中描述方言特征。 \n\n"
"**示例 — 广东话** \n"
'`Control Instruction`: *"粤语,中年男性,语气平淡"* \n'
'✅ 正确(粤语表达):*"伙計,唔該一個A餐,凍奶茶少甜!"* \n'
'❌ 错误(普通话原文):*"伙计,麻烦来一个A餐,冻奶茶少甜!"* \n\n'
"**示例 — 河南话** \n"
'`Control Instruction`: *"河南话,接地气的大叔"* \n'
'✅ 正确(河南话表达):*"恁这是弄啥嘞?晌午吃啥饭?"* \n'
'❌ 错误(普通话原文):*"你这是在干什么呢?中午吃什么饭?"* \n\n'
"🤖 **小技巧:** 不知道方言怎么写?可以用豆包、DeepSeek、Kimi 等 AI 助手"
"将普通话翻译为方言文本,再粘贴到 Target Text 中即可。 \n\n"
)
_I18N_TRANSLATIONS = {
"en": {
"reference_audio_label": "🎤 Reference Audio (optional — upload for cloning)",
"show_prompt_text_label": "🎙️ Ultimate Cloning Mode (transcript-guided cloning)",
"show_prompt_text_info": "Auto-transcribes reference audio for every vocal nuance reproduced. Control Instruction will be disabled when active.",
"prompt_text_label": "Transcript of Reference Audio (auto-filled via ASR, editable)",
"prompt_text_placeholder": "The transcript of your reference audio will appear here …",
"control_label": "🎛️ Control Instruction (optional — supports Chinese & English)",
"control_placeholder": "e.g. A warm young woman / 年轻女性,温柔甜美 / Excited and fast-paced",
"target_text_label": "✍️ Target Text — the content to speak",
"generate_btn": "🔊 Generate Speech",
"generated_audio_label": "Generated Audio",
"advanced_settings_title": "⚙️ Advanced Settings",
"ref_denoise_label": "Reference audio enhancement",
"ref_denoise_info": "Apply ZipEnhancer denoising to the reference audio before cloning",
"normalize_label": "Text normalization",
"normalize_info": "Normalize numbers, dates, and abbreviations via wetext",
"cfg_label": "CFG (guidance scale)",
"cfg_info": "Higher → closer to the prompt / reference; lower → more creative variation",
"reference_audio_too_long_error": "Reference audio is too long. Please upload audio no longer than 50 seconds.",
"denoise_busy_error": "Too many reference-audio enhancement requests are running. Please try again in a moment.",
"denoise_failed_error": "Reference audio enhancement failed. Please try disabling denoise or use a cleaner clip.",
"backend_retry_error": "The backend is temporarily unstable. Please try again in a moment.",
"asr_failed_error": "ASR failed. Please fill the transcript manually or try another reference audio.",
"usage_instructions": _USAGE_INSTRUCTIONS_EN,
"examples_footer": _EXAMPLES_FOOTER_EN,
},
"zh-CN": {
"reference_audio_label": "🎤 参考音频(可选 — 上传后用于克隆)",
"show_prompt_text_label": "🎙️ 极致克隆模式(基于文本引导的极致克隆)",
"show_prompt_text_info": "自动识别参考音频文本,完整还原音色、节奏、情感等全部声音细节。开启后 Control Instruction 将暂时禁用",
"prompt_text_label": "参考音频内容文本(ASR 自动填充,可手动编辑)",
"prompt_text_placeholder": "参考音频的文字内容将自动识别并显示在此处 …",
"control_label": "🎛️ Control Instruction(可选 — 支持中英文描述)",
"control_placeholder": "如:年轻女性,温柔甜美 / A warm young woman / 暴躁老哥,语速飞快",
"target_text_label": "✍️ Target Text — 要合成的目标文本",
"generate_btn": "🔊 开始生成",
"generated_audio_label": "生成结果",
"advanced_settings_title": "⚙️ 高级设置",
"ref_denoise_label": "参考音频降噪增强",
"ref_denoise_info": "克隆前使用 ZipEnhancer 对参考音频进行降噪处理",
"normalize_label": "文本规范化",
"normalize_info": "自动规范化数字、日期及缩写(基于 wetext)",
"cfg_label": "CFG(引导强度)",
"cfg_info": "数值越高 → 越贴合提示/参考音色;数值越低 → 生成风格更自由",
"reference_audio_too_long_error": "参考音频太长了,请上传不超过 50 秒的音频。",
"denoise_busy_error": "当前参考音频降噪请求过多,请稍后再试。",
"denoise_failed_error": "参考音频降噪失败,请尝试关闭降噪或更换更干净的音频。",
"backend_retry_error": "后端暂时不稳定,请稍后再试。",
"asr_failed_error": "ASR 识别失败,请手动填写参考音频文本,或更换一段参考音频后重试。",
"usage_instructions": _USAGE_INSTRUCTIONS_ZH,
"examples_footer": _EXAMPLES_FOOTER_ZH,
},
"zh-Hans": None,
"zh": None,
}
_I18N_TRANSLATIONS["zh-Hans"] = _I18N_TRANSLATIONS["zh-CN"]
_I18N_TRANSLATIONS["zh"] = _I18N_TRANSLATIONS["zh-CN"]
for _d in _I18N_TRANSLATIONS.values():
if _d is not None:
for _k, _v in _I18N_TRANSLATIONS["en"].items():
_d.setdefault(_k, _v)
I18N = gr.I18n(**_I18N_TRANSLATIONS)
def _resolve_ui_language(request: Optional[gr.Request] = None) -> str:
if request is None:
return "en"
accept_language = str(request.headers.get("accept-language", "")).lower()
if accept_language.startswith("zh"):
return "zh-CN"
return "en"
def _get_i18n_text(key: str, request: Optional[gr.Request] = None) -> str:
locale = _resolve_ui_language(request)
return _I18N_TRANSLATIONS.get(locale, _I18N_TRANSLATIONS["en"]).get(
key, _I18N_TRANSLATIONS["en"].get(key, key)
)
def _append_request_log(payload: dict) -> None:
if _request_log_dir is None:
return
now = datetime.now(timezone.utc)
record = {"timestamp": now.isoformat(), **payload}
log_path = _request_log_dir / f"{now.date().isoformat()}.jsonl"
with log_path.open("a", encoding="utf-8") as fp:
fp.write(json.dumps(record, ensure_ascii=False) + "\n")
DEFAULT_TARGET_TEXT = (
"VoxCPM2 is a creative multilingual TTS model from ModelBest, "
"designed to generate highly realistic speech."
)
_CUSTOM_CSS = """
.logo-container {
text-align: center;
margin: 0.5rem 0 1rem 0;
}
.logo-container img {
height: 80px;
width: auto;
max-width: 200px;
display: inline-block;
}
/* Toggle switch style */
.switch-toggle {
padding: 8px 12px;
border-radius: 8px;
background: var(--block-background-fill);
}
.switch-toggle input[type="checkbox"] {
appearance: none;
-webkit-appearance: none;
width: 44px;
height: 24px;
background: #ccc;
border-radius: 12px;
position: relative;
cursor: pointer;
transition: background 0.3s ease;
flex-shrink: 0;
}
.switch-toggle input[type="checkbox"]::after {
content: "";
position: absolute;
top: 2px;
left: 2px;
width: 20px;
height: 20px;
background: white;
border-radius: 50%;
transition: transform 0.3s ease;
box-shadow: 0 1px 3px rgba(0,0,0,0.2);
}
.switch-toggle input[type="checkbox"]:checked {
background: var(--color-accent);
}
.switch-toggle input[type="checkbox"]:checked::after {
transform: translateX(20px);
}
"""
_APP_THEME = gr.themes.Soft(
primary_hue="blue",
secondary_hue="gray",
neutral_hue="slate",
font=[gr.themes.GoogleFont("Inter"), "Arial", "sans-serif"],
)
def get_asr_model():
global _asr_model
if _asr_model is not None:
return _asr_model
with _asr_lock:
if _asr_model is not None:
return _asr_model
from funasr import AutoModel
from huggingface_hub import snapshot_download
device = os.environ.get("ASR_DEVICE", "cpu").strip() or "cpu"
asr_model_ref = _resolve_asr_model_ref()
logger.info(f"Downloading ASR model from Hugging Face: {asr_model_ref}")
asr_model_path = snapshot_download(repo_id=asr_model_ref)
logger.info(f"Loading ASR model on {device} ...")
_asr_model = AutoModel(
model=asr_model_path,
disable_update=True,
log_level="INFO",
device=device,
)
logger.info("ASR model loaded.")
return _asr_model
class _AsyncServerBridge:
"""Thread-safe bridge to AsyncVoxCPM2ServerPool running in a dedicated event loop."""
def __init__(self):
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[Thread] = None
self._server_pool = None
self._model_info: Optional[dict] = None
self._closed = False
def _run_loop(self) -> None:
assert self._loop is not None
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def start(self) -> None:
_log_runtime_diagnostics_once()
model_ref = _resolve_model_ref()
logger.info(f"Loading nano-vLLM VoxCPM async server from {model_ref} ...")
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._run_loop, name="nanovllm-event-loop", daemon=True)
self._thread.start()
try:
async def _init():
from nanovllm_voxcpm import VoxCPM
pool = VoxCPM.from_pretrained(
model=model_ref,
max_num_batched_tokens=_get_int_env("NANOVLLM_SERVERPOOL_MAX_NUM_BATCHED_TOKENS", 8192),
max_num_seqs=_get_int_env("NANOVLLM_SERVERPOOL_MAX_NUM_SEQS", 16),
max_model_len=_get_int_env("NANOVLLM_SERVERPOOL_MAX_MODEL_LEN", 4096),
gpu_memory_utilization=_get_float_env("NANOVLLM_SERVERPOOL_GPU_MEMORY_UTILIZATION", 0.95),
enforce_eager=_get_bool_env("NANOVLLM_SERVERPOOL_ENFORCE_EAGER", False),
devices=_get_devices_env(),
)
await pool.wait_for_ready()
return pool
future = asyncio.run_coroutine_threadsafe(_init(), self._loop)
self._server_pool = future.result()
info_future = asyncio.run_coroutine_threadsafe(
self._server_pool.get_model_info(), self._loop
)
self._model_info = info_future.result()
logger.info(f"nano-vLLM async server loaded: {self._model_info}")
except Exception:
self.stop()
raise
def get_model_info(self) -> dict:
assert self._model_info is not None
return self._model_info
def encode_latents(self, wav: bytes, wav_format: str, timeout: float = 120) -> bytes:
if self._closed:
raise RuntimeError("nano-vLLM bridge is closed")
assert self._loop is not None and self._server_pool is not None
future = asyncio.run_coroutine_threadsafe(
self._server_pool.encode_latents(wav, wav_format), self._loop
)
try:
return future.result(timeout=timeout)
finally:
if not future.done():
future.cancel()
def generate(self, timeout: float = 300, **kwargs):
if self._closed:
raise RuntimeError("nano-vLLM bridge is closed")
assert self._loop is not None and self._server_pool is not None
result_queue: queue.Queue = queue.Queue()
import time as _time
async def _drain():
try:
async for chunk in self._server_pool.generate(**kwargs):
result_queue.put(chunk)
result_queue.put(None)
except Exception as exc:
result_queue.put(exc)
deadline = _time.monotonic() + timeout
future = asyncio.run_coroutine_threadsafe(_drain(), self._loop)
try:
while True:
remaining = deadline - _time.monotonic()
if remaining <= 0:
raise TimeoutError(f"Generation exceeded {timeout}s timeout")
try:
item = result_queue.get(timeout=min(0.5, remaining))
except queue.Empty:
if future.done():
exc = future.exception()
if exc is not None:
raise exc
continue
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
if not future.done():
future.cancel()
def stop(self) -> None:
if self._closed:
return
self._closed = True
try:
if self._loop is not None and self._server_pool is not None:
future = asyncio.run_coroutine_threadsafe(self._server_pool.stop(), self._loop)
future.result(timeout=10)
except Exception as exc:
logger.warning(f"Failed to stop async server pool cleanly: {exc}")
finally:
if self._loop is not None:
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread is not None:
self._thread.join(timeout=5)
if (
self._loop is not None
and not self._loop.is_closed()
and (self._thread is None or not self._thread.is_alive())
):
self._loop.close()
self._server_pool = None
self._model_info = None
self._thread = None
self._loop = None
def get_voxcpm_server() -> _AsyncServerBridge:
global _voxcpm_server, _model_info
if _voxcpm_server is not None:
return _voxcpm_server
with _server_lock:
if _voxcpm_server is not None:
return _voxcpm_server
bridge = _AsyncServerBridge()
bridge.start()
_voxcpm_server = bridge
_model_info = bridge.get_model_info()
return _voxcpm_server
def get_model_info() -> dict:
global _model_info
if _model_info is None:
get_voxcpm_server()
assert _model_info is not None
return _model_info
def _prewarm_backend() -> None:
try:
logger.info("Starting backend prewarm ...")
get_voxcpm_server()
logger.info("Backend prewarm completed.")
except Exception as exc:
logger.warning(f"Backend prewarm failed: {exc}")
def _start_background_prewarm() -> None:
global _prewarm_started
if not _get_bool_env("NANOVLLM_PREWARM", True):
return
with _prewarm_lock:
if _prewarm_started:
return
_prewarm_started = True
Thread(target=_prewarm_backend, name="nanovllm-prewarm", daemon=True).start()
# ---------- GPU-accelerated inference ----------
def prompt_wav_recognition(use_prompt_text: bool, prompt_wav: Optional[str]) -> str:
if not use_prompt_text or prompt_wav is None or not prompt_wav.strip():
return ""
asr_model = get_asr_model()
res = asr_model.generate(input=prompt_wav, language="auto", use_itn=True)
return _extract_asr_text(res)
def _float_audio_to_int16(wav: np.ndarray) -> np.ndarray:
clipped = np.clip(wav, -1.0, 1.0)
return (clipped * 32767.0).astype(np.int16, copy=False)
def _generate_tts_audio_once(
text_input: str,
control_instruction: str = "",
reference_wav_path_input: Optional[str] = None,
use_prompt_text: bool = False,
prompt_text_input: str = "",
cfg_value_input: float = 2.0,
do_normalize: bool = True,
denoise: bool = True,
request: Optional[gr.Request] = None,
) -> Tuple[int, np.ndarray]:
temp_audio_path = None
try:
server = get_voxcpm_server()
model_info = get_model_info()
text = (text_input or "").strip()
if len(text) == 0:
raise ValueError("Please input text to synthesize.")
control = (control_instruction or "").strip()
final_text = f"({control}){text}" if control and not use_prompt_text else text
audio_bytes, audio_format, temp_audio_path = _prepare_audio_for_encoding(
reference_wav_path_input,
denoise=bool(denoise),
request=request,
)
prompt_text_clean = (prompt_text_input or "").strip()
if use_prompt_text and audio_bytes is None:
raise ValueError("Ultimate Cloning Mode requires a reference audio clip.")
if use_prompt_text and not prompt_text_clean:
raise ValueError(
"Ultimate Cloning Mode requires a transcript. Please wait for ASR or fill it in manually."
)
if not use_prompt_text:
prompt_text_clean = ""
if do_normalize:
logger.info(
"Ignoring normalize option: nano-vLLM backend does not support per-request text normalization."
)
prompt_latents = None
ref_audio_latents = None
if audio_bytes is not None and audio_format is not None and use_prompt_text:
logger.info(f"[Ultimate Cloning] encoding prompt audio as {audio_format}")
prompt_latents = server.encode_latents(audio_bytes, audio_format)
elif audio_bytes is not None and audio_format is not None:
logger.info(f"[Controllable Cloning] encoding reference audio as {audio_format}")
ref_audio_latents = server.encode_latents(audio_bytes, audio_format)
if prompt_latents is not None:
logger.info("[Ultimate Cloning] reference audio + transcript")
elif ref_audio_latents is not None:
logger.info("[Controllable Cloning] reference audio only")
else:
logger.info(f"[Voice Design] control: {control[:50] if control else 'None'}")
chunks: list[np.ndarray] = []
logger.info(f"Generating: '{final_text[:80]}...'")
for chunk in server.generate(
target_text=final_text,
prompt_latents=prompt_latents,
prompt_text=prompt_text_clean if prompt_latents is not None else "",
max_generate_length=_get_int_env("NANOVLLM_MAX_GENERATE_LENGTH", 2000),
temperature=_get_float_env("NANOVLLM_TEMPERATURE", 1.0),
cfg_value=float(cfg_value_input),
ref_audio_latents=ref_audio_latents,
):
chunks.append(chunk)
if not chunks:
raise RuntimeError("The model returned no audio chunks.")
wav = np.concatenate(chunks, axis=0).astype(np.float32, copy=False)
wav = _float_audio_to_int16(wav)
return (int(model_info["sample_rate"]), wav)
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.unlink(temp_audio_path)
except OSError:
pass
def generate_tts_audio(
text_input: str,
control_instruction: str = "",
reference_wav_path_input: Optional[str] = None,
use_prompt_text: bool = False,
prompt_text_input: str = "",
cfg_value_input: float = 2.0,
do_normalize: bool = True,
denoise: bool = True,
request: Optional[gr.Request] = None,
) -> Tuple[int, np.ndarray]:
_begin_generation_request()
request_payload = {
"event": "tts_request",
"ui_language": _resolve_ui_language(request),
"text": (text_input or "").strip(),
"control_instruction": (control_instruction or "").strip(),
"use_prompt_text": bool(use_prompt_text),
"prompt_text": (prompt_text_input or "").strip(),
"cfg_value": float(cfg_value_input),
"do_normalize": bool(do_normalize),
"denoise": bool(denoise),
"has_reference_audio": bool(reference_wav_path_input and reference_wav_path_input.strip()),
}
if request_payload["has_reference_audio"]:
try:
request_payload["reference_audio_duration_seconds"] = round(
_get_audio_duration_seconds(reference_wav_path_input), 3
)
except Exception as exc:
request_payload["reference_audio_duration_error"] = str(exc)
try:
try:
result = _generate_tts_audio_once(
text_input=text_input,
control_instruction=control_instruction,
reference_wav_path_input=reference_wav_path_input,
use_prompt_text=use_prompt_text,
prompt_text_input=prompt_text_input,
cfg_value_input=cfg_value_input,
do_normalize=do_normalize,
denoise=denoise,
request=request,
)
try:
_append_request_log({**request_payload, "status": "success"})
except Exception as exc:
logger.warning(f"Failed to append request log: {exc}")
return result
except (ValueError, gr.Error) as exc:
try:
_append_request_log(
{**request_payload, "status": "rejected", "error": str(exc)}
)
except Exception as log_exc:
logger.warning(f"Failed to append request log: {log_exc}")
if isinstance(exc, gr.Error):
raise
raise gr.Error(str(exc)) from exc
except Exception as exc:
logger.exception("Generation failed")
try:
_append_request_log({**request_payload, "status": "error", "error": str(exc)})
except Exception as log_exc:
logger.warning(f"Failed to append request log: {log_exc}")
active_requests = _get_active_generation_requests()
if active_requests > 1:
logger.warning(
"Generation failed with %s active requests; skipping shared backend restart: %s",
active_requests,
exc,
)
raise gr.Error(_get_i18n_text("backend_retry_error", request)) from exc
logger.warning(f"Generation failed, restarting backend and retrying once: {exc}")
with _server_lock:
_stop_server_if_needed()
try:
result = _generate_tts_audio_once(
text_input=text_input,
control_instruction=control_instruction,
reference_wav_path_input=reference_wav_path_input,
use_prompt_text=use_prompt_text,
prompt_text_input=prompt_text_input,
cfg_value_input=cfg_value_input,
do_normalize=do_normalize,
denoise=denoise,
request=request,
)
try:
_append_request_log({**request_payload, "status": "success_after_retry"})
except Exception as log_exc:
logger.warning(f"Failed to append request log: {log_exc}")
return result
except Exception as retry_exc:
logger.exception("Retry failed")
try:
_append_request_log(
{**request_payload, "status": "retry_failed", "error": str(retry_exc)}
)
except Exception as log_exc:
logger.warning(f"Failed to append request log: {log_exc}")
raise gr.Error(_get_i18n_text("backend_retry_error", request)) from retry_exc
finally:
_end_generation_request()
# ---------- UI ----------
def create_demo_interface():
assets_dir = Path.cwd().absolute() / "assets"
if assets_dir.exists():
gr.set_static_paths(paths=[assets_dir])
def _on_toggle_instant(checked):
if checked:
return (
gr.update(visible=True, value="", placeholder="Recognizing reference audio..."),
gr.update(visible=False),
)
return (
gr.update(visible=False),
gr.update(visible=True, interactive=True),
)
def _run_asr_if_needed(checked, audio_path, request: gr.Request = None):
if not checked or not audio_path:
return gr.update()
logger.info("Running ASR on reference audio...")
asr_text = _safe_prompt_wav_recognition(True, audio_path, request=request)
logger.info(f"ASR result: {asr_text[:60]}...")
return gr.update(
value=asr_text,
placeholder=_get_i18n_text("prompt_text_placeholder", request),
)
with gr.Blocks() as interface:
if (assets_dir / "voxcpm_logo.png").exists():
gr.HTML(
'<div class="logo-container">'
'<img src="/gradio_api/file=assets/voxcpm_logo.png" alt="VoxCPM Logo">'
"</div>"
)
gr.Markdown(I18N("usage_instructions"))
with gr.Row():
with gr.Column():
reference_wav = gr.Audio(
sources=["upload", "microphone"],
type="filepath",
label=I18N("reference_audio_label"),
)
show_prompt_text = gr.Checkbox(
value=False,
label=I18N("show_prompt_text_label"),
info=I18N("show_prompt_text_info"),
elem_classes=["switch-toggle"],
)
prompt_text = gr.Textbox(
value="",
label=I18N("prompt_text_label"),
placeholder=I18N("prompt_text_placeholder"),
lines=2,
visible=False,
)
control_instruction = gr.Textbox(
value="",
label=I18N("control_label"),
placeholder=I18N("control_placeholder"),
lines=2,
)
text = gr.Textbox(
value=DEFAULT_TARGET_TEXT,
label=I18N("target_text_label"),
lines=3,
)
with gr.Accordion(I18N("advanced_settings_title"), open=False):
DoDenoisePromptAudio = gr.Checkbox(
value=False,
label=I18N("ref_denoise_label"),
elem_classes=["switch-toggle"],
info=I18N("ref_denoise_info"),
)
DoNormalizeText = gr.Checkbox(
value=False,
label=I18N("normalize_label"),
elem_classes=["switch-toggle"],
info=I18N("normalize_info"),
)
cfg_value = gr.Slider(
minimum=1.0,
maximum=3.0,
value=2.0,
step=0.1,
label=I18N("cfg_label"),
info=I18N("cfg_info"),
)
run_btn = gr.Button(I18N("generate_btn"), variant="primary", size="lg")
with gr.Column():
audio_output = gr.Audio(label=I18N("generated_audio_label"))
gr.Markdown(I18N("examples_footer"))
show_prompt_text.change(
fn=_on_toggle_instant,
inputs=[show_prompt_text],
outputs=[prompt_text, control_instruction],
).then(
fn=_run_asr_if_needed,
inputs=[show_prompt_text, reference_wav],
outputs=[prompt_text],
)
run_btn.click(
fn=generate_tts_audio,
inputs=[
text,
control_instruction,
reference_wav,
show_prompt_text,
prompt_text,
cfg_value,
DoNormalizeText,
DoDenoisePromptAudio,
],
outputs=[audio_output],
show_progress=True,
api_name="generate",
)
return interface
def run_demo(
server_name: str = "0.0.0.0", server_port: int = 7860, show_error: bool = True
):
interface = create_demo_interface()
_start_background_prewarm()
interface.queue(
max_size=_get_int_env("GRADIO_QUEUE_MAX_SIZE", 10),
default_concurrency_limit=_get_int_env("GRADIO_DEFAULT_CONCURRENCY_LIMIT", 4),
).launch(
server_name=server_name,
server_port=int(os.environ.get("PORT", server_port)),
show_error=show_error,
i18n=I18N,
theme=_APP_THEME,
css=_CUSTOM_CSS,
ssr_mode=_get_bool_env("GRADIO_SSR_MODE", False),
)
if __name__ == "__main__":
run_demo()