| from fastapi import FastAPI, HTTPException
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from pydantic import BaseModel
|
| from typing import List
|
| import os
|
| import uuid
|
| import aiohttp
|
| import logging
|
| import openai
|
| from pathlib import Path
|
| import subprocess
|
| import shutil
|
| import ssl
|
| import json
|
| from fastapi.staticfiles import StaticFiles
|
| from pydub import AudioSegment
|
| from PIL import Image
|
|
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
| OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
| BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
|
|
|
|
|
| openai.api_key = OPENAI_API_KEY
|
| if OPENAI_BASE_URL:
|
| openai.api_base = OPENAI_BASE_URL
|
|
|
| app = FastAPI()
|
|
|
| app.mount("/storage", StaticFiles(directory="storage"), name="storage")
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| class ComicData(BaseModel):
|
| captions: List[str]
|
| speeches: List[str]
|
| panels: List[str]
|
|
|
|
|
| async def download_image(url, output_path):
|
| try:
|
|
|
| ssl_context = ssl.create_default_context()
|
| ssl_context.check_hostname = False
|
| ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
| async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
|
| async with session.get(url) as response:
|
| if response.status == 200:
|
| with open(output_path, 'wb') as f:
|
| f.write(await response.read())
|
| with Image.open(output_path) as img:
|
| width, height = img.size
|
| return output_path, width
|
| else:
|
| logger.error(f"Failed to download image: {response.status}")
|
| return None, 0
|
| except Exception as e:
|
| logger.error(f"Error downloading image: {e}")
|
| return None, 0
|
|
|
|
|
| async def generate_speech(text, voice="alloy", output_path=None):
|
| try:
|
| if not output_path:
|
| output_path = f"{uuid.uuid4()}.mp3"
|
|
|
| response = openai.audio.speech.create(
|
| model="tts-1",
|
| voice=voice,
|
| input=text
|
| )
|
|
|
|
|
| with open(output_path, "wb") as f:
|
| f.write(response.content)
|
|
|
| return output_path
|
| except Exception as e:
|
| logger.error(f"Error generating speech: {e}")
|
| return None
|
|
|
|
|
| def get_audio_duration(audio_path):
|
| try:
|
| audio = AudioSegment.from_file(audio_path)
|
| return len(audio) / 1000.0
|
| except Exception as e:
|
| logger.error(f"Error getting audio duration: {e}")
|
| return 5.0
|
|
|
|
|
| ASS_STYLE_HEADER = """
|
| [Script Info]
|
| WrapStyle: 0
|
| ScaledBorderAndShadow: yes
|
| PlayResX: 1920
|
| PlayResY: 1080
|
|
|
| [V4+ Styles]
|
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
|
| ; Caption样式:高亮青蓝,深蓝描边
|
| Style: Caption,Noto Sans CJK SC,46,&H00FFFF44,&H0000FFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,3,2,10,10,39,0
|
| ; Speech样式:鲜亮黄,黑色描边(最醒目)
|
| Style: Speech,Noto Sans CJK SC,42,&H00FF77FF,&H00FFFFFF,&H003800BF,&H00000000,0,0,0,0,100,100,0,0,1,2,3,8,10,10,39,0
|
|
|
| [Events]
|
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
|
| """
|
| def smart_wrap(text, image_width, font_size=48):
|
| """动态计算每行字符数"""
|
|
|
| avg_char_width = font_size * 0.6
|
| max_chars_per_line = max(1, int(image_width / avg_char_width) - 2)
|
|
|
|
|
| lines = []
|
| current_line = []
|
| current_width = 0
|
|
|
| for char in text:
|
| char_width = font_size if ord(char) > 255 else font_size//2
|
| if current_width + char_width > image_width - 100:
|
| lines.append(''.join(current_line))
|
| current_line = [char]
|
| current_width = char_width
|
| else:
|
| current_line.append(char)
|
| current_width += char_width
|
| lines.append(''.join(current_line))
|
|
|
| return r'\N'.join(lines)
|
|
|
|
|
| def create_caption_subtitle_file(project_dir, captions, panel_start_times, panel_durations, image_widths):
|
| try:
|
| subtitle_file = os.path.join(project_dir, "captions.ass")
|
|
|
| with open(subtitle_file, "w", encoding="utf-8") as f:
|
| f.write(ASS_STYLE_HEADER)
|
| for i, (caption, start, duration, width) in enumerate(zip(captions, panel_start_times, panel_durations, image_widths)):
|
| wrapped_text = smart_wrap(caption, width)
|
| f.write(
|
| f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
| f"Caption,,0,0,0,,{wrapped_text}\n"
|
| )
|
|
|
| return subtitle_file
|
| except Exception as e:
|
| logger.error(f"Error creating caption subtitle file: {e}")
|
| return None
|
|
|
|
|
| def create_speech_subtitle_file(project_dir, speeches, panel_start_times, panel_durations, image_widths):
|
| try:
|
| subtitle_file = os.path.join(project_dir, "speeches.ass")
|
|
|
| with open(subtitle_file, "w", encoding="utf-8") as f:
|
| f.write(ASS_STYLE_HEADER)
|
| for i, (speech, start, duration, width) in enumerate(zip(speeches, panel_start_times, panel_durations, image_widths)):
|
| wrapped_text = smart_wrap(speech, width)
|
| f.write(
|
| f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
| f"Speech,,0,0,0,,{wrapped_text}\n"
|
| )
|
|
|
| return subtitle_file
|
| except Exception as e:
|
| logger.error(f"Error creating speech subtitle file: {e}")
|
| return None
|
|
|
|
|
| def format_time(seconds):
|
| hours = int(seconds / 3600)
|
| minutes = int((seconds % 3600) / 60)
|
| secs = int(seconds % 60)
|
| centisecs = int((seconds - int(seconds)) * 100)
|
| return f"{hours}:{minutes:02}:{secs:02}.{centisecs:02}"
|
|
|
|
|
| async def create_audio_file(project_dir, captions, speeches):
|
| try:
|
| audio_parts = []
|
| audio_durations = {}
|
| panel_start_times = [0]
|
| current_time = 0
|
| panel_durations = []
|
|
|
|
|
| for i, (caption, speech) in enumerate(zip(captions, speeches)):
|
| panel_audio_parts = []
|
| panel_duration = 0
|
|
|
|
|
| if caption:
|
| caption_audio = os.path.join(project_dir, f"caption_{i}.mp3")
|
| result = await generate_speech(caption, "f2ed19ca0ea246bf9cbc6382be00e4fc", caption_audio)
|
| if result:
|
| duration = get_audio_duration(caption_audio)
|
| audio_durations[f"caption_{i}"] = duration
|
| panel_audio_parts.append(caption_audio)
|
| panel_duration += duration
|
|
|
|
|
| if speech:
|
| speech_audio = os.path.join(project_dir, f"speech_{i}.mp3")
|
| result = await generate_speech(speech, "3b55b3d84d2f453a98d8ca9bb24182d6", speech_audio)
|
| if result:
|
| duration = get_audio_duration(speech_audio)
|
| audio_durations[f"speech_{i}"] = duration
|
| panel_audio_parts.append(speech_audio)
|
| panel_duration += duration
|
|
|
|
|
| if panel_duration == 0:
|
| panel_duration = 5.0
|
|
|
| panel_durations.append(panel_duration)
|
|
|
|
|
| if panel_audio_parts:
|
| panel_combined = os.path.join(project_dir, f"panel_{i}_combined.mp3")
|
| combined = AudioSegment.empty()
|
|
|
| for audio_path in panel_audio_parts:
|
| segment = AudioSegment.from_file(audio_path)
|
| combined += segment
|
|
|
| combined.export(panel_combined, format="mp3")
|
| audio_parts.append(panel_combined)
|
|
|
|
|
| current_time += panel_duration
|
| if i < len(captions) - 1:
|
| panel_start_times.append(current_time)
|
|
|
| if not audio_parts:
|
| logger.error("No audio parts generated")
|
| return None, {}, [], []
|
|
|
|
|
| combined_audio = os.path.join(project_dir, "combined_audio.mp3")
|
| final_combined = AudioSegment.empty()
|
|
|
| for audio_path in audio_parts:
|
| segment = AudioSegment.from_file(audio_path)
|
| final_combined += segment
|
|
|
| final_combined.export(combined_audio, format="mp3")
|
|
|
|
|
| durations_file = os.path.join(project_dir, "audio_durations.json")
|
| with open(durations_file, "w") as f:
|
| json.dump(audio_durations, f)
|
|
|
|
|
| panel_times_file = os.path.join(project_dir, "panel_times.json")
|
| with open(panel_times_file, "w") as f:
|
| json.dump({"start_times": panel_start_times, "durations": panel_durations}, f)
|
|
|
| return combined_audio, audio_durations, panel_start_times, panel_durations
|
| except Exception as e:
|
| logger.error(f"Error creating audio file: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None, {}, [], []
|
|
|
| def process_sub_path(path):
|
| """深度处理FFmpeg路径转义"""
|
|
|
| processed = Path(path).as_posix()
|
|
|
| processed = processed.translate(str.maketrans({
|
| ':': r'\:',
|
| "'": r"\\\'",
|
| ',': r'\\,',
|
| '[': r'\\[',
|
| ']': r'\\]',
|
| ' ': r'\ '
|
| }))
|
| return f"'{processed}'"
|
|
|
|
|
| def create_video(project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
| audio_file, output_video, audio_durations, panel_start_times, panel_durations):
|
| try:
|
|
|
| filter_parts = []
|
| concat_parts = []
|
| for i, (img, duration) in enumerate(zip(image_paths, panel_durations)):
|
|
|
| anim_duration = max(duration * 0.166, 0.2)
|
| fade_in = min(anim_duration, duration * 0.5)
|
| fade_out = min(anim_duration, duration - fade_in)
|
|
|
| filter_part = (
|
| f"[{i}:v]loop=loop=-1:size=1,trim=duration={duration}[base{i}];"
|
| f"[base{i}]format=yuva420p,"
|
| f"fade=in:st=0:d={fade_in}:alpha=1,"
|
| f"fade=out:st={duration-fade_out}:d={fade_out}:alpha=1[anim{i}];"
|
| )
|
| filter_parts.append(filter_part)
|
| concat_parts.append(f"[anim{i}]")
|
| concat_str = f"{''.join(concat_parts)}concat=n={len(image_paths)}:v=1:a=0[outv]"
|
| filter_complex = ''.join(filter_parts) + concat_str
|
|
|
| temp_video = os.path.join(project_dir, "temp_video.mp4")
|
| cmd1 = ["ffmpeg", "-y"]
|
| for img in image_paths:
|
| cmd1.extend(["-i", img])
|
| cmd1.extend([
|
| "-i", audio_file,
|
| "-filter_complex", filter_complex,
|
| "-map", "[outv]",
|
| "-map", f"{len(image_paths)}:a",
|
| "-c:v", "libx264", "-pix_fmt", "yuv420p",
|
| "-c:a", "aac", "-strict", "experimental",
|
| "-vsync", "vfr",
|
| "-async", "1",
|
| "-movflags", "+faststart",
|
| temp_video
|
| ])
|
| subprocess.run(cmd1, check=True)
|
|
|
|
|
| combined_filter = (
|
| f"subtitles={process_sub_path(caption_subtitle_file)}",
|
| f"subtitles={process_sub_path(speech_subtitle_file)}"
|
| )
|
| filter_chain = ",".join(combined_filter)
|
|
|
| cmd_combined = [
|
| "ffmpeg", "-y",
|
| "-i", temp_video,
|
| "-vf", filter_chain,
|
| "-c:a", "copy",
|
| "-c:v", "libx264",
|
| "-preset", "fast",
|
| "-movflags", "+faststart",
|
| output_video
|
| ]
|
| subprocess.run(cmd_combined, check=True)
|
|
|
| os.remove(temp_video)
|
| return output_video
|
| except subprocess.CalledProcessError as e:
|
| logger.error(f"FFmpeg failed with cmd: {' '.join(e.cmd)}")
|
| logger.error(f"FFmpeg stderr: {e.stderr}")
|
| return None
|
| except Exception as e:
|
| logger.error(f"Unexpected error: {str(e)}")
|
| return None
|
|
|
|
|
| def upload_to_local_storage(local_path, relative_path):
|
| try:
|
|
|
| storage_dir = os.path.abspath("storage")
|
| os.makedirs(storage_dir, exist_ok=True)
|
|
|
|
|
| target_dir = os.path.dirname(os.path.join(storage_dir, relative_path))
|
| os.makedirs(target_dir, exist_ok=True)
|
|
|
| target_path = os.path.join(storage_dir, relative_path)
|
|
|
|
|
| shutil.copy2(local_path, target_path)
|
|
|
|
|
| relative_url = f"/storage/{relative_path.replace(os.sep, '/')}"
|
| full_url = f"{BASE_URL}{relative_url}"
|
| return full_url
|
| except Exception as e:
|
| logger.error(f"Error copying to local storage: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None
|
|
|
| @app.post("/api/generate-video")
|
| async def generate_video(comic_data: ComicData):
|
|
|
| project_id = str(uuid.uuid4())
|
|
|
| project_dir = os.path.abspath(os.path.join("temp", project_id))
|
| os.makedirs(project_dir, exist_ok=True)
|
|
|
| logger.info(f"Created project directory: {project_dir}")
|
|
|
| try:
|
|
|
| image_paths = []
|
| image_widths = []
|
| for i, panel_url in enumerate(comic_data.panels):
|
| output_path = os.path.join(project_dir, f"panel_{i}.jpg")
|
| path_result, img_width = await download_image(panel_url, output_path)
|
| if path_result:
|
| image_paths.append(path_result)
|
| image_widths.append(img_width)
|
| else:
|
| image_widths.append(1920)
|
|
|
| if not image_paths:
|
| raise HTTPException(status_code=500, detail="Failed to download images")
|
|
|
| logger.info(f"Downloaded {len(image_paths)} images")
|
|
|
|
|
| audio_file, audio_durations, panel_start_times, panel_durations = await create_audio_file(
|
| project_dir, comic_data.captions, comic_data.speeches
|
| )
|
| if not audio_file:
|
| raise HTTPException(status_code=500, detail="Failed to create audio file")
|
|
|
| logger.info(f"Created audio file: {audio_file}")
|
|
|
|
|
| caption_subtitle_file = create_caption_subtitle_file(
|
| project_dir, comic_data.captions, panel_start_times, panel_durations, image_widths
|
| )
|
| if not caption_subtitle_file:
|
| raise HTTPException(status_code=500, detail="Failed to create caption subtitle file")
|
|
|
| speech_subtitle_file = create_speech_subtitle_file(
|
| project_dir, comic_data.speeches, panel_start_times, panel_durations, image_widths
|
| )
|
| if not speech_subtitle_file:
|
| raise HTTPException(status_code=500, detail="Failed to create speech subtitle file")
|
|
|
| logger.info(f"Created subtitle files: {caption_subtitle_file}, {speech_subtitle_file}")
|
|
|
|
|
| output_video = os.path.join(project_dir, "output.mp4")
|
| result = create_video(
|
| project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
| audio_file, output_video, audio_durations, panel_start_times, panel_durations
|
| )
|
| if not result:
|
| raise HTTPException(status_code=500, detail="Failed to create video")
|
|
|
| logger.info(f"Created video: {output_video}")
|
|
|
|
|
| video_url = upload_to_local_storage(output_video, f"{project_id}/video.mp4")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| shutil.rmtree(project_dir, ignore_errors=True)
|
|
|
| return {
|
| "videoUrl": video_url,
|
|
|
|
|
|
|
| "projectId": project_id
|
| }
|
| except Exception as e:
|
|
|
| shutil.rmtree(project_dir, ignore_errors=True)
|
| logger.error(f"Error generating video: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
| @app.get("/")
|
| async def health_check():
|
| return {"status": "ok"}
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=8000) |