| import os, uuid |
| import tempfile |
| import requests |
| from mimetypes import guess_extension |
| from PIL import Image |
| from io import BytesIO |
| import subprocess |
| from fastapi import ( |
| FastAPI, |
| UploadFile, |
| File, |
| HTTPException, |
| Response, |
| Request, |
| BackgroundTasks, |
| ) |
| from typing import List, Optional |
| import asyncio, aiofiles |
| from fastapi.responses import StreamingResponse, FileResponse |
|
|
| app = FastAPI() |
|
|
|
|
| def download_image(image_url: str): |
| |
| temp_dir = tempfile.mkdtemp() |
|
|
| |
| response = requests.get(image_url) |
| response.raise_for_status() |
|
|
| |
| image = Image.open(BytesIO(response.content)) |
| image_format = image.format.lower() |
| image_extension = guess_extension(f"image/{image_format}") |
|
|
| if image_extension is None: |
| raise ValueError("Cannot detect image file type.") |
|
|
| |
| image_path = os.path.join(temp_dir, f"image{image_extension}") |
|
|
| |
| with open(image_path, "wb") as image_file: |
| image_file.write(response.content) |
|
|
| |
| return image_path, image.size |
|
|
|
|
| def make_effect( |
| image_link: str, |
| filename: str, |
| frame_rate: int, |
| duration: int, |
| quality: int, |
| ssaa: float, |
| raw: bool, |
| ): |
| |
| image_path, (width, height) = download_image(image_url=image_link) |
| print(f"Image path: {image_path}, Width: {width}, Height: {height}", "#" * 100) |
|
|
| |
| destination = os.path.join("/tmp/Video", filename) |
|
|
| |
| os.makedirs(os.path.dirname(destination), exist_ok=True) |
|
|
| |
| command = [ |
| "depthflow", |
| "input", |
| "-i", |
| image_path, |
| "main", |
| "-f", |
| str(frame_rate), |
| "-t", |
| str(duration), |
| "--width", |
| str(width), |
| "--height", |
| str(height), |
| "--quality", |
| str(quality), |
| "--ssaa", |
| str(ssaa), |
| "--benchmark", |
| ] |
|
|
| if raw: |
| command.append("--raw") |
|
|
| command.extend(["--output", destination]) |
|
|
| |
| subprocess.run(command, check=True) |
|
|
| return destination |
|
|
|
|
| @app.post("/generate_video") |
| async def generate_video( |
| background_task: BackgroundTasks, |
| image_link: str = None, |
| frame_rate: int = 30, |
| duration: int = 3, |
| quality: int = 10, |
| ssaa: float = 0.75, |
| raw: bool = True, |
| ): |
| filename = f"{str(uuid.uuid4())}.mp4" |
| try: |
| background_task.add_task( |
| make_effect, image_link, filename, frame_rate, duration, quality, ssaa, raw |
| ) |
| return {"output_file": filename} |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
|
| @app.get("/download/{filename}") |
| async def download_video(filename: str, request: Request): |
| video_directory = "/tmp/Video" |
| video_path = os.path.join(video_directory, filename) |
| if not os.path.isfile(video_path): |
| raise HTTPException(status_code=404, detail="Video not found") |
|
|
| range_header = request.headers.get("Range", None) |
| video_size = os.path.getsize(video_path) |
|
|
| if range_header: |
| start, end = range_header.strip().split("=")[1].split("-") |
| start = int(start) |
| end = video_size if end == "" else int(end) |
|
|
| headers = { |
| "Content-Range": f"bytes {start}-{end}/{video_size}", |
| "Accept-Ranges": "bytes", |
| } |
|
|
| content = read_file_range(video_path, start, end) |
| return StreamingResponse(content, media_type="video/mp4", headers=headers) |
|
|
| return FileResponse(video_path, media_type="video/mp4") |
|
|
|
|
| async def read_file_range(path, start, end): |
| async with aiofiles.open(path, "rb") as file: |
| await file.seek(start) |
| while True: |
| data = await file.read(1024 * 1024) |
| if not data or await file.tell() > end: |
| break |
| yield data |
|
|