| from fastapi import APIRouter, HTTPException, status, BackgroundTasks, UploadFile, Query |
| from .Schema import EditorRequest, TaskInfo |
| from App.Worker import celery_task, concatenate_videos |
| from celery.result import AsyncResult |
| import aiofiles, os, uuid, aiohttp |
| from App import SERVER_STATE, Task |
|
|
| videditor_router = APIRouter(tags=["vidEditor"]) |
|
|
|
|
| @videditor_router.post("/create-video") |
| async def create_video(videoRequest: EditorRequest, background_task: BackgroundTasks): |
| background_task.add_task(celery_task, videoRequest) |
| return {"task_id": "started"} |
|
|
|
|
| @videditor_router.post("/create-chunks") |
| async def create_chunks(videoRequest: EditorRequest, background_task: BackgroundTasks): |
| video_duration = videoRequest.constants.duration |
| task_id = uuid.uuid4() |
| new_task = Task(TASK_ID=task_id) |
|
|
| active_nodes = [ |
| node |
| for node in SERVER_STATE.NODES |
| if await new_task._check_node_online(node.SPACE_HOST) |
| ] |
| number_of_nodes = len(active_nodes) |
| ranges = [ |
| [i, i + number_of_nodes] for i in range(0, video_duration, number_of_nodes) |
| ] |
| for i, node in enumerate(active_nodes): |
| await new_task.add_node(node, i) |
|
|
| SERVER_STATE.TASKS[task_id] = new_task |
|
|
| async with aiohttp.ClientSession() as session: |
| for i, node in enumerate(active_nodes): |
| videoRequest.constants.frames = ranges[i] |
| if node.SPACE_HOST == SERVER_STATE.SPACE_HOST: |
| background_task.add_task(celery_task, videoRequest) |
| async with session.post( |
| "node.SPACE_HOST/create-video", json=videoRequest |
| ) as response: |
| if response.status != 200: |
| raise HTTPException( |
| status_code=response.status, |
| detail="Failed to post request to node", |
| ) |
|
|
| return {"task_id": "started"} |
|
|
|
|
| @videditor_router.post("/uploadfile/") |
| async def create_file( |
| background_tasks: BackgroundTasks, |
| file: UploadFile, |
| node: str, |
| chunk: int, |
| task: str, |
| ): |
|
|
| chunk_directory = f"/tmp/Video/{task}" |
| file_name = f"{chunk_directory}/{chunk}.mp4" |
| |
| os.makedirs(chunk_directory, exist_ok=True) |
|
|
| try: |
| async with aiofiles.open(file_name, "wb") as f: |
| while contents := await file.read(1024 * 1): |
| await f.write(contents) |
|
|
| except Exception as e: |
| return { |
| "message": f"There was an error uploading the file, error message {str(e)} " |
| } |
| finally: |
| await file.close() |
| running_task = SERVER_STATE.TASKS[task] |
| running_task.mark_node_completed(node) |
| if running_task.is_completed(): |
| background_tasks.add_task(concatenate_videos, chunk_directory) |
|
|
| return {"message": "File uploaded successfully"} |
|
|