import asyncio from pathlib import Path from app.core.config import Settings from app.core.timing import TimingLog from app.models.schemas import ChannelProfile, ClipCandidate from app.services.clips import ClipGenerator from app.services.highlight import QwenHighlightDetector from app.services.multimodal import QwenVisualAnalyzer from app.services.transcription import WhisperTranscriber from app.services.video_input import resolve_youtube_url from app.storage import JobStore class VideoPipeline: def __init__(self, settings: Settings, store: JobStore) -> None: self.settings = settings self.store = store self.transcriber = WhisperTranscriber(settings) self.highlight_detector = QwenHighlightDetector(settings) self.visual_analyzer = QwenVisualAnalyzer(settings) self.clip_generator = ClipGenerator(settings, store) async def process_source( self, job_id: str, source_kind: str, source_value: str, profile: ChannelProfile, ) -> None: timings = TimingLog() try: self.store.update_job( job_id, status="running", progress=0.04, message="Preparing video input", current_step="input", step_index=1, step_total=6, ) with timings.measure("input"): if source_kind == "youtube": video_path = await resolve_youtube_url( source_value, self.store.job_dir(job_id), self.settings ) else: video_path = Path(source_value) self.store.update_job( job_id, progress=0.18, message="Transcribing with Whisper Large V3", current_step="transcription", step_index=2, step_total=6, ) with timings.measure("transcription"): transcript = await asyncio.to_thread( self.transcriber.transcribe, str(video_path), profile ) self.store.write_json( job_id, "transcript.json", [segment.model_dump(mode="json") for segment in transcript], ) self.store.update_job( job_id, progress=0.42, message="Transcript ready", transcript=transcript, timings=timings.to_dict(), ) self.store.update_job( job_id, progress=0.48, message="Scoring highlights with Qwen", current_step="highlight_detection", step_index=3, step_total=6, ) with timings.measure("highlight_detection"): clips = await asyncio.to_thread(self.highlight_detector.detect, transcript, profile) self.store.update_job( job_id, progress=0.62, message="Checking visual highlights", current_step="multimodal_analysis", step_index=4, step_total=6, ) with timings.measure("multimodal_analysis"): clips = await asyncio.to_thread(self.visual_analyzer.enrich, str(video_path), clips) clip_total = len(clips) self.store.update_job( job_id, progress=0.72, message=f"Preparing to render {clip_total} clips", current_step="clip_generation", step_index=5, step_total=6, active_clip_index=0, active_clip_total=clip_total, ) def update_render_progress(index: int, total: int) -> None: progress = 0.72 + (0.22 * ((index - 1) / max(total, 1))) self.store.update_job( job_id, progress=min(progress, 0.94), message=f"Rendering clip {index}/{total}", current_step="clip_generation", step_index=5, step_total=6, active_clip_index=index, active_clip_total=total, timings=timings.to_dict(), ) with timings.measure("clip_generation"): rendered = await asyncio.to_thread( self.clip_generator.generate, job_id, video_path, clips, transcript, profile, update_render_progress, ) self.store.update_job( job_id, progress=0.97, message="Finalizing clips", current_step="finalizing", step_index=6, step_total=6, active_clip_index=clip_total, active_clip_total=clip_total, timings=timings.to_dict(), ) self.store.write_json( job_id, "clips.json", [clip.model_dump(mode="json") for clip in rendered] ) self.store.update_job( job_id, status="completed", progress=1, message="Clips ready", current_step="completed", step_index=6, step_total=6, active_clip_index=clip_total, active_clip_total=clip_total, transcript=transcript, clips=rendered, timings=timings.to_dict(), ) except Exception as exc: self.store.update_job( job_id, status="failed", progress=1, message="Processing failed", current_step="failed", error=str(exc), timings=timings.to_dict(), ) def patch_clip(self, job_id: str, clip_id: str, updates: dict) -> ClipCandidate: snapshot = self.store.get_job(job_id) patched: ClipCandidate | None = None clips: list[ClipCandidate] = [] for clip in snapshot.clips: if clip.id == clip_id: clean_updates = {key: value for key, value in updates.items() if value is not None} clip = clip.model_copy(update=clean_updates) if clip.end_seconds <= clip.start_seconds: clip = clip.model_copy(update={"end_seconds": clip.start_seconds + 1}) patched = clip clips.append(clip) if patched is None: raise KeyError(clip_id) self.store.update_job(job_id, clips=clips) return patched def regenerate_clip( self, job_id: str, clip_id: str, clip_style: str | None = None, clip_length_seconds: int | None = None, subtitle_text: str | None = None, ) -> ClipCandidate: snapshot = self.store.get_job(job_id) source_path = self._source_path(job_id) clips: list[ClipCandidate] = [] regenerated: ClipCandidate | None = None for index, clip in enumerate(snapshot.clips, start=1): if clip.id == clip_id: profile = snapshot.profile.model_copy( update={ key: value for key, value in { "clip_style": clip_style, "clip_length_seconds": clip_length_seconds, }.items() if value is not None } ) if clip_length_seconds is not None: clip = clip.model_copy( update={"end_seconds": clip.start_seconds + clip_length_seconds} ) if subtitle_text is not None: clip = clip.model_copy(update={"subtitle_text": subtitle_text}) clip = self.clip_generator.render_one( job_id, source_path, clip, snapshot.transcript, profile, index ) clip.metadata["regenerated"] = True regenerated = clip clips.append(clip) if regenerated is None: raise KeyError(clip_id) self.store.update_job(job_id, clips=clips) return regenerated def _source_path(self, job_id: str) -> Path: job_dir = self.store.job_dir(job_id) matches = sorted(job_dir.glob("source.*")) if not matches: raise FileNotFoundError("source video missing") return matches[0]