from __future__ import annotations import html import json import logging import os import queue import re import shutil import subprocess import sys import tempfile import threading import time import traceback import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Annotated def _bootstrap_local_paths() -> None: repo_root = Path(__file__).resolve().parent for candidate in (repo_root / "src", repo_root / "humeo-core" / "src"): candidate_str = str(candidate) if candidate.is_dir() and candidate_str not in sys.path: sys.path.insert(0, candidate_str) _bootstrap_local_paths() if not (os.environ.get("HUMEO_TRANSCRIBE_PROVIDER") or "").strip(): os.environ["HUMEO_TRANSCRIBE_PROVIDER"] = ( "elevenlabs" if (os.environ.get("ELEVENLABS_API_KEY") or "").strip() else "openai" ) from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from humeo.config import PipelineConfig from humeo.pipeline import run_pipeline APP_TITLE = "Humeo - long to shorts" LOG_FORMAT = "%(asctime)s | %(levelname)-7s | %(name)s | %(message)s" MAX_LOG_LINES = 700 LLM_KEY_NAMES = ("GOOGLE_API_KEY", "GEMINI_API_KEY", "OPENROUTER_API_KEY") class QueueLogHandler(logging.Handler): def __init__(self, sink: queue.Queue[str]): super().__init__() self._sink = sink def emit(self, record: logging.LogRecord) -> None: try: self._sink.put_nowait(self.format(record)) except Exception: pass @dataclass class ClipFile: name: str url: str duration: str poster_url: str | None = None @dataclass class Job: id: str run_root: Path output_dir: Path work_dir: Path source: str source_path: Path | None = None steering_note: str | None = None status: str = "Queued" nav_status: str = "Processing..." error: str | None = None done: bool = False created_at: float = field(default_factory=time.time) logs: list[str] = field(default_factory=list) clips: dict[str, ClipFile] = field(default_factory=dict) steps: list[dict[str, object]] = field( default_factory=lambda: [ {"name": "Uploading video", "pct": 100, "state": "done"}, {"name": "Generating transcript", "pct": 5, "state": "active"}, {"name": "Choosing short clips", "pct": 0, "state": "pending"}, {"name": "Producing clips", "pct": 0, "state": "pending"}, {"name": "Adding subtitles & light edits", "pct": 0, "state": "pending"}, ] ) JOBS: dict[str, Job] = {} JOBS_LOCK = threading.Lock() def _append_log(job: Job, line: str) -> None: job.logs.append(line) if len(job.logs) > MAX_LOG_LINES: job.logs = job.logs[-MAX_LOG_LINES:] def _set_step(job: Job, idx: int, pct: int, state: str = "active") -> None: for step_idx, step in enumerate(job.steps): if step_idx < idx: step["pct"] = 100 step["state"] = "done" elif step_idx == idx: step["pct"] = max(int(step.get("pct", 0)), min(100, pct)) step["state"] = state elif step.get("state") != "done": step["state"] = "pending" def _update_stage_from_log(job: Job, line: str) -> None: if "STAGE 1: INGESTION" in line: job.status = "Generating transcript" _set_step(job, 1, 15) elif "Transcribing" in line: job.status = "Generating transcript" _set_step(job, 1, 45) elif "Transcript already exists" in line or "Transcription complete" in line: _set_step(job, 1, 90) elif "STAGE 2: CLIP SELECTION" in line: job.status = "Choosing short clips" _set_step(job, 2, 20) elif "STAGE 2.25: HOOK DETECTION" in line: job.status = "Finding hooks" _set_step(job, 2, 55) elif "STAGE 2.5: CONTENT PRUNING" in line: job.status = "Tightening clip windows" _set_step(job, 2, 78) elif "STAGE 2.75: CLIP ASSEMBLY" in line: job.status = "Assembling clips" _set_step(job, 3, 18) elif "STAGE 3: CLIP LAYOUTS" in line: job.status = "Choosing layout" _set_step(job, 3, 38) elif "STAGE 4: RENDER" in line: job.status = "Producing clips" _set_step(job, 3, 62) elif "reframe_clip_ffmpeg" in line: _set_step(job, 4, min(90, 20 + len(job.clips) * 12)) elif "RENDER QA" in line or "Render QA summary" in line: job.status = "Checking clips" _set_step(job, 4, 82) elif "PIPELINE COMPLETE" in line: job.status = "Complete" job.nav_status = "Done" for step in job.steps: step["pct"] = 100 step["state"] = "done" def _install_log_handler(message_queue: queue.Queue[str]) -> tuple[logging.Handler, int, dict[str, int]]: handler = QueueLogHandler(message_queue) handler.setFormatter(logging.Formatter(LOG_FORMAT, datefmt="%H:%M:%S")) root_logger = logging.getLogger() previous_level = root_logger.level root_logger.addHandler(handler) root_logger.setLevel(logging.INFO) previous_logger_levels: dict[str, int] = {} for logger_name in ("urllib3", "httpx", "httpcore"): logger = logging.getLogger(logger_name) previous_logger_levels[logger_name] = logger.level logger.setLevel(logging.WARNING) return handler, previous_level, previous_logger_levels def _remove_log_handler( handler: logging.Handler, previous_root_level: int, previous_logger_levels: dict[str, int], ) -> None: root_logger = logging.getLogger() root_logger.removeHandler(handler) root_logger.setLevel(previous_root_level) for logger_name, level in previous_logger_levels.items(): logging.getLogger(logger_name).setLevel(level) def _duration_label(path: Path) -> str: try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path), ], check=True, capture_output=True, text=True, timeout=15, ) total = max(0, int(round(float(result.stdout.strip())))) except Exception: total = 0 return f"{total // 60}:{total % 60:02d}" if total else "0:00" def _poster_path_for_video(path: Path) -> Path: return path.with_name(f"{path.stem}.poster.jpg") def _ensure_poster(path: Path) -> Path | None: poster_path = _poster_path_for_video(path) if poster_path.is_file() and poster_path.stat().st_size > 0: return poster_path try: subprocess.run( [ "ffmpeg", "-y", "-loglevel", "error", "-ss", "0.45", "-i", str(path), "-frames:v", "1", "-q:v", "3", str(poster_path), ], check=True, capture_output=True, timeout=20, ) except Exception: return None return poster_path if poster_path.is_file() and poster_path.stat().st_size > 0 else None def _clip_file(job: Job, path: Path, duration: str | None = None) -> ClipFile: poster = _ensure_poster(path) return ClipFile( name=path.name, url=f"/api/jobs/{job.id}/files/{path.name}", duration=duration or _duration_label(path), poster_url=f"/api/jobs/{job.id}/files/{poster.name}" if poster else None, ) def _publish_files(job: Job) -> None: for path in sorted(job.output_dir.glob("short_*.mp4")): if not path.is_file(): continue duration = _duration_label(path) poster = _ensure_poster(path) poster_url = f"/api/jobs/{job.id}/files/{poster.name}" if poster else None existing = job.clips.get(path.name) if existing is None: job.clips[path.name] = _clip_file(job, path, duration=duration) else: if existing.duration == "0:00" and duration != "0:00": existing.duration = duration if existing.poster_url is None and poster_url: existing.poster_url = poster_url def _validate_credentials() -> None: if not any((os.environ.get(name) or "").strip() for name in LLM_KEY_NAMES): raise HTTPException( status_code=400, detail="Missing LLM secret. Set GOOGLE_API_KEY, GEMINI_API_KEY, or OPENROUTER_API_KEY in the Space secrets.", ) provider = (os.environ.get("HUMEO_TRANSCRIBE_PROVIDER") or "").strip().lower() if provider in {"", "auto"}: provider = "elevenlabs" if (os.environ.get("ELEVENLABS_API_KEY") or "").strip() else "openai" if provider == "elevenlabs" and not (os.environ.get("ELEVENLABS_API_KEY") or "").strip(): raise HTTPException(status_code=400, detail="Missing ELEVENLABS_API_KEY Space secret.") if provider in {"openai", "api"} and not (os.environ.get("OPENAI_API_KEY") or "").strip(): raise HTTPException(status_code=400, detail="Missing OPENAI_API_KEY Space secret.") def _safe_url(value: str | None) -> str | None: value = (value or "").strip() if not value: return None if not re.match(r"^https?://", value, flags=re.I): raise HTTPException(status_code=400, detail="Paste a valid http(s) video URL.") return value def _snapshot(job: Job) -> dict[str, object]: return { "id": job.id, "status": job.status, "nav_status": job.nav_status, "done": job.done, "error": job.error, "created_at": job.created_at, "age_sec": max(0, int(time.time() - job.created_at)), "logs": "\n".join(job.logs[-MAX_LOG_LINES:]), "steps": job.steps, "clips": [clip.__dict__ for clip in job.clips.values()], } def _run_job(job_id: str) -> None: with JOBS_LOCK: job = JOBS[job_id] message_queue: queue.Queue[str] = queue.Queue() handler, previous_root_level, previous_logger_levels = _install_log_handler(message_queue) def drain_queue() -> None: with JOBS_LOCK: local_job = JOBS[job_id] while True: try: line = message_queue.get_nowait() except queue.Empty: break _append_log(local_job, line) _update_stage_from_log(local_job, line) _publish_files(local_job) try: with JOBS_LOCK: _append_log(job, f"Prepared source: {job.source}") _append_log(job, f"Run id: {job.id}") _set_step(job, 1, 8) config = PipelineConfig( source=job.source, youtube_url=job.source, output_dir=job.output_dir, work_dir=job.work_dir, use_video_cache=False, clean_run=True, interactive=False, prune_level="balanced", overwrite_outputs=True, steering_notes=[job.steering_note] if job.steering_note else [], ) worker_error: str | None = None outputs: list[Path] = [] def pipeline_worker() -> None: nonlocal outputs, worker_error try: outputs = run_pipeline(config) except Exception as exc: worker_error = str(exc) for line in traceback.format_exc().splitlines(): if line.strip(): message_queue.put_nowait(line) thread = threading.Thread(target=pipeline_worker, daemon=True) thread.start() while thread.is_alive(): drain_queue() time.sleep(0.35) drain_queue() with JOBS_LOCK: local_job = JOBS[job_id] for output in outputs: if Path(output).exists(): local_job.clips[Path(output).name] = _clip_file(local_job, Path(output)) if worker_error: local_job.error = worker_error local_job.status = f"Failed: {worker_error}" local_job.nav_status = "Failed" else: local_job.status = "Complete" if local_job.clips else "Complete - no clips generated" local_job.nav_status = "Done" for step in local_job.steps: step["pct"] = 100 step["state"] = "done" local_job.done = True finally: _remove_log_handler(handler, previous_root_level, previous_logger_levels) async def _stage_upload(uploaded_file: UploadFile, run_root: Path) -> Path: suffix = Path(uploaded_file.filename or "input.mp4").suffix or ".mp4" staged_path = run_root / f"input{suffix}" with staged_path.open("wb") as handle: while chunk := await uploaded_file.read(1024 * 1024): handle.write(chunk) return staged_path app = FastAPI(title=APP_TITLE) @app.get("/", response_class=HTMLResponse) def index() -> str: return INDEX_HTML @app.post("/api/jobs") async def create_job( video_url: Annotated[str | None, Form()] = None, regen_prompt: Annotated[str | None, Form()] = None, source_job_id: Annotated[str | None, Form()] = None, file: Annotated[UploadFile | None, File()] = None, ) -> JSONResponse: _validate_credentials() job_id = uuid.uuid4().hex[:12] run_root = Path(tempfile.mkdtemp(prefix=f"clipforge-{job_id}-")) work_dir = run_root / "work" output_dir = run_root / "output" work_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True) source_path: Path | None = None source = _safe_url(video_url) source_job_id = (source_job_id or "").strip() if source_job_id: with JOBS_LOCK: previous = JOBS.get(source_job_id) if previous is None: raise HTTPException(status_code=404, detail="Previous job not found for regeneration.") if previous.source_path and previous.source_path.exists(): source_path = run_root / previous.source_path.name shutil.copy2(previous.source_path, source_path) source = str(source_path) else: source = previous.source elif file is not None: source_path = await _stage_upload(file, run_root) source = str(source_path) if not source: raise HTTPException(status_code=400, detail="Upload a video file or paste a video URL first.") job = Job( id=job_id, run_root=run_root, output_dir=output_dir, work_dir=work_dir, source=source, source_path=source_path, steering_note=(regen_prompt or "").strip() or None, ) with JOBS_LOCK: JOBS[job_id] = job threading.Thread(target=_run_job, args=(job_id,), daemon=True).start() return JSONResponse(_snapshot(job)) @app.get("/api/jobs") def list_jobs() -> JSONResponse: with JOBS_LOCK: jobs = sorted(JOBS.values(), key=lambda item: item.created_at, reverse=True)[:10] for job in jobs: _publish_files(job) return JSONResponse( { "jobs": [ { "id": job.id, "status": job.status, "nav_status": job.nav_status, "done": job.done, "error": job.error, "created_at": job.created_at, "age_sec": max(0, int(time.time() - job.created_at)), "clip_count": len(job.clips), } for job in jobs ] } ) @app.post("/api/probe-upload") async def probe_upload(file: Annotated[UploadFile | None, File()] = None) -> JSONResponse: if file is None: raise HTTPException(status_code=400, detail="Choose a video file first.") total = 0 while chunk := await file.read(1024 * 1024): total += len(chunk) return JSONResponse( { "ok": True, "filename": file.filename or "upload.mp4", "content_type": file.content_type or "", "bytes": total, } ) @app.get("/api/jobs/{job_id}") def get_job(job_id: str) -> JSONResponse: with JOBS_LOCK: job = JOBS.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found.") _publish_files(job) return JSONResponse(_snapshot(job)) @app.get("/api/jobs/{job_id}/files/{filename}") def get_job_file(job_id: str, filename: str) -> FileResponse: with JOBS_LOCK: job = JOBS.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found.") path = (job.output_dir / Path(filename).name).resolve(strict=False) if job.output_dir.resolve(strict=False) not in path.parents or not path.is_file(): raise HTTPException(status_code=404, detail="File not found.") media_type = "image/jpeg" if path.suffix.lower() in {".jpg", ".jpeg"} else "video/mp4" return FileResponse(path, media_type=media_type, filename=path.name) @app.get("/health") def health() -> dict[str, str]: return {"ok": "true"} INDEX_HTML = r"""
Upload a file - we handle the rest
Sit back - long videos can take a little while
Describe what you're looking for and we'll re-cut your video