Source code for docarray.document.mixins.audio
import wave
from typing import Union, BinaryIO, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING: # pragma: no cover
from docarray.typing import T
[docs]class AudioDataMixin:
"""Provide helper functions for :class:`Document` to support audio data."""
[docs] def save_audio_tensor_to_file(
self: 'T',
file: Union[str, BinaryIO],
sample_rate: int = 44100,
sample_width: int = 2,
) -> 'T':
"""Save :attr:`.tensor` into an wav file. Mono/stereo is preserved.
:param file: if file is a string, open the file by that name, otherwise treat it as a file-like object.
:param sample_rate: sampling frequency
:param sample_width: sample width in bytes
:return: Document itself after processed
"""
# Convert to (little-endian) 16 bit integers.
max_int16 = 2**15
tensor = (self.tensor * max_int16).astype('<h')
n_channels = 2 if self.tensor.ndim > 1 else 1
with wave.open(file, 'w') as f:
# 2 Channels.
f.setnchannels(n_channels)
# 2 bytes per sample.
f.setsampwidth(sample_width)
f.setframerate(sample_rate)
f.writeframes(tensor.tobytes())
return self
[docs] def load_uri_to_audio_tensor(self: 'T') -> 'T':
"""Convert an audio :attr:`.uri` into :attr:`.tensor` inplace
:return: Document itself after processed
"""
if self.uri.startswith('http'):
import io
import requests
resp = requests.get(self.uri)
resp.raise_for_status()
file = io.BytesIO()
file.write(resp.content)
file.seek(0)
else:
file = self.uri
with wave.open(
file
) as ifile: #: note wave is Python built-in module https://docs.python.org/3/library/wave.html
samples = ifile.getnframes()
audio = ifile.readframes(samples)
# Convert buffer to float32 using NumPy
audio_as_np_int16 = np.frombuffer(audio, dtype=np.int16)
audio_as_np_float32 = audio_as_np_int16.astype(np.float32)
# Normalise float32 array so that values are between -1.0 and +1.0
max_int16 = 2**15
audio_normalised = audio_as_np_float32 / max_int16
channels = ifile.getnchannels()
if channels == 2:
# 1 for mono, 2 for stereo
audio_stereo = np.empty(
(int(len(audio_normalised) / channels), channels)
)
audio_stereo[:, 0] = audio_normalised[
range(0, len(audio_normalised), 2)
]
audio_stereo[:, 1] = audio_normalised[
range(1, len(audio_normalised), 2)
]
self.tensor = audio_stereo
else:
self.tensor = audio_normalised
return self