#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
from six import iteritems
import numpy
import copy
import os
import glob
from past.builtins import basestring
from dcase_util.containers import ObjectContainer, RepositoryContainer, OneToOneMappingContainer
from dcase_util.ui import FancyStringifier
from dcase_util.utils import FileFormat, filelist_exists
[docs]class DataContainer(ObjectContainer):
"""Container class for data, inherited from ObjectContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, stats=None, metadata=None, time_resolution=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
Default value None
data : numpy.ndarray, optional
Data to initialize the container
Default value None
stats : dict, optional
Statistics of the data
Default value None
metadata : dict or MetadataContainer, optional
MetadataContainer
Default value None
time_resolution : float, optional
Time resolution
Default value None
processing_chain : ProcessingChain, optional
Processing chain.
Default value None
"""
# Run ObjectContainer init
ObjectContainer.__init__(self, **kwargs)
# Run super init
super(DataContainer, self).__init__(**kwargs)
# Data
self._data = None
if data is None:
data = numpy.ndarray((0, ))
self.data = data
# Stats
if stats is None:
stats = []
self._stats = stats
# Metadata
if metadata is None:
metadata = {}
self.metadata = metadata
# Matrix axis
self.time_axis = 0
# Timing
self.time_resolution = time_resolution
# Processing chain
from dcase_util.processors import ProcessingChain
if processing_chain is None:
processing_chain = ProcessingChain()
# Convert list to ProcessingChain
if isinstance(processing_chain, list):
processing_chain = ProcessingChain(processing_chain)
else:
message = '{name}: Wrong type of processing_chain given to class initializer.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
self.processing_chain = processing_chain
# Focus
self._focus_start = None
self._focus_stop = None
def __getstate__(self):
d = super(DataContainer, self).__getstate__()
d.update({
'_data': self._data,
'time_axis': self.time_axis,
'time_resolution': self.time_resolution,
'metadata': self.metadata,
'processing_chain': self.processing_chain,
'_stats': self._stats,
'_focus_start': self._focus_start,
'_focus_stop': self._focus_stop
})
return d
def __setstate__(self, d):
super(DataContainer, self).__setstate__(d)
self._data = d['_data']
self.time_axis = d['time_axis']
self.time_resolution = d['time_resolution']
self.metadata = d['metadata']
self.processing_chain = d['processing_chain']
self._stats = d['_stats']
self._focus_start = None
self._focus_stop = None
self.focus_start = d['_focus_start']
self.focus_stop = d['_focus_stop']
def __add__(self, other):
new = copy.deepcopy(self)
if isinstance(other, DataContainer):
new.data += other.data
elif isinstance(other, numpy.ndarray):
new.data += other
return new
def __iadd__(self, other):
if isinstance(other, DataContainer):
self.data += other.data
elif isinstance(other, numpy.ndarray):
self.data += other
return self
def __sub__(self, other):
new = copy.deepcopy(self)
if isinstance(other, DataContainer):
new.data -= other.data
elif isinstance(other, numpy.ndarray):
new.data -= other
return new
def __isub__(self, other):
if isinstance(other, DataContainer):
self.data -= other.data
elif isinstance(other, numpy.ndarray):
self.data -= other
return self
@property
def data(self):
"""Data matrix
Returns
-------
numpy.ndarray
Data matrix
"""
return self._data
@data.setter
def data(self, value):
self._data = value
# Reset stats
self._stats = None
@property
def shape(self):
"""Shape of data matrix
Returns
-------
tuple
"""
if isinstance(self.data, numpy.ndarray):
return self.data.shape
else:
return None
@property
def length(self):
"""Number of data columns
Returns
-------
int
"""
if isinstance(self.data, numpy.ndarray):
return self.data.shape[self.time_axis]
else:
return 0
@property
def frames(self):
"""Number of data frames
Returns
-------
int
"""
return self.length
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(DataContainer, self).to_string(ui=ui, indent=indent)
output += ui.line(field='Data') + '\n'
output += ui.data(
indent=indent + 4,
field='data',
value=self.data
) + '\n'
output += ui.line(
indent=indent + 4,
field='Dimensions'
) + '\n'
output += ui.data(
indent=indent + 6,
field='time_axis',
value=self.time_axis
) + '\n'
output += ui.line(
indent=indent + 4,
field='Timing information'
) + '\n'
output += ui.data(
indent=indent + 6,
field='time_resolution',
value=self.time_resolution,
unit="sec"
) + '\n'
output += ui.line(field='Meta') + '\n'
output += ui.data(
indent=indent + 4,
field='stats',
value='Calculated' if self._stats is not None else '-'
) + '\n'
output += ui.data(
indent=indent + 4,
field='metadata',
value=self.metadata if self.metadata else '-'
) + '\n'
output += ui.data(
indent=indent + 4,
field='processing_chain',
value=self.processing_chain if self.processing_chain else '-'
) + '\n'
output += ui.line(field='Duration') + '\n'
output += ui.data(
indent=indent + 4,
field='Frames',
value=self.length
) + '\n'
if self.time_resolution:
output += ui.data(
indent=indent + 4,
field='Seconds',
value=self._frame_to_time(frame_id=self.length),
unit='sec'
) + '\n'
if self._focus_start is not None and self._focus_stop is not None:
output += ui.line(field='Focus segment', indent=indent) + '\n'
output += ui.line(
indent=indent + 4,
field='Duration'
) + '\n'
output += ui.data(
indent=indent + 6,
field='Index',
value=self._focus_stop - self._focus_start
) + '\n'
if self.time_resolution:
output += ui.data(
indent=6, field='Seconds',
value=self._frame_to_time(frame_id=self._focus_stop - self._focus_start),
unit='sec'
) + '\n'
output += ui.line(
indent=indent + 4,
field='Start'
) + '\n'
output += ui.data(
indent=indent + 6,
field='Index',
value=self._focus_start
) + '\n'
if self.time_resolution:
output += ui.data(
indent=indent + 6,
field='Seconds',
value=self._frame_to_time(frame_id=self._focus_start),
unit='sec'
) + '\n'
output += ui.line(indent=indent + 4, field='Stop') + '\n'
output += ui.data(
indent=indent + 6,
field='Index',
value=self._focus_stop
) + '\n'
if self.time_resolution:
output += ui.data(
indent=indent + 6,
field='Seconds',
value=self._frame_to_time(frame_id=self._focus_stop),
unit='sec'
) + '\n'
return output
def __nonzero__(self):
return self.length > 0
def __len__(self):
return self.length
[docs] def push_processing_chain_item(self, processor_name, init_parameters=None, process_parameters=None,
preprocessing_callbacks=None,
input_type=None, output_type=None):
"""Push processing chain item
Parameters
----------
processor_name : str
Processor name
init_parameters : dict, optional
Initialization parameters for the processors
Default value None
process_parameters : dict, optional
Parameters for the process method of the Processor
Default value None
preprocessing_callbacks : list of dicts
Callbacks used for preprocessing
Default value None
input_type : ProcessingChainItemType
Input data type
Default value None
output_type : ProcessingChainItemType
Output data type
Default value None
Returns
-------
self
"""
self.processing_chain.push_processor(
processor_name=processor_name,
init_parameters=init_parameters,
process_parameters=process_parameters,
preprocessing_callbacks=preprocessing_callbacks,
input_type=input_type,
output_type=output_type,
)
return self
@property
def focus_start(self):
"""Focus segment start
Returns
-------
int
"""
return self._focus_start
@focus_start.setter
def focus_start(self, value):
if value is not None and value >= 0:
self._focus_start = value
if self._focus_stop is not None and self._focus_stop < self._focus_start:
# focus points are reversed
start = self._focus_start
self._focus_start = self._focus_stop
self._focus_stop = start
else:
# Keep focus start at zero
self._focus_start = 0
@property
def focus_stop(self):
"""Focus segment stop
Returns
-------
int
"""
return self._focus_stop
@focus_stop.setter
def focus_stop(self, value):
if value is not None and value < self.length:
self._focus_stop = value
if self._focus_start is not None and self._focus_stop < self._focus_start:
# focus points are reversed
start = self._focus_start
self._focus_start = self._focus_stop
self._focus_stop = start
else:
# Keep focus stop at the end
self._focus_stop = self.length
@property
def stats(self):
"""Basic statistics of data matrix.
Returns
-------
dict
"""
if not self.empty():
if not self._stats:
self._stats = self._calculate_stats()
return self._stats
def _calculate_stats(self):
"""Calculate basic statistics of data matrix.
Returns
-------
dict
"""
return {
'mean': numpy.mean(self.data, axis=self.time_axis),
'std': numpy.std(self.data, axis=self.time_axis),
'n': self.data.shape[self.time_axis],
's1': numpy.sum(self.data, axis=self.time_axis),
's2': numpy.sum(self.data ** 2, axis=self.time_axis),
}
def _time_to_frame(self, time, rounding_direction=None):
"""Time to frame index based on time resolution of the data matrix.
Parameters
----------
time : float
Time stamp in seconds
rounding_direction : str, optional
Rounding direction, one of ['ceil', 'floor', None]
Default value None
Returns
-------
int
"""
if rounding_direction is None:
frame = int(time / float(self.time_resolution))
elif rounding_direction == 'ceil':
frame = int(numpy.ceil(time / float(self.time_resolution)))
elif rounding_direction == 'floor':
frame = int(numpy.floor(time / float(self.time_resolution)))
else:
frame = int(time / float(self.time_resolution))
# Handle negative index and index outside matrix
if frame < 0:
frame = 0
elif frame > self.length:
frame = self.length
return frame
def _frame_to_time(self, frame_id):
"""Frame index to time based on time resolution of the data matrix.
Parameters
----------
frame_id : int
Frame index
Returns
-------
float
"""
return frame_id * self.time_resolution
def _length_to_frames(self, time):
return int(numpy.ceil(time * 1.0 / self.time_resolution))
def _onset_to_frames(self, onset):
return int(numpy.floor(onset * 1.0 / self.time_resolution))
def _offset_to_frames(self, offset):
return int(numpy.ceil(offset * 1.0 / self.time_resolution))
def set_focus(self,
start=None, stop=None, duration=None,
start_seconds=None, stop_seconds=None, duration_seconds=None):
"""Set focus segment
Parameters
----------
start : int, optional
Frame index of focus segment start.
Default value None
stop : int, optional
Frame index of focus segment stop.
Default value None
duration : int, optional
Frame count of focus segment.
Default value None
start_seconds : float, optional
Time stamp (in seconds) of focus segment start, will be converted to frame index based on
time resolution of the data matrix.
Default value None
stop_seconds : float, optional
Time stamp (in seconds) of focus segment stop, will be converted to frame index based on
time resolution of the data matrix.
Default value None
duration_seconds : float, optional
Duration (in seconds) of focus segment, will be converted to frame index based on
time resolution of the data matrix.
Default value None
Returns
-------
self
"""
if start is not None and stop is not None:
# Set focus based start and stop given in index.
self.focus_start = start
self.focus_stop = stop
elif start is not None and duration is not None:
# Set focus based start and duration given in index.
self.focus_start = start
self.focus_stop = start + duration
elif start_seconds is not None and stop_seconds is not None:
# Set focus based on start and stop given in seconds
self.focus_start = self._time_to_frame(time=start_seconds)
self.focus_stop = self._time_to_frame(time=stop_seconds)
elif start_seconds is not None and duration_seconds is not None:
# Set focus based on start and duration given in seconds
self.focus_start = self._time_to_frame(time=start_seconds)
self.focus_stop = self._time_to_frame(time=start_seconds + duration_seconds)
else:
# Focus segment not set, reset segment
self._focus_start = None
self._focus_stop = None
return self
[docs] def reset_focus(self):
"""Reset focus segment
Returns
-------
self
"""
self.set_focus()
return self
[docs] def get_focused(self):
"""Get focus segment from data array.
Returns
-------
numpy.array
"""
if self.focus_start is not None and self.focus_stop is not None:
# Focus is set
return self.data[self.focus_start:self.focus_stop]
else:
# Return all features
return self.data
[docs] def freeze(self):
"""Freeze focus segment, copy segment to be container's data.
Returns
-------
self
"""
self._data = self.get_focused()
self.reset_focus()
return self
[docs] def get_frames(self, frame_ids=None, frame_hop=1, **kwargs):
"""Get frames from data array.
Parameters
----------
frame_ids : list of int, optional
Frame ids of frames to be included.
Default value None
frame_hop : int, optional
Frame hopping factor, with one every frame is included.
Default value 1
Returns
-------
numpy.array
"""
data = self.data
# Apply frame_ids
if frame_ids is not None:
data = data[frame_ids]
return data[::frame_hop]
[docs] def plot(self, plot=True, figsize=None):
"""Visualize data array.
Parameters
----------
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
Returns
-------
self
"""
if figsize is None:
figsize = (10, 5)
import matplotlib.pyplot as plt
from librosa.core import frames_to_time
from librosa.display import TimeFormatter
if plot:
plt.figure(figsize=figsize)
# Plot feature matrix
if self.time_resolution:
sr = int(1.0 / float(self.time_resolution))
x_axis = 'time'
else:
sr = 1.0
x_axis = None
y = self.get_focused()[0]
locs = frames_to_time(frames=numpy.arange(len(y)), sr=sr, hop_length=1)
plt.plot(locs, y)
axes = plt.gca()
axes.set_xlim([locs.min(), locs.max()])
if x_axis == 'time':
axes.xaxis.set_major_formatter(TimeFormatter(lag=False))
axes.xaxis.set_label_text('Time')
elif x_axis is None or x_axis in ['off', 'none']:
axes.set_xticks([])
# Add filename to first subplot
if self.filename:
plt.title(self.filename)
if plot:
plt.tight_layout()
plt.show()
[docs]class DataArrayContainer(DataContainer):
"""Array data container class, inherited from DataContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, stats=None, metadata=None, time_resolution=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
Default value None
data : numpy.ndarray, optional
Data to initialize the container
Default value None
stats : dict, optional
Statistics of the data
Default value None
metadata : dict or MetadataContainer, optional
MetadataContainer
Default value None
time_resolution : float, optional
Time resolution
Default value None
processing_chain : ProcessingChain, optional
Processing chain.
Default value None
"""
kwargs.update({
'data': data,
'stats': stats,
'metadata': metadata,
'time_resolution': time_resolution,
'processing_chain': processing_chain
})
# Run DataContainer init
DataContainer.__init__(self, **kwargs)
# Run super init
super(DataArrayContainer, self).__init__(**kwargs)
[docs]class DataMatrix2DContainer(DataContainer):
"""Two-dimensional data matrix container class, inherited from DataContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, stats=None, metadata=None, time_resolution=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
Default value None
data : numpy.ndarray, optional
Data to initialize the container
Default value None
stats : dict, optional
Statistics of the data
Default value None
metadata : dict or MetadataContainer, optional
MetadataContainer
Default value None
time_resolution : float, optional
Time resolution
Default value None
processing_chain : ProcessingChain, optional
Processing chain.
Default value None
"""
if data is None:
# Initialize with 2D-matrix
data = numpy.ndarray((0, 0))
kwargs.update({
'data': data,
'stats': stats,
'metadata': metadata,
'time_resolution': time_resolution,
'processing_chain': processing_chain
})
# Run DataContainer init
DataContainer.__init__(self, **kwargs)
# Run super init
super(DataMatrix2DContainer, self).__init__(**kwargs)
# Matrix axis
self.data_axis = 0
self.time_axis = 1
def __getstate__(self):
d = super(DataMatrix2DContainer, self).__getstate__()
d.update({
'data_axis': self.data_axis,
'time_axis': self.time_axis,
})
return d
def __setstate__(self, d):
super(DataMatrix2DContainer, self).__setstate__(d)
self.data_axis = d['data_axis']
self.time_axis = d['time_axis']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(DataMatrix2DContainer, self).to_string(ui=ui, indent=indent)
output += ui.line(field='Data', indent=indent) + '\n'
output += ui.line(indent=indent + 2, field='Dimensions') + '\n'
output += ui.data(indent=indent + 4, field='time_axis', value=self.time_axis) + '\n'
output += ui.data(indent=indent + 4, field='data_axis', value=self.data_axis) + '\n'
return output
@property
def vector_length(self):
"""Data vector length
Returns
-------
int
"""
if isinstance(self.data, numpy.ndarray):
return self.data.shape[self.data_axis]
else:
return 0
@property
def T(self):
"""Transposed data in a data container
Returns
-------
DataMatrix2DContainer
"""
transposed = copy.deepcopy(self)
transposed.data = self.data.T
# Flip axis
transposed.time_axis = self.data_axis
transposed.data_axis = self.time_axis
return transposed
[docs] def get_focused(self):
"""Get focus segment from data matrix.
Returns
-------
numpy.ndarray
"""
if self.focus_start is not None and self.focus_stop is not None:
# Focus is set
if self.time_axis == 1:
return self.data[:, self.focus_start:self.focus_stop]
else:
return self.data[self.focus_start:self.focus_stop, :]
else:
# Return all features
return self.data
[docs] def get_frames(self, frame_ids=None, vector_ids=None, frame_hop=1):
"""Get frames from data matrix.
Parameters
----------
frame_ids : list of int, optional
Frame ids of frames to be included.
Default value None
vector_ids : list of int, optional
Data ids of frame's data vector to be included.
Default value None
frame_hop : int, optional
Frame hopping factor, with one every frame is included.
Default value 1
Returns
-------
numpy.ndarray
"""
data = self.data
if self.time_axis == 1:
# Apply frame_ids
if frame_ids is not None:
data = data[:, frame_ids]
# Apply vector_ids
if vector_ids is not None:
data = data[vector_ids, :]
# Apply the frame hop
if len(data.shape) > 1:
return data[:, ::frame_hop]
else:
return data[::frame_hop]
else:
# Apply frame_ids
if frame_ids is not None:
data = data[frame_ids, :]
# Apply vector_ids
if vector_ids is not None:
data = data[:, vector_ids]
if len(data.shape) > 1:
return data[::frame_hop, :]
else:
return data[::frame_hop]
def change_axis(self, time_axis=None, data_axis=None):
"""Set axis
Parameters
----------
time_axis : int, optional
New data axis for time. Current axis and new axis are swapped.
Default value None
data_axis : int, optional
New data axis for data. Current axis and new axis are swapped.
Default value None
Returns
-------
self
"""
# Get not None values
axis_list = [time_axis, data_axis]
axis_list = [x for x in axis_list if x is not None]
# Get unique values
axis_set = set(axis_list)
if len(axis_list) != len(axis_set):
message = '{name}: Give unique axis indexes [{axis_list}].'.format(
name=self.__class__.__name__,
axis_list=axis_list
)
self.logger.exception(message)
raise ValueError(message)
if time_axis > 1:
message = '{name}: Given time_axis too large [{time_axis}].'.format(
name=self.__class__.__name__,
time_axis=time_axis
)
self.logger.exception(message)
raise ValueError(message)
if data_axis > 1:
message = '{name}: Given data_axis too large [{data_axis}].'.format(
name=self.__class__.__name__,
data_axis=data_axis
)
self.logger.exception(message)
raise ValueError(message)
# Get axis map
axis_map = OneToOneMappingContainer({
'time_axis': self.time_axis,
'data_axis': self.data_axis
})
if time_axis is not None and time_axis != self.time_axis:
# Modify time axis
# Get axis names
target_axis = axis_map.flipped.map(time_axis)
source_axis = axis_map.flipped.map(self.time_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.time_axis,
axis2=time_axis
)
# Store new axes
axis_map[target_axis] = self.time_axis
axis_map[source_axis] = time_axis
setattr(self, str(target_axis), self.time_axis)
setattr(self, str(source_axis), time_axis)
if data_axis is not None and data_axis != self.data_axis:
# Modify data axis
# Get axis names
target_axis = axis_map.flipped.map(data_axis)
source_axis = axis_map.flipped.map(self.data_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.data_axis,
axis2=data_axis
)
# Store new axes
axis_map[target_axis] = self.data_axis
axis_map[source_axis] = data_axis
setattr(self, str(target_axis), self.data_axis)
setattr(self, str(source_axis), data_axis)
return self
[docs] def plot(self, plot=True, show_color_bar=False, figsize=None, xlabel=None, ylabel=None):
"""Visualize data matrix.
Parameters
----------
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
show_color_bar : bool
Show color bar next to plot.
Default value False
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
xlabel : str
Label for X axis
Default value None
ylabel : str
Label for Y axis
Default value None
Returns
-------
self
"""
if figsize is None:
figsize = (10, 5)
from librosa.display import specshow
import matplotlib.pyplot as plt
if plot:
plt.figure(figsize=figsize)
data = self.get_focused()
if self.time_axis == 0:
# Make sure time is on x-axis
data = data.T
# Plot feature matrix
if self.time_resolution:
sr = int(1.0 / float(self.time_resolution))
x_axis = 'time'
else:
sr = 1.0
x_axis = None
specshow(
data,
x_axis=x_axis,
sr=sr,
hop_length=1
)
if show_color_bar:
# Add color bar
plt.colorbar()
# Add filename to first subplot
if hasattr(self, 'filename') and self.filename:
plt.title(self.filename)
if ylabel:
plt.ylabel(ylabel, fontsize=16)
if xlabel:
plt.xlabel(xlabel, fontsize=16)
if plot:
plt.tight_layout()
plt.show()
[docs]class DataMatrix3DContainer(DataMatrix2DContainer):
"""Three-dimensional data matrix container class, inherited from DataMatrix2DContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, stats=None, metadata=None, time_resolution=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
Default value None
data : numpy.ndarray, optional
Data to initialize the container
Default value None
stats : dict, optional
Statistics of the data
Default value None
metadata : dict or MetadataContainer, optional
MetadataContainer
Default value None
time_resolution : float, optional
Time resolution
Default value None
processing_chain : ProcessingChain, optional
Processing chain.
Default value None
"""
if data is None:
# Initialize with 3D-matrix
data = numpy.ndarray((0, 0, 0))
kwargs.update({
'data': data,
'stats': stats,
'metadata': metadata,
'time_resolution': time_resolution,
'processing_chain': processing_chain
})
# Run DataMatrix2DContainer init
DataMatrix2DContainer.__init__(self, **kwargs)
# Run super init
super(DataMatrix3DContainer, self).__init__(**kwargs)
# Matrix axis
self.data_axis = 0
self.time_axis = 1
self.sequence_axis = 2
def __getstate__(self):
d = super(DataMatrix3DContainer, self).__getstate__()
d.update({
'sequence_axis': self.sequence_axis
})
return d
def __setstate__(self, d):
super(DataMatrix3DContainer, self).__setstate__(d)
self.sequence_axis = d['sequence_axis']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(DataMatrix3DContainer, self).to_string(ui=ui, indent=indent)
output += ui.line(field='Data', indent=indent) + '\n'
output += ui.line(indent=indent + 2, field='Dimensions') + '\n'
output += ui.data(indent=indent + 4, field='time_axis', value=self.time_axis) + '\n'
output += ui.data(indent=indent + 4, field='data_axis', value=self.data_axis) + '\n'
output += ui.data(indent=indent + 4, field='sequence_axis', value=self.sequence_axis) + '\n'
return output
def change_axis(self, time_axis=None, data_axis=None, sequence_axis=None):
"""Set axis
Parameters
----------
time_axis : int, optional
New data axis for time. Current axis and new axis are swapped.
Default value None
data_axis : int, optional
New data axis for data. Current axis and new axis are swapped.
Default value None
sequence_axis : int, optional
New data axis for data sequence. Current axis and new axis are swapped.
Default value None
Returns
-------
self
"""
# Get not None values
axis_list = [time_axis, data_axis, sequence_axis]
axis_list = [x for x in axis_list if x is not None]
# Get unique values
axis_set = set(axis_list)
if len(axis_list) != len(axis_set):
message = '{name}: Give unique axis indexes [{axis_list}].'.format(
name=self.__class__.__name__,
axis_list=axis_list
)
self.logger.exception(message)
raise ValueError(message)
if time_axis > 2:
message = '{name}: Given time_axis too large [{time_axis}].'.format(
name=self.__class__.__name__,
time_axis=time_axis
)
self.logger.exception(message)
raise ValueError(message)
if data_axis > 2:
message = '{name}: Given data_axis too large [{data_axis}].'.format(
name=self.__class__.__name__,
data_axis=data_axis
)
self.logger.exception(message)
raise ValueError(message)
if sequence_axis > 2:
message = '{name}: Given sequence_axis too large [{sequence_axis}].'.format(
name=self.__class__.__name__,
sequence_axis=sequence_axis
)
self.logger.exception(message)
raise ValueError(message)
# Get axis map
axis_map = OneToOneMappingContainer({
'time_axis': self.time_axis,
'data_axis': self.data_axis,
'sequence_axis': self.sequence_axis,
})
if time_axis is not None and time_axis != self.time_axis:
# Modify time axis
# Get axis names
target_axis = axis_map.flipped.map(time_axis)
source_axis = axis_map.flipped.map(self.time_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.time_axis,
axis2=time_axis
)
# Store new axes
axis_map[target_axis] = self.time_axis
axis_map[source_axis] = time_axis
setattr(self, str(target_axis), self.time_axis)
setattr(self, str(source_axis), time_axis)
if data_axis is not None and data_axis != self.data_axis:
# Modify data axis
# Get axis names
target_axis = axis_map.flipped.map(data_axis)
source_axis = axis_map.flipped.map(self.data_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.data_axis,
axis2=data_axis
)
# Store new axes
axis_map[target_axis] = self.data_axis
axis_map[source_axis] = data_axis
setattr(self, str(target_axis), self.data_axis)
setattr(self, str(source_axis), data_axis)
if sequence_axis is not None and sequence_axis != self.sequence_axis:
# Modify sequence axis
# Get axis names
target_axis = axis_map.flipped.map(sequence_axis)
source_axis = axis_map.flipped.map(self.sequence_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.sequence_axis,
axis2=sequence_axis
)
# Store new axes
axis_map[target_axis] = self.sequence_axis
axis_map[source_axis] = sequence_axis
setattr(self, str(target_axis), self.sequence_axis)
setattr(self, str(source_axis), sequence_axis)
return self
def plot(self, show_color_bar=False, show_filename=True, plot=True, figsize=None):
"""Plot data
Parameters
----------
show_color_bar : bool
Show color bar next to plot.
Default value False
show_filename : bool
Show filename as figure title
Default value True
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,10) is used.
Default value None
Returns
-------
self
"""
if figsize is None:
figsize = (10, 10)
data = self.get_focused()
if data.shape[self.sequence_axis] < 20:
from librosa.display import specshow
import matplotlib.pyplot as plt
if plot:
plt.figure(figsize=figsize)
for sequence_id in range(data.shape[self.sequence_axis]):
ax = plt.subplot(data.shape[self.sequence_axis], 1, sequence_id + 1)
current_data = data[:, :, sequence_id]
if self.time_axis == 0:
# Make sure time is on x-axis
current_data = current_data.T
# Plot data matrix
if self.time_resolution:
sr = int(1.0 / float(self.time_resolution))
x_axis = 'time'
else:
sr = 1.0
x_axis = None
specshow(
current_data,
x_axis=x_axis,
sr=sr,
hop_length=1
)
plt.ylabel(str(sequence_id))
if show_color_bar:
# Add color bar
plt.colorbar()
if sequence_id < data.shape[self.sequence_axis]-1:
ax.axes.get_xaxis().set_visible(False)
# Add filename to first subplot
if show_filename and hasattr(self, 'filename') and self.filename:
plt.title(self.filename)
show_filename = False
if plot:
plt.tight_layout()
plt.show()
else:
# TODO find method to visualize deep matrices.
message = '{name}: Matrix is too deep, plot-method not yet implemented.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise NotImplementedError(message)
class DataMatrix4DContainer(DataMatrix3DContainer):
"""Four-dimensional data matrix container class, inherited from DataMatrix3DContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
def __init__(self, data=None, stats=None, metadata=None, time_resolution=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
Default value None
data : numpy.ndarray, optional
Data to initialize the container
Default value None
stats : dict, optional
Statistics of the data
Default value None
metadata : dict or MetadataContainer, optional
MetadataContainer
Default value None
time_resolution : float, optional
Time resolution
Default value None
processing_chain : ProcessingChain, optional
Processing chain.
Default value None
"""
if data is None:
# Initialize with 4D-matrix
data = numpy.ndarray((0, 0, 0, 0))
kwargs.update({
'data': data,
'stats': stats,
'metadata': metadata,
'time_resolution': time_resolution,
'processing_chain': processing_chain
})
# Run DataMatrix3DContainer init
DataMatrix3DContainer.__init__(self, **kwargs)
# Run super init
super(DataMatrix4DContainer, self).__init__(**kwargs)
# Matrix axis
self.data_axis = 0
self.time_axis = 1
self.sequence_axis = 2
self.channel_axis = 3
def __getstate__(self):
d = super(DataMatrix4DContainer, self).__getstate__()
d.update({
'channel_axis': self.channel_axis
})
return d
def __setstate__(self, d):
super(DataMatrix4DContainer, self).__setstate__(d)
self.channel_axis = d['channel_axis']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(DataMatrix4DContainer, self).to_string(ui=ui, indent=indent)
output += ui.line(field='Data', indent=indent) + '\n'
output += ui.line(indent=indent + 2, field='Dimensions') + '\n'
output += ui.data(indent=indent + 4, field='time_axis', value=self.time_axis) + '\n'
output += ui.data(indent=indent + 4, field='data_axis', value=self.data_axis) + '\n'
output += ui.data(indent=indent + 4, field='sequence_axis', value=self.sequence_axis) + '\n'
output += ui.data(indent=indent + 4, field='channel_axis', value=self.channel_axis) + '\n'
return output
def change_axis(self, time_axis=None, data_axis=None, sequence_axis=None, channel_axis=None):
"""Set axis
Parameters
----------
time_axis : int, optional
New data axis for time. Current axis and new axis are swapped.
Default value None
data_axis : int, optional
New data axis for data. Current axis and new axis are swapped.
Default value None
sequence_axis : int, optional
New data axis for data sequence. Current axis and new axis are swapped.
Default value None
channel_axis : int, optional
New data axis for data channel. Current axis and new axis are swapped.
Default value None
Returns
-------
self
"""
# Get not None values
axis_list = [time_axis, data_axis, sequence_axis, channel_axis]
axis_list = [x for x in axis_list if x is not None]
# Get unique values
axis_set = set(axis_list)
if len(axis_list) != len(axis_set):
message = '{name}: Give unique axis indexes [{axis_list}].'.format(
name=self.__class__.__name__,
axis_list=axis_list
)
self.logger.exception(message)
raise ValueError(message)
if time_axis > 3:
message = '{name}: Given time_axis too large [{time_axis}].'.format(
name=self.__class__.__name__,
time_axis=time_axis
)
self.logger.exception(message)
raise ValueError(message)
if data_axis > 3:
message = '{name}: Given data_axis too large [{data_axis}].'.format(
name=self.__class__.__name__,
data_axis=data_axis
)
self.logger.exception(message)
raise ValueError(message)
if sequence_axis > 3:
message = '{name}: Given sequence_axis too large [{sequence_axis}].'.format(
name=self.__class__.__name__,
sequence_axis=sequence_axis
)
self.logger.exception(message)
raise ValueError(message)
if channel_axis > 3:
message = '{name}: Given channel_axis too large [{channel_axis}].'.format(
name=self.__class__.__name__,
channel_axis=channel_axis
)
self.logger.exception(message)
raise ValueError(message)
# Get axis map
axis_map = OneToOneMappingContainer({
'time_axis': self.time_axis,
'data_axis': self.data_axis,
'sequence_axis': self.sequence_axis,
'channel_axis': self.channel_axis,
})
if time_axis is not None and time_axis != self.time_axis:
# Modify time axis
# Get axis names
target_axis = axis_map.flipped.map(time_axis)
source_axis = axis_map.flipped.map(self.time_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.time_axis,
axis2=time_axis
)
# Store new axes
axis_map[target_axis] = self.time_axis
axis_map[source_axis] = time_axis
setattr(self, str(target_axis), self.time_axis)
setattr(self, str(source_axis), time_axis)
if data_axis is not None and data_axis != self.data_axis:
# Modify data axis
# Get axis names
target_axis = axis_map.flipped.map(data_axis)
source_axis = axis_map.flipped.map(self.data_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.data_axis,
axis2=data_axis
)
# Store new axes
axis_map[target_axis] = self.data_axis
axis_map[source_axis] = data_axis
setattr(self, str(target_axis), self.data_axis)
setattr(self, str(source_axis), data_axis)
if sequence_axis is not None and sequence_axis != self.sequence_axis:
# Modify sequence axis
# Get axis names
target_axis = axis_map.flipped.map(sequence_axis)
source_axis = axis_map.flipped.map(self.sequence_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.sequence_axis,
axis2=sequence_axis
)
# Store new axes
axis_map[target_axis] = self.sequence_axis
axis_map[source_axis] = sequence_axis
setattr(self, str(target_axis), self.sequence_axis)
setattr(self, str(source_axis), sequence_axis)
if channel_axis is not None and channel_axis != self.channel_axis:
# Modify channel axis
# Get axis names
target_axis = axis_map.flipped.map(channel_axis)
source_axis = axis_map.flipped.map(self.channel_axis)
# Modify data
self.data = numpy.swapaxes(
a=self.data,
axis1=self.channel_axis,
axis2=channel_axis
)
# Store new axes
axis_map[target_axis] = self.channel_axis
axis_map[source_axis] = channel_axis
setattr(self, str(target_axis), self.channel_axis)
setattr(self, str(source_axis), channel_axis)
return self
def plot(self, show_color_bar=False, show_filename=True, plot=True, figsize=None):
"""Plot data
Parameters
----------
show_color_bar : bool
Show color bar next to plot.
Default value False
show_filename : bool
Show filename as figure title
Default value True
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
Returns
-------
self
"""
if figsize is None:
figsize = (10, 5)
data = self.get_focused()
if data.shape[self.sequence_axis] <= 10:
from librosa.display import specshow
import matplotlib.pyplot as plt
if plot:
plt.figure(figsize=figsize)
rows_count = data.shape[self.channel_axis]
for sequence_id in range(data.shape[self.sequence_axis]):
for channel_id in range(data.shape[self.channel_axis]):
if rows_count == 1:
# Special case when only one stream, transpose presentation
index = 1 + sequence_id
plt.subplot(
data.shape[self.sequence_axis],
rows_count,
index
)
else:
index = 1 + (sequence_id + channel_id * data.shape[self.sequence_axis])
plt.subplot(
rows_count,
data.shape[self.sequence_axis],
index
)
if self.sequence_axis == 0 and self.channel_axis == 1:
current_data = data[sequence_id, channel_id, :, :]
elif self.sequence_axis == 0 and self.channel_axis == 2:
current_data = data[sequence_id, :, channel_id, :]
elif self.sequence_axis == 0 and self.channel_axis == 3:
current_data = data[sequence_id, :, :, channel_id]
elif self.sequence_axis == 1 and self.channel_axis == 3:
current_data = data[:, sequence_id, :, channel_id]
elif self.sequence_axis == 2 and self.channel_axis == 3:
current_data = data[:, :, sequence_id, channel_id]
else:
message = '{name}: Unknown data axes'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
# Plot feature matrix
ax = specshow(
data=current_data,
x_axis='time',
sr=1,
hop_length=1
)
if rows_count == 1:
if channel_id != data.shape[self.channel_axis] - 1:
ax.tick_params(
axis='x',
which='both',
bottom=False,
top=False,
labelbottom=False
)
plt.xlabel('')
else:
if channel_id+1 != data.shape[self.channel_axis]:
ax.tick_params(
axis='x',
which='both',
bottom=False,
top=False,
labelbottom=False
)
plt.xlabel('')
plt.ylabel('seq['+str(sequence_id)+'] chan['+str(channel_id)+']')
# Add filename to first subplot
if show_filename and hasattr(self, 'filename') and self.filename:
plt.title(self.filename)
if plot:
plt.tight_layout()
plt.show()
else:
# TODO find method to visualize deep matrices.
message = '{name}: Matrix is too deep, plot-method not yet implemented.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise NotImplementedError(message)
[docs]class BinaryMatrix2DContainer(DataMatrix2DContainer):
"""Two-dimensional data matrix container class, inherited from DataContainer."""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, time_resolution=None, label_list=None, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
data : list, optional
stats : dict, optional
metadata : dict, optional
time_resolution : float
processing_chain : ProcessingChain
"""
kwargs.update({
'data': data,
'time_resolution': time_resolution,
'label_list': label_list,
'processing_chain': processing_chain
})
# Run DataMatrix2DContainer init
DataMatrix2DContainer.__init__(self, **kwargs)
# Run super init
super(BinaryMatrix2DContainer, self).__init__(**kwargs)
self.label_list = label_list
def __getstate__(self):
d = super(BinaryMatrix2DContainer, self).__getstate__()
d.update({
'label_list': self.label_list
})
return d
def __setstate__(self, d):
super(BinaryMatrix2DContainer, self).__setstate__(d)
self.label_list = d['label_list']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(BinaryMatrix2DContainer, self).to_string(ui=ui, indent=indent)
output += ui.line(field='Labels', indent=indent) + '\n'
output += ui.data(indent=indent + 2, field='label_list', value=self.label_list) + '\n'
return output
[docs] def pad(self, length, binary_matrix=None):
"""Pad binary matrix along time axis
Parameters
----------
length : int
Length to be padded
binary_matrix : np.ndarray, shape=(time steps, amount of classes)
Binary matrix
Returns
-------
np.ndarray [shape=(number of classes,t)]
Padded binary matrix
"""
if binary_matrix is None:
binary_matrix = self.data
if length > binary_matrix.shape[self.time_axis]:
if self.time_axis == 0:
padding = numpy.zeros((length - binary_matrix.shape[0], binary_matrix.shape[1]))
self.data = numpy.vstack((binary_matrix, padding))
else:
padding = numpy.zeros((binary_matrix.shape[0], length - binary_matrix.shape[1]))
self.data = numpy.hstack((binary_matrix, padding))
elif length < binary_matrix.shape[self.time_axis]:
if self.time_axis == 0:
self.data = binary_matrix[0:length, :]
else:
self.data = binary_matrix[:, 0:length]
return self
[docs] def plot(self, plot=True, binary_matrix=None, data_container=None, figsize=None, panel_title=None,
binary_panel_title='Binary matrix', data_panel_title='Data', panel_title_position='right',
color='binary'):
"""Visualize binary matrix, and optionally synced data matrix.
For example, this can be used to visualize sound event activity along with the acoustic features.
Parameters
----------
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
binary_matrix : numpy.ndarray
Binary matrix, if None given internal data used.
Default value None
data_container : DataContainer
Extra data matrix to be shown along with binary matrix.
Default value None
figsize : tuple
Size of the figure. If None given, default size (10,5) is used.
Default value None
panel_title : str
Panel title (ylabel for first subplot)
Default value None
binary_panel_title : str
Binary panel title (ylabel for first subplot)
Default value "Binary matrix"
data_panel_title : str
Data panel title (ylabel for second subplot)
Default value "Data"
panel_title_position : str
Panel title position ['left', 'right']
Default value "right"
color : str
Color scheme used ['binary', 'gray', 'purple', 'blue', 'green', 'orange', 'red']
Default value 'binary'
Returns
-------
None
"""
if figsize is None:
figsize = (10, 5)
import matplotlib.pyplot as plt
from librosa.display import specshow
if binary_matrix is None:
binary_matrix = self.data
if self.time_axis == 0:
binary_matrix = binary_matrix.T
if color:
if color == 'binary':
cmap = plt.cm.binary
elif color == 'gray':
cmap = plt.cm.gray_r
elif color == 'purple':
cmap = plt.cm.Purples
elif color == 'blue':
cmap = plt.cm.Blues
elif color == 'green':
cmap = plt.cm.Greens
elif color == 'orange':
cmap = plt.cm.Oranges
elif color == 'red':
cmap = plt.cm.Reds
else:
cmap = plt.cm.binary
else:
cmap = plt.cm.binary
if binary_matrix is not None and data_container is not None:
fig, axes = plt.subplots(2, 1, figsize=figsize)
fig.subplots_adjust(top=1.0, bottom=0.0, right=1.0, hspace=0.05, wspace=0.00)
# Features
ax1 = plt.subplot(2, 1, 1)
img = specshow(
binary_matrix,
x_axis='time',
sr=int(1 / float(self.time_resolution)),
hop_length=1,
cmap=cmap
)
y_ticks = numpy.arange(0, len(self.label_list)) + 0.5
ax1.set_yticks(y_ticks)
ax1.set_yticklabels(self.label_list, fontsize=20)
ax1.get_xaxis().set_visible(False)
ax1.yaxis.set_label_position(panel_title_position)
plt.ylabel(binary_panel_title, fontsize=20)
# Binary matrix
ax2 = plt.subplot(2, 1, 2)
img = specshow(
data_container.data,
x_axis='time',
sr=int(1 / float(data_container.hop_length_seconds)),
hop_length=1
)
ax2.yaxis.set_label_position(panel_title_position)
plt.ylabel(data_panel_title, fontsize=20)
plt.xlabel('Time', fontsize=20)
elif binary_matrix is not None and data_container is None:
if plot:
fig = plt.figure(figsize=figsize)
ax = plt.gca()
# Binary matrix
if self.time_resolution:
sr = int(1.0 / float(self.time_resolution))
x_axis = 'time'
else:
sr = 1.0
x_axis = None
img = specshow(
binary_matrix,
x_axis=x_axis,
sr=sr,
hop_length=1,
cmap=cmap
)
if plot:
if panel_title:
ax.yaxis.set_label_position(panel_title_position)
plt.ylabel(panel_title, fontsize=20)
if self.time_resolution:
plt.xlabel('Time', fontsize=20)
if self.label_list:
ax.yaxis.set_label_position("right")
y_ticks = numpy.arange(0, len(self.label_list)) + 0.5
ax.set_yticks(y_ticks)
ax.set_yticklabels(self.label_list, fontsize=20)
if plot:
plt.show()
[docs]class DataRepository(RepositoryContainer):
"""Data repository container class to store multiple DataContainers together.
Containers are stored in a dict, label is used as dictionary key and value is associated data container.
"""
valid_formats = [FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, data=None, filename=None, default_stream_id=0, processing_chain=None, **kwargs):
"""Constructor
Parameters
----------
filename: str or dict
Either one filename (str) or multiple filenames in a dictionary. Dictionary based parameter is used to
construct the repository from separate FeatureContainers, two formats for the dictionary is supported:
1) label as key, and filename as value, and 2) two-level dictionary label as key1, stream as
key2 and filename as value.
default_stream_id : str or int
Default stream id used when accessing data
Default value 0
processing_chain : ProcessingChain
Processing chain to be included into repository
Default value None
"""
kwargs['filename'] = filename
super(DataRepository, self).__init__(**kwargs)
self.default_stream_id = default_stream_id
from dcase_util.processors import ProcessingChain
if processing_chain is None:
processing_chain = ProcessingChain()
self.processing_chain = processing_chain
self.item_class = DataMatrix2DContainer
if data is not None and isinstance(data, dict):
dict.update(self, data)
def __getstate__(self):
d = super(DataRepository, self).__getstate__()
d.update({
'default_stream_id': self.default_stream_id,
'processing_chain': self.processing_chain,
'item_class': self.item_class
})
return d
def __setstate__(self, d):
super(DataRepository, self).__setstate__(d)
self.default_stream_id = d['default_stream_id']
self.processing_chain = d['processing_chain']
self.item_class = d['item_class']
# Remove internal variables from dict
del d['default_stream_id']
del d['processing_chain']
del d['item_class']
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = ''
output += ui.class_name(self.__class__.__name__, indent=indent) + '\n'
if hasattr(self, 'filename') and self.filename:
output += ui.data(
field='filename',
value=self.filename,
indent=indent
) + '\n'
output += ui.line(field='Repository info', indent=indent) + '\n'
if hasattr(self, 'item_class') and self.item_class:
output += ui.data(
indent=indent + 2,
field='Item class',
value=self.item_class.__name__
) + '\n'
output += ui.data(
indent=indent + 2,
field='Item count',
value=len(self)
) + '\n'
output += ui.data(
indent=indent + 2,
field='Labels',
value=list(self.keys())
) + '\n'
output += ui.line(field='Content', indent=indent) + '\n'
for label, label_data in iteritems(self):
if label_data:
if isinstance(label_data, dict):
for stream_id, stream_data in iteritems(label_data):
if hasattr(stream_data, 'to_string'):
output += ui.data(
indent=indent + 2,
field='['+str(label)+']' + '[' + str(stream_id) + ']',
value=stream_data.to_string(ui=ui)
) + '\n'
else:
output += ui.data(
indent=indent + 2,
field='[' + str(label) + ']' + '[' + str(stream_id) + ']',
value=stream_data
) + '\n'
output += '\n'
return output
@property
def labels(self):
"""Item labels stores in the repository.
Returns
-------
list of str
"""
return sorted(list(self.keys()))
def stream_ids(self, label):
"""Stream ids stores for the label in the repository.
Parameters
----------
label : str
Item label
Returns
-------
list of str
"""
if label in self:
return sorted(list(self[label].keys()))
else:
return None
[docs] def load(self, filename=None, collect_from_containers=True):
"""Load file list
Parameters
----------
filename : str or dict
Either one filename (str) or multiple filenames in a dictionary. Dictionary based parameter is used to
construct the repository from separate FeatureContainers, two formats for the dictionary is supported: 1)
label as key, and filename as value, and 2) two-level dictionary label as key1, stream as key2 and
filename as value. If None given, parameter given to class initializer is used instead.
Default value None
collect_from_containers : bool
Collect data to the repository from separate containers.
Default value True
Returns
-------
self
"""
if filename:
self.filename = filename
if isinstance(self.filename, basestring):
# String filename given use load method from parent class
if os.path.exists(self.filename):
# If file exist load it
self.detect_file_format()
self.validate_format()
super(DataRepository, self).load(filename=self.filename)
if collect_from_containers:
# Collect data to the repository from separate containers
filename_base, file_extension = os.path.splitext(self.filename)
containers = glob.glob(filename_base + '.*-*' + file_extension)
for filename in containers:
label, stream_id = os.path.splitext(filename)[0].split('.')[-1].split('-')
if label not in self:
self[label] = {}
self[label][int(stream_id)] = self.item_class().load(filename=filename)
elif isinstance(self.filename, dict):
sorted(self.filename)
# Dictionary based filename given
if filelist_exists(self.filename):
dict.clear(self)
for label, data in iteritems(self.filename):
self[label] = {}
if not label.startswith('_'):
# Skip labels starting with '_', those are just for extra info
if isinstance(data, basestring):
# filename given directly, only one feature stream per method inputted.
self[label][self.default_stream_id] = self.item_class().load(filename=data)
elif isinstance(data, dict):
for stream, filename in iteritems(data):
self[label][stream] = self.item_class().load(filename=filename)
else:
# All filenames did not exists, find which ones is missing and raise error.
for label, data in iteritems(self.filename):
if isinstance(data, basestring) and not os.path.isfile(data):
message = '{name}: Repository cannot be loaded, file does not exists for method [{method}], file [{filename}]'.format(
name=self.__class__.__name__,
method=label,
filename=data
)
self.logger.exception(message)
raise IOError(message)
elif isinstance(data, dict):
for stream, filename in iteritems(data):
if not os.path.isfile(filename):
message = '{name}: Repository cannot be loaded, file does not exists for method [{method}], stream [{stream}], file [{filename}]'.format(
name=self.__class__.__name__,
method=label,
stream=stream,
filename=filename
)
self.logger.exception(message)
raise IOError(message)
else:
message = '{name}: Repository cannot be loaded, no valid filename set.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise IOError(message)
return self
def save(self, filename=None, split_into_containers=False):
"""Save file
Parameters
----------
filename : str or dict
File path
Default value filename given to class constructor
split_into_containers : bool
Split data from repository separate containers and save them individually.
Default value False
Raises
------
ImportError:
Error if file format specific module cannot be imported
IOError:
File has unknown file format
Returns
-------
self
"""
if filename:
self.filename = filename
if split_into_containers and isinstance(self.filename, basestring):
# Automatic filename generation for saving data into separate containers
filename_base, file_extension = os.path.splitext(self.filename)
filename_dictionary = {}
for label in self.labels:
if label not in filename_dictionary:
filename_dictionary[label] = {}
for stream_id in self.stream_ids(label=label):
if stream_id not in filename_dictionary[label]:
filename_dictionary[label][stream_id] = filename_base + '.' + label + '-' + str(stream_id) + file_extension
self.filename = filename_dictionary
if isinstance(self.filename, basestring):
# Single output file as target
self.detect_file_format()
self.validate_format()
# String filename given use load method from parent class
super(DataRepository, self).save(filename=self.filename)
elif isinstance(self.filename, dict):
# Custom naming and splitting into separate containers
sorted(self.filename)
# Dictionary of filenames given, save each data container in the repository separately
for label in self.labels:
if label in self.filename:
for stream_id in self.stream_ids(label=label):
if stream_id in self.filename[label]:
current_container = self.get_container(label=label, stream_id=stream_id)
current_container.save(filename=self.filename[label][stream_id])
return self
[docs] def get_container(self, label, stream_id=None):
"""Get container from repository
Parameters
----------
label : str
Label
stream_id : str or int
Stream id, if None, default_stream is used.
Default value None
Returns
-------
DataContainer
"""
if stream_id is None:
stream_id = self.default_stream_id
return self.get(label).get(stream_id)
[docs] def set_container(self, container, label, stream_id=None):
"""Store container to repository
Parameters
----------
container : DataContainer or dict or list
Data container
label : str
Label assigned to the container
stream_id : str or int
Stream id, if None, default_stream is used.
Default value None
Returns
-------
self
"""
if stream_id is None:
stream_id = self.default_stream_id
if label not in self:
self[label] = {}
self[label][stream_id] = container
return self
[docs] def push_processing_chain_item(self, processor_name, init_parameters=None, process_parameters=None,
preprocessing_callbacks=None,
input_type=None, output_type=None):
"""Push processing chain item
Parameters
----------
processor_name : str
Processor name
init_parameters : dict, optional
Initialization parameters for the processors
Default value None
process_parameters : dict, optional
Parameters for the process method of the Processor
Default value None
preprocessing_callbacks : list of dicts
Callbacks used for preprocessing
Default value None
input_type : ProcessingChainItemType
Input data type
Default value None
output_type : ProcessingChainItemType
Output data type
Default value None
Returns
-------
self
"""
self.processing_chain.push_processor(
processor_name=processor_name,
init_parameters=init_parameters,
process_parameters=process_parameters,
preprocessing_callbacks=preprocessing_callbacks,
input_type=input_type,
output_type=output_type
)
return self
[docs] def plot(self, plot=True, figsize=None):
"""Visualize data stored in the repository.
plot : bool
If true, figure is shown automatically. Set to False if collecting multiple plots into same figure
outside this method.
Default value True
figsize : tuple
Size of the figure. If None given, default size (10,10) is used.
Default value None
Returns
-------
self
"""
if figsize is None:
figsize = (10, 10)
from librosa.display import specshow
import matplotlib.pyplot as plt
rows_count = 0
for label_id, label in enumerate(self.labels):
if rows_count < len(self.stream_ids(label)):
rows_count = len(self.stream_ids(label))
labels = list(self.keys())
labels.sort()
if plot:
plt.figure(figsize=figsize)
for label_id, label in enumerate(self.labels):
for stream_id in self.stream_ids(label):
if rows_count == 1:
# Special case when only one stream, transpose presentation
index = 1 + label_id
plt.subplot(
len(self.labels),
rows_count,
index
)
else:
index = 1 + (label_id + stream_id * len(self.labels))
plt.subplot(
rows_count,
len(self.labels),
index
)
current_container = self.get_container(
label=label,
stream_id=stream_id
)
# Plot feature matrix
ax = specshow(
data=current_container.data,
x_axis='time',
sr=int(1 / float(current_container.time_resolution)),
hop_length=1
)
if rows_count == 1:
if label_id != len(self.labels) - 1:
ax.tick_params(
axis='x',
which='both',
bottom='off',
top='off',
labelbottom='off'
)
plt.xlabel('')
else:
if stream_id+1 != len(self.stream_ids(label)):
ax.tick_params(
axis='x',
which='both',
bottom='off',
top='off',
labelbottom='off'
)
plt.xlabel('')
plt.ylabel('['+str(label)+']['+str(stream_id)+']')
if plot:
plt.show()
return self