Spaces:
Running on Zero
Running on Zero
| # Copyright (c) 2026 Scenema AI | |
| # https://scenema.ai | |
| # SPDX-License-Identifier: MIT | |
| """Audio utility functions for Scenema Audio. | |
| Silence trimming, volume normalization, wav I/O, format conversion. | |
| """ | |
| import logging | |
| import math | |
| import numpy as np | |
| import soundfile as sf | |
| logger = logging.getLogger(__name__) | |
| def trim_silence( | |
| audio_np: np.ndarray, | |
| sr: int, | |
| max_silence: float = 0.5, | |
| threshold_db: float = -40, | |
| ) -> np.ndarray: | |
| """Trim silence exceeding max_silence from start and end of audio. | |
| Keeps up to max_silence seconds of silence at boundaries. | |
| Args: | |
| audio_np: Audio samples, shape (samples,) or (samples, channels). | |
| sr: Sample rate in Hz. | |
| max_silence: Maximum silence to keep at head/tail in seconds. | |
| threshold_db: Amplitude threshold below which audio is considered silence. | |
| Returns: | |
| Trimmed audio array with the same number of dimensions as input. | |
| """ | |
| threshold = 10 ** (threshold_db / 20.0) | |
| max_silent_samples = int(max_silence * sr) | |
| window = int(0.02 * sr) # 20ms analysis window | |
| if audio_np.ndim == 2: | |
| mono = audio_np.mean(axis=1) | |
| else: | |
| mono = audio_np | |
| if len(mono) < window: | |
| return audio_np | |
| energy = np.array( | |
| [ | |
| np.abs(mono[i : i + window]).max() | |
| for i in range(0, len(mono) - window, window) | |
| ] | |
| ) | |
| voiced = np.where(energy > threshold)[0] | |
| if len(voiced) == 0: | |
| return audio_np | |
| first_voiced = max(0, voiced[0] * window - max_silent_samples) | |
| last_voiced = min(len(audio_np), (voiced[-1] + 1) * window + max_silent_samples) | |
| return audio_np[first_voiced:last_voiced] | |
| def normalize_volume( | |
| audio_np: np.ndarray, | |
| sr: int, | |
| target_lufs: float = -23.0, | |
| ) -> np.ndarray: | |
| """Normalize audio volume to target LUFS (approximate via RMS). | |
| Uses a simplified RMS-based LUFS approximation suitable for | |
| per-chunk normalization before concatenation. | |
| Args: | |
| audio_np: Audio samples, shape (samples,) or (samples, channels). | |
| sr: Sample rate in Hz. | |
| target_lufs: Target loudness in LUFS (default -23, EBU R128). | |
| Returns: | |
| Volume-normalized audio array, soft-clipped to prevent distortion. | |
| """ | |
| if audio_np.ndim == 2: | |
| mono = audio_np.mean(axis=1) | |
| else: | |
| mono = audio_np | |
| rms = np.sqrt(np.mean(mono**2)) | |
| if rms < 1e-8: | |
| return audio_np | |
| current_lufs = 20 * math.log10(rms) - 0.691 | |
| gain_db = target_lufs - current_lufs | |
| gain = 10 ** (gain_db / 20.0) | |
| gain = max(0.1, min(gain, 10.0)) | |
| result = audio_np * gain | |
| peak = np.abs(result).max() | |
| if peak > 0.99: | |
| result = result * (0.99 / peak) | |
| return result | |
| def extract_wav(audio_obj) -> tuple[np.ndarray, int]: | |
| """Extract numpy waveform from an LTX Audio object. | |
| Handles shapes: (B,C,samples) -> (samples,C), (C,samples) -> (samples,C). | |
| Args: | |
| audio_obj: LTX pipeline Audio object with .waveform and .sampling_rate. | |
| Returns: | |
| Tuple of (waveform as float32 numpy, sample_rate). | |
| """ | |
| w = audio_obj.waveform.cpu().float().numpy() | |
| if w.ndim == 3: | |
| w = w.squeeze(0) | |
| if w.ndim == 2: | |
| w = w.T | |
| return w, audio_obj.sampling_rate | |
| def save_wav(audio_np: np.ndarray, sr: int, path: str) -> None: | |
| """Save audio to WAV file. | |
| Args: | |
| audio_np: Audio samples, shape (samples,) or (samples, channels). | |
| sr: Sample rate in Hz. | |
| path: Output file path. | |
| """ | |
| sf.write(path, audio_np, sr) | |
| def load_wav(path: str) -> tuple[np.ndarray, int]: | |
| """Load audio from WAV file. | |
| Args: | |
| path: Input file path. | |
| Returns: | |
| Tuple of (audio samples as float64 numpy, sample_rate). | |
| """ | |
| data, sr = sf.read(path) | |
| return data, sr | |
| def to_mono(audio_np: np.ndarray) -> np.ndarray: | |
| """Convert stereo to mono by averaging channels. | |
| Args: | |
| audio_np: Audio samples, shape (samples, 2) for stereo or (samples,) for mono. | |
| Returns: | |
| Mono audio array, shape (samples,). | |
| """ | |
| if audio_np.ndim == 2 and audio_np.shape[1] == 2: | |
| return audio_np.mean(axis=1) | |
| return audio_np | |
| def shorten_long_silence( | |
| audio_np: np.ndarray, | |
| sr: int, | |
| max_duration: float = 1.0, | |
| target_duration: float = 0.3, | |
| threshold_db: float = -35, | |
| ) -> np.ndarray: | |
| """Shorten silence regions longer than max_duration to target_duration. | |
| Unlike silenceremove which deletes silence entirely, this preserves | |
| a natural pause of target_duration seconds. Prevents chunk boundary | |
| artifacts while keeping the audio flow natural. | |
| Args: | |
| audio_np: Audio samples, shape (samples,) or (samples, channels). | |
| sr: Sample rate in Hz. | |
| max_duration: Silence longer than this is shortened. | |
| target_duration: Silence is shortened to this duration. | |
| threshold_db: Amplitude threshold below which audio is silence. | |
| Returns: | |
| Audio with long silence regions shortened. | |
| """ | |
| threshold = 10 ** (threshold_db / 20.0) | |
| window = int(0.02 * sr) # 20ms analysis window | |
| max_samples = int(max_duration * sr) | |
| target_samples = int(target_duration * sr) | |
| if audio_np.ndim == 2: | |
| mono = audio_np.mean(axis=1) | |
| else: | |
| mono = audio_np | |
| if len(mono) < window: | |
| return audio_np | |
| # Find silent regions | |
| energy = np.array( | |
| [ | |
| np.abs(mono[i : i + window]).max() | |
| for i in range(0, len(mono) - window, window) | |
| ] | |
| ) | |
| is_silent = energy < threshold | |
| # Build list of (start_sample, end_sample) for silence regions | |
| silence_regions = [] | |
| in_silence = False | |
| start = 0 | |
| for i, silent in enumerate(is_silent): | |
| if silent and not in_silence: | |
| start = i * window | |
| in_silence = True | |
| elif not silent and in_silence: | |
| end = i * window | |
| if end - start > max_samples: | |
| silence_regions.append((start, end)) | |
| in_silence = False | |
| if in_silence: | |
| end = len(mono) | |
| if end - start > max_samples: | |
| silence_regions.append((start, end)) | |
| if not silence_regions: | |
| return audio_np | |
| # Build output by keeping non-silence and shortening long silence | |
| parts = [] | |
| prev_end = 0 | |
| for s_start, s_end in silence_regions: | |
| # Keep audio before this silence | |
| parts.append(audio_np[prev_end:s_start]) | |
| # Add shortened silence (target_duration worth) | |
| parts.append(audio_np[s_start : s_start + target_samples]) | |
| prev_end = s_end | |
| # Keep remaining audio after last silence | |
| parts.append(audio_np[prev_end:]) | |
| result = np.concatenate(parts, axis=0) | |
| shortened = (len(audio_np) - len(result)) / sr | |
| if shortened > 0: | |
| logger.info( | |
| "Shortened %d silence regions, removed %.1fs", | |
| len(silence_regions), | |
| shortened, | |
| ) | |
| return result | |
| def ensure_stereo(audio_np: np.ndarray) -> np.ndarray: | |
| """Convert mono to stereo by duplicating the channel. | |
| Args: | |
| audio_np: Audio samples, shape (samples,) for mono or (samples, 2) for stereo. | |
| Returns: | |
| Stereo audio array, shape (samples, 2). | |
| """ | |
| if audio_np.ndim == 1: | |
| return np.stack([audio_np, audio_np], axis=-1) | |
| return audio_np | |