JakgritB
feat(editor): subtitle-first editor + AI subtitle pipeline
89e1dc4
from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from app.core.config import get_settings
from app.models.schemas import (
ChannelProfile,
ClipCandidate,
ClipPatch,
HealthResponse,
JobSnapshot,
PolishSubtitlesRequest,
RegenerateClipRequest,
SubtitleCue,
TranslateSubtitlesRequest,
YoutubeJobRequest,
)
from app.services.highlight import QwenHighlightDetector
from app.services.pipeline import VideoPipeline
from app.services.transcription import WhisperTranscriber
from app.services.video_input import save_upload
from app.storage import JobStore
from app.utils.rocm import detect_accelerator
settings = get_settings()
store = JobStore(settings)
pipeline = VideoPipeline(settings, store)
highlight_detector = QwenHighlightDetector(settings)
transcriber = WhisperTranscriber(settings)
app = FastAPI(title=settings.app_name, version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=[settings.frontend_origin, "http://localhost:5173", "http://127.0.0.1:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/media", StaticFiles(directory=settings.storage_dir), name="media")
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
return HealthResponse(
ok=True,
app=settings.app_name,
demo_mode=settings.demo_mode,
accelerator=detect_accelerator(),
)
@app.post("/api/jobs/youtube", response_model=JobSnapshot)
async def create_youtube_job(
request: YoutubeJobRequest, background_tasks: BackgroundTasks
) -> JobSnapshot:
snapshot = store.create_job(
request.profile, {"kind": "youtube", "url": str(request.youtube_url)}
)
background_tasks.add_task(
pipeline.process_source, snapshot.id, "youtube", str(request.youtube_url), request.profile
)
return snapshot
@app.post("/api/jobs/upload", response_model=JobSnapshot)
async def create_upload_job(
background_tasks: BackgroundTasks,
profile_json: str = Form(...),
file: UploadFile = File(...),
) -> JobSnapshot:
try:
profile = ChannelProfile.model_validate_json(profile_json)
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Invalid profile JSON: {exc}") from exc
snapshot = store.create_job(profile, {"kind": "upload", "filename": file.filename})
source_path = await save_upload(file, store.job_dir(snapshot.id))
background_tasks.add_task(pipeline.process_source, snapshot.id, "upload", str(source_path), profile)
return snapshot
@app.get("/api/jobs/{job_id}", response_model=JobSnapshot)
async def get_job(job_id: str) -> JobSnapshot:
try:
return store.get_job(job_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Job not found") from exc
@app.patch("/api/jobs/{job_id}/clips/{clip_id}", response_model=ClipCandidate)
async def update_clip(job_id: str, clip_id: str, patch: ClipPatch) -> ClipCandidate:
try:
return pipeline.patch_clip(job_id, clip_id, patch.model_dump())
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Job not found") from exc
except KeyError as exc:
raise HTTPException(status_code=404, detail="Clip not found") from exc
@app.post("/api/jobs/{job_id}/clips/{clip_id}/regenerate", response_model=ClipCandidate)
async def regenerate_clip(
job_id: str, clip_id: str, request: RegenerateClipRequest
) -> ClipCandidate:
try:
return pipeline.regenerate_clip(
job_id,
clip_id,
clip_style=request.clip_style,
clip_length_seconds=request.clip_length_seconds,
subtitle_text=request.subtitle_text,
)
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Source video not found") from exc
except KeyError as exc:
raise HTTPException(status_code=404, detail="Clip not found") from exc
@app.get("/api/jobs/{job_id}/clips/{clip_id}/download")
async def download_clip(job_id: str, clip_id: str) -> FileResponse:
snapshot = store.get_job(job_id)
clip = next((item for item in snapshot.clips if item.id == clip_id), None)
if clip is None or clip.download_url is None:
raise HTTPException(status_code=404, detail="Clip not found")
filename = clip.download_url.rsplit("/", 1)[-1]
path = store.job_dir(job_id) / filename
if not path.exists():
raise HTTPException(status_code=404, detail="Clip file not found")
return FileResponse(path, media_type="video/mp4", filename=filename)
# ─────────────────────────────────────────────────────────────────
# AI subtitle endpoints β€” work in demo mode immediately, switch to
# real Qwen / Whisper output once DEMO_MODE=false on AMD GPU cloud.
# ─────────────────────────────────────────────────────────────────
def _resolve_clip_cues(snapshot: JobSnapshot, clip: ClipCandidate) -> list[SubtitleCue]:
"""Return the cue list to operate on. Prefer explicit subtitle_cues; fall
back to splitting subtitle_text into evenly-spaced cues."""
if clip.subtitle_cues:
return [SubtitleCue(**cue.model_dump()) for cue in clip.subtitle_cues]
duration = max(0.5, clip.end_seconds - clip.start_seconds)
text = clip.subtitle_text.strip()
if not text:
return [SubtitleCue(start_seconds=0.0, end_seconds=duration, text="")]
# Reuse Whisper aligner's deterministic chunking for fallback
return transcriber._demo_align_words(text, 0.0, duration)
@app.post(
"/api/jobs/{job_id}/clips/{clip_id}/subtitle/polish",
response_model=ClipCandidate,
)
async def polish_clip_subtitles(
job_id: str, clip_id: str, request: PolishSubtitlesRequest
) -> ClipCandidate:
try:
snapshot = store.get_job(job_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Job not found") from exc
clip = next((c for c in snapshot.clips if c.id == clip_id), None)
if clip is None:
raise HTTPException(status_code=404, detail="Clip not found")
cues_in = _resolve_clip_cues(snapshot, clip)
polished = highlight_detector.polish_subtitles(cues_in, style=request.style)
return pipeline.patch_clip(
job_id,
clip_id,
{
"subtitle_cues": [cue.model_dump() for cue in polished],
"subtitle_text": " ".join(cue.text for cue in polished if cue.text),
},
)
@app.post(
"/api/jobs/{job_id}/clips/{clip_id}/subtitle/translate",
response_model=ClipCandidate,
)
async def translate_clip_subtitles(
job_id: str, clip_id: str, request: TranslateSubtitlesRequest
) -> ClipCandidate:
try:
snapshot = store.get_job(job_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Job not found") from exc
clip = next((c for c in snapshot.clips if c.id == clip_id), None)
if clip is None:
raise HTTPException(status_code=404, detail="Clip not found")
cues_in = _resolve_clip_cues(snapshot, clip)
translated = highlight_detector.translate_subtitles(cues_in, request.target_language)
return pipeline.patch_clip(
job_id,
clip_id,
{
"subtitle_cues": [cue.model_dump() for cue in translated],
"subtitle_text": " ".join(cue.text for cue in translated if cue.text),
},
)
@app.post(
"/api/jobs/{job_id}/clips/{clip_id}/subtitle/auto-time",
response_model=ClipCandidate,
)
async def auto_time_clip_subtitles(job_id: str, clip_id: str) -> ClipCandidate:
try:
snapshot = store.get_job(job_id)
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail="Job not found") from exc
clip = next((c for c in snapshot.clips if c.id == clip_id), None)
if clip is None:
raise HTTPException(status_code=404, detail="Clip not found")
text = clip.subtitle_text or " ".join(
(cue.text for cue in (clip.subtitle_cues or []) if cue.text)
)
# Best-effort: production mode uses the actual source video on disk; demo
# mode uses synthetic chunking that doesn't require the file at all.
source_path = ""
try:
for entry in store.job_dir(job_id).iterdir():
if entry.suffix.lower() in {".mp4", ".mkv", ".mov", ".webm"}:
source_path = str(entry)
break
except Exception:
source_path = ""
timed = transcriber.align_words(source_path, text, clip.start_seconds, clip.end_seconds)
return pipeline.patch_clip(
job_id,
clip_id,
{
"subtitle_cues": [cue.model_dump() for cue in timed],
"subtitle_text": " ".join(cue.text for cue in timed if cue.text),
},
)