File size: 2,158 Bytes
ca78147
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""Audio I/O and validation utilities."""
from __future__ import annotations

import io
from dataclasses import dataclass

import numpy as np
import soundfile as sf


MIN_DURATION_S = 0.5
MAX_DURATION_S = 60.0
MIN_SAMPLE_RATE = 16000


class AudioValidationError(ValueError):
    """Raised when a reference clip fails validation."""


@dataclass(frozen=True)
class ClipInfo:
    duration_s: float
    sample_rate: int
    channels: int


def validate_reference_clip(wav_bytes: bytes) -> ClipInfo:
    try:
        with sf.SoundFile(io.BytesIO(wav_bytes)) as f:
            sample_rate = f.samplerate
            channels = f.channels
            frames = f.frames
    except Exception as exc:
        raise AudioValidationError(f"invalid audio format: {exc}") from exc

    duration_s = frames / float(sample_rate) if sample_rate else 0.0

    if sample_rate < MIN_SAMPLE_RATE:
        raise AudioValidationError(
            f"sample rate {sample_rate} below minimum {MIN_SAMPLE_RATE}"
        )
    if duration_s < MIN_DURATION_S:
        raise AudioValidationError(f"clip too short ({duration_s:.2f}s)")
    if duration_s > MAX_DURATION_S:
        raise AudioValidationError(f"clip too long ({duration_s:.2f}s)")

    return ClipInfo(duration_s=duration_s, sample_rate=sample_rate, channels=channels)


def write_wav_bytes(samples: np.ndarray, sample_rate: int) -> bytes:
    buf = io.BytesIO()
    sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16")
    return buf.getvalue()


def normalize_to_mono_16k(
    samples: np.ndarray, original_sr: int, target_sr: int = 16000
) -> tuple[np.ndarray, int]:
    """Downmix to mono and naive linear resample to target_sr."""
    if samples.ndim == 2:
        samples = samples.mean(axis=1)
    if original_sr == target_sr:
        return samples.astype(np.float32), target_sr
    duration = samples.shape[0] / float(original_sr)
    target_len = int(round(duration * target_sr))
    x_old = np.linspace(0.0, 1.0, samples.shape[0], endpoint=False)
    x_new = np.linspace(0.0, 1.0, target_len, endpoint=False)
    out = np.interp(x_new, x_old, samples).astype(np.float32)
    return out, target_sr