""" Text Story Router - FastAPI endpoints for fake iMessage chat video generation. """ import os import uuid import logging import asyncio from typing import Dict, Any from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi.responses import FileResponse from .schemas import TextStoryRequest, TextStoryResponse, TextStoryStatus logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/text-story", tags=["Text Story"]) # Job storage (in-memory for now) jobs: Dict[str, Dict[str, Any]] = {} def update_job(job_id: str, status: str, progress: int, step: str = None, video_url: str = None, error: str = None): """Update job status.""" if job_id in jobs: jobs[job_id].update({ "status": status, "progress": progress, "current_step": step, "video_url": video_url, "error": error }) async def generate_text_story_video(job_id: str, request: TextStoryRequest): """ Main video generation pipeline. Pipeline: 1. Setup temp directory 2. Generate TTS for each message 3. Create chat UI frames 4. Load gameplay background 5. Compose final video """ try: import tempfile import shutil temp_dir = tempfile.mkdtemp(prefix="text_story_") logger.info(f"TextStory: Starting job {job_id}") # ============ STEP 1: TTS Generation ============ update_job(job_id, "processing", 10, "Generating voices...") from .services.tts_handler import TTSHandler tts = TTSHandler() audio_files = [] for i, msg in enumerate(request.messages): voice = request.voice_a if msg.sender == "A" else request.voice_b audio_path = os.path.join(temp_dir, f"msg_{i:03d}.wav") duration = await tts.generate_tts(msg.text, voice, audio_path) audio_files.append({ "path": audio_path, "duration": duration, "sender": msg.sender, "text": msg.text }) progress = 10 + int((i + 1) / len(request.messages) * 30) update_job(job_id, "processing", progress, f"Voice {i+1}/{len(request.messages)}") # ============ STEP 2: Create Chat Frames ============ update_job(job_id, "processing", 45, "Rendering chat UI...") from .services.renderer import ChatRenderer renderer = ChatRenderer( person_a_name=request.person_a_name, person_b_name=request.person_b_name, person_b_avatar=request.person_b_avatar ) # ============ STEP 3: Load Background ============ update_job(job_id, "processing", 55, "Loading background...") from .services.background import BackgroundHandler bg_handler = BackgroundHandler() # ============ STEP 4: Compose Video ============ update_job(job_id, "processing", 65, "Composing video...") from .services.video_composer import VideoComposer composer = VideoComposer( renderer=renderer, bg_handler=bg_handler, tts_handler=tts ) output_path = await composer.compose( messages=audio_files, ending_text=request.ending_text, output_dir=temp_dir ) # ============ STEP 5: Upload to HF Dataset ============ update_job(job_id, "processing", 90, "Uploading to cloud...") # Create local output directory videos_dir = os.path.join("videos", "text_story") os.makedirs(videos_dir, exist_ok=True) final_path = os.path.join(videos_dir, f"text_story_{job_id}.mp4") shutil.copy2(output_path, final_path) # Upload to HF Dataset using shared module cloud_url = None try: from pathlib import Path from modules.shared.services.hf_storage import HFStorageClient hf_repo = os.getenv("HF_REPO", "") hf_token = os.getenv("HF_TOKEN", "") if hf_repo and hf_token: hf_client = HFStorageClient(repo_id=hf_repo, token=hf_token) cloud_url = hf_client.upload_video( local_path=Path(final_path), video_id=f"text_story_{job_id}", folder="text_story" # Upload to text_story folder ) if cloud_url: logger.info(f"TextStory: Uploaded to HF: {cloud_url}") # Delete local file after successful upload if os.path.exists(final_path): os.remove(final_path) logger.info(f"TextStory: Deleted local file after HF upload") except Exception as e: logger.warning(f"TextStory: HF upload failed, keeping local file: {e}") # Cleanup temp shutil.rmtree(temp_dir, ignore_errors=True) # Use cloud URL if available, otherwise local endpoint for direct streaming video_url = cloud_url or f"/api/text-story/{job_id}/video" update_job(job_id, "ready", 100, "Complete!", video_url=video_url) logger.info(f"TextStory: Job {job_id} completed successfully") except Exception as e: logger.error(f"TextStory: Job {job_id} failed - {e}") import traceback logger.error(traceback.format_exc()) update_job(job_id, "failed", 0, error=str(e)) # Cleanup on failure if 'temp_dir' in locals(): shutil.rmtree(temp_dir, ignore_errors=True) @router.post("/generate", response_model=TextStoryResponse) async def generate_text_story( request: TextStoryRequest, background_tasks: BackgroundTasks ): """ Start text story video generation. Returns job_id for status polling. """ job_id = uuid.uuid4().hex[:12] # Initialize job jobs[job_id] = { "status": "processing", "progress": 0, "current_step": "Starting...", "video_url": None, "error": None, "request": request.model_dump() } # Start background generation background_tasks.add_task(generate_text_story_video, job_id, request) logger.info(f"TextStory: Started job {job_id} with {len(request.messages)} messages") return TextStoryResponse( job_id=job_id, status="processing", message=f"Started generating text story with {len(request.messages)} messages" ) @router.get("/{job_id}/status", response_model=TextStoryStatus) async def get_text_story_status(job_id: str): """Get status of a text story generation job.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] return TextStoryStatus( job_id=job_id, status=job["status"], progress=job["progress"], current_step=job.get("current_step"), video_url=job.get("video_url"), error=job.get("error") ) @router.get("/{job_id}/video") async def download_text_story_video(job_id: str): """Download the generated text story video.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] if job["status"] != "ready": raise HTTPException(status_code=400, detail="Video not ready yet") # Check persistent storage first persistent_path = f"/data/videos/text_story/text_story_{job_id}.mp4" local_path = f"videos/text_story/text_story_{job_id}.mp4" if os.path.exists(persistent_path): video_path = persistent_path elif os.path.exists(local_path): video_path = local_path else: raise HTTPException(status_code=404, detail="Video file not found") return FileResponse( video_path, media_type="video/mp4", filename=f"text_story_{job_id}.mp4" ) # ============================================ # AI CONVERSATION GENERATION # ============================================ from pydantic import BaseModel from typing import List, Optional class AiGenerateRequest(BaseModel): """Request for AI-generated conversation.""" prompt: str person_a_name: str = "You" person_b_name: str = "My Ex" message_count: int = 7 tone: str = "emotional" class AiGenerateResponse(BaseModel): """Response with generated messages.""" messages: List[dict] ending_text: Optional[str] = None @router.post("/ai-generate", response_model=AiGenerateResponse) async def ai_generate_conversation(request: AiGenerateRequest): """ Generate a fake conversation using Groq AI (openai/gpt-oss-120b). Returns a list of messages for the text story. """ import aiohttp import json groq_api_key = os.getenv("GROQ_API", "") if not groq_api_key: raise HTTPException(status_code=500, detail="GROQ_API not configured") # Tone descriptions tone_prompts = { "emotional": "Make it emotional and dramatic with deep feelings.", "funny": "Make it funny and comedic with witty responses.", "shocking": "Include a shocking plot twist at the end.", "romantic": "Make it romantic with heartfelt messages.", "angry": "Make it an angry argument with heated exchanges." } tone_instruction = tone_prompts.get(request.tone, tone_prompts["emotional"]) system_prompt = f"""You are a viral content script writer. Generate a NATURAL fake text message conversation. RULES: 1. Create approximately {request.message_count} messages (can be slightly more or less) 2. Person A is "{request.person_a_name}" (sender: "A") - shown on right side (blue bubbles) 3. Person B is "{request.person_b_name}" (sender: "B") - shown on left side (gray bubbles) 4. {tone_instruction} 5. NATURAL CHAT PATTERN - same person CAN send multiple messages in a row! Examples: - Person sends a message, then follows up with another right after - "Wait" ... "I need to tell you something" - "Are you there?" ... "Hello??" - Breaking long thoughts into 2-3 short messages 6. Keep messages SHORT (1-2 sentences max per message) 7. Usually B starts first (the "other" person initiates) 8. End with impact (twist, cliffhanger, or emotional ending) 9. Make it viral-worthy and engaging OUTPUT FORMAT (strict JSON): {{ "messages": [ {{"sender": "B", "text": "Hey..."}}, {{"sender": "B", "text": "We need to talk"}}, {{"sender": "A", "text": "What's up?"}}, ... ], "ending_text": "" }} Only output valid JSON, nothing else.""" user_prompt = f"Create a conversation about: {request.prompt}" try: async with aiohttp.ClientSession() as session: payload = { "model": "meta-llama/llama-4-scout-17b-16e-instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.8, "max_tokens": 2000 } headers = { "Authorization": f"Bearer {groq_api_key}", "Content-Type": "application/json" } async with session.post( "https://api.groq.com/openai/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status != 200: error_text = await response.text() logger.error(f"Groq API error: {error_text}") raise HTTPException(status_code=500, detail=f"Groq API error: {response.status}") data = await response.json() content = data["choices"][0]["message"]["content"] # Parse JSON response try: # Clean up content (remove markdown code blocks if present) content = content.strip() if content.startswith("```"): content = content.split("```")[1] if content.startswith("json"): content = content[4:] content = content.strip() result = json.loads(content) return AiGenerateResponse( messages=result.get("messages", []), ending_text=result.get("ending_text") ) except json.JSONDecodeError as e: logger.error(f"Failed to parse AI response: {content}") raise HTTPException(status_code=500, detail="AI returned invalid JSON") except aiohttp.ClientError as e: logger.error(f"Groq API request failed: {e}") raise HTTPException(status_code=500, detail=f"AI request failed: {str(e)}")