rajank18
..
b395a69
"""
app.py β€” StudioX AI Reel Cutter β€” Hugging Face Space entry point
===================================================================
Exposes three endpoints:
GET /health
Returns {"status": "ok"}. Node backend pings this to wake the
Space and confirm it is alive before submitting a job.
GET /progress/{job_id}
Server-Sent Events stream. Emits progress events while the job
runs so the Node backend can forward live updates to the browser.
POST /generate
Accepts a YouTube URL or an uploaded video file plus optional
settings. Runs the full pipeline and returns the ZIP as a
streaming file download.
Environment variables (set as HF Space Secrets):
ASSEMBLYAI_API_KEY β€” required
OPENROUTER_API_KEY β€” required
OPENROUTER_MODEL β€” optional, default: qwen/qwen3-30b-a3b:free
"""
import asyncio
import json
import os
import shutil
import tempfile
import uuid
from pathlib import Path
from typing import Optional
import assemblyai as aai
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from audio import extract_audio
from downloader import YouTubeDownloadError, download_youtube_video
from highlights import HighlightModelError, detect_highlights
from packager import package_reels
from processor import process_all_segments
from transcriber import transcribe_audio
from utils import log, probe_video
# ── Inject API keys from env into libraries that need them ───────────────────
aai.settings.api_key = os.environ.get("ASSEMBLYAI_API_KEY", "")
# ── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(
title = "StudioX AI Reel Cutter",
description = "Converts long videos into 5 vertical 9:16 reels",
version = "1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins = ["*"], # tighten to your domain in production
allow_methods = ["*"],
allow_headers = ["*"],
)
# ── In-memory job progress store ─────────────────────────────────────────────
# Structure: { job_id: { "stage": str, "pct": int, "done": bool, "error": str|None } }
_jobs: dict = {}
# ─────────────────────────────────────────────
# HEALTH CHECK
# ─────────────────────────────────────────────
@app.get("/health")
async def health():
"""Node backend calls this to wake the Space and confirm readiness."""
return {"status": "ok", "service": "studiox-reel-cutter"}
# ─────────────────────────────────────────────
# PROGRESS STREAM
# ─────────────────────────────────────────────
@app.get("/progress/{job_id}")
async def progress_stream(job_id: str):
"""
Server-Sent Events endpoint.
Node backend subscribes here and forwards events to the browser.
Event format:
data: {"stage": "transcribing", "pct": 45, "done": false}
"""
if job_id not in _jobs:
raise HTTPException(status_code=404, detail="Job not found")
async def event_generator():
last_pct = -1
while True:
job = _jobs.get(job_id, {})
pct = job.get("pct", 0)
# Only emit when something changed
if pct != last_pct or job.get("done") or job.get("error"):
payload = json.dumps({
"stage": job.get("stage", "starting"),
"pct" : pct,
"done" : job.get("done", False),
"error": job.get("error", None),
})
yield f"data: {payload}\n\n"
last_pct = pct
if job.get("done") or job.get("error"):
break
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control" : "no-cache",
"X-Accel-Buffering" : "no",
"Access-Control-Allow-Origin": "*",
},
)
# ─────────────────────────────────────────────
# GENERATE REELS
# ─────────────────────────────────────────────
@app.post("/generate")
async def generate_reels(
# ── Input β€” exactly one must be provided ──────────────────────────────
yt_url : Optional[str] = Form(None),
video_file : Optional[UploadFile] = File(None),
# ── Options ────────────────────────────────────────────────────────────
num_reels : int = Form(5),
min_duration : int = Form(10),
max_duration : int = Form(30),
resolution : str = Form("720p"), # "720p" | "1080p"
add_captions : bool = Form(True),
caption_font_size: int = Form(48),
caption_color : str = Form("white"),
# ── Job tracking ────────────────────────────────────────────────────────
# Node backend passes a job_id it generated so it can also call /progress
job_id : Optional[str] = Form(None),
):
"""
Main endpoint. Accepts a YouTube URL **or** an uploaded video file,
runs the full pipeline, and returns a ZIP of reels as a file download.
The ZIP contains:
reel_01_12s-40s.mp4
reel_02_55s-82s.mp4
…
metadata.json
"""
# ── Validate input ───────────────────────────────────────────────────────
if not yt_url and not video_file:
raise HTTPException(status_code=422, detail="Provide yt_url or video_file")
if yt_url and video_file:
raise HTTPException(status_code=422, detail="Provide only one of yt_url or video_file")
# ── Resolution lookup ─────────────────────────────────────────────────────
resolution_map = {
"720p" : (720, 1280),
"1080p": (1080, 1920),
}
out_w, out_h = resolution_map.get(resolution, (720, 1280))
# ── Job setup ────────────────────────────────────────────────────────────
jid = job_id or str(uuid.uuid4())
_jobs[jid] = {"stage": "starting", "pct": 0, "done": False, "error": None}
log("πŸš€", f"[{jid[:8]}] job started")
def _progress(stage: str, pct: int):
_jobs[jid] = {"stage": stage, "pct": pct, "done": False, "error": None}
log("πŸ“Š", f"[{jid[:8]}] {stage} {pct}%")
def _job_log(msg: str):
log("🧭", f"[{jid[:8]}] {msg}")
# ── Working directory (auto-cleaned on exit) ─────────────────────────────
work_dir = Path(tempfile.mkdtemp(prefix="studiox_"))
audio_dir = work_dir / "audio"
reels_dir = work_dir / "reels"
temp_dir = work_dir / "temp"
for d in [audio_dir, reels_dir, temp_dir]:
d.mkdir(parents=True, exist_ok=True)
try:
# ── Step 1: Acquire video ────────────────────────────────────────────
_progress("downloading", 0)
if yt_url:
_job_log("starting YouTube fetch")
video_path = download_youtube_video(
url = yt_url,
output_dir = work_dir,
progress_cb = lambda stage, pct: _progress("downloading", pct),
log_cb = _job_log,
)
_job_log("YouTube fetch succeeded")
else:
# Save uploaded file to disk
suffix = Path(video_file.filename).suffix or ".mp4"
video_path = work_dir / f"input{suffix}"
with video_path.open("wb") as f:
shutil.copyfileobj(video_file.file, f)
log("βœ…", f"Received uploaded file: {video_path.name} "
f"({video_path.stat().st_size / 1_048_576:.1f} MB)")
_progress("downloading", 100)
video_meta = probe_video(video_path)
# ── Step 2: Extract audio ────────────────────────────────────────────
_progress("extracting_audio", 0)
audio_path = extract_audio(video_path, audio_dir)
_progress("extracting_audio", 100)
# ── Step 3: Transcribe ───────────────────────────────────────────────
_progress("transcribing", 0)
transcript_data = transcribe_audio(
audio_path = audio_path,
progress_cb = _progress,
)
_progress("transcribing", 100)
# ── Step 4: Detect highlights ────────────────────────────────────────
_progress("detecting_highlights", 0)
segments = detect_highlights(
transcript_data = transcript_data,
video_duration = video_meta["duration"],
num_reels = num_reels,
min_duration = float(min_duration),
max_duration = float(max_duration),
progress_cb = _progress,
)
_progress("detecting_highlights", 100)
if not segments:
raise RuntimeError(
"No highlight segments detected. "
"Try a longer video or reduce min_duration."
)
# ── Step 5: Cut + convert ────────────────────────────────────────────
_progress("cutting_reels", 0)
reel_outputs = process_all_segments(
video_path = video_path,
segments = segments,
reels_dir = reels_dir,
temp_dir = temp_dir,
src_w = video_meta["width"],
src_h = video_meta["height"],
out_w = out_w,
out_h = out_h,
words = transcript_data["words"],
add_captions = add_captions,
caption_font_size = caption_font_size,
caption_color = caption_color,
progress_cb = _progress,
)
_progress("cutting_reels", 100)
# ── Step 6: Package ──────────────────────────────────────────────────
_progress("packaging", 0)
zip_path = package_reels(reel_outputs, work_dir)
_progress("packaging", 100)
# ── Mark job done ────────────────────────────────────────────────────
_jobs[jid] = {"stage": "done", "pct": 100, "done": True, "error": None}
log("✨", f"Job {jid[:8]} complete β†’ {zip_path.name}")
# ── Stream ZIP back ──────────────────────────────────────────────────
# We use a generator so the file is deleted from disk after streaming
def _iter_file_then_cleanup(path: Path, cleanup: Path):
try:
with path.open("rb") as f:
while chunk := f.read(1024 * 64): # 64 KB chunks
yield chunk
finally:
shutil.rmtree(cleanup, ignore_errors=True)
return StreamingResponse(
_iter_file_then_cleanup(zip_path, work_dir),
media_type = "application/zip",
headers = {
"Content-Disposition": f'attachment; filename="{zip_path.name}"',
"X-Job-Id" : jid,
},
)
except YouTubeDownloadError as exc:
error_msg = str(exc)
_jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": error_msg}
shutil.rmtree(work_dir, ignore_errors=True)
log("❌", f"Job {jid[:8]} failed during yt fetch: {error_msg}")
raise HTTPException(
status_code=500,
detail={
"error": "HF_UPSTREAM_FAILURE",
"message": error_msg,
"job_id": jid,
},
)
except HighlightModelError as exc:
error_msg = str(exc)
_jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": error_msg}
shutil.rmtree(work_dir, ignore_errors=True)
log("❌", f"Job {jid[:8]} failed during highlight detection: {error_msg}")
raise HTTPException(
status_code=500,
detail={
"error": "HF_UPSTREAM_FAILURE",
"message": error_msg,
"job_id": jid,
},
)
except Exception as exc:
_jobs[jid] = {"stage": "error", "pct": 0, "done": True, "error": str(exc)}
shutil.rmtree(work_dir, ignore_errors=True)
log("❌", f"Job {jid[:8]} failed: {exc}")
raise HTTPException(status_code=500, detail=str(exc))
# ─────────────────────────────────────────────
# DEV SERVER
# ─────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)