from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from app.core.config import get_settings from app.models.schemas import ( ChannelProfile, ClipCandidate, ClipPatch, HealthResponse, JobSnapshot, PolishSubtitlesRequest, RegenerateClipRequest, SubtitleCue, TranslateSubtitlesRequest, YoutubeJobRequest, ) from app.services.highlight import QwenHighlightDetector from app.services.pipeline import VideoPipeline from app.services.transcription import WhisperTranscriber from app.services.video_input import save_upload from app.storage import JobStore from app.utils.rocm import detect_accelerator settings = get_settings() store = JobStore(settings) pipeline = VideoPipeline(settings, store) highlight_detector = QwenHighlightDetector(settings) transcriber = WhisperTranscriber(settings) app = FastAPI(title=settings.app_name, version="0.1.0") app.add_middleware( CORSMiddleware, allow_origins=[settings.frontend_origin, "http://localhost:5173", "http://127.0.0.1:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/media", StaticFiles(directory=settings.storage_dir), name="media") @app.get("/health", response_model=HealthResponse) async def health() -> HealthResponse: return HealthResponse( ok=True, app=settings.app_name, demo_mode=settings.demo_mode, accelerator=detect_accelerator(), ) @app.post("/api/jobs/youtube", response_model=JobSnapshot) async def create_youtube_job( request: YoutubeJobRequest, background_tasks: BackgroundTasks ) -> JobSnapshot: snapshot = store.create_job( request.profile, {"kind": "youtube", "url": str(request.youtube_url)} ) background_tasks.add_task( pipeline.process_source, snapshot.id, "youtube", str(request.youtube_url), request.profile ) return snapshot @app.post("/api/jobs/upload", response_model=JobSnapshot) async def create_upload_job( background_tasks: BackgroundTasks, profile_json: str = Form(...), file: UploadFile = File(...), ) -> JobSnapshot: try: profile = ChannelProfile.model_validate_json(profile_json) except Exception as exc: raise HTTPException(status_code=422, detail=f"Invalid profile JSON: {exc}") from exc snapshot = store.create_job(profile, {"kind": "upload", "filename": file.filename}) source_path = await save_upload(file, store.job_dir(snapshot.id)) background_tasks.add_task(pipeline.process_source, snapshot.id, "upload", str(source_path), profile) return snapshot @app.get("/api/jobs/{job_id}", response_model=JobSnapshot) async def get_job(job_id: str) -> JobSnapshot: try: return store.get_job(job_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Job not found") from exc @app.patch("/api/jobs/{job_id}/clips/{clip_id}", response_model=ClipCandidate) async def update_clip(job_id: str, clip_id: str, patch: ClipPatch) -> ClipCandidate: try: return pipeline.patch_clip(job_id, clip_id, patch.model_dump()) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Job not found") from exc except KeyError as exc: raise HTTPException(status_code=404, detail="Clip not found") from exc @app.post("/api/jobs/{job_id}/clips/{clip_id}/regenerate", response_model=ClipCandidate) async def regenerate_clip( job_id: str, clip_id: str, request: RegenerateClipRequest ) -> ClipCandidate: try: return pipeline.regenerate_clip( job_id, clip_id, clip_style=request.clip_style, clip_length_seconds=request.clip_length_seconds, subtitle_text=request.subtitle_text, ) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Source video not found") from exc except KeyError as exc: raise HTTPException(status_code=404, detail="Clip not found") from exc @app.get("/api/jobs/{job_id}/clips/{clip_id}/download") async def download_clip(job_id: str, clip_id: str) -> FileResponse: snapshot = store.get_job(job_id) clip = next((item for item in snapshot.clips if item.id == clip_id), None) if clip is None or clip.download_url is None: raise HTTPException(status_code=404, detail="Clip not found") filename = clip.download_url.rsplit("/", 1)[-1] path = store.job_dir(job_id) / filename if not path.exists(): raise HTTPException(status_code=404, detail="Clip file not found") return FileResponse(path, media_type="video/mp4", filename=filename) # ───────────────────────────────────────────────────────────────── # AI subtitle endpoints — work in demo mode immediately, switch to # real Qwen / Whisper output once DEMO_MODE=false on AMD GPU cloud. # ───────────────────────────────────────────────────────────────── def _resolve_clip_cues(snapshot: JobSnapshot, clip: ClipCandidate) -> list[SubtitleCue]: """Return the cue list to operate on. Prefer explicit subtitle_cues; fall back to splitting subtitle_text into evenly-spaced cues.""" if clip.subtitle_cues: return [SubtitleCue(**cue.model_dump()) for cue in clip.subtitle_cues] duration = max(0.5, clip.end_seconds - clip.start_seconds) text = clip.subtitle_text.strip() if not text: return [SubtitleCue(start_seconds=0.0, end_seconds=duration, text="")] # Reuse Whisper aligner's deterministic chunking for fallback return transcriber._demo_align_words(text, 0.0, duration) @app.post( "/api/jobs/{job_id}/clips/{clip_id}/subtitle/polish", response_model=ClipCandidate, ) async def polish_clip_subtitles( job_id: str, clip_id: str, request: PolishSubtitlesRequest ) -> ClipCandidate: try: snapshot = store.get_job(job_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Job not found") from exc clip = next((c for c in snapshot.clips if c.id == clip_id), None) if clip is None: raise HTTPException(status_code=404, detail="Clip not found") cues_in = _resolve_clip_cues(snapshot, clip) polished = highlight_detector.polish_subtitles(cues_in, style=request.style) return pipeline.patch_clip( job_id, clip_id, { "subtitle_cues": [cue.model_dump() for cue in polished], "subtitle_text": " ".join(cue.text for cue in polished if cue.text), }, ) @app.post( "/api/jobs/{job_id}/clips/{clip_id}/subtitle/translate", response_model=ClipCandidate, ) async def translate_clip_subtitles( job_id: str, clip_id: str, request: TranslateSubtitlesRequest ) -> ClipCandidate: try: snapshot = store.get_job(job_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Job not found") from exc clip = next((c for c in snapshot.clips if c.id == clip_id), None) if clip is None: raise HTTPException(status_code=404, detail="Clip not found") cues_in = _resolve_clip_cues(snapshot, clip) translated = highlight_detector.translate_subtitles(cues_in, request.target_language) return pipeline.patch_clip( job_id, clip_id, { "subtitle_cues": [cue.model_dump() for cue in translated], "subtitle_text": " ".join(cue.text for cue in translated if cue.text), }, ) @app.post( "/api/jobs/{job_id}/clips/{clip_id}/subtitle/auto-time", response_model=ClipCandidate, ) async def auto_time_clip_subtitles(job_id: str, clip_id: str) -> ClipCandidate: try: snapshot = store.get_job(job_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Job not found") from exc clip = next((c for c in snapshot.clips if c.id == clip_id), None) if clip is None: raise HTTPException(status_code=404, detail="Clip not found") text = clip.subtitle_text or " ".join( (cue.text for cue in (clip.subtitle_cues or []) if cue.text) ) # Best-effort: production mode uses the actual source video on disk; demo # mode uses synthetic chunking that doesn't require the file at all. source_path = "" try: for entry in store.job_dir(job_id).iterdir(): if entry.suffix.lower() in {".mp4", ".mkv", ".mov", ".webm"}: source_path = str(entry) break except Exception: source_path = "" timed = transcriber.align_words(source_path, text, clip.start_seconds, clip.end_seconds) return pipeline.patch_clip( job_id, clip_id, { "subtitle_cues": [cue.model_dump() for cue in timed], "subtitle_text": " ".join(cue.text for cue in timed if cue.text), }, )