Spaces:
Sleeping
Sleeping
| """Audio I/O and validation utilities.""" | |
| from __future__ import annotations | |
| import io | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import soundfile as sf | |
| MIN_DURATION_S = 0.5 | |
| MAX_DURATION_S = 60.0 | |
| MIN_SAMPLE_RATE = 16000 | |
| class AudioValidationError(ValueError): | |
| """Raised when a reference clip fails validation.""" | |
| class ClipInfo: | |
| duration_s: float | |
| sample_rate: int | |
| channels: int | |
| def validate_reference_clip(wav_bytes: bytes) -> ClipInfo: | |
| try: | |
| with sf.SoundFile(io.BytesIO(wav_bytes)) as f: | |
| sample_rate = f.samplerate | |
| channels = f.channels | |
| frames = f.frames | |
| except Exception as exc: | |
| raise AudioValidationError(f"invalid audio format: {exc}") from exc | |
| duration_s = frames / float(sample_rate) if sample_rate else 0.0 | |
| if sample_rate < MIN_SAMPLE_RATE: | |
| raise AudioValidationError( | |
| f"sample rate {sample_rate} below minimum {MIN_SAMPLE_RATE}" | |
| ) | |
| if duration_s < MIN_DURATION_S: | |
| raise AudioValidationError(f"clip too short ({duration_s:.2f}s)") | |
| if duration_s > MAX_DURATION_S: | |
| raise AudioValidationError(f"clip too long ({duration_s:.2f}s)") | |
| return ClipInfo(duration_s=duration_s, sample_rate=sample_rate, channels=channels) | |
| def write_wav_bytes(samples: np.ndarray, sample_rate: int) -> bytes: | |
| buf = io.BytesIO() | |
| sf.write(buf, samples, sample_rate, format="WAV", subtype="PCM_16") | |
| return buf.getvalue() | |
| def normalize_to_mono_16k( | |
| samples: np.ndarray, original_sr: int, target_sr: int = 16000 | |
| ) -> tuple[np.ndarray, int]: | |
| """Downmix to mono and naive linear resample to target_sr.""" | |
| if samples.ndim == 2: | |
| samples = samples.mean(axis=1) | |
| if original_sr == target_sr: | |
| return samples.astype(np.float32), target_sr | |
| duration = samples.shape[0] / float(original_sr) | |
| target_len = int(round(duration * target_sr)) | |
| x_old = np.linspace(0.0, 1.0, samples.shape[0], endpoint=False) | |
| x_new = np.linspace(0.0, 1.0, target_len, endpoint=False) | |
| out = np.interp(x_new, x_old, samples).astype(np.float32) | |
| return out, target_sr | |