""" app.py — StudioX AI Reel Cutter — Hugging Face Space entry point =================================================================== Exposes three endpoints: GET /health Returns {"status": "ok"}. Node backend pings this to wake the Space and confirm it is alive before submitting a job. GET /progress/{job_id} Server-Sent Events stream. Emits progress events while the job runs so the Node backend can forward live updates to the browser. POST /generate Accepts a YouTube URL or an uploaded video file plus optional settings. Runs the full pipeline and returns the ZIP as a streaming file download. Environment variables (set as HF Space Secrets): ASSEMBLYAI_API_KEY — required OPENROUTER_API_KEY — required OPENROUTER_MODEL — optional, default: qwen/qwen3-30b-a3b:free """ import asyncio import json import os import shutil import tempfile import uuid from pathlib import Path from typing import Optional import assemblyai as aai from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, StreamingResponse from audio import extract_audio from downloader import YouTubeDownloadError, download_youtube_video from highlights import HighlightModelError, detect_highlights from packager import package_reels from processor import process_all_segments from transcriber import transcribe_audio from utils import log, probe_video # ── Inject API keys from env into libraries that need them ─────────────────── aai.settings.api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") # ── App ────────────────────────────────────────────────────────────────────── app = FastAPI( title = "StudioX AI Reel Cutter", description = "Converts long videos into 5 vertical 9:16 reels", version = "1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins = ["*"], # tighten to your domain in production allow_methods = ["*"], allow_headers = ["*"], ) # ── In-memory job progress store ───────────────────────────────────────────── # Structure: { job_id: { "stage": str, "pct": int, "done": bool, "error": str|None } } _jobs: dict = {} # ───────────────────────────────────────────── # HEALTH CHECK # ───────────────────────────────────────────── @app.get("/health") async def health(): """Node backend calls this to wake the Space and confirm readiness.""" return {"status": "ok", "service": "studiox-reel-cutter"} # ───────────────────────────────────────────── # PROGRESS STREAM # ───────────────────────────────────────────── @app.get("/progress/{job_id}") async def progress_stream(job_id: str): """ Server-Sent Events endpoint. Node backend subscribes here and forwards events to the browser. Event format: data: {"stage": "transcribing", "pct": 45, "done": false} """ if job_id not in _jobs: raise HTTPException(status_code=404, detail="Job not found") async def event_generator(): last_pct = -1 while True: job = _jobs.get(job_id, {}) pct = job.get("pct", 0) # Only emit when something changed if pct != last_pct or job.get("done") or job.get("error"): payload = json.dumps({ "stage": job.get("stage", "starting"), "pct" : pct, "done" : job.get("done", False), "error": job.get("error", None), }) yield f"data: {payload}\n\n" last_pct = pct if job.get("done") or job.get("error"): break await asyncio.sleep(1) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control" : "no-cache", "X-Accel-Buffering" : "no", "Access-Control-Allow-Origin": "*", }, ) # ───────────────────────────────────────────── # GENERATE REELS # ───────────────────────────────────────────── @app.post("/generate") async def generate_reels( # ── Input — exactly one must be provided ────────────────────────────── yt_url : Optional[str] = Form(None), video_file : Optional[UploadFile] = File(None), # ── Options ──────────────────────────────────────────────────────────── num_reels : int = Form(5), min_duration : int = Form(10), max_duration : int = Form(30), resolution : str = Form("720p"), # "720p" | "1080p" add_captions : bool = Form(True), caption_font_size: int = Form(48), caption_color : str = Form("white"), # ── Job tracking ──────────────────────────────────────────────────────── # Node backend passes a job_id it generated so it can also call /progress job_id : Optional[str] = Form(None), ): """ Main endpoint. Accepts a YouTube URL **or** an uploaded video file, runs the full pipeline, and returns a ZIP of reels as a file download. The ZIP contains: reel_01_12s-40s.mp4 reel_02_55s-82s.mp4 … metadata.json """ # ── Validate input ─────────────────────────────────────────────────────── if not yt_url and not video_file: raise HTTPException(status_code=422, detail="Provide yt_url or video_file") if yt_url and video_file: raise HTTPException(status_code=422, detail="Provide only one of yt_url or video_file") # ── Resolution lookup ───────────────────────────────────────────────────── resolution_map = { "720p" : (720, 1280), "1080p": (1080, 1920), } out_w, out_h = resolution_map.get(resolution, (720, 1280)) # ── Job setup ──────────────────────────────────────────────────────────── jid = job_id or str(uuid.uuid4()) _jobs[jid] = {"stage": "starting", "pct": 0, "done": False, "error": None} log("🚀", f"[{jid[:8]}] job started") def _progress(stage: str, pct: int): _jobs[jid] = {"stage": stage, "pct": pct, "done": False, "error": None} log("📊", f"[{jid[:8]}] {stage} {pct}%") def _job_log(msg: str): log("🧭", f"[{jid[:8]}] {msg}") # ── Working directory (auto-cleaned on exit) ───────────────────────────── work_dir = Path(tempfile.mkdtemp(prefix="studiox_")) audio_dir = work_dir / "audio" reels_dir = work_dir / "reels" temp_dir = work_dir / "temp" for d in [audio_dir, reels_dir, temp_dir]: d.mkdir(parents=True, exist_ok=True) try: # ── Step 1: Acquire video ──────────────────────────────────────────── _progress("downloading", 0) if yt_url: _job_log("starting YouTube fetch") video_path = download_youtube_video( url = yt_url, output_dir = work_dir, progress_cb = lambda stage, pct: _progress("downloading", pct), log_cb = _job_log, ) _job_log("YouTube fetch succeeded") else: # Save uploaded file to disk suffix = Path(video_file.filename).suffix or ".mp4" video_path = work_dir / f"input{suffix}" with video_path.open("wb") as f: shutil.copyfileobj(video_file.file, f) log("✅", f"Received uploaded file: {video_path.name} " f"({video_path.stat().st_size / 1_048_576:.1f} MB)") _progress("downloading", 100) video_meta = probe_video(video_path) # ── Step 2: Extract audio ──────────────────────────────────────────── _progress("extracting_audio", 0) audio_path = extract_audio(video_path, audio_dir) _progress("extracting_audio", 100) # ── Step 3: Transcribe ─────────────────────────────────────────────── _progress("transcribing", 0) transcript_data = transcribe_audio( audio_path = audio_path, progress_cb = _progress, ) _progress("transcribing", 100) # ── Step 4: Detect highlights ──────────────────────────────────────── _progress("detecting_highlights", 0) segments = detect_highlights( transcript_data = transcript_data, video_duration = video_meta["duration"], num_reels = num_reels, min_duration = float(min_duration), max_duration = float(max_duration), progress_cb = _progress, ) _progress("detecting_highlights", 100) if not segments: raise RuntimeError( "No highlight segments detected. " "Try a longer video or reduce min_duration." ) # ── Step 5: Cut + convert ──────────────────────────────────────────── _progress("cutting_reels", 0) reel_outputs = process_all_segments( video_path = video_path, segments = segments, reels_dir = reels_dir, temp_dir = temp_dir, src_w = video_meta["width"], src_h = video_meta["height"], out_w = out_w, out_h = out_h, words = transcript_data["words"], add_captions = add_captions, caption_font_size = caption_font_size, caption_color = caption_color, progress_cb = _progress, ) _progress("cutting_reels", 100) # ── Step 6: Package ────────────────────────────────────────────────── _progress("packaging", 0) zip_path = package_reels(reel_outputs, work_dir) _progress("packaging", 100) # ── Mark job done ──────────────────────────────────────────────────── _jobs[jid] = {"stage": "done", "pct": 100, "done": True, "error": None} log("✨", f"Job {jid[:8]} complete → {zip_path.name}") # ── Stream ZIP back ────────────────────────────────────────────────── # We use a generator so the file is deleted from disk after streaming def _iter_file_then_cleanup(path: Path, cleanup: Path): try: with path.open("rb") as f: while chunk := f.read(1024 * 64): # 64 KB chunks yield chunk finally: shutil.rmtree(cleanup, ignore_errors=True) return StreamingResponse( _iter_file_then_cleanup(zip_path, work_dir), media_type = "application/zip", headers = { "Content-Disposition": f'attachment; filename="{zip_path.name}"', "X-Job-Id" : jid, }, ) except YouTubeDownloadError as exc: error_msg = str(exc) _jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": error_msg} shutil.rmtree(work_dir, ignore_errors=True) log("❌", f"Job {jid[:8]} failed during yt fetch: {error_msg}") raise HTTPException( status_code=500, detail={ "error": "HF_UPSTREAM_FAILURE", "message": error_msg, "job_id": jid, }, ) except HighlightModelError as exc: error_msg = str(exc) _jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": error_msg} shutil.rmtree(work_dir, ignore_errors=True) log("❌", f"Job {jid[:8]} failed during highlight detection: {error_msg}") raise HTTPException( status_code=500, detail={ "error": "HF_UPSTREAM_FAILURE", "message": error_msg, "job_id": jid, }, ) except Exception as exc: _jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": str(exc)} shutil.rmtree(work_dir, ignore_errors=True) log("❌", f"Job {jid[:8]} failed: {exc}") raise HTTPException(status_code=500, detail=str(exc)) # ───────────────────────────────────────────── # DEV SERVER # ───────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)