| import asyncio |
| import logging |
| import uuid |
| from pathlib import Path |
| from typing import List, Dict, Optional, Any |
| import aiohttp |
| import requests |
|
|
| |
| from ..schemas import ( |
| SceneInput, RenderConfig, VideoStatus, Scene, Caption |
| ) |
| from .libraries.tts_client import TTSClient |
| from .libraries.whisper_client import WhisperClient |
| from .libraries.pexels_client import PexelsClient |
| from .libraries.ffmpeg_utils import FFmpegUtils |
| from .libraries.video_composer import VideoComposer |
| from .music_manager import MusicManager |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ShortCreator: |
| """Main video creation orchestrator""" |
| |
| def __init__( |
| self, |
| config: Any, |
| tts_client: TTSClient, |
| whisper_client: WhisperClient, |
| pexels_client: PexelsClient = None, |
| pixabay_client = None, |
| music_manager: MusicManager = None |
| ): |
| self.config = config |
| self.tts = tts_client |
| self.whisper = whisper_client |
| self.pexels = pexels_client |
| self.pixabay = pixabay_client |
| self.music_manager = music_manager |
| self.queue: List[Dict] = [] |
| self.processing = False |
| |
| def add_to_queue(self, scenes: List[SceneInput], config: RenderConfig) -> str: |
| """ |
| Add video to processing queue |
| |
| Returns: |
| video_id for tracking |
| """ |
| video_id = str(uuid.uuid4()).replace('-', '')[:24] |
| |
| self.queue.append({ |
| "id": video_id, |
| "scenes": scenes, |
| "config": config |
| }) |
| |
| logger.info(f"Added video {video_id} to queue. Queue length: {len(self.queue)}") |
| |
| |
| if not self.processing: |
| asyncio.create_task(self.process_queue()) |
| |
| return video_id |
| |
| async def process_queue(self): |
| """Process videos in the queue""" |
| if self.processing: |
| return |
| |
| self.processing = True |
| |
| try: |
| while self.queue: |
| item = self.queue[0] |
| video_id = item["id"] |
| |
| logger.info(f"Processing video {video_id}") |
| |
| try: |
| |
| |
| await asyncio.to_thread( |
| self._create_short_sync, |
| video_id, |
| item["scenes"], |
| item["config"] |
| ) |
| logger.info(f"Successfully created video {video_id}") |
| except Exception as e: |
| logger.error(f"Failed to create video {video_id}: {e}", exc_info=True) |
| |
| failed_marker = self.config.videos_dir_path / f"{video_id}.failed" |
| failed_marker.write_text(str(e)) |
| finally: |
| self.queue.pop(0) |
| finally: |
| self.processing = False |
| |
| def _create_short_sync( |
| self, |
| video_id: str, |
| input_scenes: List[SceneInput], |
| config: RenderConfig |
| ): |
| """Synchronous wrapper for create_short - runs in a separate thread""" |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| loop.run_until_complete(self.create_short(video_id, input_scenes, config)) |
| finally: |
| loop.close() |
| |
| async def create_short( |
| self, |
| video_id: str, |
| input_scenes: List[SceneInput], |
| config: RenderConfig |
| ): |
| """Create the short video""" |
| scenes = [] |
| total_duration = 0 |
| exclude_video_ids = [] |
| temp_files = [] |
| |
| orientation = config.orientation.value |
| |
| |
| for i, scene_input in enumerate(input_scenes): |
| logger.debug(f"Processing scene {i + 1}/{len(input_scenes)}") |
| |
| |
| audio_data, tts_duration = await self.tts.generate( |
| scene_input.text, |
| config.voice.value |
| ) |
| |
| |
| temp_id = str(uuid.uuid4()).replace('-', '')[:12] |
| wav_path = self.config.temp_dir_path / f"{temp_id}.wav" |
| mp3_path = self.config.temp_dir_path / f"{temp_id}.mp3" |
| video_path = self.config.temp_dir_path / f"{temp_id}.mp4" |
| |
| temp_files.extend([wav_path, mp3_path, video_path]) |
| |
| |
| FFmpegUtils.save_audio_as_wav(audio_data, wav_path) |
| FFmpegUtils.save_audio_as_mp3(audio_data, mp3_path) |
| |
| |
| audio_duration = FFmpegUtils.get_video_duration(wav_path) |
| logger.info(f"Scene {i+1}: TTS reported {tts_duration:.2f}s, actual WAV duration: {audio_duration:.2f}s") |
| |
| |
| if i + 1 == len(input_scenes) and config.paddingBack: |
| audio_duration += config.paddingBack / 1000 |
| |
| |
| captions = self.whisper.create_captions(str(wav_path)) |
| |
| |
| video_paths = [] |
| |
| |
| |
| |
| |
| |
| orientation = "portrait" |
| |
| keywords = scene_input.searchTerms |
| if not keywords: |
| keywords = ["general"] |
| |
| |
| |
| if isinstance(keywords, str): |
| keyword = keywords |
| elif isinstance(keywords, list) and len(keywords) > 0: |
| keyword = keywords[0] if isinstance(keywords[0], str) else str(keywords[0]) |
| else: |
| keyword = "general" |
| |
| logger.debug(f"Using search keyword: '{keyword}' from searchTerms: {keywords}") |
| |
| |
| search_duration = max(audio_duration, 5.0) |
| |
| video_found = False |
| video_path = None |
| temp_vid_id = str(uuid.uuid4()).replace('-', '')[:12] |
| |
| try: |
| |
| |
| search_keywords = keywords if isinstance(keywords, list) else [keywords] |
| |
| |
| |
| selected_video = None |
| video_source = None |
| |
| |
| if self.pexels: |
| try: |
| logger.debug(f"Searching Pexels for: {search_keywords}") |
| selected_video = self.pexels.find_video( |
| search_keywords, |
| search_duration, |
| exclude_video_ids, |
| orientation |
| ) |
| if selected_video: |
| video_source = "Pexels" |
| logger.info(f"Found video on Pexels for '{keyword}'") |
| except Exception as e: |
| logger.warning(f"Pexels search failed: {e}") |
| |
| |
| if not selected_video and self.pixabay: |
| try: |
| logger.debug(f"Searching Pixabay (fallback) for: {search_keywords}") |
| selected_video = self.pixabay.find_video( |
| search_keywords, |
| search_duration, |
| exclude_video_ids, |
| orientation |
| ) |
| if selected_video: |
| video_source = "Pixabay" |
| logger.info(f"Found video on Pixabay for '{keyword}'") |
| except Exception as e: |
| logger.warning(f"Pixabay search failed: {e}") |
| |
| |
| if not selected_video: |
| raise Exception(f"No video found for {search_keywords} on any platform") |
|
|
| |
| video_path = self.config.temp_dir_path / f"{temp_vid_id}.mp4" |
| temp_files.append(video_path) |
| |
| |
| logger.debug(f"Downloading {video_source} video for '{keyword}' (Target: {audio_duration:.2f}s)") |
| response = requests.get(selected_video["url"], stream=True, timeout=30) |
| response.raise_for_status() |
| |
| with open(video_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| |
| if video_path.stat().st_size < 1024: |
| logger.warning(f"Downloaded video {video_path} is too small") |
| raise Exception("Downloaded video is invalid") |
| |
| |
| |
| |
| video_found = True |
| exclude_video_ids.append(selected_video["id"]) |
|
|
| |
| except Exception as e: |
| logger.warning(f"Video search/download failed for '{keyword}': {e}. Trying photo fallback.") |
| video_found = False |
| |
| |
| if not video_found: |
| try: |
| logger.info(f"Attempting photo fallback for '{keyword}'") |
| pexels_photo = self.pexels.find_photo(keyword, orientation) |
| |
| if pexels_photo: |
| |
| photo_path = self.config.temp_dir_path / f"{temp_vid_id}.jpg" |
| temp_files.append(photo_path) |
| |
| response = requests.get(pexels_photo["url"], stream=True, timeout=30) |
| response.raise_for_status() |
| |
| with open(photo_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| |
| video_path = self.config.temp_dir_path / f"{temp_vid_id}_img.mp4" |
| temp_files.append(video_path) |
| |
| FFmpegUtils.image_to_video(photo_path, video_path, audio_duration) |
| video_found = True |
| logger.info(f"Created video from photo {pexels_photo['id']}") |
| |
| except Exception as e: |
| logger.error(f"Photo fallback failed: {e}") |
| |
| if not video_found or not video_path or not video_path.exists(): |
| raise Exception(f"Failed to find any visual content for '{keyword}'") |
| |
| |
| vid_duration = FFmpegUtils.get_video_duration(video_path) |
| |
| |
| take_duration = min(vid_duration, audio_duration) |
| |
| logger.info(f"Using {take_duration:.2f}s of content for scene (Audio: {audio_duration:.2f}s)") |
| |
| |
| final_clip_path = self.config.temp_dir_path / f"{temp_vid_id}_cut.mp4" |
| temp_files.append(final_clip_path) |
| |
| FFmpegUtils.cut_video(video_path, final_clip_path, 0, take_duration) |
| |
| |
| actual_cut_dur = FFmpegUtils.get_video_duration(final_clip_path) |
| |
| video_paths.append({ |
| "path": str(final_clip_path), |
| "duration": actual_cut_dur, |
| "keyword": keyword |
| }) |
| |
| |
| scenes.append({ |
| "captions": [c.dict() for c in captions], |
| "video": video_paths, |
| "audio": { |
| "url": str(mp3_path), |
| "duration": audio_duration |
| } |
| }) |
| |
| total_duration += audio_duration |
| |
| |
| if config.paddingBack: |
| total_duration += config.paddingBack / 1000 |
| |
| |
| music_mood = config.music.value if config.music else None |
| selected_music = self.music_manager.find_music(music_mood) |
| |
| logger.info(f"Selected music: {selected_music['filename']} (mood: {selected_music['mood']})") |
| |
| |
| output_path = self.config.videos_dir_path / f"{video_id}.mp4" |
| |
| temp_output_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" |
| |
| try: |
| VideoComposer.render( |
| scenes=scenes, |
| music_path=selected_music["path"], |
| output_path=temp_output_path, |
| orientation=orientation, |
| caption_position=config.captionPosition.value, |
| caption_bg_color=config.captionBackgroundColor, |
| music_volume=config.musicVolume.value, |
| padding_back=config.paddingBack |
| ) |
| |
| |
| if temp_output_path.exists(): |
| temp_output_path.rename(output_path) |
| logger.info(f"Video {video_id} created successfully at {output_path}") |
| |
| |
| try: |
| from modules.shared.services.hf_storage import get_hf_storage |
| hf_storage = get_hf_storage() |
| if hf_storage and hf_storage.enabled: |
| cloud_url = hf_storage.upload_video( |
| local_path=output_path, |
| video_id=video_id, |
| folder="short_video" |
| ) |
| if cloud_url: |
| |
| cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud" |
| cloud_meta_path.write_text(cloud_url) |
| |
| output_path.unlink() |
| logger.info(f"Video {video_id} uploaded to HF Hub, local file deleted") |
| except Exception as e: |
| logger.warning(f"HF upload failed for {video_id}, keeping local: {e}") |
| else: |
| raise Exception("Rendered file not found at temp path") |
| |
| except Exception as e: |
| |
| if temp_output_path.exists(): |
| temp_output_path.unlink() |
| raise e |
| |
| |
| for temp_file in temp_files: |
| if temp_file.exists(): |
| temp_file.unlink() |
| |
| def get_status(self, video_id: str) -> VideoStatus: |
| """Get video processing status""" |
| |
| if any(item["id"] == video_id for item in self.queue): |
| return VideoStatus.processing |
| |
| |
| cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud" |
| if cloud_meta_path.exists(): |
| return VideoStatus.ready |
| |
| |
| video_path = self.config.videos_dir_path / f"{video_id}.mp4" |
| if video_path.exists(): |
| return VideoStatus.ready |
| |
| |
| temp_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4" |
| if temp_path.exists(): |
| return VideoStatus.processing |
| |
| |
| failed_marker = self.config.videos_dir_path / f"{video_id}.failed" |
| if failed_marker.exists(): |
| return VideoStatus.failed |
| |
| |
| if self.processing: |
| return VideoStatus.processing |
| |
| |
| return VideoStatus.failed |
| |
| def get_video_path(self, video_id: str) -> Path: |
| """Get path to video file""" |
| return self.config.videos_dir_path / f"{video_id}.mp4" |
| |
| def delete_video(self, video_id: str): |
| """Delete video file""" |
| video_path = self.get_video_path(video_id) |
| if video_path.exists(): |
| video_path.unlink() |
| logger.info(f"Deleted video {video_id}") |
| |
| def list_all_videos(self) -> List[Dict]: |
| """List all videos with their status""" |
| videos = [] |
| |
| |
| for video_file in self.config.videos_dir_path.glob("*.mp4"): |
| |
| if ".tmp." in video_file.name: |
| continue |
| video_id = video_file.stem |
| videos.append({ |
| "id": video_id, |
| "status": self.get_status(video_id).value |
| }) |
| |
| |
| for item in self.queue: |
| if not any(v["id"] == item["id"] for v in videos): |
| videos.append({ |
| "id": item["id"], |
| "status": VideoStatus.processing.value |
| }) |
| |
| return videos |
| |
| def get_available_voices(self) -> List[str]: |
| """Get list of available TTS voices""" |
| return TTSClient.list_available_voices() |
|
|
| def _plan_segments(self, duration: float) -> List[float]: |
| """ |
| Deterministic segmentation algorithm (Even Split Strategy): |
| - Segments between 2-5 seconds |
| - Avoid 1-second clips |
| - Sum exactly equals duration |
| - Distribute duration evenly to maximize segment length |
| """ |
| if duration <= 5.0: |
| return [duration] |
| |
| |
| |
| num_segments = int(duration / 5.0) |
| if duration % 5.0 > 0: |
| num_segments += 1 |
| |
| segment_duration = duration / num_segments |
| |
| |
| segments = [segment_duration] * num_segments |
| |
| |
| current_sum = sum(segments) |
| diff = duration - current_sum |
| if abs(diff) > 0.0001: |
| segments[-1] += diff |
| |
| return segments |
|
|
| def get_available_music_tags(self) -> List[str]: |
| """Get list of available music moods""" |
| return self.music_manager.get_available_moods() |
|
|