| import asyncio |
| from pathlib import Path |
|
|
| from app.core.config import Settings |
| from app.core.timing import TimingLog |
| from app.models.schemas import ChannelProfile, ClipCandidate |
| from app.services.clips import ClipGenerator |
| from app.services.highlight import QwenHighlightDetector |
| from app.services.multimodal import QwenVisualAnalyzer |
| from app.services.transcription import WhisperTranscriber |
| from app.services.video_input import resolve_youtube_url |
| from app.storage import JobStore |
|
|
|
|
| class VideoPipeline: |
| def __init__(self, settings: Settings, store: JobStore) -> None: |
| self.settings = settings |
| self.store = store |
| self.transcriber = WhisperTranscriber(settings) |
| self.highlight_detector = QwenHighlightDetector(settings) |
| self.visual_analyzer = QwenVisualAnalyzer(settings) |
| self.clip_generator = ClipGenerator(settings, store) |
|
|
| async def process_source( |
| self, |
| job_id: str, |
| source_kind: str, |
| source_value: str, |
| profile: ChannelProfile, |
| ) -> None: |
| timings = TimingLog() |
| try: |
| self.store.update_job( |
| job_id, |
| status="running", |
| progress=0.04, |
| message="Preparing video input", |
| current_step="input", |
| step_index=1, |
| step_total=6, |
| ) |
| with timings.measure("input"): |
| if source_kind == "youtube": |
| video_path = await resolve_youtube_url( |
| source_value, self.store.job_dir(job_id), self.settings |
| ) |
| else: |
| video_path = Path(source_value) |
|
|
| self.store.update_job( |
| job_id, |
| progress=0.18, |
| message="Transcribing with Whisper Large V3", |
| current_step="transcription", |
| step_index=2, |
| step_total=6, |
| ) |
| with timings.measure("transcription"): |
| transcript = await asyncio.to_thread( |
| self.transcriber.transcribe, str(video_path), profile |
| ) |
| self.store.write_json( |
| job_id, |
| "transcript.json", |
| [segment.model_dump(mode="json") for segment in transcript], |
| ) |
| self.store.update_job( |
| job_id, |
| progress=0.42, |
| message="Transcript ready", |
| transcript=transcript, |
| timings=timings.to_dict(), |
| ) |
|
|
| self.store.update_job( |
| job_id, |
| progress=0.48, |
| message="Scoring highlights with Qwen", |
| current_step="highlight_detection", |
| step_index=3, |
| step_total=6, |
| ) |
| with timings.measure("highlight_detection"): |
| clips = await asyncio.to_thread(self.highlight_detector.detect, transcript, profile) |
|
|
| self.store.update_job( |
| job_id, |
| progress=0.62, |
| message="Checking visual highlights", |
| current_step="multimodal_analysis", |
| step_index=4, |
| step_total=6, |
| ) |
| with timings.measure("multimodal_analysis"): |
| clips = await asyncio.to_thread(self.visual_analyzer.enrich, str(video_path), clips) |
|
|
| clip_total = len(clips) |
| self.store.update_job( |
| job_id, |
| progress=0.72, |
| message=f"Preparing to render {clip_total} clips", |
| current_step="clip_generation", |
| step_index=5, |
| step_total=6, |
| active_clip_index=0, |
| active_clip_total=clip_total, |
| ) |
|
|
| def update_render_progress(index: int, total: int) -> None: |
| progress = 0.72 + (0.22 * ((index - 1) / max(total, 1))) |
| self.store.update_job( |
| job_id, |
| progress=min(progress, 0.94), |
| message=f"Rendering clip {index}/{total}", |
| current_step="clip_generation", |
| step_index=5, |
| step_total=6, |
| active_clip_index=index, |
| active_clip_total=total, |
| timings=timings.to_dict(), |
| ) |
|
|
| with timings.measure("clip_generation"): |
| rendered = await asyncio.to_thread( |
| self.clip_generator.generate, |
| job_id, |
| video_path, |
| clips, |
| transcript, |
| profile, |
| update_render_progress, |
| ) |
|
|
| self.store.update_job( |
| job_id, |
| progress=0.97, |
| message="Finalizing clips", |
| current_step="finalizing", |
| step_index=6, |
| step_total=6, |
| active_clip_index=clip_total, |
| active_clip_total=clip_total, |
| timings=timings.to_dict(), |
| ) |
| self.store.write_json( |
| job_id, "clips.json", [clip.model_dump(mode="json") for clip in rendered] |
| ) |
| self.store.update_job( |
| job_id, |
| status="completed", |
| progress=1, |
| message="Clips ready", |
| current_step="completed", |
| step_index=6, |
| step_total=6, |
| active_clip_index=clip_total, |
| active_clip_total=clip_total, |
| transcript=transcript, |
| clips=rendered, |
| timings=timings.to_dict(), |
| ) |
| except Exception as exc: |
| self.store.update_job( |
| job_id, |
| status="failed", |
| progress=1, |
| message="Processing failed", |
| current_step="failed", |
| error=str(exc), |
| timings=timings.to_dict(), |
| ) |
|
|
| def patch_clip(self, job_id: str, clip_id: str, updates: dict) -> ClipCandidate: |
| snapshot = self.store.get_job(job_id) |
| patched: ClipCandidate | None = None |
| clips: list[ClipCandidate] = [] |
| for clip in snapshot.clips: |
| if clip.id == clip_id: |
| clean_updates = {key: value for key, value in updates.items() if value is not None} |
| clip = clip.model_copy(update=clean_updates) |
| if clip.end_seconds <= clip.start_seconds: |
| clip = clip.model_copy(update={"end_seconds": clip.start_seconds + 1}) |
| patched = clip |
| clips.append(clip) |
| if patched is None: |
| raise KeyError(clip_id) |
| self.store.update_job(job_id, clips=clips) |
| return patched |
|
|
| def regenerate_clip( |
| self, |
| job_id: str, |
| clip_id: str, |
| clip_style: str | None = None, |
| clip_length_seconds: int | None = None, |
| subtitle_text: str | None = None, |
| ) -> ClipCandidate: |
| snapshot = self.store.get_job(job_id) |
| source_path = self._source_path(job_id) |
| clips: list[ClipCandidate] = [] |
| regenerated: ClipCandidate | None = None |
| for index, clip in enumerate(snapshot.clips, start=1): |
| if clip.id == clip_id: |
| profile = snapshot.profile.model_copy( |
| update={ |
| key: value |
| for key, value in { |
| "clip_style": clip_style, |
| "clip_length_seconds": clip_length_seconds, |
| }.items() |
| if value is not None |
| } |
| ) |
| if clip_length_seconds is not None: |
| clip = clip.model_copy( |
| update={"end_seconds": clip.start_seconds + clip_length_seconds} |
| ) |
| if subtitle_text is not None: |
| clip = clip.model_copy(update={"subtitle_text": subtitle_text}) |
| clip = self.clip_generator.render_one( |
| job_id, source_path, clip, snapshot.transcript, profile, index |
| ) |
| clip.metadata["regenerated"] = True |
| regenerated = clip |
| clips.append(clip) |
| if regenerated is None: |
| raise KeyError(clip_id) |
| self.store.update_job(job_id, clips=clips) |
| return regenerated |
|
|
| def _source_path(self, job_id: str) -> Path: |
| job_dir = self.store.job_dir(job_id) |
| matches = sorted(job_dir.glob("source.*")) |
| if not matches: |
| raise FileNotFoundError("source video missing") |
| return matches[0] |
|
|