Source code for dcase_util.keras.data

# !/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
import numpy
import copy

from dcase_util.ui import FancyStringifier, FancyLogger
from dcase_util.containers import ContainerMixin
from dcase_util.data import DataBuffer


[docs]def get_keras_data_sequence_class(): # Use getter method to avoid importing Keras when importing dcase_util. This allows user to decide when import # Keras, so that user can set random seeds before Keras import. from keras.utils import Sequence class KerasDataSequence(Sequence, ContainerMixin): def __init__(self, item_list=None, batch_size=64, buffer_size=None, data_processing_chain=None, meta_processing_chain=None, data_processing_chain_callback_on_epoch_end=None, meta_processing_chain_callback_on_epoch_end=None, transformer_callbacks=None, refresh_buffer_on_epoch=False, data_format='channels_last', target_format='single_target_per_sequence', **kwargs): """Constructor Parameters ---------- item_list : list or dict Items in the data sequence. List containing multi-level dictionary with first level key 'data' and 'meta'. Second level should contain parameters for process method in the processing chain. Default value None batch_size : int Batch size (item count). Default value 64 buffer_size : int Internal buffer size (item count). By setting this sufficiently high, data sequence generator can possibly fit all sequence items into internal buffer and can fetch without loading from disk. Set to None, if no internal buffer used. Default value None data_processing_chain : ProcessingChain Data processing chain. Default value None meta_processing_chain : ProcessingChain Meta processing chain. Default value None data_processing_chain_callback_on_epoch_end : list of dict Can be used to call methods with parameters for processing chain at the end of epoch. This can be used to control processing chain's internal status (e.g. roll the data). Default value None meta_processing_chain_callback_on_epoch_end : list of dict Can be used to call methods with parameters for processing chain at the end of epoch. This can be used to control processing chain's internal status (e.g. roll the data). Default value None transformer_callbacks : list of func Transformer callbacks to jointly process data and meta. This can be used for local data modification and data augmentation. Default value None refresh_buffer_on_epoch : bool In case internal data buffer is used, force data and meta refresh at the end of each epoch. Use this if data is modified/augmented differently for each epoch. In case data_processing_chain_callback_on_epoch_end or meta_processing_chain_callback_on_epoch_end is used, this parameter is automatically set to True. Default value False data_format : str Keras like data format, controls where channel should be added. Possible values ['channels_first', 'channels_last'] Default value 'channels_last' target_format : str Meta data interpretation in the relation to the data items. Default value 'single_target_per_segment' """ # Run ContainerMixin init ContainerMixin.__init__(self, **kwargs) self._data_shape = None self._data_axis = None self.item_list = copy.copy(item_list) self.batch_size = batch_size self.buffer_size = buffer_size self.data_refresh_on_epoch = refresh_buffer_on_epoch if data_format is None: data_format = 'channels_last' self.data_format = data_format if self.data_format not in ['channels_first', 'channels_last']: message = '{name}: Unknown data_format [{data_format}].'.format( name=self.__class__.__name__, data_format=self.data_format ) self.logger.exception(message) raise NotImplementedError(message) if target_format is None: target_format = 'single_target_per_sequence' self.target_format = target_format if self.target_format not in ['same', 'single_target_per_sequence']: message = '{name}: Unknown target_format [{target_format}].'.format( name=self.__class__.__name__, target_format=self.target_format ) self.logger.exception(message) raise NotImplementedError(message) if data_processing_chain_callback_on_epoch_end is None: data_processing_chain_callback_on_epoch_end = [] self.data_processing_chain_callback_on_epoch_end = data_processing_chain_callback_on_epoch_end if self.data_processing_chain_callback_on_epoch_end: self.data_refresh_on_epoch = True if meta_processing_chain_callback_on_epoch_end is None: meta_processing_chain_callback_on_epoch_end = [] self.meta_processing_chain_callback_on_epoch_end = meta_processing_chain_callback_on_epoch_end if transformer_callbacks is None: transformer_callbacks = [] self.transformer_callbacks = transformer_callbacks # Processing chains self.data_processing_chain = data_processing_chain self.meta_processing_chain = meta_processing_chain if self.buffer_size is not None: # Initialize data buffer self.data_buffer = DataBuffer( size=self.buffer_size ) else: self.data_buffer = None def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = '' output += ui.class_name(self.__class__.__name__, indent=indent) + '\n' output += ui.data( indent=indent, field='Batch size', value=self.batch_size ) + '\n' output += ui.data( indent=indent, field='Epoch size', value=len(self), unit='batches' ) + '\n' shape = self.data_shape axis = self.data_axis output += ui.data(field='Data item shape', value=shape, indent=indent) + '\n' output += ui.data( indent=indent + 2, field='Time', value=shape[axis['time_axis']] ) + '\n' output += ui.data( indent=indent + 2, field='Data', value=shape[axis['data_axis']] ) + '\n' if 'sequence_axis' in axis: output += ui.data( indent=indent + 2, field='Sequence', value=shape[axis['sequence_axis']] ) + '\n' output += ui.data( indent=indent + 2, field='Axis', value=axis ) + '\n' if self.buffer_size is not None: output += ui.line(field='Buffer', indent=indent) + '\n' output += ui.data( indent=indent + 2, field='buffer_size', value=self.buffer_size, unit='items' ) + '\n' output += ui.data( indent=indent + 2, field='buffer usage', value=self.data_buffer.count, unit='items' ) + '\n' output += ui.data( indent=indent + 2, field='buffer usage', value=(self.data_buffer.count / float(self.buffer_size)) * 100, unit='%' ) + '\n' return output def __getitem__(self, index): start_index = index * self.batch_size stop_index = (index + 1) * self.batch_size batch_buffer_data = [] batch_buffer_meta = [] for item_index in range(start_index, stop_index): if item_index < len(self.item_list): item = self.item_list[item_index] # Load item data data, meta = self.process_item(item=item) if self.transformer_callbacks: # Apply transformer callbacks for callback in self.transformer_callbacks: data, meta = callback( data=data, meta=meta ) # Collect data batch_buffer_data.append(data.data) # Collect meta if self.target_format == 'single_target_per_sequence': # Collect single target per sequence for i in range(0, data.shape[data.sequence_axis]): batch_buffer_meta.append(meta.data[:, 0]) elif self.target_format == 'same': # Collect single target per sequence batch_buffer_meta.append( numpy.repeat( a=meta.data, repeats=data.length, axis=1 ) ) if len(data.shape) == 2: # Prepare 2D data, stack along time_axis if data.time_axis == 0: batch_buffer_data = numpy.vstack(batch_buffer_data) elif data.time_axis == 1: batch_buffer_data = numpy.hstack(batch_buffer_data) elif len(data.shape) == 3: # Prepare 3D data, stack along sequence_axis if data.sequence_axis == 0: batch_buffer_data = numpy.vstack(batch_buffer_data) elif data.sequence_axis == 1: batch_buffer_data = numpy.hstack(batch_buffer_data) elif data.sequence_axis == 2: batch_buffer_data = numpy.dstack(batch_buffer_data) # Add channel dimension to the data if self.data_format == 'channels_first': batch_buffer_data = numpy.expand_dims( batch_buffer_data, axis=0 ) elif self.data_format == 'channels_last': batch_buffer_data = numpy.expand_dims( batch_buffer_data, axis=3 ) # Prepare meta if self.target_format == 'single_target_per_sequence': batch_buffer_meta = numpy.vstack(batch_buffer_meta) elif self.target_format == 'same': batch_buffer_meta = numpy.hstack(batch_buffer_meta).T return batch_buffer_data, batch_buffer_meta def __len__(self): num_batches = int(numpy.ceil(len(self.item_list) / float(self.batch_size))) if num_batches > 0: return num_batches else: return 1 @property def data_shape(self): if self._data_shape is None: # Load first item and get data length data = self.process_item( item=self.item_list[0] )[0] self._data_shape = data.shape self._data_axis = { 'time_axis': data.time_axis, 'data_axis': data.data_axis } if hasattr(data,'sequence_axis'): self._data_axis['sequence_axis']= data.sequence_axis return self._data_shape @property def data_axis(self): if self._data_axis is None: # Load first item and get data length data = self.process_item( item=self.item_list[0] )[0] self._data_shape = data.shape self._data_axis = { 'time_axis': data.time_axis, 'data_axis': data.data_axis } if hasattr(data, 'sequence_axis'): self._data_axis['sequence_axis'] = data.sequence_axis return self._data_axis @property def data_size(self): shape = self.data_shape axis = self.data_axis size = { 'time': shape[axis['time_axis']], 'data': shape[axis['data_axis']], } if 'sequence_axis' in axis: size['sequence'] = shape[axis['sequence_axis']] return size def process_item(self, item): if self.data_buffer is not None: # Fetch data and meta through internal buffer if not self.data_buffer.key_exists(key=item): data = self.data_processing_chain.process(**item['data']) meta = self.meta_processing_chain.process(**item['meta']) self.data_buffer.set( key=item, data=data, meta=meta ) else: data, meta = self.data_buffer.get(key=item) else: # Fetch data and meta directly. data = self.data_processing_chain.process(**item['data']) meta = self.meta_processing_chain.process(**item['meta']) return data, meta def on_epoch_end(self): if self.data_processing_chain_callback_on_epoch_end: for callback_parameters in self.data_processing_chain_callback_on_epoch_end: if 'method_name' in callback_parameters: self.data_processing_chain.call_method( method_name=callback_parameters['method_name'], parameters=callback_parameters.get('parameters', {}) ) if self.meta_processing_chain_callback_on_epoch_end: for callback_parameters in self.meta_processing_chain_callback_on_epoch_end: if 'method_name' in callback_parameters: self.data_processing_chain.call_method( method_name=callback_parameters['method_name'], parameters=callback_parameters.get('parameters', {}) ) if self.data_buffer is not None and self.data_refresh_on_epoch: # Force reload of data self.data_buffer.clear() return KerasDataSequence
[docs]def data_collector(item_list=None, data_processing_chain=None, meta_processing_chain=None, target_format='single_target_per_sequence', channel_dimension='channels_last', verbose=True, print_indent=2 ): """Data collector Collects data and meta into matrices while processing them through processing chains. Parameters ---------- item_list : list or dict Items in the data sequence. List containing multi-level dictionary with first level key 'data' and 'meta'. Second level should contain parameters for process method in the processing chain. Default value None data_processing_chain : ProcessingChain Data processing chain. Default value None meta_processing_chain : ProcessingChain Meta processing chain. Default value None channel_dimension : str Controls where channel dimension should be added. Similar to Keras data format parameter. If None given, no channel dimension is added. Possible values [None, 'channels_first', 'channels_last'] Default value None target_format : str Meta data interpretation in the relation to the data items. Possible values ['same', 'single_target_per_segment'] Default value 'single_target_per_segment' verbose : bool Print information about the data Default value True print_indent : int Default value 2 Returns ------- numpy.ndarray data numpy.ndarray meta dict data size information """ if item_list: # Collect all data and meta X = [] Y = [] for item in item_list: data = data_processing_chain.process(**item['data']) meta = meta_processing_chain.process(**item['meta']) X.append(data.data) # Collect meta if target_format == 'single_target_per_sequence': # Collect single target per sequence for i in range(0, data.shape[data.sequence_axis]): Y.append(meta.data[:, 0]) elif target_format == 'same': # Collect same target per each element (frame) if data.time_axis != meta.time_axis: Y.append( numpy.repeat( a=meta.data, repeats=data.length, axis=meta.time_axis ).T ) else: Y.append( numpy.repeat( a=meta.data, repeats=data.length, axis=meta.time_axis ) ) data_size = {} if len(data.shape) == 2: # Stack collected data and meta correct way if data.time_axis == 0: X = numpy.vstack(X) Y = numpy.vstack(Y) else: X = numpy.hstack(X) Y = numpy.hstack(Y) # Get data item size data_size = { 'data': X.shape[data.data_axis], 'time': X.shape[data.time_axis], } data_axis = [None] * 2 data_axis[data.data_axis] = 'data' data_axis[data.time_axis] = 'time' elif len(data.shape) == 3: # Stack collected data and meta correct way if data.sequence_axis == 0: X = numpy.vstack(X) Y = numpy.vstack(Y) elif data.sequence_axis == 1: X = numpy.hstack(X) Y = numpy.hstack(Y) elif data.sequence_axis == 2: X = numpy.dstack(X) Y = numpy.dstack(Y) # Get data item size data_size = { 'data': X.shape[data.data_axis], 'time': X.shape[data.time_axis], 'sequence': X.shape[data.sequence_axis], } data_axis = [None] * 3 data_axis[data.data_axis] = 'data' data_axis[data.time_axis] = 'time' data_axis[data.sequence_axis] = 'sequence' if channel_dimension: # Add channel dimension to the data if channel_dimension == 'channels_first': X = numpy.expand_dims(X, axis=1) data_axis.insert(1, 'channel') elif channel_dimension == 'channels_last': X = numpy.expand_dims(X, axis=3) data_axis.insert(3, 'channel') if verbose: data_shape = data.shape data_axis_ = { 'time_axis': data.time_axis, 'data_axis': data.data_axis } if hasattr(data, 'sequence_axis'): data_axis_['sequence_axis'] = data.sequence_axis meta_shape = meta.shape meta_axis = { 'time_axis': meta.time_axis, 'data_axis': meta.data_axis } if hasattr(meta, 'sequence_axis'): meta_axis['sequence_axis'] = meta.sequence_axis logger = FancyLogger() # Data information logger.line('Data', indent=print_indent) # Matrix logger.data( field='Matrix shape', value=X.shape, indent=print_indent + 2 ) # Item logger.data( field='Item shape', value=data_shape, indent=print_indent + 2 ) logger.data( field='Time', value=data_shape[data_axis_['time_axis']], indent=print_indent + 4 ) logger.data( field='Data', value=data_shape[data_axis_['data_axis']], indent=print_indent + 4 ) if 'sequence_axis' in data_axis_: logger.data( field='Sequence', value=data_shape[data_axis_['sequence_axis']], indent=print_indent + 4 ) # Meta information logger.line('Meta', indent=print_indent) # Matrix logger.data( field='Matrix shape', value=Y.shape, indent=print_indent + 2 ) # Item logger.data( field='Item shape', value=meta_shape, indent=print_indent + 2 ) logger.data( field='Time', value=meta_shape[meta_axis['time_axis']], indent=print_indent + 4 ) logger.data( field='Data', value=meta_shape[meta_axis['data_axis']], indent=print_indent + 4 ) if 'sequence_axis' in meta_axis: logger.data( field='Sequence', value=meta_shape[meta_axis['sequence_axis']], indent=print_indent + 4 ) return X, Y, data_size