| |
| """ |
| Audio Processing Module |
| Handles audio extraction, processing, and integration with FFmpeg operations. |
| |
| Upgrades: |
| - Prefer lossless audio stream-copy for muxing (no generational loss). |
| - Safe fallback to AAC re-encode when needed. |
| - Optional EBU R128 loudness normalization (two-pass loudnorm). |
| - Optional audio/video offset with sample-accurate filters. |
| - Robust ffprobe-based audio detection and metadata. |
| - MoviePy fallback when ffmpeg is unavailable. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import re |
| import json |
| import time |
| import math |
| import shutil |
| import logging |
| import tempfile |
| import subprocess |
| from pathlib import Path |
| from typing import Optional, Dict, Any, List |
|
|
| from core.exceptions import AudioProcessingError |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AudioProcessor: |
| """ |
| Comprehensive audio processing for video background replacement. |
| """ |
|
|
| def __init__(self, temp_dir: Optional[str] = None): |
| self.temp_dir = temp_dir or tempfile.gettempdir() |
| self.ffmpeg_path = shutil.which("ffmpeg") |
| self.ffprobe_path = shutil.which("ffprobe") |
| self.ffmpeg_available = self.ffmpeg_path is not None |
| self.ffprobe_available = self.ffprobe_path is not None |
|
|
| self.stats = { |
| "audio_extractions": 0, |
| "audio_merges": 0, |
| "total_processing_time": 0.0, |
| "failed_operations": 0, |
| } |
|
|
| if not self.ffmpeg_available: |
| logger.warning("FFmpeg not available - audio processing will be limited") |
| logger.info( |
| "AudioProcessor initialized (FFmpeg: %s, FFprobe: %s)", |
| self.ffmpeg_available, |
| self.ffprobe_available, |
| ) |
|
|
| |
| |
| |
|
|
| def _run(self, cmd: List[str], tag: str = "") -> subprocess.CompletedProcess: |
| logger.info("ffmpeg%s: %s", f"[{tag}]" if tag else "", " ".join(cmd)) |
| return subprocess.run(cmd, text=True, capture_output=True) |
|
|
| def _has_audio(self, path: str) -> bool: |
| if not os.path.isfile(path): |
| return False |
| if self.ffprobe_available: |
| try: |
| proc = subprocess.run( |
| [ |
| self.ffprobe_path, "-v", "error", |
| "-select_streams", "a:0", |
| "-show_entries", "stream=index", |
| "-of", "csv=p=0", |
| path, |
| ], |
| text=True, capture_output=True, check=False, |
| ) |
| return bool(proc.stdout.strip()) |
| except Exception: |
| pass |
| |
| if self.ffmpeg_available: |
| try: |
| proc = subprocess.run( |
| [self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", path, "-f", "null", "-"], |
| text=True, capture_output=True, |
| ) |
| return "Audio:" in (proc.stderr or "") |
| except Exception: |
| return False |
| return False |
|
|
| |
| |
| |
|
|
| def get_audio_info(self, video_path: str) -> Dict[str, Any]: |
| """ |
| Get comprehensive audio information from a media file. |
| """ |
| if not self.ffprobe_available: |
| return {"has_audio": False, "error": "FFprobe not available"} |
|
|
| try: |
| proc = subprocess.run( |
| [ |
| self.ffprobe_path, "-v", "error", |
| "-select_streams", "a:0", |
| "-show_entries", "stream=codec_name,sample_rate,channels,bit_rate,duration", |
| "-of", "json", |
| video_path, |
| ], |
| text=True, capture_output=True, check=False, |
| ) |
| if proc.returncode != 0: |
| return {"has_audio": False, "error": proc.stderr.strip()} |
|
|
| data = json.loads(proc.stdout or "{}") |
| streams = data.get("streams", []) |
| if not streams: |
| return {"has_audio": False, "error": "No audio stream found"} |
|
|
| s = streams[0] |
| info = { |
| "has_audio": True, |
| "codec": s.get("codec_name", "unknown"), |
| "sample_rate": int(s["sample_rate"]) if s.get("sample_rate") else "unknown", |
| "channels": int(s["channels"]) if s.get("channels") else "unknown", |
| "duration": float(s["duration"]) if s.get("duration") else "unknown", |
| "bit_rate": int(s["bit_rate"]) if s.get("bit_rate") else "unknown", |
| } |
| return info |
| except Exception as e: |
| logger.error("Error getting audio info: %s", e) |
| return {"has_audio": False, "error": str(e)} |
|
|
| |
| |
| |
|
|
| def extract_audio( |
| self, |
| video_path: str, |
| output_path: Optional[str] = None, |
| audio_format: str = "aac", |
| quality: str = "high", |
| ) -> Optional[str]: |
| """ |
| Extract audio from a media file to a separate file. |
| """ |
| if not self.ffmpeg_available: |
| raise AudioProcessingError("extract", "FFmpeg not available", video_path) |
|
|
| start = time.time() |
| info = self.get_audio_info(video_path) |
| if not info.get("has_audio", False): |
| logger.info("No audio found in %s", video_path) |
| return None |
|
|
| if output_path is None: |
| output_path = os.path.join(self.temp_dir, f"extracted_audio_{int(time.time())}.{audio_format}") |
|
|
| quality_map = { |
| "low": {"aac": ["-b:a", "96k"], "mp3": ["-b:a", "128k"], "wav": []}, |
| "medium": {"aac": ["-b:a", "192k"], "mp3": ["-b:a", "192k"], "wav": []}, |
| "high": {"aac": ["-b:a", "320k"], "mp3": ["-b:a", "320k"], "wav": []}, |
| } |
| codec_map = {"aac": ["-c:a", "aac"], "mp3": ["-c:a", "libmp3lame"], "wav": ["-c:a", "pcm_s16le"]} |
|
|
| cmd = [self.ffmpeg_path, "-y", "-i", video_path] |
| cmd += codec_map.get(audio_format, ["-c:a", "aac"]) |
| cmd += quality_map.get(quality, {}).get(audio_format, []) |
| cmd += ["-vn", output_path] |
|
|
| proc = self._run(cmd, "extract") |
| if proc.returncode != 0: |
| self.stats["failed_operations"] += 1 |
| raise AudioProcessingError("extract", f"FFmpeg failed: {proc.stderr}", video_path, output_path) |
|
|
| if not os.path.exists(output_path): |
| self.stats["failed_operations"] += 1 |
| raise AudioProcessingError("extract", "Output audio file was not created", video_path, output_path) |
|
|
| self.stats["audio_extractions"] += 1 |
| self.stats["total_processing_time"] += (time.time() - start) |
| logger.info("Audio extracted: %s", output_path) |
| return output_path |
|
|
| |
| |
| |
|
|
| def _measure_loudness(self, src_with_audio: str, stream_selector: str = "1:a:0") -> Optional[Dict[str, float]]: |
| """ |
| First pass loudnorm to measure levels. Returns dict with input_i, input_tp, input_lra, input_thresh, target_offset. |
| We run ffmpeg with -filter_complex on the selected audio input and parse the printed JSON (stderr). |
| """ |
| |
| |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-nostats", "-loglevel", "warning", |
| "-i", src_with_audio, |
| "-vn", |
| "-af", "loudnorm=I=-16:TP=-1.5:LRA=11:print_format=json", |
| "-f", "null", "-" |
| ] |
| proc = self._run(cmd, "loudnorm-pass1") |
| txt = (proc.stderr or "") + (proc.stdout or "") |
| |
| m = re.search(r"\{\s*\"input_i\"[^\}]+\}", txt, re.MULTILINE | re.DOTALL) |
| if not m: |
| logger.warning("Could not parse loudnorm analysis output.") |
| return None |
| try: |
| data = json.loads(m.group(0)) |
| |
| return { |
| "input_i": float(data.get("input_i")), |
| "input_tp": float(data.get("input_tp")), |
| "input_lra": float(data.get("input_lra")), |
| "input_thresh": float(data.get("input_thresh")), |
| "target_offset": float(data.get("target_offset")), |
| } |
| except Exception as e: |
| logger.warning("Loudnorm analysis JSON parse error: %s", e) |
| return None |
|
|
| def _build_loudnorm_filter(self, measured: Dict[str, float], target_I=-16.0, target_TP=-1.5, target_LRA=11.0) -> str: |
| """ |
| Build the second-pass loudnorm filter string using measured values. |
| """ |
| |
| return ( |
| "loudnorm=" |
| f"I={target_I}:TP={target_TP}:LRA={target_LRA}:" |
| f"measured_I={measured['input_i']}:" |
| f"measured_TP={measured['input_tp']}:" |
| f"measured_LRA={measured['input_lra']}:" |
| f"measured_thresh={measured['input_thresh']}:" |
| f"offset={measured['target_offset']}:" |
| "linear=true:print_format=summary" |
| ) |
|
|
| |
| |
| |
|
|
| def add_audio_to_video( |
| self, |
| original_video: str, |
| processed_video: str, |
| output_path: Optional[str] = None, |
| audio_quality: str = "high", |
| normalize: bool = False, |
| normalize_I: float = -16.0, |
| normalize_TP: float = -1.5, |
| normalize_LRA: float = 11.0, |
| offset_ms: float = 0.0, |
| ) -> str: |
| """ |
| Add/mux the audio from original_video into processed_video. |
| |
| Strategy: |
| 1) If no audio in original → return processed (or copy to desired name). |
| 2) If ffmpeg present: |
| a) If normalize/offset requested → re-encode AAC with filters (two-pass loudnorm). |
| b) Else try stream-copy (lossless): -c:a copy. If that fails, AAC re-encode. |
| 3) If ffmpeg missing → fallback to MoviePy (re-encode). |
| |
| Returns path to the muxed video (MP4). |
| """ |
| if not os.path.isfile(processed_video): |
| raise FileNotFoundError(f"Processed video not found: {processed_video}") |
|
|
| if output_path is None: |
| base = os.path.splitext(os.path.basename(processed_video))[0] |
| output_path = os.path.join(os.path.dirname(processed_video), f"{base}_with_audio.mp4") |
|
|
| |
| if not self._has_audio(original_video): |
| logger.info("Original has no audio; returning processed video unchanged.") |
| if processed_video != output_path: |
| shutil.copy2(processed_video, output_path) |
| return output_path |
|
|
| if not self.ffmpeg_available: |
| logger.warning("FFmpeg not available – using MoviePy fallback.") |
| return self._moviepy_mux(original_video, processed_video, output_path) |
|
|
| start = time.time() |
|
|
| |
| if normalize or abs(offset_ms) > 1e-3: |
| |
| filter_chain = [] |
| if abs(offset_ms) > 1e-3: |
| if offset_ms > 0: |
| |
| ms = int(round(offset_ms)) |
| filter_chain.append(f"adelay={ms}|{ms}") |
| else: |
| |
| secs = abs(offset_ms) / 1000.0 |
| filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
|
|
| if normalize: |
| measured = self._measure_loudness(original_video) |
| if measured: |
| filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| else: |
| |
| filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
| afilter = ",".join(filter_chain) if filter_chain else None |
|
|
| |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", processed_video, |
| "-i", original_video, |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-c:v", "copy", |
| "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
| if afilter: |
| |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", processed_video, |
| "-i", original_video, |
| "-map", "0:v:0", |
| "-filter_complex", f"[1:a]{afilter}[aout]", |
| "-map", "[aout]", |
| "-c:v", "copy", |
| "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
|
|
| proc = self._run(cmd, "mux-reencode-filters") |
| if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| self.stats["audio_merges"] += 1 |
| self.stats["total_processing_time"] += (time.time() - start) |
| logger.info("Audio merged with filters (normalize=%s, offset_ms=%.2f): %s", normalize, offset_ms, output_path) |
| return output_path |
|
|
| logger.warning("Filtered mux failed; stderr: %s", proc.stderr) |
|
|
| |
| cmd_copy = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", processed_video, |
| "-i", original_video, |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-c:v", "copy", |
| "-c:a", "copy", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
| proc = self._run(cmd_copy, "mux-copy") |
| if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| self.stats["audio_merges"] += 1 |
| self.stats["total_processing_time"] += (time.time() - start) |
| logger.info("Audio merged (stream-copy): %s", output_path) |
| return output_path |
|
|
| |
| quality_map = {"low": ["-b:a", "96k"], "medium": ["-b:a", "192k"], "high": ["-b:a", "320k"]} |
| cmd_aac = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", processed_video, |
| "-i", original_video, |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-c:v", "copy", |
| "-c:a", "aac", |
| *quality_map.get(audio_quality, quality_map["high"]), |
| "-ac", "2", "-ar", "48000", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
| proc = self._run(cmd_aac, "mux-aac") |
| if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| self.stats["audio_merges"] += 1 |
| self.stats["total_processing_time"] += (time.time() - start) |
| logger.info("Audio merged (AAC re-encode): %s", output_path) |
| return output_path |
|
|
| |
| logger.warning("FFmpeg mux failed; using MoviePy fallback.") |
| return self._moviepy_mux(original_video, processed_video, output_path) |
|
|
| |
| |
| |
|
|
| def _moviepy_mux(self, original_video: str, processed_video: str, output_path: str) -> str: |
| try: |
| from moviepy.editor import VideoFileClip, AudioFileClip |
| except Exception as e: |
| self.stats["failed_operations"] += 1 |
| raise AudioProcessingError("mux", f"MoviePy unavailable and ffmpeg failed: {e}", processed_video) |
|
|
| with VideoFileClip(processed_video) as v_clip: |
| try: |
| a_clip = AudioFileClip(original_video) |
| except Exception as e: |
| logger.warning("MoviePy could not load audio from %s (%s). Returning processed video.", original_video, e) |
| if processed_video != output_path: |
| shutil.copy2(processed_video, output_path) |
| return output_path |
|
|
| v_clip = v_clip.set_audio(a_clip) |
| v_clip.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| audio_bitrate="192k", |
| temp_audiofile=os.path.join(self.temp_dir, "temp-audio.m4a"), |
| remove_temp=True, |
| threads=2, |
| preset="medium", |
| ) |
| return output_path |
|
|
| |
| |
| |
|
|
| def sync_audio_video( |
| self, |
| video_path: str, |
| audio_path: str, |
| output_path: str, |
| offset_ms: float = 0.0, |
| normalize: bool = False, |
| normalize_I: float = -16.0, |
| normalize_TP: float = -1.5, |
| normalize_LRA: float = 11.0, |
| ) -> bool: |
| """ |
| Synchronize a separate audio file with a video (copy video, re-encode audio AAC). |
| Positive offset_ms delays audio; negative trims audio start. |
| """ |
| if not self.ffmpeg_available: |
| raise AudioProcessingError("sync", "FFmpeg not available") |
|
|
| filter_chain = [] |
| if abs(offset_ms) > 1e-3: |
| if offset_ms > 0: |
| ms = int(round(offset_ms)) |
| filter_chain.append(f"adelay={ms}|{ms}") |
| else: |
| secs = abs(offset_ms) / 1000.0 |
| filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
|
|
| if normalize: |
| measured = self._measure_loudness(audio_path) |
| if measured: |
| filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| else: |
| filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
| afilter = ",".join(filter_chain) if filter_chain else None |
|
|
| if afilter: |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", video_path, |
| "-i", audio_path, |
| "-map", "0:v:0", |
| "-filter_complex", f"[1:a]{afilter}[aout]", |
| "-map", "[aout]", |
| "-c:v", "copy", |
| "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
| else: |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", video_path, |
| "-i", audio_path, |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-c:v", "copy", |
| "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
|
|
| proc = self._run(cmd, "sync") |
| return proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
| |
| |
| |
|
|
| def adjust_audio_levels( |
| self, |
| input_path: str, |
| output_path: str, |
| volume_factor: float = 1.0, |
| normalize: bool = False, |
| normalize_I: float = -16.0, |
| normalize_TP: float = -1.5, |
| normalize_LRA: float = 11.0, |
| ) -> bool: |
| """ |
| Adjust levels on a single-file video (copy video, re-encode audio AAC). |
| """ |
| if not self.ffmpeg_available: |
| raise AudioProcessingError("adjust_levels", "FFmpeg not available") |
|
|
| filters = [] |
| if volume_factor != 1.0: |
| filters.append(f"volume={volume_factor}") |
| if normalize: |
| measured = self._measure_loudness(input_path) |
| if measured: |
| filters.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| else: |
| filters.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
| if filters: |
| cmd = [ |
| self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| "-i", input_path, |
| "-c:v", "copy", |
| "-af", ",".join(filters), |
| "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| "-movflags", "+faststart", |
| "-y", output_path, |
| ] |
| else: |
| |
| shutil.copy2(input_path, output_path) |
| return True |
|
|
| proc = self._run(cmd, "adjust-levels") |
| if proc.returncode != 0: |
| raise AudioProcessingError("adjust_levels", proc.stderr, input_path) |
| return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
| |
| |
| |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| tot_ops = self.stats["audio_extractions"] + self.stats["audio_merges"] + self.stats["failed_operations"] |
| successes = self.stats["audio_extractions"] + self.stats["audio_merges"] |
| success_rate = (successes / max(1, tot_ops)) * 100.0 |
| return { |
| "ffmpeg_available": self.ffmpeg_available, |
| "ffprobe_available": self.ffprobe_available, |
| "audio_extractions": self.stats["audio_extractions"], |
| "audio_merges": self.stats["audio_merges"], |
| "total_processing_time": self.stats["total_processing_time"], |
| "failed_operations": self.stats["failed_operations"], |
| "success_rate": success_rate, |
| } |
|
|
| def cleanup_temp_files(self, max_age_hours: int = 24): |
| """ |
| Clean up temporary audio/video files older than specified age in temp_dir. |
| """ |
| try: |
| temp_path = Path(self.temp_dir) |
| cutoff = time.time() - (max_age_hours * 3600) |
| cleaned = 0 |
| |
| for ext in (".aac", ".mp3", ".wav", ".mp4", ".m4a"): |
| for p in temp_path.glob(f"*audio*{ext}"): |
| try: |
| if p.stat().st_mtime < cutoff: |
| p.unlink() |
| cleaned += 1 |
| except Exception as e: |
| logger.warning("Could not delete temp file %s: %s", p, e) |
| if cleaned: |
| logger.info("Cleaned up %d temporary audio files", cleaned) |
| except Exception as e: |
| logger.warning("Temp file cleanup error: %s", e) |
|
|