import os, sys, time, json, uuid, shutil, threading, subprocess import nest_asyncio import uvicorn from fastapi import FastAPI, Form, File, UploadFile, HTTPException from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from huggingface_hub import HfApi, snapshot_download # ── البيئة والتكوين ─────────────────────────────────────────────────────── os.environ["COQUI_TOS_AGREED"] = "1" DATA_DIR = "data" VOICE_LIB = os.path.join(DATA_DIR, "voice_library") OUTPUT_DIR = os.path.join(DATA_DIR, "outputs") HISTORY_FILE = os.path.join(DATA_DIR, "history.json") for d in [DATA_DIR, VOICE_LIB, OUTPUT_DIR]: os.makedirs(d, exist_ok=True) HF_TOKEN = os.environ.get("HF_TOKEN") HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO") def pull_data_from_hf(): """تحميل البيانات من Dataset عند التشغيل""" if HF_TOKEN and HF_DATASET_REPO: try: print(f"[*] جاري تحميل البيانات من: {HF_DATASET_REPO}...") snapshot_download( repo_id=HF_DATASET_REPO, repo_type="dataset", local_dir=DATA_DIR, token=HF_TOKEN ) print("[✓] تم التحميل بنجاح.") except Exception as e: print(f"[!] تنبيه: لم يتم مزامنة البيانات (قد تكون المساحة جديدة): {e}") def push_data_to_hf(): """رفع البيانات إلى Dataset في الخلفية""" if HF_TOKEN and HF_DATASET_REPO: try: api = HfApi() api.upload_folder( folder_path=DATA_DIR, repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Auto-sync {time.strftime('%Y-%m-%d %H:%M:%S')}" ) except Exception as e: print(f"[!] خطأ في المزامنة الخلفية: {e}") def background_sync(): threading.Thread(target=push_data_to_hf, daemon=True).start() # تنفيذ السحب الأولي pull_data_from_hf() # ── إعداد محرك TTS ────────────────────────────────────────────────────────── try: from TTS.api import TTS except ImportError: subprocess.check_call([sys.executable, "-m", "pip", "install", "coqui-tts"]) from TTS.api import TTS import torch device = "cuda" if torch.cuda.is_available() else "cpu" print(f"[*] جاري تحميل نموذج XTTS v2 على {device.upper()}...") xtts_engine = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) print("[✓] النموذج جاهز للاستخدام.") # ── المساعدون ───────────────────────────────────────────────────────────── def load_history(): if os.path.exists(HISTORY_FILE): try: with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) except: return [] return [] def save_history(h): with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(h, f, ensure_ascii=False, indent=2) # ── تطبيق FastAPI ──────────────────────────────────────────────────────── app = FastAPI(title="XTTS Studio Pro") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) LANGUAGES = { "ar": "العربية", "en": "English", "es": "Español", "fr": "Français", "de": "Deutsch", "it": "Italiano", "pt": "Português", "ru": "Русский", "zh-cn": "中文", "ja": "日本語", "ko": "한국어", "tr": "Türkçe" } # ── واجهة المستخدم (React) ──────────────────────────────────────────────── HTML_TEMPLATE = r""" XTTS Voice Studio
""" # ── المسارات ───────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def ui(): content = HTML_TEMPLATE.replace("{LANGUAGES_JSON}", json.dumps(LANGUAGES, ensure_ascii=False)) content = content.replace("{DEVICE_NAME}", device) return content @app.post("/generate") async def generate( text: str = Form(...), language: str = Form("ar"), voice_name: str = Form(None), files: list[UploadFile] = File(default=[]) ): ref_paths = [] temp_dir = f"temp_{uuid.uuid4().hex}" os.makedirs(temp_dir, exist_ok=True) try: # التعامل مع الملفات المرفوعة for f in files: p = os.path.join(temp_dir, f.filename) with open(p, "wb") as b: shutil.copyfileobj(f.file, b) ref_paths.append(p) # التعامل مع الأصوات المحفوظة if voice_name: v_path = os.path.join(VOICE_LIB, voice_name) if os.path.isdir(v_path): ref_paths += [os.path.join(v_path, x) for x in os.listdir(v_path) if x.lower().endswith(('.wav', '.mp3'))] if not ref_paths: raise HTTPException(400, "يجب توفير صوت مرجعي") out_name = f"gen_{uuid.uuid4().hex[:8]}.wav" out_path = os.path.join(OUTPUT_DIR, out_name) xtts_engine.tts_to_file( text=text, speaker_wav=ref_paths, language=language, file_path=out_path, enable_text_splitting=True ) # تحديث السجل hist = load_history() hist.append({"filename": out_name, "text": text[:100], "language": language, "ts": int(time.time())}) save_history(hist) background_sync() return {"filename": out_name} finally: shutil.rmtree(temp_dir, ignore_errors=True) @app.get("/audio/{filename}") async def get_audio(filename: str): p = os.path.join(OUTPUT_DIR, filename) if os.path.exists(p): return FileResponse(p) raise HTTPException(404) @app.get("/history") def get_history(): return load_history() @app.get("/voices") def list_voices(): return [d for d in os.listdir(VOICE_LIB) if os.path.isdir(os.path.join(VOICE_LIB, d))] @app.post("/voices/save") async def save_voice(name: str = Form(...), file: UploadFile = File(...)): v_dir = os.path.join(VOICE_LIB, name.strip()) os.makedirs(v_dir, exist_ok=True) dest = os.path.join(v_dir, file.filename) with open(dest, "wb") as b: shutil.copyfileobj(file.file, b) background_sync() return {"status": "saved"} # ── التشغيل ────────────────────────────────────────────────────────────── if __name__ == "__main__": nest_asyncio.apply() uvicorn.run(app, host="0.0.0.0", port=7860)