clipforge / src /humeo /clip_assembly.py
moonlantern1's picture
Deploy ClipForge Docker Space
eda316b verified
"""Hard-cut filler/silence cleanup by assembling multiple kept spans."""
from __future__ import annotations
import json
import logging
import re
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from humeo_core.schemas import Clip, ClipPlan, ClipRenderSpan
from humeo.render_window import effective_export_bounds
logger = logging.getLogger(__name__)
_SPAN_BREAK_MIN_GAP_SEC = 0.55
_SPAN_EDGE_PAD_SEC = 0.05
_SPAN_MIN_DURATION_SEC = 0.30
_FILLER_SPAN_MIN_DURATION_SEC = 0.12
_SEGMENT_BREAK_MIN_GAP_SEC = 0.65
_SEGMENT_MAX_DURATION_SEC = 6.0
_SEGMENT_MAX_WORDS = 18
_FILLER_CUT_PAD_SEC = 0.02
_FILLER_WORD_RE = re.compile(r"^(u+h+|u+m+|e+h+|e+r+|a+h+|h+m+|m+m+)$", re.IGNORECASE)
_FILLER_WORDS = {
"ah",
"eh",
"er",
"hmm",
"mm",
"uh",
"uhh",
"uhm",
"um",
"umm",
}
@dataclass(frozen=True)
class AssembledClip:
source_path: Path
clip: Clip
transcript: dict
spans: list[ClipRenderSpan]
def _iter_words(transcript: dict) -> list[dict]:
words: list[dict] = []
for seg in transcript.get("segments", []) or []:
for raw in seg.get("words", []) or []:
try:
word = {
"word": str(raw.get("word", "")).strip(),
"start": float(raw["start"]),
"end": float(raw["end"]),
}
except (KeyError, TypeError, ValueError):
continue
if not word["word"] or word["end"] <= word["start"]:
continue
words.append(word)
return words
def _clean_word_token(text: str) -> str:
return re.sub(r"(^[^A-Za-z]+|[^A-Za-z]+$)", "", text or "").lower()
def _looks_like_filler_word(text: str) -> bool:
token = _clean_word_token(text)
if not token:
return False
return token in _FILLER_WORDS or bool(_FILLER_WORD_RE.fullmatch(token))
def derive_render_spans(clip: Clip, transcript: dict) -> list[ClipRenderSpan]:
if clip.render_spans:
return list(clip.render_spans)
start_sec, end_sec = effective_export_bounds(clip)
words = [
word
for word in _iter_words(transcript)
if word["end"] > start_sec and word["start"] < end_sec
]
if not words:
return [ClipRenderSpan(start_time_sec=start_sec, end_time_sec=end_sec)]
spans: list[ClipRenderSpan] = []
span_start: float | None = None
prev_end: float | None = None
resume_after = start_sec
for word in words:
word_start = float(word["start"])
word_end = float(word["end"])
if _looks_like_filler_word(str(word["word"])):
if span_start is not None and prev_end is not None:
span_end = min(end_sec, max(span_start, word_start - _FILLER_CUT_PAD_SEC))
if span_end - span_start >= _FILLER_SPAN_MIN_DURATION_SEC:
spans.append(ClipRenderSpan(start_time_sec=span_start, end_time_sec=span_end))
span_start = None
prev_end = None
resume_after = min(end_sec, word_end + _FILLER_CUT_PAD_SEC)
continue
if span_start is None:
span_start = max(start_sec, word_start - _SPAN_EDGE_PAD_SEC, resume_after)
prev_end = word_end
continue
if prev_end is not None and word_start - prev_end >= _SPAN_BREAK_MIN_GAP_SEC:
span_end = min(end_sec, prev_end + _SPAN_EDGE_PAD_SEC)
if span_end - span_start >= _SPAN_MIN_DURATION_SEC:
spans.append(ClipRenderSpan(start_time_sec=span_start, end_time_sec=span_end))
span_start = max(start_sec, word_start - _SPAN_EDGE_PAD_SEC)
prev_end = word_end
if span_start is None or prev_end is None:
if not spans:
spans.append(ClipRenderSpan(start_time_sec=start_sec, end_time_sec=end_sec))
return spans
final_end = min(end_sec, prev_end + _SPAN_EDGE_PAD_SEC)
if final_end - span_start >= _SPAN_MIN_DURATION_SEC:
spans.append(ClipRenderSpan(start_time_sec=span_start, end_time_sec=final_end))
if not spans:
spans.append(ClipRenderSpan(start_time_sec=start_sec, end_time_sec=end_sec))
return spans
def apply_render_spans(clips: list[Clip], transcript: dict) -> list[Clip]:
out: list[Clip] = []
for clip in clips:
spans = derive_render_spans(clip, transcript)
out.append(clip.model_copy(update={"render_spans": spans}))
return out
def _segment_local_words(words: list[dict], *, language: str) -> dict:
segments: list[dict] = []
chunk: list[dict] = []
def flush() -> None:
if not chunk:
return
segments.append(
{
"start": chunk[0]["start"],
"end": chunk[-1]["end"],
"text": " ".join(str(word["word"]) for word in chunk).strip(),
"words": list(chunk),
}
)
chunk.clear()
for word in words:
if chunk:
gap = float(word["start"]) - float(chunk[-1]["end"])
dur = float(word["end"]) - float(chunk[0]["start"])
if (
gap >= _SEGMENT_BREAK_MIN_GAP_SEC
or dur >= _SEGMENT_MAX_DURATION_SEC
or len(chunk) >= _SEGMENT_MAX_WORDS
):
flush()
chunk.append(word)
flush()
return {"segments": segments, "language": language}
def build_assembled_transcript(clip: Clip, transcript: dict) -> dict:
words = _iter_words(transcript)
local_words: list[dict] = []
current_offset = 0.0
for span in derive_render_spans(clip, transcript):
for word in words:
if word["end"] <= span.start_time_sec or word["start"] >= span.end_time_sec:
continue
if _looks_like_filler_word(str(word["word"])):
continue
local_words.append(
{
"word": word["word"],
"start": max(word["start"], span.start_time_sec) - span.start_time_sec + current_offset,
"end": min(word["end"], span.end_time_sec) - span.start_time_sec + current_offset,
}
)
current_offset += span.duration_sec
language = str(transcript.get("language") or "en")
return _segment_local_words(local_words, language=language)
def _ffmpeg_concat_filter(spans: list[ClipRenderSpan]) -> str:
parts: list[str] = []
for idx, span in enumerate(spans):
parts.append(
f"[0:v]trim=start={span.start_time_sec:.3f}:end={span.end_time_sec:.3f},setpts=PTS-STARTPTS[v{idx}]"
)
parts.append(
f"[0:a]atrim=start={span.start_time_sec:.3f}:end={span.end_time_sec:.3f},asetpts=PTS-STARTPTS[a{idx}]"
)
concat_inputs = "".join(f"[v{idx}][a{idx}]" for idx in range(len(spans)))
parts.append(f"{concat_inputs}concat=n={len(spans)}:v=1:a=1[vout][aout]")
return ";".join(parts)
def assemble_clip(
source_path: Path,
clip: Clip,
transcript: dict,
output_dir: Path,
) -> AssembledClip:
spans = derive_render_spans(clip, transcript)
output_dir.mkdir(parents=True, exist_ok=True)
assembled_path = output_dir / f"clip_{clip.clip_id}.mp4"
ffmpeg = shutil.which("ffmpeg")
if not ffmpeg:
raise RuntimeError("ffmpeg not found on PATH")
cmd = [
ffmpeg,
"-y",
"-i",
str(source_path),
"-filter_complex",
_ffmpeg_concat_filter(spans),
"-map",
"[vout]",
"-map",
"[aout]",
"-c:v",
"libx264",
"-preset",
"veryfast",
"-crf",
"20",
"-c:a",
"aac",
"-b:a",
"160k",
"-movflags",
"+faststart",
str(assembled_path),
]
subprocess.run(cmd, check=True, capture_output=True)
assembled_transcript = build_assembled_transcript(clip, transcript)
assembled_transcript_path = output_dir / f"clip_{clip.clip_id}.transcript.json"
assembled_transcript_path.write_text(
json.dumps(assembled_transcript, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
timeline_path = output_dir / f"clip_{clip.clip_id}.timeline.json"
timeline_path.write_text(
json.dumps(
{
"clip_id": clip.clip_id,
"source_spans": [span.model_dump() for span in spans],
"assembled_duration_sec": sum(span.duration_sec for span in spans),
},
indent=2,
)
+ "\n",
encoding="utf-8",
)
assembled_duration = sum(span.duration_sec for span in spans)
assembled_clip = clip.model_copy(
update={
"start_time_sec": 0.0,
"end_time_sec": assembled_duration,
"trim_start_sec": 0.0,
"trim_end_sec": 0.0,
"hook_start_sec": None,
"hook_end_sec": None,
"render_spans": [],
}
)
logger.info(
"Assembled clip %s into %d span(s): %.1fs -> %.1fs",
clip.clip_id,
len(spans),
clip.duration_sec,
assembled_duration,
)
return AssembledClip(
source_path=assembled_path,
clip=assembled_clip,
transcript=assembled_transcript,
spans=spans,
)
def write_clip_plan(path: Path, clips: list[Clip]) -> Path:
path.write_text(
ClipPlan(source_path="", clips=clips).model_dump_json(indent=2) + "\n",
encoding="utf-8",
)
return path