#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
import sys
import os
import soundfile
import tempfile
import numpy
import librosa
from six.moves.http_client import BadStatusLine
from dcase_util.containers import ContainerMixin, FileMixin
from dcase_util.ui.ui import FancyStringifier, FancyHTMLStringifier
from dcase_util.utils import FileFormat, Path, is_int, is_jupyter, get_audio_info
[docs]class AudioContainer(ContainerMixin, FileMixin):
"""Audio container class."""
valid_formats = [FileFormat.WAV, FileFormat.FLAC,
FileFormat.OGG,
FileFormat.M4A, FileFormat.WEBM,
FileFormat.MP3, FileFormat.MP4,
FileFormat.MKV] #: Valid file formats
[docs] def __init__(self,
data=None, fs=44100,
focus_start_samples=None, focus_stop_samples=None, focus_channel=None, channel_labels=None,
**kwargs):
"""Constructor
Parameters
----------
data : numpy.ndarray or list of numpy.ndarray
Data to initialize the container
Default value None
fs : int
Target sampling frequency, if loaded audio does have different sampling frequency, audio will be re-sampled.
Default value "44100"
focus_start_samples : int
Focus segment start
Default value None
focus_stop_samples : int
Focus segment stop
Default value None
focus_channel : int
Focus segment channel
Default value None
channel_labels : list
Channel names
Default value None
filename : str, optional
File path
"""
# Run ContainerMixin init
ContainerMixin.__init__(self, **kwargs)
# Run FileMixin init
FileMixin.__init__(self, **kwargs)
# Run super init to call init of mixins too
super(AudioContainer, self).__init__(**kwargs)
self.channel_axis = 0
self.time_axis = 1
# Audio data
if data is None:
# Initialize with array
data = numpy.ndarray((0, ))
if isinstance(data, list):
data = numpy.vstack(data)
self._data = data
self.data_synced_with_file = False
self.fs = fs
self.filetype_info = None
# Filename
if self.filename:
self.detect_file_format()
self.validate_format()
# Initialize focus segment variables
self._focus_start = None
self._focus_stop = None
self._focus_channel = None
# Set focus segment through properties
self.focus_start_samples = focus_start_samples
self.focus_stop_samples = focus_stop_samples
self.focus_channel = focus_channel
self.channel_labels = channel_labels
def __getstate__(self):
d = super(AudioContainer, self).__getstate__()
d.update({
'channel_axis': self.channel_axis,
'time_axis': self.time_axis,
'_data': self._data,
'data_synced_with_file': self.data_synced_with_file,
'fs': self.fs,
'filetype_info': self.filetype_info,
'filename': self.filename,
'_focus_start': self._focus_start,
'_focus_stop': self._focus_stop,
'_focus_channel': self._focus_channel,
})
return d
def __setstate__(self, d):
super(AudioContainer, self).__setstate__(d)
self.channel_axis = d['channel_axis']
self.time_axis = d['time_axis']
self._data = d['_data']
self.data_synced_with_file = d['data_synced_with_file']
self.fs = d['fs']
self.filetype_info = d['filetype_info']
self.filename = d['filename']
self._focus_start = None
self._focus_stop = None
self._focus_channel = None
self.focus_start = d['_focus_start']
self.focus_stop = d['_focus_stop']
self.focus_channel = d['_focus_channel']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = ''
output += ui.class_name(self.__class__.__name__, indent=indent) + '\n'
if self.filename:
output += ui.data(
field='Filename',
value=self.filename,
indent=indent
) + '\n'
if self.filetype_info and self.filetype_info.values:
output += ui.data(
field='Format',
value=self.format + ' (' + ', '.join(self.filetype_info.values()) + ')',
indent=indent
) + '\n'
else:
output += ui.data(field='Format', value=self.format, indent=indent) + '\n'
output += ui.data(
field='Synced',
value='Yes' if self.data_synced_with_file else 'No',
indent=indent
) + '\n'
output += ui.data(
field='Sampling rate',
value=str(self.fs),
unit='hz',
indent=indent
) + '\n'
output += ui.data(
field='Channels',
value=str(self.channels),
indent=indent
) + '\n'
if self.channel_labels:
if isinstance(self.channel_labels, list):
output += ui.data(
field='Labels',
value='',
indent=indent + 2
) + '\n'
for channel_id, label in enumerate(self.channel_labels):
output += ui.data(
field='[{channel_id}]'.format(channel_id=channel_id),
value=str(label),
indent=indent+3
) + '\n'
output += ui.line(field='Duration', indent=indent) + '\n'
output += ui.data(
indent=indent + 2,
field='Seconds',
value=self.duration_sec,
unit='sec'
) + '\n'
output += ui.data(
indent=indent + 2,
field='Milliseconds',
value=self.duration_ms,
unit='ms'
) + '\n'
output += ui.data(
indent=indent + 2,
field='Samples',
value=self.duration_samples,
unit='samples'
) + '\n'
if self._focus_channel is not None or self._focus_start is not None or self._focus_stop is not None:
output += ui.line(field='Focus segment', indent=indent) + '\n'
if self.focus_channel is not None:
if self.channels == 2:
if self._focus_channel == 0:
output += ui.data(
indent=indent + 4,
field='Channel',
value='{channel} [{label}]'.format(
channel=self._focus_channel,
label='Left Channel'
)
) + '\n'
elif self._focus_channel == 1:
output += ui.data(
indent=indent + 4,
field='Channel',
value='{channel} [{label}]'.format(
channel=self._focus_channel,
label='Right Channel'
)
) + '\n'
else:
output += ui.data(
indent=indent + 4,
field='Channel',
value=self._focus_channel
) + '\n'
output += ui.line(
indent=indent + 2,
field='Duration'
) + '\n'
output += ui.data(
indent=indent + 4,
field='Seconds',
value=self.focus_stop_seconds - self.focus_start_seconds,
unit='sec'
) + '\n'
output += ui.data(
indent=indent + 4,
field='Samples',
value=self.focus_stop_samples - self.focus_start_samples,
unit='sec'
) + '\n'
output += ui.line(
indent=indent + 2,
field='Start point'
) + '\n'
output += ui.data(
indent=indent + 4,
field='Seconds',
value=self.focus_start_seconds,
unit='sec') + '\n'
output += ui.data(
indent=indent + 4,
field='Samples',
value=self.focus_start_samples,
unit='samples'
) + '\n'
output += ui.line(
indent=indent + 2,
field='Stop point'
) + '\n'
output += ui.data(
indent=indent + 4,
field='Seconds',
value=self.focus_stop_seconds,
unit='sec'
) + '\n'
output += ui.data(
indent=indent + 4,
field='Samples',
value=self.focus_stop_samples,
unit='samples'
) + '\n'
return output
def __nonzero__(self):
return self.loaded
def __getitem__(self, i):
"""Get ith sample, in case of multiple channels array is across channels is returned"""
if not isinstance(i, int):
raise TypeError("Index should be integer")
if i < 0 or i > self.length:
raise KeyError(i)
if len(self._data.shape) == 1:
return self._data[i]
elif len(self._data.shape) > 1:
return self._data[:, i]
else:
return None
def __setitem__(self, i, value):
"""Set ith sample"""
if not isinstance(i, int):
raise TypeError("Index should be integer")
if i < 0 or i > self.length:
raise KeyError(i)
if len(self._data.shape) == 1:
self._data[i] = value
elif len(self._data.shape) > 1:
self._data[:, i] = value
def __iter__(self):
return iter(self._data)
def __len__(self):
return self.length
@property
def data(self):
"""Audio data
Returns
-------
numpy.ndarray
Audio data
"""
return self._data
@data.setter
def data(self, value):
self._data = value
self.data_synced_with_file = False
@property
def focus_start_samples(self):
"""Focus segment start in samples.
Returns
-------
int
Focus segment start in samples
"""
return self._focus_start
@focus_start_samples.setter
def focus_start_samples(self, value):
if value is not None and value > 0:
value = int(value)
self._focus_start = value
if self._focus_stop is not None and self._focus_stop < self._focus_start:
# focus points are reversed
start = self._focus_start
self._focus_start = self._focus_stop
self._focus_stop = start
else:
self._focus_start = 0
@property
def focus_start_seconds(self):
"""Focus segment start in seconds.
Returns
-------
int
Focus segment start in seconds
"""
return self._sample_to_time(sample=self.focus_start_samples)
@focus_start_seconds.setter
def focus_start_seconds(self, value):
self.focus_start_samples = self._time_to_sample(time=value)
@property
def focus_stop_samples(self):
"""Focus segment stop in samples.
Returns
-------
int
Focus segment stop in samples
"""
if self._focus_stop is None:
return self.length
else:
return self._focus_stop
@focus_stop_samples.setter
def focus_stop_samples(self, value):
if value is None:
self._focus_stop = None
else:
if value <= self.duration_samples and value is not None:
value = int(value)
self._focus_stop = value
if self._focus_start is not None and self._focus_stop < self._focus_start:
# focus points are reversed
start = self._focus_start
self._focus_start = self._focus_stop
self._focus_stop = start
else:
self._focus_stop = self.duration_samples
@property
def focus_stop_seconds(self):
"""Focus segment stop in seconds.
Returns
-------
int
Focus segment stop in seconds
"""
return self._sample_to_time(sample=self.focus_stop_samples)
@focus_stop_seconds.setter
def focus_stop_seconds(self, value):
self.focus_stop_samples = self._time_to_sample(time=value)
@property
def focus_channel(self):
"""Focus channel
Returns
-------
int or str
Focus channel
"""
return self._focus_channel
@focus_channel.setter
def focus_channel(self, value):
if value is not None and is_int(value):
if 0 <= value < self.channels:
self._focus_channel = value
else:
self._focus_channel = None
elif value is not None and isinstance(value, str):
if value.lower() == 'mixdown':
self._focus_channel = 'mixdown'
elif value.lower() == 'left' or value.lower() == 'l':
self._focus_channel = 0
elif value.lower() == 'right' or value.lower() == 'r':
self._focus_channel = 1
else:
# Unknown channel label given
message = '{name}: Unknown channel [{channel}]'.format(name=self.__class__.__name__, channel=value)
self.logger.exception(message)
raise ValueError(message)
else:
self._focus_channel = None
@property
def loaded(self):
"""Audio load status.
Returns
-------
bool
Audio status
"""
if isinstance(self._data, numpy.ndarray) and len(self._data) > 0:
return True
else:
return False
@property
def shape(self):
"""Audio data shape.
Returns
-------
tuple
shape of audio data
"""
if self.loaded:
return self._data.shape
else:
return None
@property
def length(self):
"""Length of audio data in samples.
Returns
-------
int
Audio length
"""
if self.loaded:
if len(self._data.shape) == 1:
return self._data.shape[0]
elif len(self._data.shape) > 1:
return self._data.shape[-1]
else:
return 0
else:
return 0
@property
def duration_samples(self):
"""Duration of audio data in samples.
Returns
-------
int
Audio duration
"""
return self.length
@property
def duration_ms(self):
"""Duration of audio data in milliseconds.
Returns
-------
float
Audio duration
"""
return (self.length / float(self.fs)) * 1000
@property
def duration_sec(self):
"""Duration of audio data in seconds.
Returns
-------
float
Audio duration
"""
return self.length / float(self.fs)
@property
def channels(self):
"""Number of audio channels.
Returns
-------
int
Number of audio channels
"""
if self.loaded:
if len(self.data.shape) == 2:
return self._data.shape[self.channel_axis]
elif len(self.data.shape) == 1:
return 1
else:
return 0
else:
return 0
@property
def streams(self):
"""Rename channels for compatibility.
Returns
-------
int
Number of streams
"""
return self.channels
@property
def empty(self):
"""Check if audio data is empty.
In case audio is not yet loaded it is first loaded into container from disk.
Returns
-------
bool
"""
if self.loaded:
if self.length == 0:
return True
else:
return False
else:
if self.filename and self.exists():
# Audio data is not yet loaded and filename set and file exists, load the data from a file
self.load()
if self.length == 0:
return True
else:
return False
else:
return True
[docs] def load(self, filename=None, fs='native', mono=False, res_type='kaiser_best', start=None, stop=None, auto_trimming=False):
"""Load file
Parameters
----------
filename : str, optional
File path, if None given filename parameter given to class constructor is used.
fs : int or str
Target sampling frequency, if loaded audio does have different sampling frequency, audio will
be re-sampled. If None given, value given to class constructor is used. If 'native' is given then
native sampling frequency defined by audio file is used.
Default value 'native'
mono : bool
Monophonic target, multi-channel audio will be down-mixed.
Default value False
res_type : str
Resample type, defined by Librosa.
Default value 'kaiser_best'
start : float, optional
Segment start time in seconds.
Default value None
stop : float, optional
Segment stop time in seconds.
Default value None
auto_trimming : bool
In case using segment stop parameter, the parameter is adjusted automatically if it exceeds the file duration.
Default value False
Raises
------
IOError:
File does not exists or has unknown file format
Returns
-------
self
"""
if filename is not None:
self.filename = filename
self.detect_file_format()
self.validate_format()
if self.exists():
if fs is None:
# Use sampling frequency defined in class construction.
fs = self.fs
info = get_audio_info(filename=self.filename)
# Check start and stop parameters against file duration
if start is not None and start < 0:
message = '{name}: Start parameter is negative [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
elif info['duration_sec'] and start is not None and start > info['duration_sec']:
message = '{name}: Start parameter exceeds file length [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
if stop is not None and stop < 0:
message = '{name}: Stop parameter is negative [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
elif info['duration_sec'] and stop is not None and stop > info['duration_sec'] and not auto_trimming:
message = '{name}: Stop parameter exceeds file length [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
if self.format == FileFormat.WAV:
self.filetype_info = {
'subtype': info['subtype']['name'],
'subtype_info': info['subtype']['info']
}
# Handle segment start and stop
if start is not None and stop is not None:
start_sample = int(start * info['fs'])
stop_sample = int(stop * info['fs'])
if stop_sample > info['duration_samples']:
stop_sample = info['duration_samples']
else:
start_sample = None
stop_sample = None
self._data, source_fs = soundfile.read(
file=self.filename,
start=start_sample,
stop=stop_sample
)
self._data = self._data.T
# Down-mix audio
if mono and len(self._data.shape) > 1:
self._data = numpy.mean(self._data, axis=self.channel_axis)
if fs == 'native':
# Use native sampling frequency.
self.fs = source_fs
else:
# Target sampling frequency defined, possibly re-sample signal.
if fs != source_fs:
self._data = librosa.core.resample(
self._data,
orig_sr=source_fs,
target_sr=fs,
res_type=res_type
)
# Store sampling frequency
self.fs = fs
elif self.format in [FileFormat.FLAC, FileFormat.OGG,
FileFormat.MP3,
FileFormat.M4A, FileFormat.MP4, FileFormat.WEBM, FileFormat.MKV]:
# Handle segment start and stop
if start is not None and stop is not None:
offset = start
duration = stop - start
elif start is not None:
offset = start
duration = None
else:
offset = 0.0
duration = None
if fs == 'native':
# Use native sampling frequency
sr = None
else:
# Use target sampling frequency
sr = fs
self._data, self.fs = librosa.load(
self.filename,
sr=sr,
mono=mono,
res_type=res_type,
offset=offset,
duration=duration
)
if not auto_trimming and duration is not None and round(duration, 6) != self.duration_sec:
message = '{name}: Check start and stop parameter, requested duration exceeds the file length [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
else:
message = '{name}: Unknown format [{format}]'.format(
name=self.__class__.__name__,
format=self.filename
)
self.logger.exception(message)
raise IOError(message)
else:
message = '{name}: File does not exists [{file}]'.format(
name=self.__class__.__name__,
file=self.filename
)
self.logger.exception(message)
raise IOError(message)
# Check if after load function is defined, call if found
if hasattr(self, '_after_load'):
self._after_load()
# Internal data is synced with the file, until it is edited.
self.data_synced_with_file = True
return self
[docs] def save(self, filename=None, bit_depth=16, bit_rate=None):
"""Save audio
Parameters
----------
filename : str, optional
File path, if None given filename parameter given to class constructor is used.
Default value None
bit_depth : int, optional
Bit depth for audio.
Default value 16
bit_rate : int, optional
Bit rate for compressed audio formats.
Default value None
Raises
------
ImportError:
Error if file format specific module cannot be imported
IOError:
File has unknown file format
Returns
-------
self
"""
if filename:
self.filename = filename
self.detect_file_format()
self.validate_format()
if self.filename is None or self.filename == '':
message = '{name}: Filename is empty [{filename}]'.format(
name=self.__class__.__name__,
filename=self.filename
)
self.logger.exception(message)
raise IOError(message)
# Check if before save function is defined, call if found
if hasattr(self, '_before_save'):
self._before_save()
if self.format == FileFormat.WAV:
if bit_depth == 16:
subtype = 'PCM_16'
elif bit_depth == 24:
subtype = 'PCM_24'
elif bit_depth == 32:
subtype = 'PCM_32'
else:
message = '{name}: Unexpected bit depth [{bitdepth}]'.format(
name=self.__class__.__name__,
bitdepth=bit_depth
)
self.logger.exception(message)
raise IOError(message)
soundfile.write(
file=self.filename,
data=self._data.T,
samplerate=self.fs,
subtype=subtype
)
elif self.format == FileFormat.FLAC:
if bit_depth == 16:
subtype = 'PCM_16'
elif bit_depth == 24:
subtype = 'PCM_24'
elif bit_depth == 32:
subtype = 'PCM_32'
else:
message = '{name}: Unexpected bit depth [{bitdepth}]'.format(
name=self.__class__.__name__,
bitdepth=bit_depth
)
self.logger.exception(message)
raise IOError(message)
soundfile.write(
file=self.filename,
data=self._data.T,
samplerate=self.fs,
format='flac',
subtype=subtype
)
elif self.format == FileFormat.OGG:
soundfile.write(
file=self.filename,
data=self._data.T,
samplerate=self.fs,
format='OGG',
subtype='VORBIS'
)
elif self.format == FileFormat.MP3:
# Notice: Saving with MP3 format results in slightly longer signal than original.
# Difference is due to padding in the compression algorithm, and is usually around 200 - 1000 samples.
import subprocess
import platform
if platform.system() == 'Windows':
ffmpeg_binary = "ffmpeg.exe"
else:
ffmpeg_binary = "ffmpeg"
if bit_rate not in [8, 16, 24, 32, 40, 48, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320]:
message = '{name}: Unsupported bit rate [{bitrate}]'.format(
name=self.__class__.__name__,
bitrate=bit_rate
)
self.logger.exception(message)
raise IOError(message)
command = [
ffmpeg_binary,
'-y', # enable overwrite file
'-f', 's16le', # input format
'-acodec', 'pcm_s16le', # input bit depth
'-r', str(self.fs), # sampling rate
'-ac', str(self.channels), # amount of channels
'-i', '-', # input from pipe
'-vn', # no video input
'-acodec', 'libmp3lame', # output audio codec
'-b:a', "{bitrate:d}k".format(bitrate=bit_rate), # bit rate
self.filename # output filename
]
popen_parameters = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE
}
pipe = subprocess.Popen(
command,
**popen_parameters
)
# Convert signal data from float [-1,1] to signed 16-bit
audio_signal = numpy.asarray(self.data).T
signal_max_value = 2 ** (16 - 1)
audio_signal = (audio_signal * signal_max_value).clip(
-signal_max_value,
signal_max_value - 1
).astype('int16')
try:
try:
pipe.stdin.write(
audio_signal.tobytes()
)
except NameError:
pipe.stdin.write(
audio_signal.tostring()
)
except IOError as error:
pipe_error = pipe.stderr.read()
error = str(error)
error += "\n\nFFMPEG encountered the following error {filename}:".format(filename=self.filename)
error += "\n\n" + str(pipe_error)
raise IOError(error)
pipe.stdin.close()
if pipe.stderr is not None:
pipe.stderr.close()
pipe.wait()
else:
message = '{name}: Unknown format for saving [{format}]'.format(
name=self.__class__.__name__,
format=self.filename
)
self.logger.exception(message)
raise IOError(message)
# Check if after save function is defined, call if found
if hasattr(self, '_after_save'):
self._after_save()
# Internal data is synced with the file, until it is edited.
self.data_synced_with_file = True
return self
[docs] def load_from_youtube(self, query_id, start=None, stop=None, mono=False, silent=True):
"""Load audio data from youtube
Parameters
----------
query_id : str
Youtube query id.
start : float, optional
Segment start time in seconds.
Default value None
stop : float, optional
Segment stop time in seconds.
Default value None
mono : bool
Monophonic target, multi-channel audio will be down-mixed.
Default value False
silent : bool
Switch to show progress bar.
Default value True
Raises
------
IOError:
Youtube video does not exists or cannot be downloaded
Returns
-------
self
"""
if is_jupyter():
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
def progress_hook(t):
"""Wraps tqdm instance. Don't forget to close() or __exit__()
the tqdm instance once you're done with it (easiest using `with` syntax).
"""
def inner(total, recvd, ratio, rate, eta):
t.total = int(total / 1024.0)
t.update(int(recvd / 1024.0))
return inner
try:
import pafy
except ImportError:
message = '{name}: Unable to import pafy module. You can install it with `pip install pafy`.'.format(
name=self.__class__.__name__
)
self.logger().exception(message)
raise ImportError(message)
try:
from youtube_dl.utils import ExtractorError
except ImportError:
message = '{name}: Unable to import youtube_dl module. You can install it with `pip install youtube-dl`.'.format(
name=self.__class__.__name__
)
self.logger().exception(message)
raise ImportError(message)
try:
# Access youtube video and get best quality audio stream
youtube_audio = pafy.new(
url='https://www.youtube.com/watch?v={query_id}'.format(query_id=query_id),
basic=False,
gdata=False,
size=False
).getbestaudio()
# Get temp file
tmp_file = tempfile.NamedTemporaryFile(suffix='.'+youtube_audio.extension)
# Get temporary filename
tmp_filename = tmp_file.name
# Remove temporary file (avoid FileExistsError on Windows)
tmp_file.close()
download_progress_bar = None
if not silent:
# Create download progress bar
download_progress_bar = tqdm(
desc="{0: <25s}".format('Download youtube item '),
file=sys.stdout,
unit='B',
unit_scale=True,
leave=False,
disable=self.disable_progress_bar,
ascii=self.use_ascii_progress_bar
)
callback = progress_hook(download_progress_bar)
else:
callback = None
# Download audio
youtube_audio.download(
filepath=tmp_filename,
quiet=True,
callback=callback
)
if not silent:
# Close progress bar
download_progress_bar.close()
# Create audio processing progress bar
audio_processing_progress_bar = tqdm(
desc="{0: <25s}".format('Processing '),
initial=0,
total=4,
file=sys.stdout,
leave=False,
disable=self.disable_progress_bar,
ascii=self.use_ascii_progress_bar
)
# Store current filename
filename = self.filename
# Load audio segment
self.load(
filename=tmp_filename,
mono=mono,
fs=self.fs,
res_type='kaiser_best',
start=float(start) if start is not None else None,
stop=float(stop) if stop is not None else None
)
# Restore filename
if filename:
self.filename = filename
self.detect_file_format()
if not silent:
audio_processing_progress_bar.update(1)
audio_processing_progress_bar.update(3)
audio_processing_progress_bar.close()
except (IOError, BadStatusLine, ExtractorError) as e:
# Store files with errors
raise IOError(e.message)
except (KeyboardInterrupt, SystemExit):
# Remove temporal file and current audio file.
os.remove(self.filename)
raise
return self
[docs] def normalize(self, headroom=0.005):
"""Normalize audio data.
Data is normalized between -(1.0 - headroom) and +(1.0 - headroom)
Parameters
----------
headroom : float
How much headroom there should be left under 1.0.
Default value 0.005
Returns
-------
self
"""
if self.channels > 1:
for channel_data in self._data:
mean_value = numpy.mean(channel_data)
channel_data -= mean_value
max_value = max(abs(channel_data)) + headroom
channel_data /= max_value
else:
mean_value = numpy.mean(self._data)
self._data -= mean_value
if len(self._data.shape) == 2:
max_value = max(abs(self._data[self.channel_axis, :])) + headroom
else:
max_value = max(abs(self._data)) + headroom
self._data /= max_value
return self
[docs] def resample(self, target_fs, scale=True, res_type='kaiser_best'):
"""Resample audio data.
Parameters
----------
target_fs : int
Target sampling rate
scale : bool
Scale the resampled signal to have approximately equal total energy (see `librosa.core.resample`).
Default value True
res_type : str
Resample type (see `librosa.core.resample`)
Default value 'kaiser_best'
Returns
-------
self
"""
if target_fs != self.fs:
self._data = numpy.asfortranarray(self._data)
self._data = librosa.resample(
y=self._data,
orig_sr=self.fs,
target_sr=target_fs,
scale=scale,
res_type=res_type
)
self.fs = target_fs
return self
[docs] def mixdown(self):
"""Mix all audio channels into single channel.
Returns
-------
self
"""
self.reset_focus()
self.set_focus(channel='mixdown')
self.freeze()
return self
[docs] def reset_focus(self):
"""Reset focus segment.
Returns
-------
self
"""
self._focus_start = None
self._focus_stop = None
self._focus_channel = None
return self
[docs] def set_focus(self,
start=None, stop=None, duration=None,
start_seconds=None, stop_seconds=None, duration_seconds=None,
channel=None):
"""Set focus segment
Parameters
----------
start : int
Sample index of focus segment start.
Default value None
stop : int
Sample index of focus segment stop.
Default value None
duration : int
Sample count of focus segment.
Default value None
start_seconds : float
Time stamp (in seconds) of focus segment start.
Default value None
stop_seconds : float
Time stamp (in seconds) of focus segment stop.
Default value None
duration_seconds : float
Duration (in seconds) of focus segment.
Default value None
channel : int or str
Audio channel id or name to focus. In case of stereo signal, valid channel labels to select
single channel are 'L', 'R', 'left', and 'right' or 0, 1, and to get mixed down
version of all channels 'mixdown'.
Default value None
Returns
-------
self
"""
if start is not None or stop is not None or duration is not None:
# Sample based setting
if start is not None and stop is not None:
self.reset_focus()
self.focus_start_samples = start
self.focus_stop_samples = stop
elif start is not None and duration is not None:
self.reset_focus()
self.focus_start_samples = start
self.focus_stop_samples = start + duration
elif start_seconds is not None or stop_seconds is not None or duration_seconds is not None:
# Time based setting
if start_seconds is not None and stop_seconds is not None:
self.reset_focus()
self.focus_start_samples = self._time_to_sample(time=start_seconds)
self.focus_stop_samples = self._time_to_sample(time=stop_seconds)
elif start_seconds is not None and duration_seconds is not None:
self.reset_focus()
self.focus_start_samples = self._time_to_sample(time=start_seconds)
self.focus_stop_samples = self._time_to_sample(time=start_seconds + duration_seconds)
else:
# Reset
self._focus_start = None
self._focus_stop = None
self.focus_channel = channel
return self
[docs] def get_focused(self):
"""Get focus segment from audio data.
Returns
-------
numpy.ndarray
"""
focused_data = None
if self.focus_start_samples is not None or self.focus_stop_samples is not None:
if self.focus_start_samples is not None:
focus_start_samples = self.focus_start_samples
else:
focus_start_samples = 0
if self.focus_stop_samples is not None:
focus_stop_samples = self.focus_stop_samples
else:
focus_stop_samples = self.length
if self.channels == 1:
# We have single channel
focused_data = self._data[focus_start_samples:focus_stop_samples]
elif self.channels > 1:
# We have multichannel audio
focused_data = []
for channel_data in self._data:
focused_data.append(channel_data[focus_start_samples:focus_stop_samples])
focused_data = numpy.vstack(focused_data)
else:
focused_data = self._data
if self.focus_channel is not None and is_int(self.focus_channel) and 0 <= self.focus_channel < self.channels:
return focused_data[self.focus_channel, :]
elif self.focus_channel == 'mixdown' and self.channels > 1:
return numpy.mean(focused_data, axis=self.channel_axis)
else:
return focused_data
[docs] def freeze(self):
"""Freeze focus segment, copy segment to be container's data.
Returns
-------
self
"""
self._data = self.get_focused()
self.reset_focus()
return self
[docs] def frames(self,
frame_length=None, hop_length=None,
frame_length_seconds=None, hop_length_seconds=None):
"""Slice audio into overlapping frames.
Parameters
----------
frame_length : int, optional
Frame length in samples. Set either frame_length or frame_length_seconds.
Default value None
hop_length : int, optional
Frame hop length in samples. Set either hop_length or hop_length_seconds.
Default value None
frame_length_seconds : float, optional
Frame length in seconds, converted into samples based on sampling rate.
Default value None
hop_length_seconds: float, optional
Frame hop length in seconds, converted into samples based on sampling rate.
Default value None
Raises
------
ValueError:
No frame_length and no frame_length_seconds given.
No hop_length and no hop_length_seconds given.
Returns
-------
numpy.ndarray
"""
if not frame_length and frame_length_seconds:
frame_length = int(self.fs * frame_length_seconds)
if not hop_length and hop_length_seconds:
hop_length = int(self.fs * hop_length_seconds)
if not frame_length:
message = '{name}: Specify frame_length parameter for frame splitting.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if not hop_length:
message = '{name}: Specify hop_length parameter for frame splitting.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if self.channels == 1:
return librosa.util.frame(
x=self.get_focused(),
frame_length=frame_length,
hop_length=hop_length
)
else:
data = []
for channel_id, channel_data in enumerate(self.get_focused()):
data.append(
librosa.util.frame(
x=channel_data,
frame_length=frame_length,
hop_length=hop_length
)
)
return numpy.array(data)
def segments(self,
segment_length=None, segment_length_seconds=None,
segments=None,
active_segments=None,
skip_segments=None):
"""Slice audio into segments.
Parameters
----------
segment_length : int, optional
Segment length in samples. Set either segment_length or segment_length_seconds. Used to produce
consecutive non-overlapping segments.
Default value None
segment_length_seconds : float, optional
Segment length in seconds, converted into samples based on sampling rate. Used to produce consecutive
non-overlapping segments.
Set either segment_length or segment_length_seconds.
Default value None
segments : list of dict or MetaDataContainer, optional
List of time segments (onset and offset). If none given, segment length is used to produce consecutive
non-overlapping segments.
Default value None
active_segments : list of dict or MetaDataContainer, optional
List of time segments (onset and offset) to be used when creating segments.
Only used when segment_length or segment_length_seconds are given and segments are generated
within this method.
Default value None
skip_segments : list of dict or MetaDataContainer, optional
List of time segments (onset and offset) to be skipped when creating segments.
Only used when segment_length or segment_length_seconds are given and segments are generated
within this method.
Default value None
Raises
------
ValueError:
No segments and no segment_length given.
Returns
-------
list, MetaDataContainer
"""
from dcase_util.containers import MetaDataContainer
if not segment_length and segment_length_seconds:
# Get segment_length from segment_length_seconds
segment_length = int(self.fs * segment_length_seconds)
if segments is None and segment_length is not None:
if skip_segments is not None:
# Make sure skip segments is MetaDataContainer
skip_segments = MetaDataContainer(skip_segments)
if active_segments is not None:
# Make sure active segments is MetaDataContainer
active_segments = MetaDataContainer(active_segments)
segments = MetaDataContainer()
for active_seg in active_segments:
segment_start = int(self.fs * active_seg.onset)
while segment_start + segment_length < int(self.fs * active_seg.offset):
# Segment stop
segment_stop = segment_start + segment_length
if skip_segments is not None:
# Go through skip segments and adjust segment start and stop to avoid segments
for item in skip_segments:
if item.active_within_segment(
start=segment_start / float(self.fs),
stop=segment_stop / float(self.fs)
):
# Adjust segment start to avoid current skip segment
segment_start = int(self.fs * item.offset)
# Adjust segment stop accordingly
segment_stop = segment_start + segment_length
if segment_stop < self.length:
# Valid segment found, store it
segments.append(
{
'onset': segment_start / float(self.fs),
'offset': segment_stop / float(self.fs),
}
)
# Set next segment start
segment_start = segment_stop
# Stop loop if segment_start is out of signal
if segment_start > self.length:
break
else:
# No segments given, get segments based on segment_length
segment_start = 0
segments = MetaDataContainer()
while True:
# Segment stop
segment_stop = segment_start + segment_length
if skip_segments is not None:
# Go through skip segments and adjust segment start and stop to avoid segments
for item in skip_segments:
if item.active_within_segment(
start=segment_start/float(self.fs),
stop=segment_stop/float(self.fs)
):
# Adjust segment start to avoid current skip segment
segment_start = int(self.fs * item.offset)
# Adjust segment stop accordingly
segment_stop = segment_start + segment_length
if segment_stop < self.length:
# Valid segment found, store it
segments.append(
{
'onset': segment_start/float(self.fs),
'offset': segment_stop/float(self.fs),
}
)
# Set next segment start
segment_start = segment_stop
# Stop loop if segment_start is out of signal
if segment_start > self.length:
break
elif segments is not None:
# Make sure segments is MetadataContainer
segments = MetaDataContainer(segments)
else:
message = '{name}: Specify segments parameter or segment_length for segment creation.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
# Get audio segments
data = []
for segment in segments:
segment_start_samples = int(self.fs * segment.onset)
segment_stop_samples = int(self.fs * segment.offset)
if self.channels > 1:
data.append(
self._data[:, segment_start_samples:segment_stop_samples]
)
else:
data.append(
self._data[segment_start_samples:segment_stop_samples]
)
return data, segments
def pad(self, type='silence', length=None, length_seconds=None):
"""Generate signal
Parameters
----------
type : str
Default value 'silence'
length : int, optional
Default value None
length_seconds : float, optional
Default value None
Returns
-------
list, MetaDataContainer
"""
if not length and length_seconds is not None:
# Get length from length_seconds
length = int(self.fs * length_seconds)
if self.length < length:
if type == 'silence':
if len(self.data.shape) == 1:
self._data = numpy.pad(
array=self._data,
pad_width=(0, length-self.length),
mode='constant'
)
else:
self._data = numpy.pad(
array=self._data,
pad_width=((0, 0), (0, length-self.length)),
mode='constant'
)
return self
[docs] def plot(self, plot_type='wave', **kwargs):
"""Visualize audio data
Parameters
----------
plot_type : str
Visualization type, 'wave' for waveform plot, 'spec' for spectrogram, 'dual' for showing both at the same time.
Default value 'wave'
Returns
-------
self
"""
if plot_type == 'wave':
self.plot_wave(**kwargs)
elif plot_type == 'spec':
self.plot_spec(**kwargs)
elif plot_type == 'dual':
if kwargs.get('figsize') is None:
figsize = (10, 8)
else:
figsize = kwargs.get('figsize')
if self.channels == 1:
import matplotlib.pyplot as plt
plt.figure(figsize=figsize)
plt.subplot(2, 1, 1)
self.plot_wave(
x_axis=kwargs.get('x_axis', 'time'),
max_points=kwargs.get('max_points', 50000.0),
max_sr=kwargs.get('max_sr', 1000),
offset=kwargs.get('offset', 0.0),
color=kwargs.get('color', '#333333'),
alpha=kwargs.get('alpha', 1.0),
show_filename=kwargs.get('show_filename', True),
show_xaxis=False,
plot=False,
figsize=kwargs.get('figsize', None),
channel_labels=kwargs.get('channel_labels', None)
)
plt.subplot(2, 1, 2)
self.plot_spec(
spec_type=kwargs.get('spec_type', 'log'),
hop_length=kwargs.get('hop_length', 512),
cmap=kwargs.get('cmap', 'magma'),
show_filename=False,
show_xaxis=kwargs.get('show_xaxis', True),
show_colorbar=False,
plot=False,
figsize=kwargs.get('figsize', None),
channel_labels=kwargs.get('channel_labels', None)
)
plt.show()
else:
# TODO dual plotting for multichannel audio.
message = '{name}: Dual plotting of multi-channel audio is not yet implemented.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise NotImplementedError(message)
[docs] def plot_wave(self, x_axis='time', max_points=50000.0, max_sr=1000, offset=0.0, color='#333333', alpha=1.0,
show_filename=True, show_xaxis=True, plot=True, figsize=None, channel_labels=None):
"""Visualize audio data as waveform.
Parameters
----------
x_axis : str
X-axis type.
Default value 'time'
max_points : float
Maximum number of time-points to plot (see `librosa.display.waveplot`).
Default value 50000
max_sr : number
Maximum sampling rate for the visualization
Default value 1000
offset : float
Horizontal offset (in time) to start the waveform plot (see `librosa.display.waveplot`).
Default value 0.0
color : str or list of str
Waveform fill color in hex-code. Per channel colors can be given as list of str.
Default value '#333333'
alpha : float
Alpha of the waveform fill color.
Default value 1.0
show_filename : bool
Show filename as figure title.
Default value True
show_xaxis : bool
Show X-axis.
Default value True
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
channel_labels : list
Channel names
Default value None
Returns
-------
self
"""
if channel_labels is None:
channel_labels = self.channel_labels
if figsize is None:
figsize = (10, 5)
import matplotlib.pyplot as plt
from librosa.display import waveplot
if plot:
plt.figure(figsize=figsize)
title = Path(self.filename).shorten()
if self.channels > 1 and len(self.get_focused().shape) > 1:
# Plotting for multi-channel audio
for channel_id, channel_data in enumerate(self.get_focused()):
ax = plt.subplot(self.channels, 1, channel_id + 1)
if channel_id + 1 != self.channels:
current_x_axis = None
else:
current_x_axis = x_axis
if isinstance(color, list) and channel_id < len(color):
current_color = color[channel_id]
else:
current_color = color
waveplot(
y=channel_data.ravel(),
sr=self.fs,
x_axis=current_x_axis,
max_points=max_points,
max_sr=max_sr,
offset=offset,
color=current_color,
alpha=alpha
)
if isinstance(channel_labels, list) and channel_id < len(channel_labels):
plt.ylabel('{channel_label} / Ch{channel:d}'.format(
channel_label=channel_labels[channel_id],
channel=channel_id)
)
else:
plt.ylabel('Channel {channel:d}'.format(channel=channel_id))
if channel_id == 0 and show_filename:
if self.filename:
plt.title(title)
if channel_id+1 != self.channels or not show_xaxis:
ax.axes.get_xaxis().set_visible(False)
else:
# Plotting for single channel audio
if isinstance(color, list) and len(color):
current_color = color[0]
else:
current_color = color
ax = waveplot(
y=self.get_focused().ravel(),
sr=self.fs,
x_axis=x_axis,
max_points=max_points,
max_sr=max_sr,
offset=offset,
color=current_color,
alpha=alpha
)
if isinstance(channel_labels, list) and len(channel_labels):
plt.ylabel('{channel_label}'.format(channel_label=channel_labels[0]))
else:
plt.ylabel('Channel {channel:d}'.format(channel=0))
if self.filename and show_filename:
plt.title(title)
if not show_xaxis:
ax.axes.get_xaxis().set_visible(False)
if plot:
plt.show()
[docs] def plot_spec(self, spec_type='log', hop_length=512, cmap='magma',
show_filename=True, show_xaxis=True, show_colorbar=False, plot=True, figsize=None, channel_labels=None):
"""Visualize audio data as spectrogram.
Parameters
----------
spec_type : str
Spectrogram type, use 'linear', 'log', 'cqt', 'cqt_hz', and 'cqt_note'.
Default value 'log'
hop_length : float
Hop length, also used to determine time scale in x-axis (see `librosa.display.specshow`).
Default value 512
cmap : float
Color map (see `librosa.display.specshow`).
Default value 'magma'
show_filename : bool
Show filename as figure title.
Default value True
show_xaxis : bool
Show X-axis.
Default value True
show_colorbar : bool
Show color bar next to plot.
Default value False
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same
figure outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
channel_labels : list
Channel names
Default value None
Returns
-------
self
"""
if channel_labels is None:
channel_labels = self.channel_labels
if figsize is None:
figsize = (10, 5)
from librosa.display import specshow
import matplotlib.pyplot as plt
if plot:
plt.figure(figsize=figsize)
title = Path(self.filename).shorten()
if self.channels > 1:
for channel_id, channel_data in enumerate(self.get_focused()):
ax = plt.subplot(self.channels, 1, channel_id+1)
if spec_type in ['linear', 'log']:
D = librosa.core.amplitude_to_db(numpy.abs(librosa.stft(channel_data.ravel())) ** 2, ref=numpy.max)
elif spec_type.startswith('cqt'):
D = librosa.core.amplitude_to_db(librosa.cqt(channel_data.ravel(), sr=self.fs), ref=numpy.max)
else:
message = '{name}: Unknown spec_type given for plot_spec'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if spec_type == 'linear':
specshow(
data=D,
sr=self.fs,
y_axis='linear',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'log':
specshow(
data=D,
sr=self.fs,
y_axis='log',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'cqt_hz' or 'cqt':
specshow(
data=D,
sr=self.fs,
y_axis='cqt_hz',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'cqt_note':
specshow(
data=D,
sr=self.fs,
y_axis='cqt_note',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
if show_colorbar:
plt.colorbar(format='%+2.0f dB')
if isinstance(channel_labels, list) and channel_id < len(channel_labels):
plt.ylabel('{channel_label} / Ch{channel:d}'.format(
channel_label=channel_labels[channel_id],
channel=channel_id)
)
else:
plt.ylabel('Channel {channel:d}'.format(channel=channel_id))
if channel_id == 0 and self.filename:
plt.title(title)
if channel_id+1 != self.channels or not show_xaxis:
ax.axes.get_xaxis().set_visible(False)
else:
channel_id = 0
if spec_type in ['linear', 'log']:
D = librosa.core.amplitude_to_db(
numpy.abs(librosa.stft(self.get_focused().ravel())) ** 2,
ref=numpy.max
)
elif spec_type.startswith('cqt'):
D = librosa.core.amplitude_to_db(
librosa.cqt(self.get_focused().ravel(), sr=self.fs),
ref=numpy.max
)
else:
message = '{name}: Unknown spec_type given'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if spec_type == 'linear':
ax = specshow(
data=D,
sr=self.fs,
y_axis='linear',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'log':
ax = specshow(
data=D,
sr=self.fs,
y_axis='log',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'cqt_hz' or 'cqt':
ax = specshow(
data=D,
sr=self.fs,
y_axis='cqt_hz',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
elif spec_type == 'cqt_note':
ax = specshow(
data=D,
sr=self.fs,
y_axis='cqt_note',
x_axis='time',
hop_length=hop_length,
cmap=cmap
)
if show_colorbar:
plt.colorbar(format='%+2.0f dB')
if isinstance(channel_labels, list) and len(channel_labels):
plt.ylabel('{channel_label}'.format(
channel_label=channel_labels[0])
)
else:
plt.ylabel('Channel {channel:d}'.format(channel=0))
if not show_xaxis:
ax.axes.get_xaxis().set_visible(False)
if show_filename and channel_id == 0:
plt.title(title)
if plot:
plt.show()
def _time_to_sample(self, time):
"""Time to sample index.
Parameters
----------
time : float
Time stamp in seconds.
Returns
-------
int
"""
return int(time * self.fs)
def _sample_to_time(self, sample):
"""Sample index to time.
Parameters
----------
sample : int
Sample index.
Returns
-------
float
"""
return sample / float(self.fs)
[docs] def overlay(self, audio, start_seconds=0, multiplier=0):
"""Simple sample overlay method
Parameters
----------
audio : AudioContainer
Audio to be mixed
start_seconds : float
Time stamp (in seconds) of segment start.
Default value 0
multiplier : float
Audio data multiplier
Default value 0
Returns
-------
self
"""
start_samples = int(start_seconds * self.fs)
audio_data = audio.get_focused()
segment_length = len(audio_data)
self._data[start_samples:start_samples+segment_length] += audio_data * multiplier
return self