NCAkit / modules /video_creator /services /short_creator.py
ismdrobiul489's picture
perf: Remove normalize_video step - saves 50-90 sec per scene
e93be7e
import asyncio
import logging
import uuid
from pathlib import Path
from typing import List, Dict, Optional, Any
import aiohttp
import requests
# Use relative imports for module-local schemas
from ..schemas import (
SceneInput, RenderConfig, VideoStatus, Scene, Caption
)
from .libraries.tts_client import TTSClient
from .libraries.whisper_client import WhisperClient
from .libraries.pexels_client import PexelsClient
from .libraries.ffmpeg_utils import FFmpegUtils
from .libraries.video_composer import VideoComposer
from .music_manager import MusicManager
logger = logging.getLogger(__name__)
class ShortCreator:
"""Main video creation orchestrator"""
def __init__(
self,
config: Any, # NCAkitConfig from parent
tts_client: TTSClient,
whisper_client: WhisperClient,
pexels_client: PexelsClient = None,
pixabay_client = None, # Optional PixabayClient
music_manager: MusicManager = None
):
self.config = config
self.tts = tts_client
self.whisper = whisper_client
self.pexels = pexels_client
self.pixabay = pixabay_client
self.music_manager = music_manager
self.queue: List[Dict] = []
self.processing = False
def add_to_queue(self, scenes: List[SceneInput], config: RenderConfig) -> str:
"""
Add video to processing queue
Returns:
video_id for tracking
"""
video_id = str(uuid.uuid4()).replace('-', '')[:24] # Similar to cuid
self.queue.append({
"id": video_id,
"scenes": scenes,
"config": config
})
logger.info(f"Added video {video_id} to queue. Queue length: {len(self.queue)}")
# Start processing if not already running
if not self.processing:
asyncio.create_task(self.process_queue())
return video_id
async def process_queue(self):
"""Process videos in the queue"""
if self.processing:
return
self.processing = True
try:
while self.queue:
item = self.queue[0]
video_id = item["id"]
logger.info(f"Processing video {video_id}")
try:
# Run video creation in a background thread to keep API responsive
# This allows status checks while video is being processed
await asyncio.to_thread(
self._create_short_sync,
video_id,
item["scenes"],
item["config"]
)
logger.info(f"Successfully created video {video_id}")
except Exception as e:
logger.error(f"Failed to create video {video_id}: {e}", exc_info=True)
# Mark as failed by creating a .failed marker file
failed_marker = self.config.videos_dir_path / f"{video_id}.failed"
failed_marker.write_text(str(e))
finally:
self.queue.pop(0)
finally:
self.processing = False
def _create_short_sync(
self,
video_id: str,
input_scenes: List[SceneInput],
config: RenderConfig
):
"""Synchronous wrapper for create_short - runs in a separate thread"""
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.create_short(video_id, input_scenes, config))
finally:
loop.close()
async def create_short(
self,
video_id: str,
input_scenes: List[SceneInput],
config: RenderConfig
):
"""Create the short video"""
scenes = []
total_duration = 0
exclude_video_ids = []
temp_files = []
orientation = config.orientation.value
# Process each scene
for i, scene_input in enumerate(input_scenes):
logger.debug(f"Processing scene {i + 1}/{len(input_scenes)}")
# Generate TTS audio
audio_data, tts_duration = await self.tts.generate(
scene_input.text,
config.voice.value
)
# Save audio files
temp_id = str(uuid.uuid4()).replace('-', '')[:12]
wav_path = self.config.temp_dir_path / f"{temp_id}.wav"
mp3_path = self.config.temp_dir_path / f"{temp_id}.mp3"
video_path = self.config.temp_dir_path / f"{temp_id}.mp4"
temp_files.extend([wav_path, mp3_path, video_path])
# Save and convert audio
FFmpegUtils.save_audio_as_wav(audio_data, wav_path)
FFmpegUtils.save_audio_as_mp3(audio_data, mp3_path)
# Get ACTUAL audio duration from WAV file (TTS estimate is often wrong!)
audio_duration = FFmpegUtils.get_video_duration(wav_path)
logger.info(f"Scene {i+1}: TTS reported {tts_duration:.2f}s, actual WAV duration: {audio_duration:.2f}s")
# Add padding to last scene
if i + 1 == len(input_scenes) and config.paddingBack:
audio_duration += config.paddingBack / 1000
# Generate captions
captions = self.whisper.create_captions(str(wav_path))
# Find and download background video(s)
video_paths = []
# Simplified Scene Construction: One Video Per Scene
# User Request: "Remove restrictions. One video per scene equal to audio."
# User Request: "Video must be 9:16. Use image if needed."
# Force portrait for 9:16
orientation = "portrait"
keywords = scene_input.searchTerms
if not keywords:
keywords = ["general"]
# Handle both string and list inputs for searchTerms
# If it's a string, use it directly; if list, use first item
if isinstance(keywords, str):
keyword = keywords # Use the whole string
elif isinstance(keywords, list) and len(keywords) > 0:
keyword = keywords[0] if isinstance(keywords[0], str) else str(keywords[0])
else:
keyword = "general"
logger.debug(f"Using search keyword: '{keyword}' from searchTerms: {keywords}")
# Try to find a video that is at least as long as the audio
search_duration = max(audio_duration, 5.0)
video_found = False
video_path = None
temp_vid_id = str(uuid.uuid4()).replace('-', '')[:12]
try:
# 1. Try Video Search - pass full keywords list
# Ensure keywords is a list for find_video
search_keywords = keywords if isinstance(keywords, list) else [keywords]
# PRIORITY: Try ONE platform only to save API quota
# Priority: Pexels first, Pixabay as fallback
selected_video = None
video_source = None
# Try Pexels FIRST (if configured)
if self.pexels:
try:
logger.debug(f"Searching Pexels for: {search_keywords}")
selected_video = self.pexels.find_video(
search_keywords,
search_duration,
exclude_video_ids,
orientation
)
if selected_video:
video_source = "Pexels"
logger.info(f"Found video on Pexels for '{keyword}'")
except Exception as e:
logger.warning(f"Pexels search failed: {e}")
# FALLBACK: Try Pixabay ONLY if Pexels didn't find anything
if not selected_video and self.pixabay:
try:
logger.debug(f"Searching Pixabay (fallback) for: {search_keywords}")
selected_video = self.pixabay.find_video(
search_keywords,
search_duration,
exclude_video_ids,
orientation
)
if selected_video:
video_source = "Pixabay"
logger.info(f"Found video on Pixabay for '{keyword}'")
except Exception as e:
logger.warning(f"Pixabay search failed: {e}")
# If no video found on either platform
if not selected_video:
raise Exception(f"No video found for {search_keywords} on any platform")
video_path = self.config.temp_dir_path / f"{temp_vid_id}.mp4"
temp_files.append(video_path)
# Download video
logger.debug(f"Downloading {video_source} video for '{keyword}' (Target: {audio_duration:.2f}s)")
response = requests.get(selected_video["url"], stream=True, timeout=30)
response.raise_for_status()
with open(video_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify file size
if video_path.stat().st_size < 1024:
logger.warning(f"Downloaded video {video_path} is too small")
raise Exception("Downloaded video is invalid")
# Skip normalize - Pexels/Pixabay videos are already H.264 encoded
# This saves 50-90 seconds per scene!
video_found = True
exclude_video_ids.append(selected_video["id"]) # Fixed: use selected_video
except Exception as e:
logger.warning(f"Video search/download failed for '{keyword}': {e}. Trying photo fallback.")
video_found = False
# 2. Photo Fallback
if not video_found:
try:
logger.info(f"Attempting photo fallback for '{keyword}'")
pexels_photo = self.pexels.find_photo(keyword, orientation)
if pexels_photo:
# Download photo
photo_path = self.config.temp_dir_path / f"{temp_vid_id}.jpg"
temp_files.append(photo_path)
response = requests.get(pexels_photo["url"], stream=True, timeout=30)
response.raise_for_status()
with open(photo_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Convert photo to video
video_path = self.config.temp_dir_path / f"{temp_vid_id}_img.mp4"
temp_files.append(video_path)
FFmpegUtils.image_to_video(photo_path, video_path, audio_duration)
video_found = True
logger.info(f"Created video from photo {pexels_photo['id']}")
except Exception as e:
logger.error(f"Photo fallback failed: {e}")
if not video_found or not video_path or not video_path.exists():
raise Exception(f"Failed to find any visual content for '{keyword}'")
# Get actual duration (whether video or image-video)
vid_duration = FFmpegUtils.get_video_duration(video_path)
# Determine cut duration
take_duration = min(vid_duration, audio_duration)
logger.info(f"Using {take_duration:.2f}s of content for scene (Audio: {audio_duration:.2f}s)")
# Physically cut/trim to ensure exact match
final_clip_path = self.config.temp_dir_path / f"{temp_vid_id}_cut.mp4"
temp_files.append(final_clip_path)
FFmpegUtils.cut_video(video_path, final_clip_path, 0, take_duration)
# Verify actual cut duration
actual_cut_dur = FFmpegUtils.get_video_duration(final_clip_path)
video_paths.append({
"path": str(final_clip_path),
"duration": actual_cut_dur,
"keyword": keyword
})
# Build scene dict
scenes.append({
"captions": [c.dict() for c in captions],
"video": video_paths,
"audio": {
"url": str(mp3_path),
"duration": audio_duration
}
})
total_duration += audio_duration
# Add padding to total duration
if config.paddingBack:
total_duration += config.paddingBack / 1000
# Select background music
music_mood = config.music.value if config.music else None
selected_music = self.music_manager.find_music(music_mood)
logger.info(f"Selected music: {selected_music['filename']} (mood: {selected_music['mood']})")
# Render final video
output_path = self.config.videos_dir_path / f"{video_id}.mp4"
# Use a temp path for atomic write to prevent premature "ready" status
temp_output_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4"
try:
VideoComposer.render(
scenes=scenes,
music_path=selected_music["path"],
output_path=temp_output_path,
orientation=orientation,
caption_position=config.captionPosition.value,
caption_bg_color=config.captionBackgroundColor,
music_volume=config.musicVolume.value,
padding_back=config.paddingBack
)
# Atomic rename to final path
if temp_output_path.exists():
temp_output_path.rename(output_path)
logger.info(f"Video {video_id} created successfully at {output_path}")
# Optional: Upload to HF Hub (if configured)
try:
from modules.shared.services.hf_storage import get_hf_storage
hf_storage = get_hf_storage()
if hf_storage and hf_storage.enabled:
cloud_url = hf_storage.upload_video(
local_path=output_path,
video_id=video_id,
folder="short_video" # Module-specific folder
)
if cloud_url:
# Save cloud URL to metadata file for download redirect
cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud"
cloud_meta_path.write_text(cloud_url)
# Delete local video file to save space
output_path.unlink()
logger.info(f"Video {video_id} uploaded to HF Hub, local file deleted")
except Exception as e:
logger.warning(f"HF upload failed for {video_id}, keeping local: {e}")
else:
raise Exception("Rendered file not found at temp path")
except Exception as e:
# Cleanup temp file on failure
if temp_output_path.exists():
temp_output_path.unlink()
raise e
# Cleanup temp files
for temp_file in temp_files:
if temp_file.exists():
temp_file.unlink()
def get_status(self, video_id: str) -> VideoStatus:
"""Get video processing status"""
# Check if in queue (waiting or being processed)
if any(item["id"] == video_id for item in self.queue):
return VideoStatus.processing
# Check if video is in cloud (has .cloud metadata file)
cloud_meta_path = self.config.videos_dir_path / f"{video_id}.cloud"
if cloud_meta_path.exists():
return VideoStatus.ready # Cloud-uploaded videos are ready
# Check if final video exists locally (READY)
video_path = self.config.videos_dir_path / f"{video_id}.mp4"
if video_path.exists():
return VideoStatus.ready
# Check if temp file exists (still rendering = PROCESSING)
temp_path = self.config.videos_dir_path / f"{video_id}.tmp.mp4"
if temp_path.exists():
return VideoStatus.processing
# Check if failed marker exists
failed_marker = self.config.videos_dir_path / f"{video_id}.failed"
if failed_marker.exists():
return VideoStatus.failed
# If processing flag is active but video not found, it might be in early stages
if self.processing:
return VideoStatus.processing
# Video not found at all
return VideoStatus.failed
def get_video_path(self, video_id: str) -> Path:
"""Get path to video file"""
return self.config.videos_dir_path / f"{video_id}.mp4"
def delete_video(self, video_id: str):
"""Delete video file"""
video_path = self.get_video_path(video_id)
if video_path.exists():
video_path.unlink()
logger.info(f"Deleted video {video_id}")
def list_all_videos(self) -> List[Dict]:
"""List all videos with their status"""
videos = []
# Get all MP4 files (exclude temp files)
for video_file in self.config.videos_dir_path.glob("*.mp4"):
# Skip temp files (*.tmp.mp4)
if ".tmp." in video_file.name:
continue
video_id = video_file.stem
videos.append({
"id": video_id,
"status": self.get_status(video_id).value
})
# Add videos in queue
for item in self.queue:
if not any(v["id"] == item["id"] for v in videos):
videos.append({
"id": item["id"],
"status": VideoStatus.processing.value
})
return videos
def get_available_voices(self) -> List[str]:
"""Get list of available TTS voices"""
return TTSClient.list_available_voices()
def _plan_segments(self, duration: float) -> List[float]:
"""
Deterministic segmentation algorithm (Even Split Strategy):
- Segments between 2-5 seconds
- Avoid 1-second clips
- Sum exactly equals duration
- Distribute duration evenly to maximize segment length
"""
if duration <= 5.0:
return [duration]
# Calculate optimal number of segments
# We want segments as close to 5.0 as possible, but >= 2.0
num_segments = int(duration / 5.0)
if duration % 5.0 > 0:
num_segments += 1
segment_duration = duration / num_segments
# Create list of equal segments
segments = [segment_duration] * num_segments
# Handle floating point precision errors
current_sum = sum(segments)
diff = duration - current_sum
if abs(diff) > 0.0001:
segments[-1] += diff
return segments
def get_available_music_tags(self) -> List[str]:
"""Get list of available music moods"""
return self.music_manager.get_available_moods()