Source code for dcase_util.data.manipulators

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function, absolute_import
import six
from six import iteritems
import numpy
import scipy
import copy
import os
import glob
from past.builtins import basestring

from dcase_util.containers import ObjectContainer
from dcase_util.ui import FancyStringifier
from dcase_util.utils import VectorRecipeParser, filelist_exists


[docs]class Normalizer(ObjectContainer): """Data normalizer to accumulate data statistics"""
[docs] def __init__(self, n=None, s1=None, s2=None, mean=None, std=None, **kwargs): """__init__ method. Parameters ---------- n : int Item count used to calculate statistics Default value None s1 : np.array [shape=(vector_length,)] Vector-wise sum of the data seen by the Normalizer Default value None s2 : np.array [shape=(vector_length,)] Vector-wise sum^2 of the data seen by the Normalizer Default value None mean : np.ndarray() [shape=(vector_length, 1)] Mean of the data Default value None std : np.ndarray() [shape=(vector_length, 1)] Standard deviation of the data Default value None """ # Run super init to call init of mixins too super(Normalizer, self).__init__(**kwargs) self.n = n self.s1 = s1 self.s2 = s2 self._mean = mean self._std = std # Make sure mean and std are numpy.array if isinstance(self._mean, list): self._mean = numpy.array(self._mean) if isinstance(self._std, list): self._std = numpy.array(self._std) # Make sure mean and std has correct shape if isinstance(self._mean, numpy.ndarray) and len(self._mean.shape) == 1: self._mean = self._mean.reshape((-1, 1)) if isinstance(self._std, numpy.ndarray) and len(self._std.shape) == 1: self._std = self._std.reshape((-1, 1))
def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = super(Normalizer, self).to_string(ui=ui, indent=indent) output += ui.data(field='Mean', value=self.mean, indent=indent) + '\n' output += ui.data(field='Std', value=self.std, indent=indent) + '\n' output += ui.data(field='n', value=self.n, indent=indent) + '\n' return output def __getstate__(self): d = super(ObjectContainer, self).__getstate__() d.update( { 'n': self.n, 's1': self.s1, 's2': self.s2, '_mean': self._mean, '_std': self._std, } ) return d def __setstate__(self, d): super(ObjectContainer, self).__setstate__(d) self.n = d['n'] self.s1 = d['s1'] self.s2 = d['s2'] self._mean = d['_mean'] self._std = d['_std'] @property def mean(self): """Mean vector Returns ------- numpy.array(vector_length, 1) """ if self._mean is None: if self.s1 is not None and self.n is not None: self._mean = (self.s1 / self.n).reshape(-1, 1) else: self._mean = None return self._mean @mean.setter def mean(self, value): self._mean = value @property def std(self): """Standard deviation vector Returns ------- numpy.array(vector_length, 1) """ if self._std is None: if self.s1 is not None and self.s2 is not None and self.n is not None: self._std = numpy.sqrt((self.n * self.s2 - (self.s1 * self.s1)) / (self.n * (self.n - 1))).reshape(-1, 1) else: self._std = None return self._std @std.setter def std(self, value): self._std = value def __enter__(self): self.reset() return self def __exit__(self, type, value, traceback): # Finalize accumulated calculation self.finalize() def __call__(self, *args, **kwargs): return self.normalize(*args, **kwargs)
[docs] def reset(self): """Reset internal variables. """ self.n = None self.s1 = None self.s2 = None self.mean = None self.std = None
[docs] def accumulate(self, data, time_axis=1): """Accumulate statistics Parameters ---------- data : FeatureContainer or np.ndarray Data in FeatureContainer or in np.ndarray time_axis : int If data contains np.ndarray axis for the time Default value 1 Returns ------- self """ from dcase_util.containers import FeatureContainer stats = None if isinstance(data, FeatureContainer): stats = data.stats elif isinstance(data, numpy.ndarray): stats = { 'mean': numpy.mean(data, axis=time_axis), 'std': numpy.std(data, axis=time_axis), 'n': data.shape[time_axis], 's1': numpy.sum(data, axis=time_axis), 's2': numpy.sum(data ** 2, axis=time_axis), } if stats: if self.n is None: self.n = stats['n'] else: self.n += stats['n'] if self.s1 is None: self.s1 = stats['s1'] else: self.s1 += stats['s1'] if self.s2 is None: self.s2 = stats['s2'] else: self.s2 += stats['s2'] return self
[docs] def finalize(self): """Finalize statistics calculation Accumulated values are used to get mean and std for the seen data. Parameters ---------- Returns ------- self """ self._mean = (self.s1 / self.n).reshape(-1, 1) self._std = numpy.sqrt((self.n * self.s2 - (self.s1 * self.s1)) / (self.n * (self.n - 1))).reshape(-1, 1) return self
[docs] def normalize(self, data, **kwargs): """Normalize data matrix with internal statistics of the class. Parameters ---------- data : DataContainer or np.ndarray DataContainer or np.ndarray to be normalized Returns ------- DataContainer or numpy.ndarray [shape=(frames, number of data values)] Normalized data matrix """ from dcase_util.containers import DataContainer # Make copy of data to prevent data contamination data = copy.deepcopy(data) if isinstance(data, DataContainer): data.data = (data.data - self.mean) / self.std return data elif isinstance(data, numpy.ndarray): return (data - self.mean) / self.std
def plot(self, plot=True, figsize=None): """Visualize normalization factors. Parameters ---------- plot : bool If true, figure is shown automatically. Set to False if collecting multiple plots into same figure outside this method. Default value True figsize : tuple Size of the figure. If None given, default size (10,5) is used. Default value None Returns ------- self """ if figsize is None: figsize = (10, 5) import matplotlib.pyplot as plt if plot: plt.figure(figsize=figsize) # Add filename to first subplot if hasattr(self, 'filename') and self.filename: plt.title(self.filename) upper = (self.mean + self.std).reshape(-1) lower = (self.mean - self.std).reshape(-1) plt.fill_between(range(self.mean.shape[0]), lower, upper, facecolor='grey', alpha=0.1, step='mid') plt.errorbar( x=range(self.mean.shape[0]), y=self.mean.reshape(-1), yerr=self.std.reshape(-1), linestyle='None', marker='s', elinewidth=0.5, capsize=4, capthick=1, color='black', ) plt.ylabel('Value') plt.xlabel('Index') plt.tight_layout() if plot: plt.show()
[docs]class RepositoryNormalizer(ObjectContainer): """Data repository normalizer"""
[docs] def __init__(self, normalizers=None, filename=None, **kwargs): """__init__ method. Parameters ---------- normalizers : dict Normalizers in a dict to initialize the repository, key in the dictionary should be the label Default value None filename : str or dict Either one filename (str) or multiple filenames in a dictionary. Dictionary based parameter is used to construct the repository from separate Normalizer, format: label as key, and filename as value. Default value None """ # Run super init to call init of mixins too super(RepositoryNormalizer, self).__init__(**kwargs) self.normalizers = {} if normalizers: if isinstance(normalizers, dict): self.normalizers = normalizers else: message = '{name}: Invalid type for normalizer_dict'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise ValueError(message) self.filename = filename
def __enter__(self): self.reset() return self def __exit__(self, type, value, traceback): # Finalize accumulated calculation self.finalize() def __getitem__(self, label): return self.normalizers[label] def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = super(RepositoryNormalizer, self).to_string(ui=ui, indent=indent) output += ui.data( indent=indent + 2, field='Labels', value=list(self.normalizers.keys()) ) + '\n' output += ui.line(field='Content') + '\n' for label, label_data in iteritems(self.normalizers): if label_data: output += ui.data( indent=indent + 2, field='['+str(label)+']', value=label_data ) + '\n' output += '\n' return output def __call__(self, *args, **kwargs): return self.normalize(*args, **kwargs) def reset(self): """Reset normalizers. """ for label in self.normalizers: self.normalizers[label].reset()
[docs] def load(self, filename, collect_from_containers=True): """Load normalizers from disk. Parameters ---------- filename : str or dict Either one filename (str) or multiple filenames in a dictionary. Dictionary based parameter is used to construct the repository from separate Normalizer, format: label as key, and filename as value. If None given, parameter given to class initializer is used instead. Default value None collect_from_containers : bool Collect data to the repository from separate containers. Default value True Returns ------- self """ if filename: self.filename = filename if isinstance(self.filename, basestring): # String filename given use load method from parent class if os.path.exists(self.filename): # If file exist load it self.detect_file_format() self.validate_format() super(RepositoryNormalizer, self).load( filename=self.filename ) if collect_from_containers: # Collect data to the repository from separate normalizer containers filename_base, file_extension = os.path.splitext(self.filename) normalizers = glob.glob(filename_base + '.*' + file_extension) for filename in normalizers: label = os.path.splitext(filename)[0].split('.')[-1] self.normalizers[label] = Normalizer().load( filename=filename ) elif isinstance(self.filename, dict): sorted(self.filename) # Dictionary based filename given if filelist_exists(self.filename): self.normalizers = {} for label, data in iteritems(self.filename): if not label.startswith('_'): # Skip labels starting with '_', those are just for extra info if isinstance(data, basestring): # filename given directly, only one data stream per method inputted. self.normalizers[label] = Normalizer().load( filename=data ) else: # All filenames did not exists, find which ones is missing and raise error. for label, data in iteritems(self.filename): if isinstance(data, basestring) and not os.path.isfile(data): message = '{name}: RepositoryNormalizer cannot be loaded, file does not exists for method [{method}], file [{filename}]'.format( name=self.__class__.__name__, method=label, filename=data ) self.logger.exception(message) raise IOError(message) else: message = '{name}: RepositoryNormalizer cannot be loaded, no valid filename set.'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise IOError(message) return self
def save(self, filename=None, split_into_containers=False): """Save file Parameters ---------- filename : str or dict File path Default value filename given to class constructor split_into_containers : bool Split data from repository separate normalizer containers and save them individually. Default value False Raises ------ ImportError: Error if file format specific module cannot be imported IOError: File has unknown file format Returns ------- self """ if filename: self.filename = filename if split_into_containers and isinstance(self.filename, basestring): # Automatic filename generation for saving data into separate containers filename_base, file_extension = os.path.splitext(self.filename) filename_dictionary = {} for label in self.normalizers: if label not in filename_dictionary: filename_dictionary[label] = {} filename_dictionary[label] = filename_base + '.' + label + file_extension self.filename = filename_dictionary if isinstance(self.filename, basestring): # Single output file as target self.detect_file_format() self.validate_format() # String filename given use load method from parent class super(RepositoryNormalizer, self).save( filename=self.filename ) elif isinstance(self.filename, dict): # Custom naming and splitting into separate normalizer containers sorted(self.filename) # Dictionary of filenames given, save each data normalizer in the repository separately for label in self.normalizers: if label in self.filename: current_normalizer = self.normalizers[label] current_normalizer.save( filename=self.filename[label] ) return self def accumulate(self, data): """Accumulate statistics Parameters ---------- data : DataRepository Data in DataRepository Returns ------- self """ for label_id, label in enumerate(data.labels): if label not in self.normalizers: # Label not yet encountered, initialize new normalizer for it self.normalizers[label] = Normalizer() # Loop through streams for stream_id in data.stream_ids(label=label): self.normalizers[label].accumulate( data=data.get_container( label=label, stream_id=stream_id ) ) return self def finalize(self): """Finalize statistics calculation Accumulated values are used to get mean and std for the seen data. Parameters ---------- Returns ------- self """ for label in self.normalizers: self.normalizers[label].finalize() return self
[docs] def normalize(self, data, **kwargs): """Normalize data repository Parameters ---------- data : DataRepository DataRepository to be normalized Returns ------- DataRepository """ from dcase_util.containers import DataRepository # Make copy of data to prevent data contamination data = copy.deepcopy(data) if isinstance(data, DataRepository): for label_id, label in enumerate(data.labels): if label in self.normalizers: for stream_id in data.stream_ids(label=label): data.set_container( label=label, stream_id=stream_id, container=self.normalizers[label].normalize( data=data.get_container( label=label, stream_id=stream_id ) ) ) return data
[docs]class Aggregator(ObjectContainer): """Data aggregator""" valid_method = ['mean', 'std', 'cov', 'kurtosis', 'skew', 'flatten']
[docs] def __init__(self, win_length_frames=10, hop_length_frames=1, recipe=None, center=True, padding=True, **kwargs): """Constructor Parameters ---------- recipe : list of dict or list of str Aggregation recipe, supported methods [mean, std, cov, kurtosis, skew, flatten]. Default value None win_length_frames : int Window length in data frames Default value 10 hop_length_frames : int Hop length in data frames Default value 1 center : bool Centering of the window Default value True padding : bool Padding of the first window with the first frame and last window with last frame to have equal length data in the windows. Default value True """ # Run super init to call init of mixins too super(Aggregator, self).__init__(**kwargs) self.win_length_frames = win_length_frames self.hop_length_frames = hop_length_frames self.center = center self.padding = padding if recipe is None and kwargs.get('aggregation_recipe', None) is not None: recipe = kwargs.get('aggregation_recipe', None) if isinstance(recipe, dict): self.recipe = [d['label'] for d in recipe] elif isinstance(recipe, list): recipe = recipe if isinstance(recipe[0], dict): self.recipe = [d['label'] for d in recipe] else: self.recipe = recipe elif isinstance(recipe, six.string_types): recipe = VectorRecipeParser().parse(recipe=recipe) self.recipe = [d['label'] for d in recipe] else: message = '{name}: No valid recipe set'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise ValueError(message)
def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = super(Aggregator, self).to_string(ui=ui, indent=indent) output += ui.data(field='win_length_frames', value=self.win_length_frames, indent=indent) + '\n' output += ui.data(field='hop_length_frames', value=self.hop_length_frames, indent=indent) + '\n' output += ui.data(field='recipe', value=self.recipe, indent=indent) + '\n' return output def __getstate__(self): # Return only needed data for pickle return { 'recipe': self.recipe, 'win_length_frames': self.win_length_frames, 'hop_length_frames': self.hop_length_frames, } def __setstate__(self, d): self.recipe = d['recipe'] self.win_length_frames = d['win_length_frames'] self.hop_length_frames = d['hop_length_frames'] def __call__(self, *args, **kwargs): return self.aggregate(*args, **kwargs)
[docs] def aggregate(self, data=None, **kwargs): """Aggregate data Parameters ---------- data : DataContainer Features to be aggregated Default value None Returns ------- DataContainer """ from dcase_util.containers import DataContainer # Make copy of the data to prevent modifications to the original data data = copy.deepcopy(data) if isinstance(data, DataContainer): aggregated_data = [] # Not the most efficient way as numpy stride_tricks would produce # faster code, however, opted for cleaner presentation this time. for frame in range(0, data.data.shape[data.time_axis], self.hop_length_frames): # Get start and end of the window if self.center: # Keep frame at the middle (approximately) start_frame = int(frame - numpy.floor(self.win_length_frames / 2.0)) end_frame = int(frame + numpy.ceil(self.win_length_frames / 2.0)) else: start_frame = frame end_frame = frame + self.win_length_frames frame_ids = numpy.array(range(start_frame, end_frame)) valid_frame = True if self.padding: # If start of data matrix, pad with first frame frame_ids[frame_ids < 0] = 0 # If end of the data matrix, pad with last frame frame_ids[frame_ids > data.data.shape[data.time_axis] - 1] = data.data.shape[data.time_axis] - 1 else: # Mark non-full windows invalid if numpy.any(frame_ids < 0) or numpy.any( frame_ids > data.data.shape[data.time_axis] - 1): valid_frame = False if valid_frame: current_frame = data.get_frames(frame_ids=frame_ids) aggregated_frame = [] if 'mean' in self.recipe: aggregated_frame.append(current_frame.mean(axis=data.time_axis)) if 'std' in self.recipe: aggregated_frame.append(current_frame.std(axis=data.time_axis)) if 'cov' in self.recipe: aggregated_frame.append(numpy.cov(current_frame).flatten()) if 'kurtosis' in self.recipe: aggregated_frame.append(scipy.stats.kurtosis(current_frame, axis=data.time_axis)) if 'skew' in self.recipe: aggregated_frame.append(scipy.stats.skew(current_frame, axis=data.time_axis)) if 'flatten' in self.recipe: if data.time_axis == 0: aggregated_frame.append(current_frame.flatten()) elif data.time_axis == 1: aggregated_frame.append(current_frame.T.flatten().T) if aggregated_frame: aggregated_data.append(numpy.concatenate(aggregated_frame)) if aggregated_data: # Update data data.data = numpy.vstack(aggregated_data).T else: message = '{name}: No aggregated data, check your aggregation recipe.'.format( name=self.__class__.__name__, ) self.logger.exception(message) raise ValueError(message) # Update meta data if hasattr(data, 'hop_length_seconds') and data.hop_length_seconds is not None: data.hop_length_seconds = self.hop_length_frames * data.hop_length_seconds return data else: message = '{name}: Unknown data container type.'.format( name=self.__class__.__name__, ) self.logger.exception(message) raise ValueError(message)
[docs]class Sequencer(ObjectContainer): """Data sequencer"""
[docs] def __init__(self, sequence_length=10, hop_length=None, padding=None, shift_border='roll', shift=0, required_data_amount_per_segment=0.9, **kwargs): """__init__ method. Parameters ---------- sequence_length : int Sequence length Default value 10 hop_length : int Hop value of when forming the sequence, if None then hop length equals to sequence_length (non-overlapping sequences). Default value None padding: str How data is treated at the boundaries [None, 'zero', 'repeat'] Default value None shift_border : string, ['roll', 'shift'] Sequence border handling when doing temporal shifting. Default value roll shift : int Sequencing grid shift. Default value 0 required_data_amount_per_segment : float [0,1] Percentage of valid data items per segment there need to be for valid segment. Use this parameter to filter out part of the non-full segments. Default value 0.9 """ # Run super init to call init of mixins too super(Sequencer, self).__init__(**kwargs) self.sequence_length = sequence_length if hop_length is None: self.hop_length = self.sequence_length else: self.hop_length = hop_length # Padding if padding in [None, False, 'zero', 'repeat']: self.padding = padding else: message = '{name}: Unknown padding mode [{padding_mode}]'.format( name=self.__class__.__name__, padding_mode=padding ) self.logger.exception(message) raise ValueError(message) # Shifting self.shift = shift if shift_border in ['roll', 'shift']: self.shift_border = shift_border else: message = '{name}: Unknown temporal shifting border handling [{border_mode}]'.format( name=self.__class__.__name__, border_mode=self.shift_border ) self.logger.exception(message) raise ValueError(message) self.required_data_amount_per_segment = required_data_amount_per_segment
def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = super(Sequencer, self).to_string(ui=ui, indent=indent) output += ui.data(field='frames', value=self.sequence_length, indent=indent) + '\n' output += ui.data(field='hop_length_frames', value=self.hop_length, indent=indent) + '\n' output += ui.data(field='padding', value=self.padding, indent=indent) + '\n' output += ui.data(field='required_data_amount_per_segment', value=self.required_data_amount_per_segment, indent=indent) + '\n' output += ui.line(field='Shifting', indent=indent) + '\n' output += ui.data(indent=indent + 2, field='shift', value=self.shift) + '\n' output += ui.data(indent=indent + 2, field='shift_border', value=self.shift_border) + '\n' return output def __getstate__(self): # Return only needed data for pickle return { 'frames': self.sequence_length, 'hop_length_frames': self.hop_length, 'padding': self.padding, 'shift': self.shift, 'shift_border': self.shift_border, 'required_data_amount_per_segment': self.required_data_amount_per_segment, } def __setstate__(self, d): self.sequence_length = d['frames'] self.hop_length = d['hop_length_frames'] self.padding = d['padding'] self.shift = d['shift'] self.shift_border = d['shift_border'] self.required_data_amount_per_segment = d['required_data_amount_per_segment'] def __call__(self, *args, **kwargs): return self.sequence(*args, **kwargs)
[docs] def sequence(self, data, shift=None, **kwargs): """Convert 2D data matrix into sequence of specified length 2D matrices Parameters ---------- data : DataContainer or numpy.ndarray Data shift : int Sequencing grid shift in frames. If none given, one given for class initializer is used. Value is kept inside data size. Parameter value is stored as new class stored value. Default value None Returns ------- DataMatrix3DContainer """ if shift: self.shift = shift from dcase_util.containers import DataContainer, DataMatrix2DContainer, DataMatrix3DContainer # Make copy of the data to prevent modifications to the original data data = copy.deepcopy(data) if isinstance(data, numpy.ndarray): if len(data.shape) == 2: data = DataMatrix2DContainer(data) if isinstance(data, DataContainer): # Make sure shift index is withing data self.shift = self.shift % data.length # Not the most efficient way as numpy stride_tricks would produce # faster code, however, opted for cleaner presentation this time. processed_data = [] if self.shift_border == 'shift': segment_indexes = numpy.arange(self.shift, data.length, self.hop_length) elif self.shift_border == 'roll': segment_indexes = numpy.arange(0, data.length, self.hop_length) if self.shift != 0: # Roll data data.data = numpy.roll( data.data, shift=-self.shift, axis=data.time_axis ) else: message = '{name}: Unknown type for sequence border handling when doing temporal shifting ' \ '[{shift_border}].'.format( name=self.__class__.__name__, shift_border=self.shift_border ) self.logger.exception(message) raise ValueError(message) if self.padding: if len(segment_indexes) == 0: # Have at least one segment segment_indexes = numpy.array([0]) else: # Remove segments which are not full segment_indexes = segment_indexes[(segment_indexes + self.sequence_length - 1) < data.length] for segment_start_frame in segment_indexes: segment_end_frame = segment_start_frame + self.sequence_length frame_ids = numpy.array(range(segment_start_frame, segment_end_frame)) valid_frames = numpy.where(numpy.logical_and(frame_ids >= 0, frame_ids < data.length))[0] if len(valid_frames) / float(self.sequence_length) > self.required_data_amount_per_segment: # Process segment only if it has minimum about of valid frames if self.padding == 'repeat': # Handle boundaries with repeated boundary vectors # If start of matrix, pad with first frame frame_ids[frame_ids < 0] = 0 # If end of the matrix, pad with last frame frame_ids[frame_ids > data.length - 1] = data.length - 1 # Append the segment processed_data.append( data.get_frames( frame_ids=frame_ids ) ) elif self.padding == 'zero': # Handle boundaries with zero padding # Initialize current segment with zero content current_segment = numpy.zeros((data.vector_length, self.sequence_length)) # Copy data into correct position within the segment current_segment[:, valid_frames] = data.get_frames( frame_ids=frame_ids[valid_frames] ) # Append the segment processed_data.append(current_segment) else: # Append the segment processed_data.append(data.get_frames(frame_ids=frame_ids)) if len(processed_data) == 0: message = '{name}: Cannot create valid segment, adjust segment length and hop size, or use ' \ 'padding flag. (Data length was {length})'.format( name=self.__class__.__name__, length=data.length ) self.logger.exception(message) raise IOError(message) return DataMatrix3DContainer( data=numpy.moveaxis(numpy.array(processed_data), 0, 2), time_resolution=None, processing_chain=data.processing_chain ) else: message = '{name}: Unknown data container type.'.format( name=self.__class__.__name__, ) self.logger.exception(message) raise ValueError(message)
[docs] def increase_shifting(self, shift_step=1): """Increase temporal shifting Parameters ---------- shift_step : int Amount to be added to the sequencing grid Default value 1 Returns ------- self """ if shift_step: self.shift += shift_step return self
[docs]class Stacker(ObjectContainer): """Data stacker"""
[docs] def __init__(self, recipe=None, hop=1, **kwargs): """Constructor Parameters ---------- recipe : dict or str Stacking recipe Default value None hop : int, optional Data item hopping Default value 1 """ # Run super init to call init of mixins too super(Stacker, self).__init__(**kwargs) if isinstance(recipe, str): from dcase_util.utils import VectorRecipeParser self.recipe = VectorRecipeParser().parse(recipe=recipe) elif isinstance(recipe, list): self.recipe = recipe else: message = '{name}: No recipe set'.format( name=self.__class__.__name__ ) self.logger.exception(message) raise ValueError(message) self.hop = hop
def to_string(self, ui=None, indent=0): """Get container information in a string Parameters ---------- ui : FancyStringifier or FancyHTMLStringifier Stringifier class Default value FancyStringifier indent : int Amount of indention used Default value 0 Returns ------- str """ if ui is None: ui = FancyStringifier() output = super(Stacker, self).to_string(ui=ui, indent=indent) output += ui.data(field='recipe', value=self.recipe, indent=indent) + '\n' output += ui.data(field='hop', value=self.hop, indent=indent) + '\n' return output def __getstate__(self): return { 'recipe': self.recipe, 'hop': self.hop, } def __setstate__(self, d): self.recipe = d['recipe'] self.hop = d['hop'] def __call__(self, *args, **kwargs): return self.stack(*args, **kwargs)
[docs] def stack(self, repository, **kwargs): """Vector creation based on recipe Parameters ---------- repository : RepositoryContainer Repository with needed data Returns ------- FeatureContainer """ # Check that all data matrices have same amount of frames frame_count = [] time_resolution = [] for recipe_part in self.recipe: label = recipe_part['label'] stream_id = 0 # Default value if 'vector-index' in recipe_part: stream_id = recipe_part['vector-index']['stream'] if repository.get_container(label=label, stream_id=stream_id).time_resolution: time_resolution.append( repository.get_container( label=label, stream_id=stream_id ).time_resolution ) frame_count.append( repository.get_container( label=label, stream_id=stream_id ).length ) if len(set(frame_count)) != 1: message = '{name}: Data matrices should have same number of frames {frame_count}'.format( name=self.__class__.__name__, frame_count=frame_count, ) self.logger.exception(message) raise AssertionError(message) if len(set(time_resolution)) != 1: message = '{name}: Data matrices should have same time resolution {time_resolution}'.format( name=self.__class__.__name__, time_resolution=time_resolution, ) self.logger.exception(message) raise AssertionError(message) # Stack data data_matrix = [] for recipe_part in self.recipe: label = recipe_part['label'] # Default values stream_id = 0 if 'vector-index' in recipe_part: stream_id = recipe_part['vector-index']['stream'] if ('vector-index' not in recipe_part or ('vector-index' in recipe_part and 'full' in recipe_part['vector-index'] and recipe_part['vector-index']['full'])): # Full matrix data_matrix.append( repository.get_container( label=label, stream_id=stream_id ).get_frames( frame_hop=self.hop ) ) elif ('vector-index' in recipe_part and 'vector' in recipe_part['vector-index'] and 'selection' in recipe_part['vector-index'] and recipe_part['vector-index']['selection']): index = numpy.array(recipe_part['vector-index']['vector']) # Selector vector data_matrix.append( repository.get_container( label=label, stream_id=stream_id ).get_frames( vector_ids=index, frame_hop=self.hop ) ) elif ('vector-index' in recipe_part and 'start' in recipe_part['vector-index'] and 'stop' in recipe_part['vector-index']): index = numpy.arange(recipe_part['vector-index']['start'], recipe_part['vector-index']['stop']) # Start and end index data_matrix.append( repository.get_container( label=label, stream_id=stream_id ).get_frames( vector_ids=index, frame_hop=self.hop ) ) from dcase_util.containers import FeatureContainer return FeatureContainer( data=numpy.vstack(data_matrix), time_resolution=time_resolution[0], processing_chain=repository.processing_chain )
[docs]class Selector(ObjectContainer): """Data selector"""
[docs] def __init__(self, **kwargs): """Constructor Parameters ---------- time_resolution : float Hop length in seconds """ super(Selector, self).__init__(**kwargs) # Initialize selection mask events from dcase_util.containers import MetaDataContainer self.selection_events = MetaDataContainer()
def __getstate__(self): # Return only needed data for pickle return {} def __setstate__(self, d): from dcase_util.containers import MetaDataContainer self.selection_events = MetaDataContainer() def __call__(self, *args, **kwargs): return self.select(*args, **kwargs) def set_mask(self, mask_events): """Set masking events Parameters ---------- mask_events : list of MetaItems or MetaDataContainer Event list used for selecting """ self.selection_events = mask_events return self
[docs] def select(self, data, selection_events=None): """Selecting data repository with given events Parameters ---------- data : DataRepository Data repository to be masked. selection_events : list of MetaItems or MetaDataContainer Event list used for selecting Default value None Returns ------- DataRepository """ if selection_events is None: selection_events = self.selection_events for label in data.labels: for stream_id in data.stream_ids(label): current_container = data.get_container(label=label, stream_id=stream_id) selection_mask = numpy.zeros((current_container.length), dtype=bool) for select_event in selection_events: onset_frame = current_container._time_to_frame(select_event.onset, rounding_direction='floor') offset_frame = current_container._time_to_frame(select_event.offset, rounding_direction='ceil') if offset_frame > current_container.length: offset_frame = current_container.length selection_mask[onset_frame:offset_frame] = True current_container.data = current_container.get_frames(frame_ids=numpy.where(selection_mask == True)[0]) return data
[docs]class Masker(ObjectContainer): """Data masker"""
[docs] def __init__(self, **kwargs): """Constructor Parameters ---------- time_resolution : float Hop length in seconds """ super(Masker, self).__init__(**kwargs) # Initialize mask events from dcase_util.containers import MetaDataContainer self.mask_events = MetaDataContainer()
def __getstate__(self): # Return only needed data for pickle return {} def __setstate__(self, d): from dcase_util.containers import MetaDataContainer self.mask_events = MetaDataContainer() def __call__(self, *args, **kwargs): return self.mask(*args, **kwargs) def set_mask(self, mask_events): """Set masking events Parameters ---------- mask_events : list of MetaItems or MetaDataContainer Event list used for masking """ self.mask_events = mask_events return self
[docs] def mask(self, data, mask_events=None): """Masking data repository with given events Parameters ---------- data : DataRepository Data repository to be masked. mask_events : list of MetaItems or MetaDataContainer Event list used for masking Default value None Returns ------- DataRepository """ if mask_events is None: mask_events = self.mask_events for label in data.labels: for stream_id in data.stream_ids(label): current_container = data.get_container(label=label, stream_id=stream_id) removal_mask = numpy.ones((current_container.length), dtype=bool) for mask_event in mask_events: onset_frame = current_container._time_to_frame(mask_event.onset, rounding_direction='floor') offset_frame = current_container._time_to_frame(mask_event.offset, rounding_direction='ceil') if offset_frame > current_container.length: offset_frame = current_container.length removal_mask[onset_frame:offset_frame] = False current_container.data = current_container.get_frames(frame_ids=numpy.where(removal_mask == True)[0]) return data