Source code for dcase_util.processors.audio

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function, absolute_import
import copy
import numpy
from dcase_util.containers import AudioContainer
from dcase_util.processors import Processor, ProcessingChainItemType, ProcessingChain, SequencingProcessor
from dcase_util.data import Sequencer

[docs]class AudioReadingProcessor(Processor): input_type = ProcessingChainItemType.NONE #: Input data type output_type = ProcessingChainItemType.AUDIO #: Output data type
[docs] def __init__(self, data=None, fs=44100, focus_start_samples=None, focus_stop_samples=None, focus_channel=None, mono=False, **kwargs): """Constructor Parameters ---------- data : DataContainer Data to initialize the container fs : int Target sampling rate when reading audio focus_start_samples : int Sample id of the focus segment start focus_stop_samples : int Sample id of the focus segment stop focus_channel : int or str Focus segment channel mono : bool Mixdown multi-channel audio in during the reading stage. """ # Inject initialization parameters back to kwargs kwargs.update( { 'data': data, 'fs': fs, 'focus_start_samples': focus_start_samples, 'focus_stop_samples': focus_stop_samples, 'focus_channel': focus_channel, 'mono': mono } ) # Run super init to call init of mixins too super(AudioReadingProcessor, self).__init__(**kwargs)
[docs] def process(self, data=None, filename=None, focus_start_samples=None, focus_stop_samples=None, focus_duration_samples=None, focus_start_seconds=None, focus_stop_seconds=None, focus_duration_seconds=None, focus_channel=None, store_processing_chain=False, **kwargs): """Audio reading Parameters ---------- data : filename : str Filename focus_start_samples : int Sample index of focus segment start focus_stop_samples : int Sample index of focus segment stop focus_duration_samples : int Sample count of focus segment focus_start_seconds : float Time stamp (in seconds) of focus segment start focus_stop_seconds : float Time stamp (in seconds) of focus segment stop focus_duration_seconds : float Duration (in seconds) of focus segment focus_channel : int or str Audio channel id or name to focus. In case of stereo signal, valid channel labels to select single channel are 'L', 'R', 'left', and 'right' or 0, 1, and to get mixed down version of all channels 'mixdown'. store_processing_chain : bool Store processing chain to data container returned Default value False Returns ------- AudioContainer """ if data is None and self.input_type == ProcessingChainItemType.NONE: audio_container = AudioContainer(**self.init_parameters) if filename: audio_container.load( filename=filename, mono=self.init_parameters.get('mono') ) # Set focus segment and channel audio_container.set_focus( start=focus_start_samples, stop=focus_stop_samples, duration=focus_duration_samples, start_seconds=focus_start_seconds, stop_seconds=focus_stop_seconds, duration_seconds=focus_duration_seconds, channel=focus_channel ) if store_processing_chain: processing_chain_item = self.get_processing_chain_item() if 'process_parameters' not in processing_chain_item: processing_chain_item['process_parameters'] = {} processing_chain_item['process_parameters']['filename'] = filename processing_chain_item['process_parameters']['focus_start_samples'] = focus_start_samples processing_chain_item['process_parameters']['focus_stop_samples'] = focus_stop_samples processing_chain_item['process_parameters']['focus_duration_samples'] = focus_duration_samples processing_chain_item['process_parameters']['focus_start_seconds'] = focus_start_seconds processing_chain_item['process_parameters']['focus_stop_seconds'] = focus_stop_seconds processing_chain_item['process_parameters']['focus_duration_seconds'] = focus_duration_seconds processing_chain_item['process_parameters']['focus_channel'] = focus_channel # Push chain item into processing chain stored in the container # Create processing chain to be stored in the container, and push chain item into it if hasattr(audio_container, 'processing_chain'): audio_container.processing_chain.push_processor(**processing_chain_item) else: audio_container.processing_chain = ProcessingChain().push_processor(**processing_chain_item) return audio_container else: message = '{name}: Wrong input data type, type required [{input_type}].'.format( name=self.__class__.__name__, input_type=self.input_type) self.logger.exception(message) raise ValueError(message)
[docs]class MonoAudioReadingProcessor(AudioReadingProcessor): input_type = ProcessingChainItemType.NONE #: Input data type output_type = ProcessingChainItemType.AUDIO #: Output data type
[docs] def __init__(self, data=None, fs=44100, focus_start_samples=None, focus_stop_samples=None, focus_channel=None, **kwargs): """Constructor Parameters ---------- data : DataContainer Data to initialize the container fs : int Target sampling rate when reading audio focus_start_samples : int Sample id of the focus segment start focus_stop_samples : int Sample id of the focus segment stop focus_channel : int or str Focus segment channel mono : bool Mixdown multi-channel audio in during the reading stage. """ kwargs.update( { 'data': data, 'fs': fs, 'focus_start_samples': focus_start_samples, 'focus_stop_samples': focus_stop_samples, 'focus_channel': focus_channel, 'mono': True } ) # Run AudioReadingProcessor init AudioReadingProcessor.__init__(self, **kwargs) # Run super init to call init of mixins too super(MonoAudioReadingProcessor, self).__init__(**kwargs)
[docs]class AudioWritingProcessor(Processor): input_type = ProcessingChainItemType.AUDIO #: Input data type output_type = ProcessingChainItemType.NONE #: Output data type
[docs] def __init__(self, *args, **kwargs): """Constructor""" # Run super init to call init of mixins too super(AudioWritingProcessor, self).__init__(*args, **kwargs)
[docs] def process(self, data=None, output_filename=None, bit_depth=16, bit_rate=None, **kwargs): """Audio writing Parameters ---------- data : output_filename : str Filename bit_depth : int, optional Bit depth for audio. Default value 16 bit_rate : int, optional Bit rate for compressed audio formats. Default value None Returns ------- AudioContainer """ if data and isinstance(data, AudioContainer): audio_container = copy.deepcopy(data) if output_filename: audio_container.save( filename=output_filename, bit_depth=bit_depth, bit_rate=bit_rate ) return audio_container else: message = '{name}: Wrong input data type, type required [{input_type}].'.format( name=self.__class__.__name__, input_type=self.input_type ) self.logger.exception(message) raise ValueError(message)
[docs]class MonoAudioWritingProcessor(Processor): input_type = ProcessingChainItemType.AUDIO #: Input data type output_type = ProcessingChainItemType.NONE #: Output data type
[docs] def __init__(self, *args, **kwargs): """Constructor""" # Run super init to call init of mixins too super(MonoAudioWritingProcessor, self).__init__(*args, **kwargs)
[docs] def process(self, data=None, output_filename=None, bit_depth=16, bit_rate=None, **kwargs): """Audio writing Parameters ---------- data : output_filename : str Filename bit_depth : int, optional Bit depth for audio. Default value 16 bit_rate : int, optional Bit rate for compressed audio formats. Default value None Returns ------- AudioContainer """ if data and isinstance(data, AudioContainer): audio_container = copy.deepcopy(data) audio_container.mixdown() if output_filename: audio_container.save( filename=output_filename, bit_depth=bit_depth, bit_rate=bit_rate ) return audio_container else: message = '{name}: Wrong input data type, type required [{input_type}].'.format( name=self.__class__.__name__, input_type=self.input_type ) self.logger.exception(message) raise ValueError(message)
class AudioSequencingProcessor(SequencingProcessor): """Frame blocking processor""" input_type = ProcessingChainItemType.AUDIO #: Input data type output_type = ProcessingChainItemType.DATA_CONTAINER #: Output data type def __init__(self, sequence_length=44100, hop_length=None, padding=None, shift_border='roll', shift=0, required_data_amount_per_segment=0.9, **kwargs): """__init__ method. Parameters ---------- sequence_length : int Sequence length Default value 44100 hop_length : int Hop value of when forming the sequence, if None then hop length equals to sequence_length (non-overlapping sequences). Default value None padding: str How data is treated at the boundaries [None, 'zero', 'repeat'] Default value None shift_border : string, ['roll', 'shift'] Sequence border handling when doing temporal shifting. Default value roll shift : int Sequencing grid shift. Default value 0 required_data_amount_per_segment : float [0,1] Percentage of valid data items per segment there need to be for valid segment. Use this parameter to filter out part of the non-full segments. Default value 0.9 """ # Inject initialization parameters back to kwargs kwargs.update( { 'sequence_length': sequence_length, 'hop_length': hop_length, 'padding': padding, 'shift': shift, 'shift_border': shift_border, 'required_data_amount_per_segment': required_data_amount_per_segment } ) # Run super init to call init of mixins too super(AudioSequencingProcessor, self).__init__(**kwargs) self.sequencer = Sequencer(**self.init_parameters) def process(self, data=None, store_processing_chain=False, **kwargs): """Process Parameters ---------- data : DataContainer Data store_processing_chain : bool Store processing chain to data container returned Default value False Returns ------- DataMatrix3DContainer """ from dcase_util.containers import AudioContainer, DataMatrix2DContainer if isinstance(data, AudioContainer): audio_data = data.data if data.channels == 1: audio_data = audio_data[numpy.newaxis, :] # Do processing container = self.sequencer.sequence( data=DataMatrix2DContainer(audio_data, time_resolution=1/float(data.fs)), **kwargs ) if store_processing_chain: # Get processing chain item processing_chain_item = self.get_processing_chain_item() # Update current processing parameters into chain item processing_chain_item.update({ 'process_parameters': kwargs }) # Push chain item into processing chain stored in the container container.processing_chain.push_processor(**processing_chain_item) return container else: message = '{name}: Wrong input data type, type required [{input_type}].'.format( name=self.__class__.__name__, input_type=self.input_type) self.logger.exception(message) raise ValueError(message)