feat: Major optimizations - Quiz dynamic fonts, TTS 1.2x speed, Video stream copy (10x faster), Single API call, Fact Image dynamic fonts, Text Story position fix
ee36c8e | """ | |
| Text Story Router - FastAPI endpoints for fake iMessage chat video generation. | |
| """ | |
| import os | |
| import uuid | |
| import logging | |
| import asyncio | |
| from typing import Dict, Any | |
| from fastapi import APIRouter, HTTPException, BackgroundTasks | |
| from fastapi.responses import FileResponse | |
| from .schemas import TextStoryRequest, TextStoryResponse, TextStoryStatus | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/text-story", tags=["Text Story"]) | |
| # Job storage (in-memory for now) | |
| jobs: Dict[str, Dict[str, Any]] = {} | |
| def update_job(job_id: str, status: str, progress: int, step: str = None, | |
| video_url: str = None, error: str = None): | |
| """Update job status.""" | |
| if job_id in jobs: | |
| jobs[job_id].update({ | |
| "status": status, | |
| "progress": progress, | |
| "current_step": step, | |
| "video_url": video_url, | |
| "error": error | |
| }) | |
| async def generate_text_story_video(job_id: str, request: TextStoryRequest): | |
| """ | |
| Main video generation pipeline. | |
| Pipeline: | |
| 1. Setup temp directory | |
| 2. Generate TTS for each message | |
| 3. Create chat UI frames | |
| 4. Load gameplay background | |
| 5. Compose final video | |
| """ | |
| try: | |
| import tempfile | |
| import shutil | |
| temp_dir = tempfile.mkdtemp(prefix="text_story_") | |
| logger.info(f"TextStory: Starting job {job_id}") | |
| # ============ STEP 1: TTS Generation ============ | |
| update_job(job_id, "processing", 10, "Generating voices...") | |
| from .services.tts_handler import TTSHandler | |
| tts = TTSHandler() | |
| audio_files = [] | |
| for i, msg in enumerate(request.messages): | |
| voice = request.voice_a if msg.sender == "A" else request.voice_b | |
| audio_path = os.path.join(temp_dir, f"msg_{i:03d}.wav") | |
| duration = await tts.generate_tts(msg.text, voice, audio_path) | |
| audio_files.append({ | |
| "path": audio_path, | |
| "duration": duration, | |
| "sender": msg.sender, | |
| "text": msg.text | |
| }) | |
| progress = 10 + int((i + 1) / len(request.messages) * 30) | |
| update_job(job_id, "processing", progress, f"Voice {i+1}/{len(request.messages)}") | |
| # ============ STEP 2: Create Chat Frames ============ | |
| update_job(job_id, "processing", 45, "Rendering chat UI...") | |
| from .services.renderer import ChatRenderer | |
| renderer = ChatRenderer( | |
| person_a_name=request.person_a_name, | |
| person_b_name=request.person_b_name, | |
| person_b_avatar=request.person_b_avatar | |
| ) | |
| # ============ STEP 3: Load Background ============ | |
| update_job(job_id, "processing", 55, "Loading background...") | |
| from .services.background import BackgroundHandler | |
| bg_handler = BackgroundHandler() | |
| # ============ STEP 4: Compose Video ============ | |
| update_job(job_id, "processing", 65, "Composing video...") | |
| from .services.video_composer import VideoComposer | |
| composer = VideoComposer( | |
| renderer=renderer, | |
| bg_handler=bg_handler, | |
| tts_handler=tts | |
| ) | |
| output_path = await composer.compose( | |
| messages=audio_files, | |
| ending_text=request.ending_text, | |
| output_dir=temp_dir | |
| ) | |
| # ============ STEP 5: Upload to HF Dataset ============ | |
| update_job(job_id, "processing", 90, "Uploading to cloud...") | |
| # Create local output directory | |
| videos_dir = os.path.join("videos", "text_story") | |
| os.makedirs(videos_dir, exist_ok=True) | |
| final_path = os.path.join(videos_dir, f"text_story_{job_id}.mp4") | |
| shutil.copy2(output_path, final_path) | |
| # Upload to HF Dataset using shared module | |
| cloud_url = None | |
| try: | |
| from pathlib import Path | |
| from modules.shared.services.hf_storage import HFStorageClient | |
| hf_repo = os.getenv("HF_REPO", "") | |
| hf_token = os.getenv("HF_TOKEN", "") | |
| if hf_repo and hf_token: | |
| hf_client = HFStorageClient(repo_id=hf_repo, token=hf_token) | |
| cloud_url = hf_client.upload_video( | |
| local_path=Path(final_path), | |
| video_id=f"text_story_{job_id}", | |
| folder="text_story" # Upload to text_story folder | |
| ) | |
| if cloud_url: | |
| logger.info(f"TextStory: Uploaded to HF: {cloud_url}") | |
| # Delete local file after successful upload | |
| if os.path.exists(final_path): | |
| os.remove(final_path) | |
| logger.info(f"TextStory: Deleted local file after HF upload") | |
| except Exception as e: | |
| logger.warning(f"TextStory: HF upload failed, keeping local file: {e}") | |
| # Cleanup temp | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # Use cloud URL if available, otherwise local endpoint for direct streaming | |
| video_url = cloud_url or f"/api/text-story/{job_id}/video" | |
| update_job(job_id, "ready", 100, "Complete!", video_url=video_url) | |
| logger.info(f"TextStory: Job {job_id} completed successfully") | |
| except Exception as e: | |
| logger.error(f"TextStory: Job {job_id} failed - {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| update_job(job_id, "failed", 0, error=str(e)) | |
| # Cleanup on failure | |
| if 'temp_dir' in locals(): | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| async def generate_text_story( | |
| request: TextStoryRequest, | |
| background_tasks: BackgroundTasks | |
| ): | |
| """ | |
| Start text story video generation. | |
| Returns job_id for status polling. | |
| """ | |
| job_id = uuid.uuid4().hex[:12] | |
| # Initialize job | |
| jobs[job_id] = { | |
| "status": "processing", | |
| "progress": 0, | |
| "current_step": "Starting...", | |
| "video_url": None, | |
| "error": None, | |
| "request": request.model_dump() | |
| } | |
| # Start background generation | |
| background_tasks.add_task(generate_text_story_video, job_id, request) | |
| logger.info(f"TextStory: Started job {job_id} with {len(request.messages)} messages") | |
| return TextStoryResponse( | |
| job_id=job_id, | |
| status="processing", | |
| message=f"Started generating text story with {len(request.messages)} messages" | |
| ) | |
| async def get_text_story_status(job_id: str): | |
| """Get status of a text story generation job.""" | |
| if job_id not in jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| job = jobs[job_id] | |
| return TextStoryStatus( | |
| job_id=job_id, | |
| status=job["status"], | |
| progress=job["progress"], | |
| current_step=job.get("current_step"), | |
| video_url=job.get("video_url"), | |
| error=job.get("error") | |
| ) | |
| async def download_text_story_video(job_id: str): | |
| """Download the generated text story video.""" | |
| if job_id not in jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| job = jobs[job_id] | |
| if job["status"] != "ready": | |
| raise HTTPException(status_code=400, detail="Video not ready yet") | |
| # Check persistent storage first | |
| persistent_path = f"/data/videos/text_story/text_story_{job_id}.mp4" | |
| local_path = f"videos/text_story/text_story_{job_id}.mp4" | |
| if os.path.exists(persistent_path): | |
| video_path = persistent_path | |
| elif os.path.exists(local_path): | |
| video_path = local_path | |
| else: | |
| raise HTTPException(status_code=404, detail="Video file not found") | |
| return FileResponse( | |
| video_path, | |
| media_type="video/mp4", | |
| filename=f"text_story_{job_id}.mp4" | |
| ) | |
| # ============================================ | |
| # AI CONVERSATION GENERATION | |
| # ============================================ | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| class AiGenerateRequest(BaseModel): | |
| """Request for AI-generated conversation.""" | |
| prompt: str | |
| person_a_name: str = "You" | |
| person_b_name: str = "My Ex" | |
| message_count: int = 7 | |
| tone: str = "emotional" | |
| class AiGenerateResponse(BaseModel): | |
| """Response with generated messages.""" | |
| messages: List[dict] | |
| ending_text: Optional[str] = None | |
| async def ai_generate_conversation(request: AiGenerateRequest): | |
| """ | |
| Generate a fake conversation using Groq AI (openai/gpt-oss-120b). | |
| Returns a list of messages for the text story. | |
| """ | |
| import aiohttp | |
| import json | |
| groq_api_key = os.getenv("GROQ_API", "") | |
| if not groq_api_key: | |
| raise HTTPException(status_code=500, detail="GROQ_API not configured") | |
| # Tone descriptions | |
| tone_prompts = { | |
| "emotional": "Make it emotional and dramatic with deep feelings.", | |
| "funny": "Make it funny and comedic with witty responses.", | |
| "shocking": "Include a shocking plot twist at the end.", | |
| "romantic": "Make it romantic with heartfelt messages.", | |
| "angry": "Make it an angry argument with heated exchanges." | |
| } | |
| tone_instruction = tone_prompts.get(request.tone, tone_prompts["emotional"]) | |
| system_prompt = f"""You are a viral content script writer. Generate a NATURAL fake text message conversation. | |
| RULES: | |
| 1. Create approximately {request.message_count} messages (can be slightly more or less) | |
| 2. Person A is "{request.person_a_name}" (sender: "A") - shown on right side (blue bubbles) | |
| 3. Person B is "{request.person_b_name}" (sender: "B") - shown on left side (gray bubbles) | |
| 4. {tone_instruction} | |
| 5. NATURAL CHAT PATTERN - same person CAN send multiple messages in a row! Examples: | |
| - Person sends a message, then follows up with another right after | |
| - "Wait" ... "I need to tell you something" | |
| - "Are you there?" ... "Hello??" | |
| - Breaking long thoughts into 2-3 short messages | |
| 6. Keep messages SHORT (1-2 sentences max per message) | |
| 7. Usually B starts first (the "other" person initiates) | |
| 8. End with impact (twist, cliffhanger, or emotional ending) | |
| 9. Make it viral-worthy and engaging | |
| OUTPUT FORMAT (strict JSON): | |
| {{ | |
| "messages": [ | |
| {{"sender": "B", "text": "Hey..."}}, | |
| {{"sender": "B", "text": "We need to talk"}}, | |
| {{"sender": "A", "text": "What's up?"}}, | |
| ... | |
| ], | |
| "ending_text": "" | |
| }} | |
| Only output valid JSON, nothing else.""" | |
| user_prompt = f"Create a conversation about: {request.prompt}" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| payload = { | |
| "model": "meta-llama/llama-4-scout-17b-16e-instruct", | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| "temperature": 0.8, | |
| "max_tokens": 2000 | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {groq_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| async with session.post( | |
| "https://api.groq.com/openai/v1/chat/completions", | |
| json=payload, | |
| headers=headers, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as response: | |
| if response.status != 200: | |
| error_text = await response.text() | |
| logger.error(f"Groq API error: {error_text}") | |
| raise HTTPException(status_code=500, detail=f"Groq API error: {response.status}") | |
| data = await response.json() | |
| content = data["choices"][0]["message"]["content"] | |
| # Parse JSON response | |
| try: | |
| # Clean up content (remove markdown code blocks if present) | |
| content = content.strip() | |
| if content.startswith("```"): | |
| content = content.split("```")[1] | |
| if content.startswith("json"): | |
| content = content[4:] | |
| content = content.strip() | |
| result = json.loads(content) | |
| return AiGenerateResponse( | |
| messages=result.get("messages", []), | |
| ending_text=result.get("ending_text") | |
| ) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse AI response: {content}") | |
| raise HTTPException(status_code=500, detail="AI returned invalid JSON") | |
| except aiohttp.ClientError as e: | |
| logger.error(f"Groq API request failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"AI request failed: {str(e)}") | |