import asyncio import logging import uuid from pathlib import Path from typing import List, Dict, Optional, Any import aiohttp import requests # Use relative imports for module-local schemas from ..schemas import ( SceneInput, RenderConfig, VideoStatus, Scene, Caption ) from .libraries.tts_client import TTSClient from .libraries.whisper_client import WhisperClient from .libraries.pexels_client import PexelsClient from .libraries.ffmpeg_utils import FFmpegUtils from .libraries.video_composer import VideoComposer from .music_manager import MusicManager logger = logging.getLogger(__name__) class ShortCreator: """Main video creation orchestrator""" def __init__( self, config: Any, # NCAkitConfig from parent tts_client: TTSClient, whisper_client: WhisperClient, pexels_client: PexelsClient = None, pixabay_client = None, # Optional PixabayClient music_manager: MusicManager = None ): self.config = config self.tts = tts_client self.whisper = whisper_client self.pexels = pexels_client self.pixabay = pixabay_client self.music_manager = music_manager self.queue: List[Dict] = [] self.processing = False def add_to_queue(self, scenes: List[SceneInput], config: RenderConfig) -> str: """ Add video to processing queue Returns: video_id for tracking """ video_id = str(uuid.uuid4()).replace('-', '')[:24] # Similar to cuid self.queue.append({ "id": video_id, "scenes": scenes, "config": config }) logger.info(f"Added video {video_id} to queue. Queue length: {len(self.queue)}") # Start processing if not already running if not self.processing: asyncio.create_task(self.process_queue()) return video_id async def process_queue(self): """Process videos in the queue""" if self.processing: return self.processing = True try: while self.queue: item = self.queue[0] video_id = item["id"] logger.info(f"Processing video {video_id}") try: # Run video creation in a background thread to keep API responsive # This allows status checks while video is being processed await asyncio.to_thread( self._create_short_sync, video_id, item["scenes"], item["config"] ) logger.info(f"Successfully created video {video_id}") except Exception as e: logger.error(f"Failed to create video {video_id}: {e}", exc_info=True) # Mark as failed by creating a .failed marker file failed_marker = self.config.videos_dir_path / f"{video_id}.failed" failed_marker.write_text(str(e)) finally: self.queue.pop(0) finally: self.processing = False def _create_short_sync( self, video_id: str, input_scenes: List[SceneInput], config: RenderConfig ): """Synchronous wrapper for create_short - runs in a separate thread""" # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(self.create_short(video_id, input_scenes, config)) finally: loop.close() async def create_short( self, video_id: str, input_scenes: List[SceneInput], config: RenderConfig ): """Create the short video""" scenes = [] total_duration = 0 exclude_video_ids = [] temp_files = [] orientation = config.orientation.value # Process each scene for i, scene_input in enumerate(input_scenes): logger.debug(f"Processing scene {i + 1}/{len(input_scenes)}") # Generate TTS audio audio_data, tts_duration = await self.tts.generate( scene_input.text, config.voice.value ) # Save audio files temp_id = str(uuid.uuid4()).replace('-', '')[:12] wav_path = self.config.temp_dir_path / f"{temp_id}.wav" mp3_path = self.config.temp_dir_path / f"{temp_id}.mp3" video_path = self.config.temp_dir_path / f"{temp_id}.mp4" temp_files.extend([wav_path, mp3_path, video_path]) # Save and convert audio FFmpegUtils.save_audio_as_wav(audio_data, wav_path) FFmpegUtils.save_audio_as_mp3(audio_data, mp3_path) # Get ACTUAL audio duration from WAV file (TTS estimate is often wrong!) audio_duration = FFmpegUtils.get_video_duration(wav_path) logger.info(f"Scene {i+1}: TTS reported {tts_duration:.2f}s, actual WAV duration: {audio_duration:.2f}s") # Add padding to last scene if i + 1 == len(input_scenes) and config.paddingBack: audio_duration += config.paddingBack / 1000 # Generate captions captions = self.whisper.create_captions(str(wav_path)) # Find and download background video(s) video_paths = [] # Simplified Scene Construction: One Video Per Scene # User Request: "Remove restrictions. One video per scene equal to audio." # User Request: "Video must be 9:16. Use image if needed." # Force portrait for 9:16 orientation = "portrait" keywords = scene_input.searchTerms if not keywords: keywords = ["general"] # Handle both string and list inputs for searchTerms # If it's a string, use it directly; if list, use first item if isinstance(keywords, str): keyword = keywords # Use the whole string elif isinstance(keywords, list) and len(keywords) > 0: keyword = keywords[0] if isinstance(keywords[0], str) else str(keywords[0]) else: keyword = "general" logger.debug(f"Using search keyword: '{keyword}' from searchTerms: {keywords}") # Try to find a video that is at least as long as the audio search_duration = max(audio_duration, 5.0) video_found = False video_path = None temp_vid_id = str(uuid.uuid4()).replace('-', '')[:12] try: # 1. Try Video Search - pass full keywords list # Ensure keywords is a list for find_video search_keywords = keywords if isinstance(keywords, list) else [keywords] # PRIORITY: Try ONE platform only to save API quota # Priority: Pexels first, Pixabay as fallback selected_video = None video_source = None # Try Pexels FIRST (if configured) if self.pexels: try: logger.debug(f"Searching Pexels for: {search_keywords}") selected_video = self.pexels.find_video( search_keywords, search_duration, exclude_video_ids, orientation ) if selected_video: video_source = "Pexels" logger.info(f"Found video on Pexels for '{keyword}'") except Exception as e: logger.warning(f"Pexels search failed: {e}") # FALLBACK: Try Pixabay ONLY if Pexels didn't find anything if not selected_video and self.pixabay: try: logger.debug(f"Searching Pixabay (fallback) for: {search_keywords}") selected_video = self.pixabay.find_video( search_keywords, search_duration, exclude_video_ids, orientation ) if selected_video: video_source = "Pixabay" logger.info(f"Found video on Pixabay for '{keyword}'") except Exception as e: logger.warning(f"Pixabay search failed: {e}") # If no video found on either platform if not selected_video: raise Exception(f"No video found for {search_keywords} on any platform") video_path = self.config.temp_dir_path / f"{temp_vid_id}.mp4" temp_files.append(video_path) # Download video logger.debug(f"Downloading {video_source} video for '{keyword}' (Target: {audio_duration:.2f}s)") response = requests.get(selected_video["url"], stream=True, timeout=30) response.raise_for_status() with open(video_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Verify file size if video_path.stat().st_size < 1024: logger.warning(f"Downloaded video {video_path} is too small") raise Exception("Downloaded video is invalid") # Skip normalize - Pexels/Pixabay videos are already H.264 encoded # This saves 50-90 seconds per scene! video_found = True exclude_video_ids.append(selected_video["id"]) # Fixed: use selected_video except Exception as e: logger.warning(f"Video search/download failed for '{keyword}': {e}. Trying photo fallback.") video_found = False # 2. Photo Fallback if not video_found: try: logger.info(f"Attempting photo fallback for '{keyword}'") pexels_photo = self.pexels.find_photo(keyword, orientation) if pexels_photo: # Download photo photo_path = self.config.temp_dir_path / f"{temp_vid_id}.jpg" temp_files.append(photo_path) response = requests.get(pexels_photo["url"], stream=True, timeout=30) response.raise_for_status() with open(photo_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Convert photo to video video_path = self.config.temp_dir_path / f"{temp_vid_id}_img.mp4" temp_files.append(video_path) FFmpegUtils.image_to_video(photo_path, video_path, audio_duration) video_found = True logger.info(f"Created video from photo {pexels_photo['id']}") except Exception as e: logger.error(f"Photo fallback failed: {e}") if not video_found or not video_path or not video_path.exists(): raise Exception(f"Failed to find any visual content for '{keyword}'") # Get actual duration (whether video or image-video) vid_duration = FFmpegUtils.get_video_duration(video_path) # Determine cut duration take_duration = min(vid_duration, audio_duration) logger.info(f"Using {take_duration:.2f}s of content for scene (Audio: {audio_duration:.2f}s)") # Physically cut/trim to ensure exact match final_clip_path = self.config.temp_dir_path / f"{temp_vid_id}_cut.mp4" temp_files.append(final_clip_path) FFmpegUtils.cut_video(video_path, final_clip_path, 0, take_duration) # Verify actual cut duration actual_cut_dur = FFmpegUtils.get_video_duration(final_clip_path) video_paths.append({ "path": str(final_clip_path), "duration": actual_cut_dur, "keyword": keyword }) # Build scene dict scenes.append({ "captions": [c.dict() for c in captions], "video": video_paths, "audio": { "url": str(mp3_path), "duration": audio_duration } }) total_duration += audio_duration # Add padding to total duration if config.paddingBack: total_duration += config.paddingBack / 1000 # Select background music music_mood = config.music.value if config.music else None selected_music = self.music_manager.find_music(music_mood) logger.info(f"Selected music: {selected_music['filename']} (mood: {selected_music['mood']})") # Render final video output_path = self.config.videos_dir_path / f"{video_id}.mp4" # Use a temp path for atomic write to prevent premature "ready" status temp_output_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" try: VideoComposer.render( scenes=scenes, music_path=selected_music["path"], output_path=temp_output_path, orientation=orientation, caption_position=config.captionPosition.value, caption_bg_color=config.captionBackgroundColor, music_volume=config.musicVolume.value, padding_back=config.paddingBack ) # Atomic rename to final path if temp_output_path.exists(): temp_output_path.rename(output_path) logger.info(f"Video {video_id} created successfully at {output_path}") # Optional: Upload to HF Hub (if configured) try: from modules.shared.services.hf_storage import get_hf_storage hf_storage = get_hf_storage() if hf_storage and hf_storage.enabled: cloud_url = hf_storage.upload_video( local_path=output_path, video_id=video_id, folder="short_video" # Module-specific folder ) if cloud_url: # Save cloud URL to metadata file for download redirect cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud" cloud_meta_path.write_text(cloud_url) # Delete local video file to save space output_path.unlink() logger.info(f"Video {video_id} uploaded to HF Hub, local file deleted") except Exception as e: logger.warning(f"HF upload failed for {video_id}, keeping local: {e}") else: raise Exception("Rendered file not found at temp path") except Exception as e: # Cleanup temp file on failure if temp_output_path.exists(): temp_output_path.unlink() raise e # Cleanup temp files for temp_file in temp_files: if temp_file.exists(): temp_file.unlink() def get_status(self, video_id: str) -> VideoStatus: """Get video processing status""" # Check if in queue (waiting or being processed) if any(item["id"] == video_id for item in self.queue): return VideoStatus.processing # Check if video is in cloud (has .cloud metadata file) cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud" if cloud_meta_path.exists(): return VideoStatus.ready # Cloud-uploaded videos are ready # Check if final video exists locally (READY) video_path = self.config.videos_dir_path / f"{video_id}.mp4" if video_path.exists(): return VideoStatus.ready # Check if temp file exists (still rendering = PROCESSING) temp_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" if temp_path.exists(): return VideoStatus.processing # Check if failed marker exists failed_marker = self.config.videos_dir_path / f"{video_id}.failed" if failed_marker.exists(): return VideoStatus.failed # If processing flag is active but video not found, it might be in early stages if self.processing: return VideoStatus.processing # Video not found at all return VideoStatus.failed def get_video_path(self, video_id: str) -> Path: """Get path to video file""" return self.config.videos_dir_path / f"{video_id}.mp4" def delete_video(self, video_id: str): """Delete video file""" video_path = self.get_video_path(video_id) if video_path.exists(): video_path.unlink() logger.info(f"Deleted video {video_id}") def list_all_videos(self) -> List[Dict]: """List all videos with their status""" videos = [] # Get all MP4 files (exclude temp files) for video_file in self.config.videos_dir_path.glob("*.mp4"): # Skip temp files (*.tmp.mp4) if ".tmp." in video_file.name: continue video_id = video_file.stem videos.append({ "id": video_id, "status": self.get_status(video_id).value }) # Add videos in queue for item in self.queue: if not any(v["id"] == item["id"] for v in videos): videos.append({ "id": item["id"], "status": VideoStatus.processing.value }) return videos def get_available_voices(self) -> List[str]: """Get list of available TTS voices""" return TTSClient.list_available_voices() def _plan_segments(self, duration: float) -> List[float]: """ Deterministic segmentation algorithm (Even Split Strategy): - Segments between 2-5 seconds - Avoid 1-second clips - Sum exactly equals duration - Distribute duration evenly to maximize segment length """ if duration <= 5.0: return [duration] # Calculate optimal number of segments # We want segments as close to 5.0 as possible, but >= 2.0 num_segments = int(duration / 5.0) if duration % 5.0 > 0: num_segments += 1 segment_duration = duration / num_segments # Create list of equal segments segments = [segment_duration] * num_segments # Handle floating point precision errors current_sum = sum(segments) diff = duration - current_sum if abs(diff) > 0.0001: segments[-1] += diff return segments def get_available_music_tags(self) -> List[str]: """Get list of available music moods""" return self.music_manager.get_available_moods()