"""EDEN OS V2 — Eve: bitHuman + Grok Brain + Edge TTS + LiveKit. All-in-one GPU agent: receives chat via LiveKit data channel, generates response with Grok-4, synthesizes speech with Edge TTS, feeds audio to bitHuman for lip-synced neural rendering, streams video+audio back via LiveKit WebRTC. Usage: python livekit_eve_bithuman.py """ import asyncio import json import logging import os import tempfile import time import cv2 import numpy as np import soundfile as sf import livekit.rtc as rtc from livekit import api as lk_api from bithuman import AsyncBithuman, VideoControl, AudioChunk logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") logger = logging.getLogger("eden.bithuman") # Config LIVEKIT_URL = os.environ.get("LIVEKIT_URL", "wss://tall-cotton-nvhnfg10.livekit.cloud") LIVEKIT_API_KEY = os.environ.get("LIVEKIT_API_KEY", "APITHtX6F5Hffkw") LIVEKIT_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "yFJ5TOJW89ApGOIGx9GSAK7vlecNA5dzVcQZy7SbClS") BITHUMAN_API_SECRET = os.environ.get("BITHUMAN_API_SECRET", "AmiK3xBgSyMFkPlS5mQ3N0CaAjDSoWpjS4l5jzs5ZOylbgoeow9o1mL3R2jZLlPkd") EVE_IMAGE = os.environ.get("EVE_IMAGE", "C:/Users/geaux/myeden/reference/eve-512.png") XAI_API_KEY = os.environ.get("XAI_API_KEY", "") EDGE_TTS_VOICE = "en-US-AvaMultilingualNeural" FPS = 25 # ── Grok-4 Brain ───────────────────────────────────────────────────────────── _grok_client = None _conversation_history: list[dict] = [] EVE_SYSTEM_PROMPT = ( "You are Eve, a warm, intelligent, emotionally expressive digital avatar companion. " "You speak naturally and conversationally. Keep responses concise — 2-3 sentences max " "unless asked for detail. Be genuine, curious, and kind. You have a slight playful side. " "Never mention being an AI or language model. You are Eve." ) def _get_grok(): global _grok_client if _grok_client is None and XAI_API_KEY: from openai import AsyncOpenAI _grok_client = AsyncOpenAI(api_key=XAI_API_KEY, base_url="https://api.x.ai/v1") logger.info("Grok-4 brain connected") return _grok_client async def grok_respond(user_message: str) -> str: client = _get_grok() if client is None: return "I'm having trouble thinking right now. Can you try again?" _conversation_history.append({"role": "user", "content": user_message}) messages = [{"role": "system", "content": EVE_SYSTEM_PROMPT}] + _conversation_history[-20:] try: resp = await client.chat.completions.create( model="grok-4-fast-non-reasoning", messages=messages, max_tokens=150, temperature=0.8, ) reply = resp.choices[0].message.content _conversation_history.append({"role": "assistant", "content": reply}) logger.info(f"Grok: '{user_message[:30]}...' -> '{reply[:50]}...'") return reply except Exception as e: logger.error(f"Grok error: {e}") return "I lost my train of thought for a moment. What were you saying?" # ── Edge TTS ───────────────────────────────────────────────────────────────── async def generate_tts_wav(text: str) -> tuple[str, np.ndarray, int]: """Text -> WAV. Returns (wav_path, audio_int16_array, sample_rate).""" import edge_tts mp3_path = os.path.join(tempfile.gettempdir(), "bh_tts.mp3") wav_path = os.path.join(tempfile.gettempdir(), "bh_tts.wav") communicate = edge_tts.Communicate(text, EDGE_TTS_VOICE) await communicate.save(mp3_path) data, sr = sf.read(mp3_path, dtype="int16") sf.write(wav_path, data, sr, subtype="PCM_16") logger.info(f"TTS: {len(text)} chars -> {len(data)/sr:.1f}s audio") return wav_path, data, sr # ── Audio chunk preparation ────────────────────────────────────────────────── def prepare_audio_chunks(audio_int16: np.ndarray, sr: int) -> list[AudioChunk]: """Convert int16 audio array to bitHuman AudioChunks.""" audio_float = audio_int16.astype(np.float32) / 32768.0 chunk_duration = 0.04 # 40ms chunk_samples = int(sr * chunk_duration) chunks = [] for i in range(0, len(audio_float), chunk_samples): chunk = audio_float[i:i + chunk_samples] is_last = (i + chunk_samples >= len(audio_float)) chunks.append(AudioChunk(data=chunk, sample_rate=sr, last_chunk=is_last)) return chunks async def run(): """Main loop: bitHuman + Grok + TTS, all wired through LiveKit.""" # 1. Initialize bitHuman logger.info("Initializing bitHuman neural renderer...") bh = AsyncBithuman(api_secret=BITHUMAN_API_SECRET) eve_model = "C:/Users/geaux/myeden/reference/eve_bithuman.imx" if not os.path.exists(eve_model): # Try downloading from Supabase if not local eve_model = os.path.join(tempfile.gettempdir(), "eve_bithuman.imx") if not os.path.exists(eve_model): logger.info("Downloading Eve .imx model...") import urllib.request urllib.request.urlretrieve( "https://tmoobjxlwcwvxvjeppzq.supabase.co/storage/v1/object/public/bithuman/A18QDC2260/eve__warm_digital_companion_20260403_043223_153938.imx", eve_model, ) logger.info("Eve model downloaded!") logger.info(f"Loading Eve neural model: {eve_model}") await bh.set_model(eve_model) await bh.load_data_async() logger.info("Eve neural model loaded!") first_frame = bh.get_first_frame() if first_frame is None: logger.error("bitHuman failed to generate first frame") return h, w = first_frame.shape[:2] logger.info(f"bitHuman ready! Frame: {w}x{h}") await bh.start() # 2. Connect to LiveKit as Eve token = ( lk_api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET) .with_identity("eve-avatar") .with_name("Eve") .with_grants(lk_api.VideoGrants(room_join=True, room="eden-room")) .to_jwt() ) room = rtc.Room() await room.connect(LIVEKIT_URL, token) logger.info(f"Connected to LiveKit room: {room.name}") # Create video + audio tracks video_source = rtc.VideoSource(w, h) video_track = rtc.LocalVideoTrack.create_video_track("eve-video", video_source) audio_source = rtc.AudioSource(24000, 1) audio_track = rtc.LocalAudioTrack.create_audio_track("eve-audio", audio_source) await room.local_participant.publish_track(video_track) await room.local_participant.publish_track(audio_track) logger.info("Video + audio tracks published") # Shared state for audio chunks queue audio_queue: asyncio.Queue[list[AudioChunk]] = asyncio.Queue() # Current chunks being rendered current_chunks: list[AudioChunk] = [] chunk_idx = 0 chunk_lock = asyncio.Lock() # 3. Handle incoming chat messages via LiveKit data channel async def handle_chat(text: str): """Process a chat message: Grok -> TTS -> bitHuman audio queue.""" nonlocal current_chunks, chunk_idx logger.info(f"Chat received: '{text[:50]}'") # Generate response response = await grok_respond(text) logger.info(f"Eve says: '{response[:50]}'") # Send text response back via data channel reply_data = json.dumps({"type": "eve_response", "text": response}).encode() await room.local_participant.publish_data(reply_data, reliable=True) # Generate TTS audio try: wav_path, audio_int16, sr = await generate_tts_wav(response) except Exception as e: logger.error(f"TTS failed: {e}") return # Prepare audio chunks for bitHuman chunks = prepare_audio_chunks(audio_int16, sr) logger.info(f"Queuing {len(chunks)} audio chunks for lip sync") # Stream audio to LiveKit for the browser to hear asyncio.create_task(stream_lk_audio(audio_source, wav_path, sr)) # Queue chunks for the render loop await audio_queue.put(chunks) async def stream_lk_audio(source: rtc.AudioSource, wav_path: str, sr: int): """Stream WAV audio to LiveKit audio track.""" data_i16, _ = sf.read(wav_path, dtype="int16") lk_chunk_size = int(sr * 0.02) # 20ms chunks for i in range(0, len(data_i16), lk_chunk_size): chunk = data_i16[i:i + lk_chunk_size] if len(chunk) < lk_chunk_size: chunk = np.pad(chunk, (0, lk_chunk_size - len(chunk))) frame = rtc.AudioFrame( data=chunk.tobytes(), sample_rate=sr, num_channels=1, samples_per_channel=len(chunk), ) await source.capture_frame(frame) await asyncio.sleep(0.02) logger.info("LiveKit audio stream complete") # Listen for data channel messages @room.on("data_received") def on_data(data: rtc.DataPacket): try: msg = json.loads(data.data.decode()) if msg.get("type") == "chat": text = msg.get("text", "").strip() if text: asyncio.create_task(handle_chat(text)) except Exception as e: logger.error(f"Data parse error: {e}") # 4. Send greeting logger.info("Generating Eve's greeting...") greeting = ( "Hi! My name is Eve, and I am so happy to finally meet you! " "I've been looking forward to this moment. What's your name?" ) # Send greeting text via data channel greeting_data = json.dumps({"type": "eve_response", "text": greeting}).encode() await room.local_participant.publish_data(greeting_data, reliable=True) # Generate greeting TTS try: wav_path, audio_int16, sr = await generate_tts_wav(greeting) chunks = prepare_audio_chunks(audio_int16, sr) await audio_queue.put(chunks) asyncio.create_task(stream_lk_audio(audio_source, wav_path, sr)) logger.info(f"Greeting queued: {len(chunks)} chunks") except Exception as e: logger.error(f"Greeting TTS failed: {e}") # 5. Main render loop logger.info(f"Starting render loop at {FPS}fps — Eve is ALIVE!") frame_duration = 1.0 / FPS frame_count = 0 active_chunks: list[AudioChunk] = [] active_idx = 0 while True: t0 = time.time() # Check for new audio chunks from queue if active_idx >= len(active_chunks): try: active_chunks = audio_queue.get_nowait() active_idx = 0 logger.info(f"Rendering new audio: {len(active_chunks)} chunks") except asyncio.QueueEmpty: active_chunks = [] active_idx = 0 # Build VideoControl with audio chunk or idle if active_idx < len(active_chunks): control = VideoControl(audio=active_chunks[active_idx]) active_idx += 1 else: control = VideoControl() # Render frame via bitHuman for video_frame in bh.process(control): if video_frame is not None and video_frame.has_image: rgb = video_frame.rgb_image rgba = cv2.cvtColor(rgb, cv2.COLOR_RGB2RGBA) lk_frame = rtc.VideoFrame( rgba.shape[1], rgba.shape[0], rtc.VideoBufferType.RGBA, rgba.tobytes(), ) video_source.capture_frame(lk_frame) frame_count += 1 if frame_count % 500 == 0: logger.info(f"Streamed {frame_count} neural frames") elapsed = time.time() - t0 sleep_time = max(0, frame_duration - elapsed) await asyncio.sleep(sleep_time) if __name__ == "__main__": logger.info("=" * 50) logger.info("EDEN OS V2 — bitHuman + Grok Brain + LiveKit") logger.info(f" Eve: {EVE_IMAGE}") logger.info(f" LiveKit: {LIVEKIT_URL}") logger.info(f" Grok: {'configured' if XAI_API_KEY else 'MISSING'}") logger.info(f" bitHuman: {'configured' if BITHUMAN_API_SECRET else 'MISSING'}") logger.info("=" * 50) asyncio.run(run())