NCAkit / modules /text_story /router.py
ismdrobiul489's picture
feat: Major optimizations - Quiz dynamic fonts, TTS 1.2x speed, Video stream copy (10x faster), Single API call, Fact Image dynamic fonts, Text Story position fix
ee36c8e
"""
Text Story Router - FastAPI endpoints for fake iMessage chat video generation.
"""
import os
import uuid
import logging
import asyncio
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from .schemas import TextStoryRequest, TextStoryResponse, TextStoryStatus
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/text-story", tags=["Text Story"])
# Job storage (in-memory for now)
jobs: Dict[str, Dict[str, Any]] = {}
def update_job(job_id: str, status: str, progress: int, step: str = None,
video_url: str = None, error: str = None):
"""Update job status."""
if job_id in jobs:
jobs[job_id].update({
"status": status,
"progress": progress,
"current_step": step,
"video_url": video_url,
"error": error
})
async def generate_text_story_video(job_id: str, request: TextStoryRequest):
"""
Main video generation pipeline.
Pipeline:
1. Setup temp directory
2. Generate TTS for each message
3. Create chat UI frames
4. Load gameplay background
5. Compose final video
"""
try:
import tempfile
import shutil
temp_dir = tempfile.mkdtemp(prefix="text_story_")
logger.info(f"TextStory: Starting job {job_id}")
# ============ STEP 1: TTS Generation ============
update_job(job_id, "processing", 10, "Generating voices...")
from .services.tts_handler import TTSHandler
tts = TTSHandler()
audio_files = []
for i, msg in enumerate(request.messages):
voice = request.voice_a if msg.sender == "A" else request.voice_b
audio_path = os.path.join(temp_dir, f"msg_{i:03d}.wav")
duration = await tts.generate_tts(msg.text, voice, audio_path)
audio_files.append({
"path": audio_path,
"duration": duration,
"sender": msg.sender,
"text": msg.text
})
progress = 10 + int((i + 1) / len(request.messages) * 30)
update_job(job_id, "processing", progress, f"Voice {i+1}/{len(request.messages)}")
# ============ STEP 2: Create Chat Frames ============
update_job(job_id, "processing", 45, "Rendering chat UI...")
from .services.renderer import ChatRenderer
renderer = ChatRenderer(
person_a_name=request.person_a_name,
person_b_name=request.person_b_name,
person_b_avatar=request.person_b_avatar
)
# ============ STEP 3: Load Background ============
update_job(job_id, "processing", 55, "Loading background...")
from .services.background import BackgroundHandler
bg_handler = BackgroundHandler()
# ============ STEP 4: Compose Video ============
update_job(job_id, "processing", 65, "Composing video...")
from .services.video_composer import VideoComposer
composer = VideoComposer(
renderer=renderer,
bg_handler=bg_handler,
tts_handler=tts
)
output_path = await composer.compose(
messages=audio_files,
ending_text=request.ending_text,
output_dir=temp_dir
)
# ============ STEP 5: Upload to HF Dataset ============
update_job(job_id, "processing", 90, "Uploading to cloud...")
# Create local output directory
videos_dir = os.path.join("videos", "text_story")
os.makedirs(videos_dir, exist_ok=True)
final_path = os.path.join(videos_dir, f"text_story_{job_id}.mp4")
shutil.copy2(output_path, final_path)
# Upload to HF Dataset using shared module
cloud_url = None
try:
from pathlib import Path
from modules.shared.services.hf_storage import HFStorageClient
hf_repo = os.getenv("HF_REPO", "")
hf_token = os.getenv("HF_TOKEN", "")
if hf_repo and hf_token:
hf_client = HFStorageClient(repo_id=hf_repo, token=hf_token)
cloud_url = hf_client.upload_video(
local_path=Path(final_path),
video_id=f"text_story_{job_id}",
folder="text_story" # Upload to text_story folder
)
if cloud_url:
logger.info(f"TextStory: Uploaded to HF: {cloud_url}")
# Delete local file after successful upload
if os.path.exists(final_path):
os.remove(final_path)
logger.info(f"TextStory: Deleted local file after HF upload")
except Exception as e:
logger.warning(f"TextStory: HF upload failed, keeping local file: {e}")
# Cleanup temp
shutil.rmtree(temp_dir, ignore_errors=True)
# Use cloud URL if available, otherwise local endpoint for direct streaming
video_url = cloud_url or f"/api/text-story/{job_id}/video"
update_job(job_id, "ready", 100, "Complete!", video_url=video_url)
logger.info(f"TextStory: Job {job_id} completed successfully")
except Exception as e:
logger.error(f"TextStory: Job {job_id} failed - {e}")
import traceback
logger.error(traceback.format_exc())
update_job(job_id, "failed", 0, error=str(e))
# Cleanup on failure
if 'temp_dir' in locals():
shutil.rmtree(temp_dir, ignore_errors=True)
@router.post("/generate", response_model=TextStoryResponse)
async def generate_text_story(
request: TextStoryRequest,
background_tasks: BackgroundTasks
):
"""
Start text story video generation.
Returns job_id for status polling.
"""
job_id = uuid.uuid4().hex[:12]
# Initialize job
jobs[job_id] = {
"status": "processing",
"progress": 0,
"current_step": "Starting...",
"video_url": None,
"error": None,
"request": request.model_dump()
}
# Start background generation
background_tasks.add_task(generate_text_story_video, job_id, request)
logger.info(f"TextStory: Started job {job_id} with {len(request.messages)} messages")
return TextStoryResponse(
job_id=job_id,
status="processing",
message=f"Started generating text story with {len(request.messages)} messages"
)
@router.get("/{job_id}/status", response_model=TextStoryStatus)
async def get_text_story_status(job_id: str):
"""Get status of a text story generation job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
return TextStoryStatus(
job_id=job_id,
status=job["status"],
progress=job["progress"],
current_step=job.get("current_step"),
video_url=job.get("video_url"),
error=job.get("error")
)
@router.get("/{job_id}/video")
async def download_text_story_video(job_id: str):
"""Download the generated text story video."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job["status"] != "ready":
raise HTTPException(status_code=400, detail="Video not ready yet")
# Check persistent storage first
persistent_path = f"/data/videos/text_story/text_story_{job_id}.mp4"
local_path = f"videos/text_story/text_story_{job_id}.mp4"
if os.path.exists(persistent_path):
video_path = persistent_path
elif os.path.exists(local_path):
video_path = local_path
else:
raise HTTPException(status_code=404, detail="Video file not found")
return FileResponse(
video_path,
media_type="video/mp4",
filename=f"text_story_{job_id}.mp4"
)
# ============================================
# AI CONVERSATION GENERATION
# ============================================
from pydantic import BaseModel
from typing import List, Optional
class AiGenerateRequest(BaseModel):
"""Request for AI-generated conversation."""
prompt: str
person_a_name: str = "You"
person_b_name: str = "My Ex"
message_count: int = 7
tone: str = "emotional"
class AiGenerateResponse(BaseModel):
"""Response with generated messages."""
messages: List[dict]
ending_text: Optional[str] = None
@router.post("/ai-generate", response_model=AiGenerateResponse)
async def ai_generate_conversation(request: AiGenerateRequest):
"""
Generate a fake conversation using Groq AI (openai/gpt-oss-120b).
Returns a list of messages for the text story.
"""
import aiohttp
import json
groq_api_key = os.getenv("GROQ_API", "")
if not groq_api_key:
raise HTTPException(status_code=500, detail="GROQ_API not configured")
# Tone descriptions
tone_prompts = {
"emotional": "Make it emotional and dramatic with deep feelings.",
"funny": "Make it funny and comedic with witty responses.",
"shocking": "Include a shocking plot twist at the end.",
"romantic": "Make it romantic with heartfelt messages.",
"angry": "Make it an angry argument with heated exchanges."
}
tone_instruction = tone_prompts.get(request.tone, tone_prompts["emotional"])
system_prompt = f"""You are a viral content script writer. Generate a NATURAL fake text message conversation.
RULES:
1. Create approximately {request.message_count} messages (can be slightly more or less)
2. Person A is "{request.person_a_name}" (sender: "A") - shown on right side (blue bubbles)
3. Person B is "{request.person_b_name}" (sender: "B") - shown on left side (gray bubbles)
4. {tone_instruction}
5. NATURAL CHAT PATTERN - same person CAN send multiple messages in a row! Examples:
- Person sends a message, then follows up with another right after
- "Wait" ... "I need to tell you something"
- "Are you there?" ... "Hello??"
- Breaking long thoughts into 2-3 short messages
6. Keep messages SHORT (1-2 sentences max per message)
7. Usually B starts first (the "other" person initiates)
8. End with impact (twist, cliffhanger, or emotional ending)
9. Make it viral-worthy and engaging
OUTPUT FORMAT (strict JSON):
{{
"messages": [
{{"sender": "B", "text": "Hey..."}},
{{"sender": "B", "text": "We need to talk"}},
{{"sender": "A", "text": "What's up?"}},
...
],
"ending_text": ""
}}
Only output valid JSON, nothing else."""
user_prompt = f"Create a conversation about: {request.prompt}"
try:
async with aiohttp.ClientSession() as session:
payload = {
"model": "meta-llama/llama-4-scout-17b-16e-instruct",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"temperature": 0.8,
"max_tokens": 2000
}
headers = {
"Authorization": f"Bearer {groq_api_key}",
"Content-Type": "application/json"
}
async with session.post(
"https://api.groq.com/openai/v1/chat/completions",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Groq API error: {error_text}")
raise HTTPException(status_code=500, detail=f"Groq API error: {response.status}")
data = await response.json()
content = data["choices"][0]["message"]["content"]
# Parse JSON response
try:
# Clean up content (remove markdown code blocks if present)
content = content.strip()
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
content = content.strip()
result = json.loads(content)
return AiGenerateResponse(
messages=result.get("messages", []),
ending_text=result.get("ending_text")
)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AI response: {content}")
raise HTTPException(status_code=500, detail="AI returned invalid JSON")
except aiohttp.ClientError as e:
logger.error(f"Groq API request failed: {e}")
raise HTTPException(status_code=500, detail=f"AI request failed: {str(e)}")