Source code for dcase_util.containers.metadata
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
import six
import sys
import os
import copy
import numpy
import csv
import logging
import io
from past.builtins import basestring
from dcase_util.containers import ListDictContainer
from dcase_util.utils import posix_path, get_parameter_hash, FieldValidator, \
setup_logging, is_float, is_int, is_jupyter, FileFormat, get_audio_info
from dcase_util.ui import FancyStringifier, FancyHTMLStringifier
[docs]class MetaDataItem(dict):
"""Meta data item class, inherited from standard dict class."""
[docs] def __init__(self, *args, **kwargs):
"""Constructor
Parameters
----------
dict
"""
dict.__init__(self, *args)
# Compatibility with old field names used in DCASE baseline system implementations 2016 and 2017
if 'file' in self and 'filename' not in self:
self['filename'] = self['file']
if 'event_onset' in self and 'onset' not in self:
self['onset'] = self['event_onset']
if 'event_offset' in self and 'offset' not in self:
self['offset'] = self['event_offset']
# Process meta data fields
# File target for the meta data item
if 'filename' in self and isinstance(self['filename'], six.string_types):
if not os.path.isabs(self['filename']):
# Force relative file paths into unix format even under Windows
self['filename'] = posix_path(self['filename'])
if 'filename_original' in self and isinstance(self['filename_original'], six.string_types):
# Keep file paths in unix format even under Windows
self['filename_original'] = posix_path(self['filename_original'])
# Meta data item timestamps: onset and offset
if 'onset' in self:
if is_float(self['onset']):
self['onset'] = float(self['onset'])
else:
self['onset'] = None
if 'offset' in self:
if is_float(self['offset']):
self['offset'] = float(self['offset'])
else:
self['offset'] = None
# Event label assigned to the meta data item
if 'event_label' in self:
if isinstance(self['event_label'], basestring):
self['event_label'] = self['event_label'].strip()
if self['event_label'].lower() == 'none' or self['event_label'] == '':
self['event_label'] = None
# Acoustic scene label assigned to the meta data item
if 'scene_label' in self and self.scene_label:
self['scene_label'] = self['scene_label'].strip()
if self['scene_label'].lower() == 'none':
self['scene_label'] = None
# Tag labels
if 'tags' in self and self.tags:
if isinstance(self['tags'], str):
self['tags'] = self['tags'].strip()
if self['tags'].lower() == 'none':
self['tags'] = None
if self['tags'] and '#' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split('#')]
elif self['tags'] and ',' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(',')]
elif self['tags'] and ';' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(';')]
elif self['tags'] and ':' in self['tags']:
self['tags'] = [x.strip() for x in self['tags'].split(':')]
else:
self['tags'] = [self['tags']]
# Remove empty tags
self['tags'] = list(filter(None, self['tags']))
# Sort tags
self['tags'].sort()
def __str__(self):
return self.to_string()
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = ''
output += ui.class_name(self.__class__.__name__, indent=indent) + '\n'
output += ui.line(field='Target', indent=indent) + '\n'
if self.filename:
output += ui.data(indent=indent + 2, field='filename', value=self.filename) + '\n'
if self.filename_original:
output += ui.data(indent=indent + 2, field='filename_original', value=self.filename_original) + '\n'
if self.filename_audio:
output += ui.data(indent=indent + 2, field='filename_audio', value=self.filename_audio) + '\n'
if self.filename_video:
output += ui.data(indent=indent + 2, field='filename_video', value=self.filename_video) + '\n'
if self.identifier:
output += ui.data(indent=indent + 2, field='identifier', value=self.identifier) + '\n'
if self.dataset:
output += ui.data(indent=indent + 2, field='dataset', value=self.dataset) + '\n'
if self.source_label:
output += ui.data(indent=indent + 2, field='source_label', value=self.source_label) + '\n'
if self.set_label:
output += ui.data(indent=indent + 2, field='set_label', value=self.set_label) + '\n'
if self.onset is not None:
output += ui.data(indent=indent + 2, field='onset', value=self.onset, unit='sec') + '\n'
if self.offset is not None:
output += ui.data(indent=indent + 2, field='offset', value=self.offset, unit='sec') + '\n'
if self.scene_label is not None or self.event_label is not None or self.tags is not None:
output += ui.line(field='Meta data', indent=indent) + '\n'
if self.scene_label:
output += ui.data(indent=indent + 2, field='scene_label', value=self.scene_label) + '\n'
if self.event_label:
output += ui.data(indent=indent + 2, field='event_label', value=self.event_label) + '\n'
if self.tags:
output += ui.data(indent=indent + 2, field='tags', value=self.tags) + '\n'
output += ui.line(field='Item', indent=indent) + '\n'
output += ui.data(indent=indent + 2, field='id', value=self.id) + '\n'
return output
def to_html(self, indent=0):
"""Get container information in a HTML formatted string
Parameters
----------
indent : int
Amount of indent
Default value 0
Returns
-------
str
"""
return self.to_string(ui=FancyHTMLStringifier(), indent=indent)
[docs] def show(self, mode='auto', indent=0):
"""Print container content
If called inside Jupyter notebook, HTML formatted version is shown.
Parameters
----------
mode : str
Output type, possible values ['auto', 'print', 'html']. 'html' will work in Jupyter notebook only.
Default value 'auto'
indent : int
Amount of indent
Default value 0
Returns
-------
Nothing
"""
if mode == 'auto':
if is_jupyter():
mode = 'html'
else:
mode = 'print'
if mode not in ['html', 'print']:
# Unknown mode given
message = '{name}: Unknown mode [{mode}]'.format(name=self.__class__.__name__, mode=mode)
self.logger.exception(message)
raise ValueError(message)
if mode == 'html':
from IPython.core.display import display, HTML
display(
HTML(
self.to_html(indent=indent)
)
)
elif mode == 'print':
print(self.to_string(indent=indent))
[docs] def log(self, level='info'):
"""Log container content
Parameters
----------
level : str
Logging level, possible values [info, debug, warn, warning, error, critical]
Default value "info"
Returns
-------
self
"""
from dcase_util.ui import FancyLogger
FancyLogger().line(self.__str__(), level=level)
@property
def logger(self):
logger = logging.getLogger(__name__)
if not logger.handlers:
setup_logging()
return logger
@property
def id(self):
"""Unique item identifier
ID is formed by taking MD5 hash of the item data.
Returns
-------
str
Unique item id
"""
string = ''
if self.filename:
string += self.filename
if self.scene_label:
string += self.scene_label
if self.event_label:
string += self.event_label
if self.identifier:
string += self.identifier
if self.source_label:
string += self.source_label
if self.set_label:
string += self.set_label
if self.tags:
string += ','.join(self.tags)
if self.onset:
string += '{:8.4f}'.format(self.onset)
if self.offset:
string += '{:8.4f}'.format(self.offset)
return get_parameter_hash(string)
[docs] def get_list(self):
"""Return item values in a list with specified order.
Returns
-------
list
"""
fields = list(self.keys())
# Select only valid fields
valid_fields = ['event_label', 'filename', 'offset', 'onset',
'scene_label', 'identifier', 'source_label', 'tags']
fields = list(set(fields).intersection(valid_fields))
fields.sort()
if fields == ['filename']:
return [self.filename]
elif fields == ['event_label', 'filename', 'offset', 'onset', 'scene_label']:
return [self.filename, self.scene_label, self.onset, self.offset, self.event_label]
elif fields == ['offset', 'onset']:
return [self.onset, self.offset]
elif fields == ['event_label', 'offset', 'onset']:
return [self.onset, self.offset, self.event_label]
elif fields == ['filename', 'scene_label']:
return [self.filename, self.scene_label]
elif fields == ['filename', 'identifier', 'scene_label']:
return [self.filename, self.scene_label, self.identifier]
elif fields == ['event_label', 'filename']:
return [self.filename, self.event_label]
elif fields == ['event_label', 'filename', 'offset', 'onset']:
return [self.filename, self.onset, self.offset, self.event_label]
elif fields == ['event_label', 'filename', 'offset', 'onset', 'identifier', 'scene_label']:
return [self.filename, self.scene_label, self.onset, self.offset, self.event_label, self.identifier]
elif fields == ['event_label', 'filename', 'offset', 'onset', 'scene_label', 'source_label']:
return [self.filename, self.scene_label, self.onset, self.offset, self.event_label, self.source_label]
elif fields == ['event_label', 'filename', 'offset', 'onset', 'identifier', 'scene_label', 'source_label']:
return [self.filename, self.scene_label, self.onset, self.offset, self.event_label,
self.source_label, self.identifier]
elif fields == ['filename', 'tags']:
return [self.filename, ";".join(self.tags)+";"]
elif fields == ['filename', 'identifier', 'tags']:
return [self.filename, ";".join(self.tags)+";", self.identifier]
elif fields == ['filename', 'scene_label', 'tags']:
return [self.filename, self.scene_label, ";".join(self.tags)+";"]
elif fields == ['filename', 'identifier', 'scene_label', 'tags']:
return [self.filename, self.scene_label, ";".join(self.tags)+";", self.identifier]
elif fields == ['filename', 'offset', 'onset', 'scene_label', 'tags']:
return [self.filename, self.scene_label, self.onset, self.offset, ";".join(self.tags)+";"]
else:
message = '{name}: Invalid meta data format [{format}]'.format(
name=self.__class__.__name__,
format=str(fields)
)
self.logger.exception(message)
raise ValueError(message)
@property
def filename(self):
"""Filename
Returns
-------
str or None
filename
"""
if 'filename' in self:
return self['filename']
elif 'filename_audio' in self:
return self['filename_audio']
elif 'filename_video' in self:
return self['filename_video']
else:
return None
@property
def filename_audio(self):
"""Audio filename
Returns
-------
str or None
filename
"""
if 'filename_audio' in self:
return self['filename_audio']
elif 'filename' in self:
return self['filename']
else:
return None
@property
def filename_video(self):
"""Video filename
Returns
-------
str or None
filename
"""
if 'filename_video' in self:
return self['filename_video']
elif 'filename' in self:
return self['filename']
else:
return None
@filename.setter
def filename(self, value):
if not os.path.isabs(value):
# Force relative file paths into unix format even under Windows
value = posix_path(value)
self['filename'] = value
@property
def filename_original(self):
"""Filename
Returns
-------
str or None
filename
"""
if 'filename_original' in self:
return self['filename_original']
else:
return None
@filename_original.setter
def filename_original(self, value):
# Keep paths in unix format even under Windows
self['filename_original'] = posix_path(value)
@filename_audio.setter
def filename_audio(self, value):
if not os.path.isabs(value):
# Force relative file paths into unix format even under Windows
value = posix_path(value)
self['filename_audio'] = value
@filename_video.setter
def filename_video(self, value):
if not os.path.isabs(value):
# Force relative file paths into unix format even under Windows
value = posix_path(value)
self['filename_video'] = value
@property
def scene_label(self):
"""Scene label
Returns
-------
str or None
scene label
"""
if 'scene_label' in self:
return self['scene_label']
else:
return None
@scene_label.setter
def scene_label(self, value):
self['scene_label'] = value
@property
def event_label(self):
"""Event label
Returns
-------
str or None
event label
"""
if 'event_label' in self:
return self['event_label']
else:
return None
@event_label.setter
def event_label(self, value):
self['event_label'] = value
@property
def onset(self):
"""Onset
Returns
-------
float or None
onset
"""
if 'onset' in self:
return self['onset']
else:
return None
@onset.setter
def onset(self, value):
self['onset'] = float(value)
if 'event_onset' in self:
# Mirror onset to event_onset
self['event_onset'] = self['onset']
@property
def offset(self):
"""Offset
Returns
-------
float or None
offset
"""
if 'offset' in self:
return self['offset']
else:
return None
@offset.setter
def offset(self, value):
self['offset'] = float(value)
if 'event_offset' in self:
# Mirror onset to event_onset
self['event_offset'] = self['offset']
@property
def identifier(self):
"""Identifier
Returns
-------
str or None
location identifier
"""
if 'identifier' in self:
return self['identifier']
else:
return None
@identifier.setter
def identifier(self, value):
self['identifier'] = value
@property
def dataset(self):
"""Dataset
Returns
-------
str or None
dataset identifier
"""
if 'dataset' in self:
return self['dataset']
else:
return None
@dataset.setter
def dataset(self, value):
self['dataset'] = value
@property
def source_label(self):
"""Source label
Returns
-------
str or None
source label
"""
if 'source_label' in self:
return self['source_label']
else:
return None
@source_label.setter
def source_label(self, value):
self['source_label'] = value
@property
def set_label(self):
"""Set label
Returns
-------
str or None
set label
"""
if 'set_label' in self:
return self['set_label']
else:
return None
@set_label.setter
def set_label(self, value):
self['set_label'] = value
@property
def tags(self):
"""Tags
Returns
-------
list or None
tags
"""
if 'tags' in self:
return self['tags']
else:
return None
@tags.setter
def tags(self, value):
if isinstance(value, str):
value = value.strip()
if value.lower() == 'none':
value = None
if value and '#' in value:
value = [x.strip() for x in value.split('#')]
elif value and ',' in value:
value = [x.strip() for x in value.split(',')]
elif value and ':' in value:
value = [x.strip() for x in value.split(':')]
elif value and ';' in value:
value = [x.strip() for x in value.split(';')]
self['tags'] = value
# Remove empty tags
self['tags'] = list(filter(None, self['tags']))
# Sort tags
self['tags'].sort()
[docs] def active_within_segment(self, start, stop):
"""Item active withing given segment.
Parameters
----------
start : float
Segment start time
stop : float
Segment stop time
Returns
-------
bool
item activity
"""
if self.onset is not None and start <= self.onset <= stop:
# item has onset within segment
return True
elif self.offset is not None and start <= self.offset <= stop:
# item has offset within segment
return True
elif self.onset is not None and self.offset is not None and self.onset <= start and self.offset >= stop:
# item starts and ends outside segment
return True
else:
return False
[docs]class MetaDataContainer(ListDictContainer):
"""Meta data container class, inherited from ListDictContainer."""
valid_formats = [FileFormat.CSV, FileFormat.TXT, FileFormat.ANN, FileFormat.CPICKLE] #: Valid file formats
[docs] def __init__(self, *args, **kwargs):
super(MetaDataContainer, self).__init__(*args, **kwargs)
self.item_class = MetaDataItem
# Convert all items in the list to MetaDataItems
for item_id in range(0, len(self)):
if not isinstance(self[item_id], self.item_class):
self[item_id] = self.item_class(self[item_id])
from dcase_util.processors import ProcessingChain
self.processing_chain = ProcessingChain()
def __str__(self):
return self.to_string()
[docs] def to_string(self, ui=None, indent=0, show_info=True, show_data=True, show_stats=True):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indent
Default value 0
show_info : bool
Include basic info about the container
Default value True
show_data : bool
Include data
Default value True
show_stats : bool
Include scene and event statistics
Default value True
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = ''
if show_info:
output += ui.class_name(self.__class__.__name__, indent=indent) + '\n'
if hasattr(self, 'filename') and self.filename:
output += ui.data(
field='Filename',
value=self.filename,
indent=indent
) + '\n'
output += ui.data(field='Items', value=len(self), indent=indent) + '\n'
output += ui.line(field='Unique', indent=indent) + '\n'
output += ui.data(indent=indent + 2, field='Files', value=len(self.unique_files)) + '\n'
output += ui.data(indent=indent + 2, field='Scene labels', value=len(self.unique_scene_labels)) + '\n'
output += ui.data(indent=indent + 2, field='Event labels', value=len(self.unique_event_labels)) + '\n'
output += ui.data(indent=indent + 2, field='Tags', value=len(self.unique_tags)) + '\n'
output += ui.data(indent=indent + 2, field='Identifiers', value=len(self.unique_identifiers)) + '\n'
output += ui.data(indent=indent + 2, field='Datasets', value=len(self.unique_datasets)) + '\n'
output += ui.data(indent=indent + 2, field='Source labels', value=len(self.unique_source_labels)) + '\n'
output += '\n'
if show_data:
output += ui.line('Meta data', indent=indent) + '\n'
cell_data = [[], [], [], [], [], [], []]
for row_id, item in enumerate(self):
cell_data[0].append(item.filename)
cell_data[1].append(item.onset)
cell_data[2].append(item.offset)
cell_data[3].append(item.scene_label)
cell_data[4].append(item.event_label)
cell_data[5].append(','.join(item.tags) if item.tags else '-')
cell_data[6].append(item.identifier if item.tags else '-')
output += ui.table(
cell_data=cell_data,
column_headers=['Source', 'Onset', 'Offset', 'Scene', 'Event', 'Tags', 'Identifier'],
column_types=['str20', 'float2', 'float2', 'str15', 'str15', 'str15', 'str5'],
indent=indent + 2
)
output += '\n'
if show_stats:
stats = self.stats()
if 'scenes' in stats and 'scene_label_list' in stats['scenes'] and stats['scenes']['scene_label_list']:
output += ui.line('Scene statistics', indent=indent) + '\n'
cell_data = [[], [], []]
for scene_id, scene_label in enumerate(stats['scenes']['scene_label_list']):
cell_data[0].append(scene_label)
cell_data[1].append(int(stats['scenes']['count'][scene_id]))
cell_data[2].append(int(stats['scenes']['identifiers'][scene_id]))
output += ui.table(
cell_data=cell_data,
column_headers=['Scene label', 'Count', 'Identifiers'],
column_types=['str20', 'int', 'int'],
indent=indent + 2
)
output += '\n'
if 'events' in stats and 'event_label_list' in stats['events'] and stats['events']['event_label_list']:
output += ui.line('Event statistics', indent=indent) + '\n'
cell_data = [[], [], [], [], [], [], []]
for event_id, event_label in enumerate(stats['events']['event_label_list']):
cell_data[0].append(event_label)
cell_data[1].append(int(stats['events']['count'][event_id]))
cell_data[2].append(stats['events']['avg_length'][event_id])
cell_data[3].append(stats['events']['length'][event_id])
cell_data[4].append(stats['events']['flatten_active_length'][event_id])
cell_data[5].append(stats['events']['flatten_inactive_length'][event_id])
cell_data[6].append(stats['events']['activity_percentage'][event_id])
cell_data[0].append('OVERALL')
cell_data[1].append(stats['events']['overall_event_count'])
cell_data[2].append(stats['events']['overall_avg_length'])
cell_data[3].append(stats['events']['overall_length'])
cell_data[4].append(stats['events']['overall_event_flatten_active_length'])
cell_data[5].append(stats['events']['overall_event_flatten_inactive_length'])
cell_data[6].append(stats['events']['overall_activity_percentage'])
output += ui.table(
cell_data=cell_data,
column_headers=['Event label', 'Count', 'Avg (sec)', 'Total (sec)', 'Active (sec)', 'Inactive (sec)', 'Activity %'],
column_types=['str20', 'int', 'float2', 'float2', 'float1', 'float1', 'float1'],
indent=indent + 2
) + '\n'
if 'tags' in stats and 'tag_list' in stats['tags'] and stats['tags']['tag_list']:
output += ui.line('Tag statistics', indent=indent) + '\n'
cell_data = [[], [], []]
for tag_id, tag in enumerate(stats['tags']['tag_list']):
cell_data[0].append(tag)
cell_data[1].append(int(stats['tags']['count'][tag_id]))
cell_data[2].append(int(stats['tags']['identifiers'][tag_id]))
output += ui.table(
cell_data=cell_data,
column_headers=['Tag', 'Count', 'Identifiers'],
column_types=['str20', 'int', 'int'],
indent=indent + 2
) + '\n'
return output
def to_html(self, indent=0, show_info=True, show_data=True, show_stats=True):
"""Get container information in a HTML formatted string
Parameters
----------
indent : int
Amount of indent
Default value 0
show_info : bool
Include basic info about the container
Default value True
show_data : bool
Include data
Default value True
show_stats : bool
Include scene and event statistics
Default value True
Returns
-------
str
"""
return self.to_string(ui=FancyHTMLStringifier(), indent=indent, show_info=show_info, show_data=show_data, show_stats=show_stats)
def __add__(self, other):
return self.update(super(MetaDataContainer, self).__add__(other))
[docs] def append(self, item):
"""Append item to the meta data list
Parameters
----------
item : MetaDataItem or dict
Item to be appended.
Raises
------
ValueError
Item not correct type.
"""
if not isinstance(item, MetaDataItem) and not isinstance(item, dict):
message = '{name}: Appending only MetaDataItem or dict allowed.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if isinstance(item, dict):
item = MetaDataItem(item)
super(MetaDataContainer, self).append(item)
@property
def file_count(self):
"""Number of files
Returns
-------
file_count: int > 0
"""
return len(self.unique_files)
@property
def event_count(self):
"""Number of events
Returns
-------
event_count: int > 0
"""
return len(self)
@property
def scene_label_count(self):
"""Number of unique scene labels
Returns
-------
scene_label_count: int >= 0
"""
return len(self.unique_scene_labels)
@property
def event_label_count(self):
"""Number of unique event labels
Returns
-------
event_label_count: float >= 0
"""
return len(self.unique_event_labels)
@property
def identifier_count(self):
"""Number of unique identifiers
Returns
-------
identifier_count: float >= 0
"""
return len(self.unique_identifiers)
@property
def dataset_count(self):
"""Number of unique dataset identifiers
Returns
-------
dataset_count: float >= 0
"""
return len(self.unique_datasets)
@property
def tag_count(self):
"""Number of unique tags
Returns
-------
tag_count: int >= 0
"""
return len(self.unique_tags)
@property
def unique_files(self):
"""Unique files
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
files = []
for item in self:
files.append(str(item.filename))
files = list(set(files))
files.sort()
return files
@property
def unique_event_labels(self):
"""Unique event labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if item.event_label and item.event_label not in labels:
labels.append(item.event_label)
labels.sort()
return labels
@property
def unique_scene_labels(self):
"""Unique scene labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if item.scene_label and item.scene_label not in labels:
labels.append(item.scene_label)
labels.sort()
return labels
@property
def unique_tags(self):
"""Unique tags
Returns
-------
tags: list, shape=(n,)
Unique tags in alphabetical order
"""
tags = []
for item in self:
if item.tags:
for tag in item.tags:
if tag not in tags:
tags.append(tag)
tags.sort()
return tags
@property
def unique_identifiers(self):
"""Unique identifiers
Returns
-------
labels: list, shape=(n,)
Unique identifier labels in alphabetical order
"""
labels = []
for item in self:
if item.identifier and item.identifier not in labels:
labels.append(item.identifier)
labels.sort()
return labels
@property
def unique_datasets(self):
"""Unique datasets
Returns
-------
labels: list, shape=(n,)
Unique dataset identifier labels in alphabetical order
"""
labels = []
for item in self:
if item.dataset and item.dataset not in labels:
labels.append(item.dataset)
labels.sort()
return labels
@property
def unique_source_labels(self):
"""Unique source labels
Returns
-------
labels: list, shape=(n,)
Unique labels in alphabetical order
"""
labels = []
for item in self:
if item.source_label and item.source_label not in labels:
labels.append(item.source_label)
labels.sort()
return labels
@property
def max_offset(self):
"""Find the offset (end-time) of last event
Returns
-------
max_offset: float > 0
maximum offset
"""
max_offset = 0
for item in self:
if 'offset' in item and item.offset > max_offset:
max_offset = item.offset
return max_offset
def update(self, data):
"""Replace content with given list
Parameters
----------
data : list
New content
Returns
-------
self
"""
super(MetaDataContainer, self).update(data=data)
# Convert all items in the list to MetaDataItems
for item_id in range(0, len(self)):
if not isinstance(self[item_id], self.item_class):
self[item_id] = self.item_class(self[item_id])
return self
[docs] def log(self, level='info', show_data=False, show_stats=True):
"""Log container content
Parameters
----------
level : str
Logging level, possible values [info, debug, warn, warning, error, critical]
show_data : bool
Include data
show_stats : bool
Include scene and event statistics
Returns
-------
None
"""
self.ui.line(self.to_string(show_data=show_data, show_stats=show_stats), level=level)
[docs] def log_all(self, level='info'):
"""Log container content with all meta data items.
"""
self.log(level=level, show_data=True, show_stats=True)
[docs] def show(self, mode='auto', indent=0, show_info=True, show_data=False, show_stats=True):
"""Print container content
If called inside Jupyter notebook, HTML formatted version is shown.
Parameters
----------
mode : str
Output type, possible values ['auto', 'print', 'html']. 'html' will work in Jupyter notebook only.
Default value 'auto'
indent : int
Amount of indent
Default value 0
show_info : bool
Include basic info about the container
Default value True
show_data : bool
Include data
Default value True
show_stats : bool
Include scene and event statistics
Default value True
Returns
-------
Nothing
"""
if mode == 'auto':
if is_jupyter():
mode = 'html'
else:
mode = 'print'
if mode not in ['html', 'print']:
# Unknown mode given
message = '{name}: Unknown mode [{mode}]'.format(name=self.__class__.__name__, mode=mode)
self.logger.exception(message)
raise ValueError(message)
if mode == 'html':
from IPython.core.display import display, HTML
display(
HTML(
self.to_html(indent=indent, show_info=show_info, show_data=show_data, show_stats=show_stats)
)
)
elif mode == 'print':
print(self.to_string(indent=indent, show_info=show_info, show_data=show_data, show_stats=show_stats))
[docs] def show_all(self, mode='auto', indent=0):
"""Print container content with all meta data items.
Parameters
----------
mode : str
Output type, possible values ['auto', 'print', 'html']. 'html' will work in Jupyter notebook only.
Default value 'auto'
indent : int
Amount of indent
Default value 0
Returns
-------
Nothing
"""
self.show(mode=mode, indent=indent, show_data=True, show_stats=True)
[docs] def load(self, filename=None, fields=None, csv_header=True, file_format=None, delimiter=None, decimal='point'):
"""Load event list from delimited text file (csv-formatted)
Preferred delimiter is tab, however, other delimiters are supported automatically
(they are sniffed automatically).
Supported input formats:
- [file(string)]
- [file(string)][scene_label(string)]
- [file(string)][scene_label(string)][identifier(string)]
- [event_onset (float)][tab][event_offset (float)]
- [event_onset (float)][tab][event_offset (float)][tab][event_label (string)]
- [file(string)][tab][onset (float)][tab][offset (float)][tab][event_label (string)]
- [file(string)[tab][scene_label(string)][tab][onset (float)][tab][offset (float)]
- [file(string)[tab][scene_label(string)][tab][onset (float)][tab][offset (float)][tab][event_label (string)]
- [file(string)[tab][scene_label(string)][tab][onset (float)][tab][offset (float)][tab][event_label (string)][tab][source(single character)]
- [file(string)[tab][scene_label(string)][tab][onset (float)][tab][offset (float)][tab][event_label (string)][tab][source(string)]
- [file(string)[tab][tags (list of strings, delimited with ;)]
- [file(string)[tab][scene_label(string)][tab][tags (list of strings, delimited with ;)]
- [file(string)[tab][scene_label(string)][tab][tags (list of strings, delimited with ;)][tab][event_onset (float)][tab][event_offset (float)]
Parameters
----------
filename : str
Path to the meta data in text format (csv). If none given, one given for class constructor is used.
Default value None
fields : list of str, optional
List of column names. Used only for CSV formatted files.
Default value None
csv_header : bool, optional
Read field names from first line (header). Used only for CSV formatted files.
Default value True
file_format : FileFormat, optional
Forced file format, use this when there is a miss-match between file extension and file format.
Default value None
delimiter : str, optional
Forced data delimiter for csv format. If None given, automatic delimiter sniffer used. Use this when sniffer does not work.
Default value None
decimal : str
Decimal 'point' or 'comma'
Default value 'point'
Returns
-------
data : list of event dicts
List containing event dicts
"""
def validate(row_format, valid_formats):
for valid_format in valid_formats:
if row_format == valid_format:
return True
return False
if filename:
self.filename = filename
if not file_format:
self.detect_file_format()
self.validate_format()
if file_format and FileFormat.validate_label(label=file_format):
self.format = file_format
if self.exists():
if self.format in [FileFormat.TXT, FileFormat.ANN]:
if delimiter is None:
if decimal == 'comma':
delimiter = self.delimiter(exclude_delimiters=[','])
else:
delimiter = self.delimiter()
data = []
field_validator = FieldValidator()
f = io.open(self.filename, 'rt')
try:
for row in csv.reader(f, delimiter=delimiter):
if row:
row_format = []
for item in row:
row_format.append(field_validator.process(item))
for item_id, item in enumerate(row):
if row_format[item_id] == FieldValidator.NUMBER:
# Translate decimal comma into decimal point
row[item_id] = float(row[item_id].replace(',', '.'))
elif row_format[item_id] in [FieldValidator.AUDIOFILE,
FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.ALPHA1,
FieldValidator.ALPHA2,
FieldValidator.LIST]:
row[item_id] = row[item_id].strip()
if validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE],
[FieldValidator.DATAFILE],
[FieldValidator.AUDIOFILE,
FieldValidator.EMPTY],
[FieldValidator.DATAFILE,
FieldValidator.EMPTY],
[FieldValidator.AUDIOFILE,
FieldValidator.EMPTY,
FieldValidator.EMPTY],
[FieldValidator.DATAFILE,
FieldValidator.EMPTY,
FieldValidator.EMPTY],
[FieldValidator.AUDIOFILE,
FieldValidator.EMPTY,
FieldValidator.EMPTY,
FieldValidator.EMPTY],
[FieldValidator.DATAFILE,
FieldValidator.EMPTY,
FieldValidator.EMPTY,
FieldValidator.EMPTY]
]):
# Format: [file]
data.append(
self.item_class({
'filename': row[0]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.NUMBER,
FieldValidator.NUMBER]
]):
# Format: [onset offset]
data.append(
self.item_class({
'onset': row[0],
'offset': row[1]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER],
[FieldValidator.DATAFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER]
]):
# Format: [file onset offset]
data.append(
self.item_class({
'filename': row[0],
'onset': row[1],
'offset': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING],
]):
# Format: [file scene_label]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.AUDIOFILE],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.DATAFILE],
]):
# Format: [file scene_label file], filename mapping included
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'filename_original': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.STRING],
]):
# Format: [file scene_label identifier]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'identifier': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING],
[FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.ALPHA2],
]):
# Format: [onset offset event_label]
data.append(
self.item_class({
'onset': row[0],
'offset': row[1],
'event_label': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING],
[FieldValidator.AUDIOFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING]
]):
# Format: [file onset offset event_label]
data.append(
self.item_class({
'filename': row[0],
'onset': row[1],
'offset': row[2],
'event_label': row[3]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER]
]):
# Format: [file scene_label onset offset]
data.append(
self.item_class({
'filename': row[0],
'onset': row[2],
'offset': row[3],
'scene_label': row[1]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING]
]):
# Format: [file onset offset event_label identifier]
data.append(
self.item_class({
'filename': row[0],
'onset': row[1],
'offset': row[2],
'event_label': row[3],
'identifier': row[4]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER]
]):
# Format: [file scene_label onset offset]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING]
]):
# Format: [file scene_label onset offset event_label]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'event_label': row[4]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.ALPHA1],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.ALPHA1]
]):
# Format: [file scene_label onset offset event_label source_label]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'event_label': row[4],
'source_label': row[5]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING]
]):
# Format: [file scene_label onset offset event_label source_label]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'event_label': row[4],
'source_label': row[5]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.ALPHA1,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.ALPHA1,
FieldValidator.STRING]
]):
# Format: [file scene_label onset offset event_label source_label identifier]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'event_label': row[4],
'source_label': row[5],
'identifier': row[6]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.STRING,
FieldValidator.STRING,
FieldValidator.STRING]
]):
# Format: [file scene_label onset offset event_label source_label identifier]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'event_label': row[4],
'source_label': row[5],
'identifier': row[6]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.LIST],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.LIST]
]):
# Format: [file scene_label tags]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'tags': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.LIST,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.LIST,
FieldValidator.STRING]
]):
# Format: [file scene_label tags identifier]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'tags': row[2],
'identifier': row[3]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.LIST],
[FieldValidator.DATAFILE,
FieldValidator.LIST]
]):
# Format: [file tags]
data.append(
self.item_class({
'filename': row[0],
'tags': row[1]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.LIST,
FieldValidator.STRING],
[FieldValidator.DATAFILE,
FieldValidator.LIST,
FieldValidator.STRING]
]):
# Format: [file tags identifier]
data.append(
self.item_class({
'filename': row[0],
'tags': row[1],
'identifier': row[2]
})
)
elif validate(row_format=row_format,
valid_formats=[
[FieldValidator.AUDIOFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.LIST],
[FieldValidator.DATAFILE,
FieldValidator.STRING,
FieldValidator.NUMBER,
FieldValidator.NUMBER,
FieldValidator.LIST]
]):
# Format: [file scene_label onset offset tags]
data.append(
self.item_class({
'filename': row[0],
'scene_label': row[1],
'onset': row[2],
'offset': row[3],
'tags': row[4]
})
)
else:
message = '{name}: Unknown row format [{format}], row [{row}]'.format(
name=self.__class__.__name__,
format=row_format,
row=row
)
self.logger.exception(message)
raise IOError(message)
finally:
f.close()
self.update(data=data)
elif self.format == FileFormat.CSV:
if fields is None and csv_header is None:
message = '{name}: Parameters fields or csv_header has to be set for CSV files.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if not delimiter:
if decimal == 'comma':
delimiter = self.delimiter(exclude_delimiters=[','])
else:
delimiter = self.delimiter()
data = []
with open(self.filename, 'r') as f:
csv_reader = csv.reader(f, delimiter=delimiter)
if csv_header:
csv_fields = next(csv_reader)
if fields is None:
fields = csv_fields
for row in csv_reader:
if row:
for cell_id, cell_data in enumerate(row):
if decimal == 'comma':
# Translate decimal comma into decimal point
cell_data = float(cell_data.replace(',', '.'))
if is_int(cell_data):
row[cell_id] = int(cell_data)
elif is_float(cell_data):
row[cell_id] = float(cell_data)
data.append(dict(zip(fields, row)))
self.update(data=data)
elif self.format == FileFormat.CPICKLE:
from dcase_util.files import Serializer
self.update(
data=Serializer.load_cpickle(filename=self.filename)
)
else:
message = '{name}: Unknown format [{format}]'.format(name=self.__class__.__name__, format=self.filename)
self.logger.exception(message)
raise IOError(message)
else:
message = '{name}: File not found [{filename}]'.format(
name=self.__class__.__name__,
filename=self.filename
)
self.logger.exception(message)
raise IOError(message)
return self
[docs] def save(self, filename=None, fields=None, csv_header=True, file_format=None, delimiter='\t', **kwargs):
"""Save content to csv file
Parameters
----------
filename : str
Filename. If none given, one given for class constructor is used.
Default value None
fields : list of str
Fields in correct order, if none given all field in alphabetical order will be outputted.
Used only for CSV formatted files.
Default value None
csv_header : bool
In case of CSV formatted file, first line will contain field names. Names are taken from fields parameter.
Default value True
file_format : FileFormat, optional
Forced file format, use this when there is a miss-match between file extension and file format.
Default value None
delimiter : str
Delimiter to be used when saving data.
Default value '\t'
Returns
-------
self
"""
if filename:
self.filename = filename
if not file_format:
self.detect_file_format()
self.validate_format()
if file_format and FileFormat.validate_label(label=file_format):
self.format = file_format
if self.format in [FileFormat.TXT, FileFormat.ANN]:
# Make sure writing is using correct line endings to avoid extra empty lines
if sys.version_info[0] == 2:
f = open(self.filename, 'wbt')
elif sys.version_info[0] >= 3:
f = open(self.filename, 'wt', newline='')
try:
writer = csv.writer(f, delimiter=delimiter)
for item in self:
writer.writerow(item.get_list())
finally:
f.close()
elif self.format == FileFormat.CSV:
if fields is None:
fields = set()
for item in self:
fields.update(list(item.keys()))
fields = sorted(list(fields))
# Make sure writing is using correct line endings to avoid extra empty lines
if sys.version_info[0] == 2:
csv_file = open(self.filename, 'wb')
elif sys.version_info[0] >= 3:
csv_file = open(self.filename, 'w', newline='')
try:
csv_writer = csv.writer(csv_file, delimiter=delimiter)
if csv_header:
csv_writer.writerow(fields)
for item in self:
item_values = []
for field in fields:
value = item[field]
if isinstance(value, list):
value = ";".join(value)+";"
item_values.append(value)
csv_writer.writerow(item_values)
finally:
csv_file.close()
elif self.format == FileFormat.CPICKLE:
from dcase_util.files import Serializer
Serializer.save_cpickle(filename=self.filename, data=self)
else:
message = '{name}: Unknown format [{format}]'.format(name=self.__class__.__name__, format=self.filename)
self.logger.exception(message)
raise IOError(message)
return self
[docs] def filter(self,
filename=None,
file_list=None,
scene_label=None,
scene_list=None,
event_label=None,
event_list=None,
tag=None,
tag_list=None,
identifier=None,
identifier_list=None,
dataset=None,
dataset_list=None,
source_label=None,
source_label_list=None,
**kwargs
):
"""Filter content
Parameters
----------
filename : str, optional
Filename to be matched
Default value None
file_list : list, optional
List of filenames to be matched
Default value None
scene_label : str, optional
Scene label to be matched
Default value None
scene_list : list of str, optional
List of scene labels to be matched
Default value None
event_label : str, optional
Event label to be matched
Default value None
event_list : list of str, optional
List of event labels to be matched
Default value None
tag : str, optional
Tag to be matched
Default value None
tag_list : list of str, optional
List of tags to be matched
Default value None
identifier : str, optional
Identifier to be matched
Default value None
identifier_list : list of str, optional
List of identifiers to be matched
Default value None
dataset : str, optional
Dataset identifier to be matched
Default value None
dataset_list : list of str, optional
List of dataset identifiers to be matched
Default value None
source_label : str, optional
Source label to be matched
Default value None
source_label_list : list of str, optional
List of source labels to be matched
Default value None
Returns
-------
MetaDataContainer
"""
# Inject parameters back to kwargs, and use parent filter method
if filename is not None:
kwargs['filename'] = filename
if scene_label is not None:
kwargs['scene_label'] = scene_label
if event_label is not None:
kwargs['event_label'] = event_label
if identifier is not None:
kwargs['identifier'] = identifier
if dataset is not None:
kwargs['dataset'] = dataset
if source_label is not None:
kwargs['source_label'] = source_label
if file_list is not None:
kwargs['filename'] = list(file_list)
if scene_list is not None:
kwargs['scene_label'] = list(scene_list)
if event_list is not None:
kwargs['event_label'] = list(event_list)
if identifier_list is not None:
kwargs['identifier'] = list(identifier_list)
if dataset_list is not None:
kwargs['dataset_list'] = list(dataset_list)
if source_label_list is not None:
kwargs['source_label'] = list(source_label_list)
result = MetaDataContainer(super(MetaDataContainer, self).filter(**kwargs))
# Handle tags separately
if tag is not None or tag_list is not None:
data = []
if tag_list:
tag_list = set(tag_list)
for item in result:
matched = []
if tag:
if item.tags and tag in item.tags:
matched.append(True)
else:
matched.append(False)
if tag_list:
if item.tags and tag_list.intersection(item.tags):
matched.append(True)
else:
matched.append(False)
if all(matched):
data.append(copy.deepcopy(item))
return MetaDataContainer(data)
else:
return result
[docs] def process_events(self, minimum_event_length=None, minimum_event_gap=None):
"""Process event content
Makes sure that minimum event length and minimum event gap conditions are met per event label class.
Parameters
----------
minimum_event_length : float > 0.0
Minimum event length in seconds, shorten than given are filtered out from the output.
Default value None
minimum_event_gap : float > 0.0
Minimum allowed gap between events in seconds from same event label class.
Default value None
Returns
-------
MetaDataContainer
"""
processed_events = []
files = self.unique_files
if not files:
files = [None]
for filename in files:
for event_label in self.unique_event_labels:
current_events_items = self.filter(filename=filename, event_label=event_label)
# Sort events
current_events_items = sorted(current_events_items, key=lambda k: k.onset)
# 1. remove short events
event_results_1 = []
for event in current_events_items:
if minimum_event_length is not None:
if event.offset - event.onset >= minimum_event_length:
event_results_1.append(event)
else:
event_results_1.append(event)
if len(event_results_1) and minimum_event_gap is not None:
# 2. remove small gaps between events
event_results_2 = []
# Load first event into event buffer
buffered_event_onset = event_results_1[0].onset
buffered_event_offset = event_results_1[0].offset
for i in range(1, len(event_results_1)):
if event_results_1[i].onset - buffered_event_offset > minimum_event_gap:
# The gap between current event and the buffered is bigger than minimum event gap,
# store event, and replace buffered event
current_event = copy.deepcopy(event_results_1[i])
current_event.onset = buffered_event_onset
current_event.offset = buffered_event_offset
event_results_2.append(current_event)
buffered_event_onset = event_results_1[i].onset
buffered_event_offset = event_results_1[i].offset
else:
# The gap between current event and the buffered is smaller than minimum event gap,
# extend the buffered event until the current offset
buffered_event_offset = event_results_1[i].offset
# Store last event from buffer
current_event = copy.copy(event_results_1[len(event_results_1) - 1])
current_event.onset = buffered_event_onset
current_event.offset = buffered_event_offset
event_results_2.append(current_event)
processed_events += event_results_2
else:
processed_events += event_results_1
return MetaDataContainer(processed_events)
[docs] def map_events(self, target_event_label, source_event_labels=None):
"""Map events with varying event labels into single target event label
Parameters
----------
target_event_label : str
Target event label
source_event_labels : list of str
Event labels to be processed. If none given, all events are merged
Default value None
Returns
-------
MetaDataContainer
"""
processed_events = MetaDataContainer()
files = self.unique_files
if not files:
files = [None]
if source_event_labels is None:
source_event_labels = self.unique_event_labels
for event_label in source_event_labels:
current_events_items = self.filter(event_label=event_label)
for item in current_events_items:
item.event_label = target_event_label
processed_events += current_events_items
return processed_events
def map_tags(self, target_tag, source_tags=None):
"""Map tags with varying tags into single target tag
Parameters
----------
target_tag : str
Target tag
source_tags : list of str
Tags to be processed. If none given, all tags are merged
Default value None
Returns
-------
MetaDataContainer
"""
processed_tags = MetaDataContainer()
files = self.unique_files
if not files:
files = [None]
if source_tags is None:
source_tags = self.unique_tags
for item in self:
i = copy.deepcopy(item)
for tag in source_tags:
if tag in i.tags:
i.tags[item.tags.index(tag)] = target_tag
processed_tags.append(i)
return processed_tags
[docs] def event_inactivity(self, event_label='inactivity', source_event_labels=None, duration_list=None):
"""Get inactivity segments between events as event list
Parameters
----------
event_label : str
Event label used for inactivity
source_event_labels : list of str
Event labels to be taken into account. If none given, all events are considered.
Default value None
duration_list : dict
Dictionary where filename is a key and value is the total duration of the file.
If none given, max event offset is used to get file length.
Default value None
Returns
-------
MetaDataContainer
"""
meta_flatten = self.map_events(target_event_label='activity', source_event_labels=source_event_labels)
meta_flatten = meta_flatten.process_events(
minimum_event_gap=numpy.spacing(1),
minimum_event_length=numpy.spacing(1)
)
inactivity_events = MetaDataContainer()
files = meta_flatten.unique_files
if not files:
files = [None]
if duration_list is None:
duration_list = {}
for filename in files:
current_events_items = meta_flatten.filter(filename=filename)
current_inactivity_events = MetaDataContainer()
onset = 0.0
for item in current_events_items:
current_onset = onset
current_offset = item.onset
current_inactivity_events.append(
{
'filename': filename,
'onset': current_onset,
'offset': current_offset,
'event_label': event_label
}
)
onset = item.offset
if filename in duration_list:
file_duration = duration_list[filename]
else:
file_duration = current_events_items.max_offset
current_inactivity_events.append(
{
'filename': filename,
'onset': onset,
'offset': file_duration,
'event_label': event_label
}
)
current_inactivity_events = current_inactivity_events.process_events(
minimum_event_gap=numpy.spacing(1),
minimum_event_length=numpy.spacing(1)
)
current_inactivity_events = sorted(current_inactivity_events, key=lambda k: k.onset)
inactivity_events += current_inactivity_events
return inactivity_events
[docs] def add_time(self, time):
"""Add time offset to event onset and offset timestamps
Parameters
----------
time : float
Offset to be added to the onset and offsets
Returns
-------
self
"""
for item in self:
if item.onset:
item.onset += time
if item.offset:
item.offset += time
return self
[docs] def filter_time_segment(self, start=None, stop=None, duration=None, filename=None, zero_time=True, trim=True):
"""Filter time segment
Parameters
----------
start : float > 0.0
Segment start, seconds
Default value None
stop : float > 0.0
Segment end, seconds
Default value None
duration : float
Segment duration, seconds
Default value None
filename : str
Filename to filter
Default value None
zero_time : bool
Convert timestamps in respect to the segment start
Default value True
trim : bool
Trim event onsets and offset according to segment start and stop times.
Default value True
Returns
-------
MetaDataContainer
"""
if len(self.unique_files) > 1 and filename is None:
message = '{name}: Meta data contains items for multiple files. Please specify filename parameter.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
elif filename is not None and filename not in self.unique_files:
message = '{name}: Filename is not used in meta data items.'.format(
name=self.__class__.__name__
)
self.logger.exception(message)
raise ValueError(message)
if filename is not None and filename in self.unique_files:
data = self.filter(filename=filename)
else:
data = copy.deepcopy(self)
if stop is None and duration is not None:
stop = start + duration
filtered_data = MetaDataContainer()
for item in data:
if item.active_within_segment(start=start, stop=stop):
item_ = copy.deepcopy(item)
if zero_time:
# Slice start time is new zero time
item_.onset -= start
item_.offset -= start
if trim:
# Trim negative onsets to 0 and trim offsets going over slice stop to slice stop.
if item_.onset < 0:
item_.onset = 0
if item_.offset > stop-start:
item_.offset = stop - start
elif trim:
if item_.onset < start:
item_.onset = start
if item_.offset > stop:
item_.offset = stop
if item_.onset != item_.offset:
filtered_data.append(item_)
return filtered_data
[docs] def stats(self, event_label_list=None, scene_label_list=None, tag_list=None, calculate_event_activity=False, duration_list=None):
"""Statistics of the container content
Parameters
----------
event_label_list : list of str
List of event labels to be included in the statistics. If none given, all unique labels used
Default value None
scene_label_list : list of str
List of scene labels to be included in the statistics. If none given, all unique labels used
Default value None
tag_list : list of str
List of tags to be included in the statistics. If none given, all unique tags used
Default value None
calculate_event_activity : bool
Calculate event activity and inactivity, might be slow as the corresponding audio recording length has to be loaded.
Default value False
duration_list : dict
Dictionary where filename is a key and value is the total duration of the file.
Default value None
Returns
-------
dict
"""
if event_label_list is None:
event_label_list = self.unique_event_labels
if scene_label_list is None:
scene_label_list = self.unique_scene_labels
if tag_list is None:
tag_list = self.unique_tags
scene_counts = numpy.zeros(len(scene_label_list))
scene_unique_identifiers = numpy.zeros(len(scene_label_list))
for scene_id, scene_label in enumerate(scene_label_list):
scene_data = self.filter(scene_label=scene_label)
scene_counts[scene_id] = len(scene_data)
scene_unique_identifiers[scene_id] = len(scene_data.unique_identifiers)
event_lengths = numpy.zeros(len(event_label_list))
event_counts = numpy.zeros(len(event_label_list))
if calculate_event_activity:
event_flatten_active_lengths = numpy.zeros(len(event_label_list))
event_flatten_inactive_lengths = numpy.zeros(len(event_label_list))
if duration_list is None:
duration_list = {}
for filename in self.unique_files:
info = get_audio_info(filename)
duration_list[filename] = info['duration_sec']
file_map = {}
for item in self:
if item.filename not in file_map:
file_map[item.filename] = MetaDataContainer()
file_map[item.filename].append(item)
else:
event_flatten_active_lengths = [None] * len(event_label_list)
event_flatten_inactive_lengths = [None] * len(event_label_list)
for event_id, event_label in enumerate(event_label_list):
for item in self:
if item.onset is not None and item.offset is not None and item.event_label == event_label:
event_lengths[event_id] += item.offset - item.onset
if item.event_label == event_label:
event_counts[event_id] += 1
if calculate_event_activity:
for filename in self.unique_files:
meta_flatten = file_map[filename].filter(event_label=event_label).map_events(
target_event_label='activity'
).process_events(
minimum_event_gap=numpy.spacing(1),
minimum_event_length=numpy.spacing(1)
)
for e in meta_flatten:
event_flatten_active_lengths[event_id] += e.offset - e.onset
for e in meta_flatten.event_inactivity(duration_list=duration_list):
event_flatten_inactive_lengths[event_id] += e.offset - e.onset
if calculate_event_activity:
overall_event_flatten_active_length = 0
overall_event_flatten_inactive_length = 0
for filename in self.unique_files:
meta_flatten = file_map[filename].map_events(
target_event_label='activity'
).process_events(
minimum_event_gap=numpy.spacing(1),
minimum_event_length=numpy.spacing(1)
)
current_event_inactivity = meta_flatten.event_inactivity(duration_list=duration_list)
for item in meta_flatten:
overall_event_flatten_active_length += item['offset'] - item['onset']
for item in current_event_inactivity:
overall_event_flatten_inactive_length += item['offset'] - item['onset']
else:
overall_event_flatten_active_length = None
overall_event_flatten_inactive_length = None
tag_counts = numpy.zeros(len(tag_list))
tag_identifiers = numpy.zeros(len(tag_list))
for tag_id, tag in enumerate(tag_list):
for item in self:
if item.tags and tag in item.tags:
tag_counts[tag_id] += 1
tag_identifiers[tag_id] = len(self.filter(tag=tag).unique_identifiers)
return {
'scenes': {
'scene_label_list': scene_label_list,
'count': scene_counts,
'identifiers': scene_unique_identifiers
},
'events': {
'event_label_list': event_label_list,
'length': event_lengths,
'flatten_active_length': event_flatten_active_lengths,
'flatten_inactive_length': event_flatten_inactive_lengths,
'activity_percentage': event_flatten_active_lengths / (event_flatten_active_lengths + event_flatten_inactive_lengths)*100.0 if calculate_event_activity else event_flatten_active_lengths,
'count': event_counts,
'avg_length': event_lengths/(event_counts + numpy.spacing(1)),
'overall_event_count': numpy.sum(event_counts),
'overall_length': numpy.sum(event_lengths),
'overall_avg_length': numpy.mean(event_lengths/(event_counts + numpy.spacing(1))) if calculate_event_activity else None,
'overall_event_flatten_active_length': overall_event_flatten_active_length,
'overall_event_flatten_inactive_length': overall_event_flatten_inactive_length,
'overall_activity_percentage': overall_event_flatten_active_length / (overall_event_flatten_active_length + overall_event_flatten_inactive_length) * 100.0 if calculate_event_activity else None,
},
'tags': {
'tag_list': tag_list,
'count': tag_counts,
'identifiers': tag_identifiers
}
}
[docs] def scene_stat_counts(self):
"""Scene count statistics
Returns
-------
dict
"""
stats = {}
for scene_label in self.unique_scene_labels:
stats[scene_label] = len(self.filter(scene_label=scene_label))
return stats
[docs] def event_stat_counts(self):
"""Event count statistics
Returns
-------
dict
"""
stats = {}
for event_label in self.unique_event_labels:
stats[event_label] = len(self.filter(event_label=event_label))
return stats
[docs] def tag_stat_counts(self):
"""Tag count statistics
Returns
-------
dict
"""
stats = {}
for tag in self.unique_tags:
stats[tag] = len(self.filter(tag=tag))
return stats
[docs] def to_event_roll(self, label_list=None, time_resolution=0.01, label='event_label', length_seconds=None):
"""Event roll
Event roll is binary matrix indicating event activity withing time segment defined by time_resolution.
Parameters
----------
label_list : list
List of labels in correct order
Default value None
time_resolution : float > 0.0
Time resolution used when converting event into event roll.
Default value 0.01
label : str
Meta data field used to create event roll
Default value 'event_label'
length_seconds : float
Event roll length in seconds
Default value None
Returns
-------
numpy.ndarray [shape=(math.ceil(data_length * 1 / time_resolution), amount of classes)]
"""
if label_list is None:
label_list = self.unique_event_labels
if len(self.unique_files) <= 1:
from dcase_util.data import EventRollEncoder
event_roll = EventRollEncoder(
label_list=label_list,
time_resolution=time_resolution,
).encode(
metadata_container=self,
label=label,
length_seconds=length_seconds
)
return event_roll
else:
message = '{name}: Meta data contains items for multiple files.'.format(name=self.__class__.__name__)
self.logger.exception(message)
raise ValueError(message)
[docs] def intersection(self, second_metadata):
"""Intersection of two meta containers
Parameters
----------
second_metadata : MetaDataContainer
Second meta data container
Returns
-------
MetaDataContainer
Container with intersecting items
"""
# Get unique IDs for current meta data container
id1 = []
for item1 in self:
id1.append(item1.id)
# Get unique IDs for second meta data container
id2 = []
for item2 in second_metadata:
id2.append(item2.id)
# Find intersection of IDs
id_intersect = list(set(id1).intersection(set(id2)))
# Collect intersecting items
intersection = MetaDataContainer()
for id in id_intersect:
intersection.append(self[id1.index(id)])
return intersection
[docs] def intersection_report(self, second_metadata):
"""Intersection report for two meta containers
Parameters
----------
second_metadata : MetaDataContainer
Second meta data container
Returns
-------
dict
Dict with intersection data ['items', 'files', 'identifiers', 'scene_labels', 'event_labels' ,'tags']
"""
return {
'items': self.intersection(second_metadata=second_metadata),
'files': list(set(self.unique_files).intersection(set(second_metadata.unique_files))),
'identifiers': list(set(self.unique_identifiers).intersection(set(second_metadata.unique_identifiers))),
'scene_labels': list(set(self.unique_scene_labels).intersection(set(second_metadata.unique_scene_labels))),
'event_labels': list(set(self.unique_event_labels).intersection(set(second_metadata.unique_event_labels))),
'tags': list(set(self.unique_tags).intersection(set(second_metadata.unique_tags)))
}
[docs] def difference(self, second_metadata):
"""Difference of two meta containers
Parameters
----------
second_metadata : MetaDataContainer
Second meta data container
Returns
-------
MetaDataContainer
Container with difference items
"""
# Get unique IDs for current meta data container
id1 = []
for item1 in self:
id1.append(item1.id)
# Get unique IDs for second meta data container
id2 = []
for item2 in second_metadata:
id2.append(item2.id)
# Find difference of IDs
id_difference = list(set(id1).symmetric_difference(set(id2)))
# Collect difference items
difference = MetaDataContainer()
for id in id_difference:
difference.append(self[id1.index(id)])
return difference
def push_processing_chain_item(self, processor_name, init_parameters=None, process_parameters=None,
preprocessing_callbacks=None,
input_type=None, output_type=None):
"""Push processing chain item
Parameters
----------
processor_name : str
Processor name
init_parameters : dict, optional
Initialization parameters for the processors
Default value None
process_parameters : dict, optional
Parameters for the process method of the Processor
Default value None
preprocessing_callbacks : list of dicts
Callbacks used for preprocessing
Default value None
input_type : ProcessingChainItemType
Input data type
Default value None
output_type : ProcessingChainItemType
Output data type
Default value None
Returns
-------
self
"""
self.processing_chain.push_processor(
processor_name=processor_name,
init_parameters=init_parameters,
process_parameters=process_parameters,
preprocessing_callbacks=preprocessing_callbacks,
input_type=input_type,
output_type=output_type,
)
return self