| from celery import Celery, chain |
| import os, shutil, subprocess |
| import uuid |
| from urllib.parse import urlparse |
| from subprocess import run |
| from App import celery_config, bot, SERVER_STATE |
| from typing import List |
| from App.Editor.Schema import EditorRequest, LinkInfo, Assets, Constants |
| from celery.signals import worker_process_init |
| from asgiref.sync import async_to_sync |
| import json |
| import os |
| from pydantic import BaseModel, Field |
| from App.Generate.utils.GroqInstruct import tagger |
| from App.utilis import upload_file |
|
|
| import subprocess |
|
|
|
|
| def concatenate_videos(input_dir): |
| |
| files = sorted([f for f in os.listdir(input_dir) if f.endswith(".mp4")]) |
|
|
| |
| input_files = "|".join([f"file '{os.path.join(input_dir, f)}'" for f in files]) |
|
|
| output_file = f"{input_dir}/final.mp4" |
| |
| subprocess.run( |
| [ |
| "ffmpeg", |
| "-f", |
| "concat", |
| "-safe", |
| "0", |
| "-i", |
| f"concat:{input_files}", |
| "-c", |
| "copy", |
| output_file, |
| ] |
| ) |
| bot.start() |
| bot.send_file(-1002069945904, file=output_file, caption="finally done!") |
| return output_file |
|
|
|
|
| class YouTubeUploadTask(BaseModel): |
| filename: str |
| title: str = Field( |
| ..., |
| min_length=100, |
| max_length=500, |
| description="A good title for the video", |
| ) |
| description: str = Field( |
| ..., |
| min_length=100, |
| max_length=500, |
| description="A brief summary of the video's content", |
| ) |
| category_id: str = "22" |
| privacy: str = "private" |
| tags: str = Field( |
| ..., |
| min_length=100, |
| max_length=500, |
| description="Best seo tags for youtube based on the story", |
| ) |
| thumbnail: str = Field( |
| ..., |
| min_length=100, |
| max_length=500, |
| description="""Best image prompt based on the image description: here is an """, |
| ) |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| def create_json_file(assets: List[Assets], asset_dir: str): |
| for asset in assets: |
| filename = f"{asset.type.capitalize()}Sequences.json" |
| |
| json_string = json.dumps(asset.sequence) |
|
|
| |
| os.makedirs(asset_dir, exist_ok=True) |
| print(os.path.join(asset_dir, filename)) |
| |
| with open(os.path.join(asset_dir, filename), "w") as f: |
| f.write(json_string) |
|
|
|
|
| |
| def create_constants_json_file(constants: Constants, asset_dir: str): |
| temp_dir = asset_dir.replace("src/HelloWorld/Assets", "") |
| instrunction_file = os.path.join(temp_dir, "ServerInstructions.json") |
| filename = "Constants.json" |
| if constants: |
| json_string = json.dumps(constants.dict()) |
| else: |
| json_string = json.dumps({}) |
| os.makedirs(asset_dir, exist_ok=True) |
| with open(instrunction_file, "w") as f: |
| if constants.instructions: |
| f.write(json.dumps({"frames": constants.frames})) |
| else: |
| f.write(json.dumps({"frames": [0, constants.duration]})) |
|
|
| with open(os.path.join(asset_dir, filename), "w") as f: |
| f.write(json_string) |
|
|
|
|
| def create_symlink(source_dir, target_dir, symlink_name): |
| source_path = os.path.join(source_dir, symlink_name) |
| target_path = os.path.join(target_dir, symlink_name) |
|
|
| try: |
| os.symlink(source_path, target_path) |
| print(f"Symlink '{symlink_name}' created successfully.") |
| except FileExistsError: |
| print(f"Symlink '{symlink_name}' already exists.") |
|
|
|
|
| def change_playback_speed(input_path, speed_factor): |
| """ |
| Change the playback speed of a video and overwrite the original file. |
| |
| :param input_path: Path to the input video file. |
| :param speed_factor: Factor by which to increase the speed. (e.g., 2.0 for double speed) |
| """ |
| |
| temp_output_path = input_path + ".temp.mp4" |
|
|
| |
| command = [ |
| "ffmpeg", |
| "-i", |
| input_path, |
| "-filter_complex", |
| f"[0:v]setpts={1/speed_factor}*PTS[v];[0:a]atempo={speed_factor}[a]", |
| "-map", |
| "[v]", |
| "-map", |
| "[a]", |
| "-y", |
| temp_output_path, |
| ] |
|
|
| |
| subprocess.run(command, check=True) |
|
|
| |
| os.replace(temp_output_path, input_path) |
|
|
|
|
| def download_with_wget( |
| link=None, |
| download_dir=None, |
| filename=None, |
| links_file_path=None, |
| ): |
| headers = [ |
| "--header", |
| "Cookie: __Host-session=63EQahvTpHuoFSkEW75hC", |
| "--header", |
| "Cookie: __cf_bm=CDGicP5OErYjDI85UmQSRKlppJLlbcgCXlWcODoIQAI-1716296320-1.0.1.1-4Rm5_wdxupmrDWgddOQjEV01TMFC4UJ479GRIAKKGHNgXu3N8ZkASEZXGwCWaRyUYazsUaLMALk.4frWWJzHQ", |
| ] |
|
|
| |
| if links_file_path: |
| command = ["aria2c", "-i", links_file_path, "--dir", download_dir] |
| else: |
| command = ["aria2c"] + headers + [link, "-d", download_dir, "-o", filename] |
|
|
| |
| subprocess.run(command) |
|
|
|
|
| |
| def copy_remotion_app(src: str, dest: str): |
| shutil.copytree(src, dest) |
|
|
|
|
| |
| def unsilence(directory: str): |
| output_dir = os.path.join(directory, "out/video.mp4") |
| shortered_dir = os.path.join(directory, "out/temp.mp4") |
| os.system(f"pipx run unsilence {output_dir} {shortered_dir} -y") |
| os.remove(output_dir) |
| os.rename(shortered_dir, output_dir) |
|
|
|
|
| |
| def install_dependencies(directory: str): |
| os.chdir(directory) |
| os.system("npm install") |
|
|
|
|
| |
| def upload_video_to_youtube(task: YouTubeUploadTask): |
| |
|
|
| |
| command = [ |
| "/srv/youtube/youtubeuploader", |
| "-filename", |
| task.filename, |
| "-title", |
| task.title, |
| "-description", |
| task.description, |
| "-categoryId", |
| task.category_id, |
| "-privacy", |
| task.privacy, |
| "-tags", |
| task.tags, |
| ] |
|
|
| if task.thumbnail: |
| command.extend(["-thumbnail", task.thumbnail]) |
|
|
| |
| result = run(command, capture_output=True, text=True) |
|
|
| return result.stdout |
|
|
|
|
| |
| def download_assets(links: List[LinkInfo], temp_dir: str): |
| public_dir = f"{temp_dir}/public" |
| os.makedirs(public_dir, exist_ok=True) |
|
|
| links_file_path = os.path.join(temp_dir, "download_links.txt") |
|
|
| with open(links_file_path, "w") as links_file: |
| for link in links: |
| file_link = link.link |
| file_name = link.file_name |
| |
| links_file.write(f"{file_link}\n out={file_name}\n") |
| download_with_wget(links_file_path=links_file_path, download_dir=public_dir) |
|
|
|
|
| |
| def render_video(directory: str, output_directory: str): |
| os.chdir(directory) |
| os.system(f"npm run build --output {output_directory}") |
| print("complete") |
|
|
|
|
| |
| async def cleanup_temp_directory( |
| temp_dir: str, |
| output_dir: str, |
| video_task: EditorRequest, |
| chat_id: int = -1002069945904, |
| ): |
| video_folder_dir = f"/tmp/Video/{video_task.constants.task}" |
|
|
| try: |
| print("sending...") |
| |
| await bot.send_file(chat_id, file=output_dir, caption="Your video caption") |
|
|
| finally: |
| |
| |
| |
| |
| |
| |
| |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| |
| async def celery_task(video_task: EditorRequest): |
| remotion_app_dir = os.path.join("./", "Remotion-app") |
| project_id = str(uuid.uuid4()) |
| temp_dir = f"/tmp/{project_id}" |
| output_dir = f"/tmp/{project_id}/out/video.mp4" |
| assets_dir = os.path.join(temp_dir, "src/HelloWorld/Assets") |
|
|
| copy_remotion_app(remotion_app_dir, temp_dir) |
|
|
| |
| |
| |
| install_dependencies(temp_dir) |
| create_constants_json_file(video_task.constants, assets_dir) |
| create_json_file(video_task.assets, assets_dir) |
| download_assets(video_task.links, temp_dir) |
| render_video(temp_dir, output_dir) |
| change_playback_speed(output_dir, 1.2) |
| |
| |
|
|
| |
| |
| await cleanup_temp_directory(temp_dir, output_dir, video_task) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| def handle_error(task_id, err, *args, **kwargs): |
| print(f"Error in task {task_id}: {err}") |
| |
|
|