File size: 7,386 Bytes
cdc4405
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# Copyright (c) 2026 Scenema AI
# https://scenema.ai
# SPDX-License-Identifier: MIT

"""Audio utility functions for Scenema Audio.

Silence trimming, volume normalization, wav I/O, format conversion.
"""

import logging
import math

import numpy as np
import soundfile as sf

logger = logging.getLogger(__name__)


def trim_silence(
    audio_np: np.ndarray,
    sr: int,
    max_silence: float = 0.5,
    threshold_db: float = -40,
) -> np.ndarray:
    """Trim silence exceeding max_silence from start and end of audio.

    Keeps up to max_silence seconds of silence at boundaries.

    Args:
        audio_np: Audio samples, shape (samples,) or (samples, channels).
        sr: Sample rate in Hz.
        max_silence: Maximum silence to keep at head/tail in seconds.
        threshold_db: Amplitude threshold below which audio is considered silence.

    Returns:
        Trimmed audio array with the same number of dimensions as input.
    """
    threshold = 10 ** (threshold_db / 20.0)
    max_silent_samples = int(max_silence * sr)
    window = int(0.02 * sr)  # 20ms analysis window

    if audio_np.ndim == 2:
        mono = audio_np.mean(axis=1)
    else:
        mono = audio_np

    if len(mono) < window:
        return audio_np

    energy = np.array(
        [
            np.abs(mono[i : i + window]).max()
            for i in range(0, len(mono) - window, window)
        ]
    )

    voiced = np.where(energy > threshold)[0]
    if len(voiced) == 0:
        return audio_np

    first_voiced = max(0, voiced[0] * window - max_silent_samples)
    last_voiced = min(len(audio_np), (voiced[-1] + 1) * window + max_silent_samples)

    return audio_np[first_voiced:last_voiced]


def normalize_volume(
    audio_np: np.ndarray,
    sr: int,
    target_lufs: float = -23.0,
) -> np.ndarray:
    """Normalize audio volume to target LUFS (approximate via RMS).

    Uses a simplified RMS-based LUFS approximation suitable for
    per-chunk normalization before concatenation.

    Args:
        audio_np: Audio samples, shape (samples,) or (samples, channels).
        sr: Sample rate in Hz.
        target_lufs: Target loudness in LUFS (default -23, EBU R128).

    Returns:
        Volume-normalized audio array, soft-clipped to prevent distortion.
    """
    if audio_np.ndim == 2:
        mono = audio_np.mean(axis=1)
    else:
        mono = audio_np

    rms = np.sqrt(np.mean(mono**2))
    if rms < 1e-8:
        return audio_np

    current_lufs = 20 * math.log10(rms) - 0.691
    gain_db = target_lufs - current_lufs
    gain = 10 ** (gain_db / 20.0)
    gain = max(0.1, min(gain, 10.0))

    result = audio_np * gain

    peak = np.abs(result).max()
    if peak > 0.99:
        result = result * (0.99 / peak)

    return result


def extract_wav(audio_obj) -> tuple[np.ndarray, int]:
    """Extract numpy waveform from an LTX Audio object.

    Handles shapes: (B,C,samples) -> (samples,C), (C,samples) -> (samples,C).

    Args:
        audio_obj: LTX pipeline Audio object with .waveform and .sampling_rate.

    Returns:
        Tuple of (waveform as float32 numpy, sample_rate).
    """
    w = audio_obj.waveform.cpu().float().numpy()
    if w.ndim == 3:
        w = w.squeeze(0)
    if w.ndim == 2:
        w = w.T
    return w, audio_obj.sampling_rate


def save_wav(audio_np: np.ndarray, sr: int, path: str) -> None:
    """Save audio to WAV file.

    Args:
        audio_np: Audio samples, shape (samples,) or (samples, channels).
        sr: Sample rate in Hz.
        path: Output file path.
    """
    sf.write(path, audio_np, sr)


def load_wav(path: str) -> tuple[np.ndarray, int]:
    """Load audio from WAV file.

    Args:
        path: Input file path.

    Returns:
        Tuple of (audio samples as float64 numpy, sample_rate).
    """
    data, sr = sf.read(path)
    return data, sr


def to_mono(audio_np: np.ndarray) -> np.ndarray:
    """Convert stereo to mono by averaging channels.

    Args:
        audio_np: Audio samples, shape (samples, 2) for stereo or (samples,) for mono.

    Returns:
        Mono audio array, shape (samples,).
    """
    if audio_np.ndim == 2 and audio_np.shape[1] == 2:
        return audio_np.mean(axis=1)
    return audio_np


def shorten_long_silence(
    audio_np: np.ndarray,
    sr: int,
    max_duration: float = 1.0,
    target_duration: float = 0.3,
    threshold_db: float = -35,
) -> np.ndarray:
    """Shorten silence regions longer than max_duration to target_duration.

    Unlike silenceremove which deletes silence entirely, this preserves
    a natural pause of target_duration seconds. Prevents chunk boundary
    artifacts while keeping the audio flow natural.

    Args:
        audio_np: Audio samples, shape (samples,) or (samples, channels).
        sr: Sample rate in Hz.
        max_duration: Silence longer than this is shortened.
        target_duration: Silence is shortened to this duration.
        threshold_db: Amplitude threshold below which audio is silence.

    Returns:
        Audio with long silence regions shortened.
    """
    threshold = 10 ** (threshold_db / 20.0)
    window = int(0.02 * sr)  # 20ms analysis window
    max_samples = int(max_duration * sr)
    target_samples = int(target_duration * sr)

    if audio_np.ndim == 2:
        mono = audio_np.mean(axis=1)
    else:
        mono = audio_np

    if len(mono) < window:
        return audio_np

    # Find silent regions
    energy = np.array(
        [
            np.abs(mono[i : i + window]).max()
            for i in range(0, len(mono) - window, window)
        ]
    )
    is_silent = energy < threshold

    # Build list of (start_sample, end_sample) for silence regions
    silence_regions = []
    in_silence = False
    start = 0
    for i, silent in enumerate(is_silent):
        if silent and not in_silence:
            start = i * window
            in_silence = True
        elif not silent and in_silence:
            end = i * window
            if end - start > max_samples:
                silence_regions.append((start, end))
            in_silence = False
    if in_silence:
        end = len(mono)
        if end - start > max_samples:
            silence_regions.append((start, end))

    if not silence_regions:
        return audio_np

    # Build output by keeping non-silence and shortening long silence
    parts = []
    prev_end = 0
    for s_start, s_end in silence_regions:
        # Keep audio before this silence
        parts.append(audio_np[prev_end:s_start])
        # Add shortened silence (target_duration worth)
        parts.append(audio_np[s_start : s_start + target_samples])
        prev_end = s_end

    # Keep remaining audio after last silence
    parts.append(audio_np[prev_end:])

    result = np.concatenate(parts, axis=0)
    shortened = (len(audio_np) - len(result)) / sr
    if shortened > 0:
        logger.info(
            "Shortened %d silence regions, removed %.1fs",
            len(silence_regions),
            shortened,
        )
    return result


def ensure_stereo(audio_np: np.ndarray) -> np.ndarray:
    """Convert mono to stereo by duplicating the channel.

    Args:
        audio_np: Audio samples, shape (samples,) for mono or (samples, 2) for stereo.

    Returns:
        Stereo audio array, shape (samples, 2).
    """
    if audio_np.ndim == 1:
        return np.stack([audio_np, audio_np], axis=-1)
    return audio_np