ElevenClip-AI / backend /app /services /pipeline.py
JakgritB
feat(backend): enrich clipping pipeline metadata
5dadf47
import asyncio
from pathlib import Path
from app.core.config import Settings
from app.core.timing import TimingLog
from app.models.schemas import ChannelProfile, ClipCandidate
from app.services.clips import ClipGenerator
from app.services.highlight import QwenHighlightDetector
from app.services.multimodal import QwenVisualAnalyzer
from app.services.transcription import WhisperTranscriber
from app.services.video_input import resolve_youtube_url
from app.storage import JobStore
class VideoPipeline:
def __init__(self, settings: Settings, store: JobStore) -> None:
self.settings = settings
self.store = store
self.transcriber = WhisperTranscriber(settings)
self.highlight_detector = QwenHighlightDetector(settings)
self.visual_analyzer = QwenVisualAnalyzer(settings)
self.clip_generator = ClipGenerator(settings, store)
async def process_source(
self,
job_id: str,
source_kind: str,
source_value: str,
profile: ChannelProfile,
) -> None:
timings = TimingLog()
try:
self.store.update_job(
job_id,
status="running",
progress=0.04,
message="Preparing video input",
current_step="input",
step_index=1,
step_total=6,
)
with timings.measure("input"):
if source_kind == "youtube":
video_path = await resolve_youtube_url(
source_value, self.store.job_dir(job_id), self.settings
)
else:
video_path = Path(source_value)
self.store.update_job(
job_id,
progress=0.18,
message="Transcribing with Whisper Large V3",
current_step="transcription",
step_index=2,
step_total=6,
)
with timings.measure("transcription"):
transcript = await asyncio.to_thread(
self.transcriber.transcribe, str(video_path), profile
)
self.store.write_json(
job_id,
"transcript.json",
[segment.model_dump(mode="json") for segment in transcript],
)
self.store.update_job(
job_id,
progress=0.42,
message="Transcript ready",
transcript=transcript,
timings=timings.to_dict(),
)
self.store.update_job(
job_id,
progress=0.48,
message="Scoring highlights with Qwen",
current_step="highlight_detection",
step_index=3,
step_total=6,
)
with timings.measure("highlight_detection"):
clips = await asyncio.to_thread(self.highlight_detector.detect, transcript, profile)
self.store.update_job(
job_id,
progress=0.62,
message="Checking visual highlights",
current_step="multimodal_analysis",
step_index=4,
step_total=6,
)
with timings.measure("multimodal_analysis"):
clips = await asyncio.to_thread(self.visual_analyzer.enrich, str(video_path), clips)
clip_total = len(clips)
self.store.update_job(
job_id,
progress=0.72,
message=f"Preparing to render {clip_total} clips",
current_step="clip_generation",
step_index=5,
step_total=6,
active_clip_index=0,
active_clip_total=clip_total,
)
def update_render_progress(index: int, total: int) -> None:
progress = 0.72 + (0.22 * ((index - 1) / max(total, 1)))
self.store.update_job(
job_id,
progress=min(progress, 0.94),
message=f"Rendering clip {index}/{total}",
current_step="clip_generation",
step_index=5,
step_total=6,
active_clip_index=index,
active_clip_total=total,
timings=timings.to_dict(),
)
with timings.measure("clip_generation"):
rendered = await asyncio.to_thread(
self.clip_generator.generate,
job_id,
video_path,
clips,
transcript,
profile,
update_render_progress,
)
self.store.update_job(
job_id,
progress=0.97,
message="Finalizing clips",
current_step="finalizing",
step_index=6,
step_total=6,
active_clip_index=clip_total,
active_clip_total=clip_total,
timings=timings.to_dict(),
)
self.store.write_json(
job_id, "clips.json", [clip.model_dump(mode="json") for clip in rendered]
)
self.store.update_job(
job_id,
status="completed",
progress=1,
message="Clips ready",
current_step="completed",
step_index=6,
step_total=6,
active_clip_index=clip_total,
active_clip_total=clip_total,
transcript=transcript,
clips=rendered,
timings=timings.to_dict(),
)
except Exception as exc:
self.store.update_job(
job_id,
status="failed",
progress=1,
message="Processing failed",
current_step="failed",
error=str(exc),
timings=timings.to_dict(),
)
def patch_clip(self, job_id: str, clip_id: str, updates: dict) -> ClipCandidate:
snapshot = self.store.get_job(job_id)
patched: ClipCandidate | None = None
clips: list[ClipCandidate] = []
for clip in snapshot.clips:
if clip.id == clip_id:
clean_updates = {key: value for key, value in updates.items() if value is not None}
clip = clip.model_copy(update=clean_updates)
if clip.end_seconds <= clip.start_seconds:
clip = clip.model_copy(update={"end_seconds": clip.start_seconds + 1})
patched = clip
clips.append(clip)
if patched is None:
raise KeyError(clip_id)
self.store.update_job(job_id, clips=clips)
return patched
def regenerate_clip(
self,
job_id: str,
clip_id: str,
clip_style: str | None = None,
clip_length_seconds: int | None = None,
subtitle_text: str | None = None,
) -> ClipCandidate:
snapshot = self.store.get_job(job_id)
source_path = self._source_path(job_id)
clips: list[ClipCandidate] = []
regenerated: ClipCandidate | None = None
for index, clip in enumerate(snapshot.clips, start=1):
if clip.id == clip_id:
profile = snapshot.profile.model_copy(
update={
key: value
for key, value in {
"clip_style": clip_style,
"clip_length_seconds": clip_length_seconds,
}.items()
if value is not None
}
)
if clip_length_seconds is not None:
clip = clip.model_copy(
update={"end_seconds": clip.start_seconds + clip_length_seconds}
)
if subtitle_text is not None:
clip = clip.model_copy(update={"subtitle_text": subtitle_text})
clip = self.clip_generator.render_one(
job_id, source_path, clip, snapshot.transcript, profile, index
)
clip.metadata["regenerated"] = True
regenerated = clip
clips.append(clip)
if regenerated is None:
raise KeyError(clip_id)
self.store.update_job(job_id, clips=clips)
return regenerated
def _source_path(self, job_id: str) -> Path:
job_dir = self.store.job_dir(job_id)
matches = sorted(job_dir.glob("source.*"))
if not matches:
raise FileNotFoundError("source video missing")
return matches[0]