""" Fact Creator Service Main orchestrator for generating fact-image videos """ import asyncio import logging import uuid import time from pathlib import Path from typing import Dict, List, Optional from datetime import datetime from ..schemas import JobStatus, ImageModel from .text_overlay import TextOverlay logger = logging.getLogger(__name__) # Master prompt for image generation (CLEAN, NO TEXT) IMAGE_SYSTEM_PROMPT = """Generate a clean, cinematic, vertical image (9:16 aspect ratio) based on this description: {image_prompt} IMPORTANT: - Do NOT add any text, words, letters, numbers, or watermarks in the image - Leave empty/soft area at bottom 40% for text overlay - Clean aesthetic, modern TikTok/Instagram style - High-quality, beautiful lighting""" class FactCreator: """ Main orchestrator for fact-image video generation. Pipeline: 1. Generate clean image (NVIDIA/Cloudflare/Pexels) 2. Add text overlay (PIL) 3. Create short video (MoviePy) 4. Upload to cloud (optional) """ # Video settings TARGET_WIDTH = 1080 TARGET_HEIGHT = 1920 FPS = 24 FADE_DURATION = 0.3 def __init__( self, config, nvidia_client=None, cloudflare_client=None, pexels_client=None ): self.config = config self.nvidia = nvidia_client self.cloudflare = cloudflare_client self.pexels = pexels_client self.text_overlay = TextOverlay() # Job tracking self.jobs: Dict[str, Dict] = {} self.queue: List[Dict] = [] self.processing = False def add_to_queue( self, model: ImageModel, image_prompt: str, fact_text: str, duration: int = 5, fact_heading: str = None, heading_background: dict = None ) -> str: """ Add fact-image job to queue. Returns: job_id for tracking """ job_id = str(uuid.uuid4()).replace('-', '')[:16] job = { "id": job_id, "model": model, "image_prompt": image_prompt, "fact_heading": fact_heading, "heading_background": heading_background, "fact_text": fact_text, "duration": duration, "status": JobStatus.queued, "progress": 0, "created_at": datetime.now().isoformat(), "video_url": None, "error": None } self.jobs[job_id] = job self.queue.append(job) logger.info(f"Added job {job_id} to queue. Queue length: {len(self.queue)}") # Start processing if not already running if not self.processing: asyncio.create_task(self.process_queue()) return job_id async def process_queue(self): """Process jobs in queue""" if self.processing: return self.processing = True try: while self.queue: job = self.queue[0] job_id = job["id"] logger.info(f"Processing job {job_id}") try: await self._process_job(job) job["status"] = JobStatus.ready job["progress"] = 100 logger.info(f"Job {job_id} completed successfully") except Exception as e: logger.error(f"Job {job_id} failed: {e}", exc_info=True) job["status"] = JobStatus.failed job["error"] = str(e) finally: self.queue.pop(0) finally: self.processing = False async def _process_job(self, job: Dict): """Process a single fact-image job""" job_id = job["id"] temp_dir = self.config.temp_dir_path / job_id temp_dir.mkdir(parents=True, exist_ok=True) try: # ==================== # Step 1: Generate Image # ==================== job["status"] = JobStatus.generating_image job["progress"] = 10 logger.info(f"[{job_id}] Generating image with {job['model']}...") image_path = temp_dir / "base_image.png" # Build full prompt full_prompt = IMAGE_SYSTEM_PROMPT.format(image_prompt=job["image_prompt"]) if job["model"] == ImageModel.nvidia and self.nvidia: # NVIDIA: uses aspect_ratio "9:16" internally (no width/height params) self.nvidia.generate_and_save( prompt=full_prompt, output_path=image_path ) elif job["model"] == ImageModel.cloudflare and self.cloudflare: # Cloudflare: supports width/height (1080x1920) self.cloudflare.generate_and_save( prompt=full_prompt, output_path=image_path, width=1080, height=1920 ) elif job["model"] == ImageModel.pexels: # Pexels: Direct API call to get FIRST (most relevant) photo import requests as pexels_requests import os pexels_key = os.getenv("PEXELS_API_KEY") if not pexels_key: raise Exception("PEXELS_API_KEY not configured") logger.info(f"[{job_id}] Searching Pexels for: {job['image_prompt']}") resp = pexels_requests.get( "https://api.pexels.com/v1/search", headers={"Authorization": pexels_key}, params={ "query": job["image_prompt"], "orientation": "portrait", "per_page": 5, "size": "large" }, timeout=15 ) if resp.status_code != 200: raise Exception(f"Pexels API error: {resp.status_code}") photos = resp.json().get("photos", []) if not photos: raise Exception(f"No Pexels photos found for: {job['image_prompt']}") # Select FIRST (most relevant) photo, NOT random photo = photos[0] photo_url = photo.get("src", {}).get("original") or photo.get("src", {}).get("large") if not photo_url: raise Exception("No valid photo URL from Pexels") logger.info(f"[{job_id}] Selected Pexels photo ID {photo['id']}") # Download photo img_resp = pexels_requests.get(photo_url, timeout=30) img_resp.raise_for_status() image_path.parent.mkdir(parents=True, exist_ok=True) image_path.write_bytes(img_resp.content) logger.info(f"[{job_id}] Downloaded Pexels photo") else: # Fallback to any available client if self.nvidia: self.nvidia.generate_and_save(full_prompt, image_path) elif self.cloudflare: self.cloudflare.generate_and_save(full_prompt, image_path, width=1080, height=1920) else: raise Exception("No image generation client available!") job["progress"] = 40 # ==================== # Step 2: Add Text Overlay # ==================== job["status"] = JobStatus.adding_text logger.info(f"[{job_id}] Adding text overlay...") overlay_path = temp_dir / "overlay_image.png" self.text_overlay.add_text( image_path=image_path, text=job["fact_text"], output_path=overlay_path, heading=job.get("fact_heading"), heading_background=job.get("heading_background") ) job["progress"] = 60 # ==================== # Step 3: Create Video # ==================== job["status"] = JobStatus.creating_video logger.info(f"[{job_id}] Creating {job['duration']}s video...") output_path = self.config.videos_dir_path / f"{job_id}.mp4" await self._create_video(overlay_path, output_path, job["duration"]) job["video_url"] = str(output_path) job["progress"] = 90 # ==================== # Step 4: Upload to Cloud (Optional) # ==================== from modules.shared.services.hf_storage import get_hf_storage hf_client = get_hf_storage() if hf_client and hf_client.enabled: logger.info(f"[{job_id}] Uploading to HF Hub...") cloud_url = hf_client.upload_video(output_path, job_id, "fact_image") if cloud_url: job["video_url"] = cloud_url job["storage"] = "cloud" # Save cloud URL to metadata file cloud_file = output_path.with_suffix('.cloud') cloud_file.write_text(cloud_url) # Delete local file output_path.unlink() logger.info(f"[{job_id}] Uploaded to cloud, local file deleted") logger.info(f"[{job_id}] Video ready: {job['video_url']}") finally: # Cleanup temp files import shutil if temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) async def _create_video(self, image_path: Path, output_path: Path, duration: int): """Create video from single image with fade effects (9:16 = 1080x1920)""" from moviepy.editor import ImageClip # Create image clip clip = ImageClip(str(image_path)).set_duration(duration) # Ensure 9:16 aspect ratio (1080x1920) clip = clip.resize((self.TARGET_WIDTH, self.TARGET_HEIGHT)) # Add fade in/out clip = clip.fadein(self.FADE_DURATION) clip = clip.fadeout(self.FADE_DURATION) # Write video logger.info(f"Writing video: {duration}s, fade in/out {self.FADE_DURATION}s") clip.write_videofile( str(output_path), fps=self.FPS, codec='libx264', audio=False, # No audio for fact videos preset='ultrafast', # Fast export (3-5x faster) ffmpeg_params=[ '-pix_fmt', 'yuv420p', '-movflags', '+faststart', '-profile:v', 'baseline', '-level', '3.0' ] ) clip.close() def get_status(self, job_id: str) -> Dict: """Get job status""" job = self.jobs.get(job_id) if not job: # Check for .cloud file (cloud-stored video) cloud_file = self.config.videos_dir_path / f"{job_id}.cloud" if cloud_file.exists(): return { "job_id": job_id, "status": JobStatus.ready, "progress": 100, "video_url": cloud_file.read_text().strip() } return { "job_id": job_id, "status": JobStatus.failed, "progress": 0, "error": "Job not found" } return { "job_id": job["id"], "status": job["status"], "progress": job.get("progress", 0), "video_url": job.get("video_url"), "error": job.get("error") } def get_video_path(self, job_id: str) -> Optional[Path]: """Get video file path""" return self.config.videos_dir_path / f"{job_id}.mp4"