| import os |
| import sys |
| import importlib |
| import json |
| import asyncio |
| import tempfile |
| from datetime import datetime |
|
|
| import torch |
| import gradio as gr |
| import pydub |
| import edge_tts |
| import pysrt |
| from pydub import AudioSegment |
|
|
| |
| script_dir = os.path.dirname(os.path.abspath(__file__)) |
| src_path = os.path.join(script_dir, "src") |
| if src_path not in sys.path: |
| sys.path.insert(0, src_path) |
|
|
| import chatterbox.vc |
| importlib.reload(chatterbox.vc) |
| from chatterbox.vc import ChatterboxVC |
|
|
| |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| _vc_model = None |
| def get_vc_model(): |
| global _vc_model |
| if _vc_model is None: |
| print(f"[VC] Đang tải model trên {DEVICE}…") |
| _vc_model = ChatterboxVC.from_pretrained(DEVICE) |
| print("[VC] Model sẵn sàng.") |
| return _vc_model |
|
|
| |
| global_log_messages_vc = [] |
| def yield_vc_updates(log_msg=None, audio_data=None, file_list=None, log_append=True): |
| global global_log_messages_vc |
| |
| if log_msg is not None: |
| prefix = datetime.now().strftime("[%H:%M:%S]") |
| if log_append: |
| global_log_messages_vc.append(f"{prefix} {log_msg}") |
| else: |
| global_log_messages_vc = [f"{prefix} {log_msg}"] |
| log_update = gr.update(value="\n".join(global_log_messages_vc)) |
|
|
| |
| audio_update = gr.update( |
| visible=(audio_data is not None), |
| value=audio_data if audio_data is not None else None |
| ) |
| |
| files_update = gr.update( |
| visible=(file_list is not None), |
| value=file_list if file_list is not None else [] |
| ) |
|
|
| yield log_update, audio_update, files_update |
|
|
| |
| def load_edge_tts_voices(json_path="voices.json"): |
| with open(json_path, "r", encoding="utf-8") as f: |
| voices = json.load(f) |
| display_list, code_map = [], {} |
| for lang, genders in voices.items(): |
| for gender, items in genders.items(): |
| for v in items: |
| disp = f"{lang} - {gender} - {v['display_name']} ({v['voice_code']})" |
| display_list.append(disp) |
| code_map[disp] = v["voice_code"] |
| return display_list, code_map |
|
|
| edge_choices, edge_code_map = load_edge_tts_voices() |
|
|
| |
| async def _edge_tts_async(text, disp, rate_pct, vol_pct): |
| code = edge_code_map.get(disp) |
| rate_str = f"{rate_pct:+d}%" |
| vol_str = f"{vol_pct:+d}%" |
| out = "temp_edge_tts.wav" |
| await edge_tts.Communicate(text, voice=code, rate=rate_str, volume=vol_str).save(out) |
| return out |
|
|
| def run_edge_tts(text, disp, rate_pct, vol_pct): |
| path = asyncio.run(_edge_tts_async(text, disp, rate_pct, vol_pct)) |
| return path, path |
|
|
| |
| async def _tts_save_segment(text: str, voice_code: str, rate_pct: int, vol_pct: int, path: str) -> bool: |
| """ |
| Save một đoạn text thành file audio bằng Edge TTS. |
| Trả về True nếu có audio, False nếu bị NoAudioReceived. |
| """ |
| rate_str = f"{rate_pct:+d}%" |
| vol_str = f"{vol_pct:+d}%" |
| try: |
| await edge_tts.Communicate(text, voice=voice_code, rate=rate_str, volume=vol_str).save(path) |
| return True |
| except edge_tts.exceptions.NoAudioReceived: |
| |
| return False |
|
|
| async def _generate_audio_from_srt( |
| srt_path: str, |
| tmp_dir: str, |
| out_path: str, |
| voice_code: str, |
| rate_pct: int, |
| vol_pct: int |
| ): |
| """ |
| Đọc file .srt, chia nhỏ text nếu >200 ký tự, gọi Edge TTS từng phần, |
| ghép các segment và export thành file WAV. |
| """ |
| subs = pysrt.open(srt_path, encoding='utf-8') |
| segments = [] |
|
|
| for i, sub in enumerate(subs): |
| text = sub.text.replace('\n', ' ') |
| |
| if len(text) > 200: |
| parts = [text[k:k+200] for k in range(0, len(text), 200)] |
| else: |
| parts = [text] |
|
|
| seg = AudioSegment.silent(duration=0) |
| for j, part in enumerate(parts): |
| seg_path = os.path.join(tmp_dir, f"seg_{i}_{j}.wav") |
| ok = await _tts_save_segment(part, voice_code, rate_pct, vol_pct, seg_path) |
| if ok: |
| seg += AudioSegment.from_file(seg_path) |
| segments.append(seg) |
|
|
| |
| if segments: |
| combined = segments[0] |
| for seg in segments[1:]: |
| combined += seg |
| combined.export(out_path, format="wav") |
|
|
| def synthesize_srt_audio( |
| srt_path: str, |
| disp_voice: str, |
| work_dir: str, |
| rate_pct: int, |
| vol_pct: int |
| ) -> str: |
| """ |
| Wrapper đồng bộ để sinh file WAV từ SRT bằng Edge TTS, |
| trả về đường dẫn file WAV để đưa vào pipeline clone voice. |
| """ |
| |
| voice_code = edge_code_map.get(disp_voice) |
|
|
| |
| tmp_dir = tempfile.mkdtemp() |
| out_path = os.path.join(work_dir, "srt_source.wav") |
|
|
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| loop.run_until_complete( |
| _generate_audio_from_srt( |
| srt_path, tmp_dir, out_path, |
| voice_code, rate_pct, vol_pct |
| ) |
| ) |
| return out_path |
|
|
| |
| def generate_vc( |
| source_audio_path, |
| target_voice_path, |
| cfg_rate: float, |
| sigma_min: float, |
| batch_mode: bool, |
| batch_parameter: str, |
| batch_values: str |
| ): |
| model = get_vc_model() |
| yield from yield_vc_updates("Khởi tạo chuyển giọng…", log_append=False) |
|
|
| |
| date_folder = datetime.now().strftime("%Y%m%d") |
| work_dir = os.path.join("outputs/vc", date_folder) |
| os.makedirs(work_dir, exist_ok=True) |
|
|
| def run_once(src, tgt, rate, sigma): |
| return model.generate(src, target_voice_path=tgt, inference_cfg_rate=rate, sigma_min=sigma) |
|
|
| outputs = [] |
| try: |
| if batch_mode: |
| try: |
| vals = [float(v.strip()) for v in batch_values.split(",") if v.strip()] |
| except: |
| raise gr.Error("Batch values phải là số, phân cách bởi dấu phẩy.") |
| yield from yield_vc_updates(f"Chạy batch '{batch_parameter}': {vals}") |
| for idx, v in enumerate(vals, 1): |
| r, s = cfg_rate, sigma_min |
| tag = "" |
| if batch_parameter == "Inference CFG Rate": |
| r, tag = v, f"cfg_{v}" |
| else: |
| s, tag = v, f"sigma_{v}" |
| yield from yield_vc_updates(f" • Mục {idx}/{len(vals)}: {batch_parameter}={v}") |
| wav = run_once(source_audio_path, target_voice_path, r, s) |
| fn = f"{tag}_{idx}.wav" |
| path = os.path.join(work_dir, fn) |
| model.save_wav(wav, path) |
| outputs.append(path) |
| yield from yield_vc_updates(f"Đã lưu: {path}") |
| else: |
| audio = pydub.AudioSegment.from_file(source_audio_path) |
| if len(audio) > 40_000: |
| yield from yield_vc_updates("Audio dài >40s: tách thành đoạn 40s…") |
| chunks = [audio[i:i+40_000] for i in range(0, len(audio), 40_000)] |
| temp_paths = [] |
| for i, chunk in enumerate(chunks): |
| tmp = f"{source_audio_path}_chunk{i}.wav" |
| chunk.export(tmp, format="wav") |
| wav = run_once(tmp, target_voice_path, cfg_rate, sigma_min) |
| outp = os.path.join(work_dir, f"part{i}.wav") |
| model.save_wav(wav, outp) |
| temp_paths.append(outp) |
| os.remove(tmp) |
| yield from yield_vc_updates(f"Xử lý đoạn {i+1}/{len(chunks)}") |
| |
| combined = AudioSegment.empty() |
| for p in temp_paths: |
| combined += AudioSegment.from_file(p) |
| final = os.path.join(work_dir, "combined.wav") |
| combined.export(final, format="wav") |
| outputs.append(final) |
| yield from yield_vc_updates("Chuyển xong.") |
| else: |
| yield from yield_vc_updates("Đang chuyển giọng…") |
| wav = run_once(source_audio_path, target_voice_path, cfg_rate, sigma_min) |
| outp = os.path.join(work_dir, f"LyTranTTS_{datetime.now().strftime('%H%M%S')}.wav") |
| model.save_wav(wav, outp) |
| outputs.append(outp) |
| yield from yield_vc_updates("Hoàn thành.") |
| except Exception as e: |
| yield from yield_vc_updates(f"Lỗi: {e}") |
| raise |
|
|
| |
| first = outputs[0] if outputs else None |
| yield from yield_vc_updates(log_msg=None, audio_data=first, file_list=outputs) |
|
|
| |
| def run_vc_from_srt_or_file( |
| use_srt: bool, |
| srt_file, srt_voice, srt_rate, srt_vol, |
| edge_text, edge_voice, edge_rate, edge_vol, |
| src_audio, tgt_audio, |
| cfg_rate, sigma_min, |
| batch_mode, batch_parameter, batch_values |
| ): |
| yield from yield_vc_updates("Bắt đầu…", log_append=False) |
|
|
| date_folder = datetime.now().strftime("%Y%m%d") |
| work_dir = os.path.join("outputs/vc", date_folder) |
| os.makedirs(work_dir, exist_ok=True) |
|
|
| if use_srt: |
| yield from yield_vc_updates("Sinh audio từ SRT…") |
| source = synthesize_srt_audio( |
| srt_file.name, srt_voice, work_dir, |
| rate_pct=srt_rate, vol_pct=srt_vol |
| ) |
| elif edge_text and edge_voice: |
| yield from yield_vc_updates("Sinh audio từ Edge TTS…") |
| tmp, _ = run_edge_tts(edge_text, edge_voice, edge_rate, edge_vol) |
| source = tmp |
| else: |
| source = src_audio |
|
|
| yield from generate_vc( |
| source, tgt_audio, |
| cfg_rate, sigma_min, |
| batch_mode, batch_parameter, batch_values |
| ) |
|
|
| |
| with gr.Blocks(title="Chuyển Giọng Nói AI") as demo: |
| gr.Markdown("## 📣 Chuyển Giọng Nói AI") |
| gr.Markdown("> Tác giả: **Lý Trần**") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| |
| use_srt = gr.Checkbox(label="Sử dụng file SRT làm nguồn?", value=False) |
| srt_file = gr.File(file_types=[".srt"], label="Tải lên file .srt", visible=False) |
| srt_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS (SRT)", visible=False) |
| srt_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ SRT (% chuẩn)", visible=False) |
| srt_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng SRT (% chuẩn)", visible=False) |
|
|
| |
| use_edge = gr.Checkbox(label="Tạo nguồn qua Edge TTS?", value=False) |
| edge_text = gr.Textbox(label="Văn bản cho Edge TTS", visible=False) |
| edge_voice = gr.Dropdown(choices=edge_choices, label="Giọng Edge TTS", visible=False) |
| edge_rate = gr.Slider(-100, 100, value=0, step=1, label="Tốc độ Edge (% chuẩn)", visible=False) |
| edge_vol = gr.Slider(-100, 100, value=0, step=1, label="Âm lượng Edge (% chuẩn)", visible=False) |
| gen_edge_btn = gr.Button("🗣️ Tạo Edge TTS", visible=False) |
| edge_audio = gr.Audio(label="Nguồn Edge TTS", type="filepath", visible=False) |
|
|
| |
| src_audio = gr.Audio(sources=["upload","microphone"], type="filepath", |
| label="Tải lên / Ghi âm nguồn") |
|
|
| |
| gr.Markdown("### Giọng tham chiếu (mục tiêu)") |
| tgt_audio = gr.Audio(sources=["upload","microphone"], type="filepath", |
| label="Tải lên / Ghi âm giọng mục tiêu") |
|
|
| |
| gr.Markdown("### Tham số chuyển giọng") |
| cfg_slider = gr.Slider(0.0, 30.0, value=0.5, step=0.1, label="CFG Rate") |
| sigma_input = gr.Number(1e-6, label="Sigma Min", |
| minimum=1e-7, maximum=1e-5, step=1e-7) |
|
|
| |
| with gr.Accordion("Tùy chọn Batch Sweep", open=False): |
| batch_chk = gr.Checkbox(label="Kích hoạt Batch Sweep", value=False) |
| batch_param = gr.Dropdown(choices=["Inference CFG Rate","Sigma Min"], |
| label="Tham số thay đổi") |
| batch_vals = gr.Textbox(placeholder="ví dụ: 0.5,1.0,2.0", |
| label="Giá trị phân cách dấu phẩy") |
|
|
| run_btn = gr.Button("🚀 Chuyển giọng") |
|
|
| with gr.Column(): |
| gr.Markdown("### Nhật ký") |
| log_box = gr.Textbox(interactive=False, lines=12) |
| gr.Markdown("### Kết quả") |
| out_audio = gr.Audio(label="Âm thanh kết quả", type="filepath", visible=False) |
| out_files = gr.Files(label="Tải xuống file đầu ra", visible=False) |
|
|
| |
| def toggle_srt(v): |
| return ( |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v), |
| gr.update(visible=not v) |
| ) |
| use_srt.change( |
| fn=toggle_srt, |
| inputs=[use_srt], |
| outputs=[ |
| srt_file, srt_voice, srt_rate, srt_vol, |
| use_edge, edge_text, edge_voice, edge_rate, edge_vol, |
| gen_edge_btn, edge_audio, src_audio |
| ] |
| ) |
|
|
| |
| def toggle_edge(v): |
| return ( |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=v), |
| gr.update(visible=not v) |
| ) |
| use_edge.change( |
| fn=toggle_edge, |
| inputs=[use_edge], |
| outputs=[edge_text, edge_voice, edge_rate, edge_vol, gen_edge_btn, edge_audio, src_audio] |
| ) |
|
|
| |
| gen_edge_btn.click( |
| fn=run_edge_tts, |
| inputs=[edge_text, edge_voice, edge_rate, edge_vol], |
| outputs=[edge_audio, src_audio] |
| ) |
|
|
| |
| run_btn.click( |
| fn=run_vc_from_srt_or_file, |
| inputs=[ |
| use_srt, srt_file, srt_voice, srt_rate, srt_vol, |
| edge_text, edge_voice, edge_rate, edge_vol, |
| src_audio, tgt_audio, |
| cfg_slider, sigma_input, |
| batch_chk, batch_param, batch_vals |
| ], |
| outputs=[log_box, out_audio, out_files], |
| show_progress="minimal" |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |
|
|