Source code for dcase_util.utils.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function, absolute_import
import six

import os
import sys
import locale
import logging
import logging.config
import pkg_resources


[docs]def get_class_inheritors(klass): """Get all classes inherited from given class Parameters ---------- klass : class Returns ------- list List of classes """ sub_classes = [] work = [klass] while work: parent = work.pop() for child in parent.__subclasses__(): if child not in sub_classes: sub_classes.append(child) work.append(child) return sub_classes
[docs]def get_byte_string(num_bytes, show_bytes=True): """Output number of bytes according to locale and with IEC binary prefixes Parameters ---------- num_bytes : int > 0 [scalar] Bytes show_bytes : bool, optional Show byte count Default value "True" Returns ------- str Human readable byte string """ KB = float(1024) MB = float(KB * KB) GB = float(KB * MB) TB = float(KB * GB) PB = float(KB * TB) EB = float(KB * PB) ZB = float(KB * EB) YB = float(KB * ZB) locale.setlocale(locale.LC_ALL, '') output = '' if show_bytes and num_bytes > KB: output += locale.format("%d", num_bytes, grouping=True) + ' bytes' output += ' (' if num_bytes >= YB: output += '%.4g YB' % (num_bytes / YB) elif num_bytes >= ZB: output += '%.4g ZB' % (num_bytes / ZB) elif num_bytes >= EB: output += '%.4g EB' % (num_bytes / EB) elif num_bytes >= PB: output += '%.4g PB' % (num_bytes / PB) elif num_bytes >= TB: output += '%.4g TB' % (num_bytes / TB) elif num_bytes >= GB: output += '%.4g GB' % (num_bytes / GB) elif num_bytes >= MB: output += '%.4g MB' % (num_bytes / MB) elif num_bytes >= KB: output += '%.4g KB' % (num_bytes / KB) else: output += '%d bytes' % (num_bytes) if show_bytes and num_bytes > KB: output += ')' return output
[docs]def check_pkg_resources(package_requirement, logger=None): working_set = pkg_resources.WorkingSet() if logger is None: logger = logging.getLogger(__name__) try: working_set.require(package_requirement) except pkg_resources.VersionConflict: message = '{name}: Version conflict, update package [pip install {package_requirement}]'.format( name=__name__, package_requirement=package_requirement ) logger.exception(message) raise except pkg_resources.DistributionNotFound: message = '{name}: Package not found, install package [pip install {package_requirement}]'.format( name=__name__, package_requirement=package_requirement ) logger.exception(message) raise
[docs]def is_int(value): """Check if given value is integer Parameters ---------- value : variable Returns ------- bool """ if value is not None: try: int(value) return True except ValueError: return False else: return False
[docs]def is_float(value): """Check if given value is float Parameters ---------- value : variable Returns ------- bool """ if value is not None: try: float(value) return True except ValueError: return False else: return False
[docs]def is_jupyter(): """Check if code is run in Jupyter (Jupyter notebook, Jupyter console, or ipython qtconsole). Returns ------- bool """ try: from IPython import get_ipython shell = get_ipython().__class__.__name__ if shell == 'ZMQInteractiveShell': # Jupyter notebook, Jupyter console, or qtconsole return True elif shell == 'google.colab._shell': # Google Colab return True elif shell == 'TerminalInteractiveShell': # Normal terminal console with IPython return False else: return False except (ModuleNotFoundError, NameError): # Normal python interpreter return False
[docs]def get_audio_info(filename, logger=None): """Get information about audio file without opening it. Parameters ---------- filename : str filename logger : Logger class Logger class Default value None Returns ------- DictContainer Dict with audio file information """ from dcase_util.utils.files import FileFormat from dcase_util.files import File from dcase_util.containers import DictContainer if logger is None: logger = logging.getLogger(__name__) file = File( filename=filename, valid_formats=[ FileFormat.WAV, FileFormat.FLAC, FileFormat.OGG, FileFormat.MP3, FileFormat.M4A, FileFormat.MP4, FileFormat.WEBM, FileFormat.MKV, ] ) if not file.exists(): # File does not exists message = '{name}: File does not exists [{filename}] '.format( name=__name__, filename=filename ) logger.exception(message) raise IOError(message) file.detect_file_format() if file.format is None: # Unknown format message = '{name}: File format cannot be detected for file [{filename}] '.format( name=__name__, filename=filename ) logger.exception(message) raise IOError(message) info = DictContainer({ 'filename': file.filename, 'bytes': file.bytes, 'format': file.format }) if file.format == FileFormat.WAV: import soundfile wav_info = soundfile.info(file=file.filename) info['fs'] = wav_info.samplerate info['channels'] = wav_info.channels info['duration_sec'] = wav_info.duration info['duration_ms'] = (wav_info.frames / float(wav_info.samplerate)) * 1000 info['duration_samples'] = wav_info.frames info['subtype'] = { 'name': wav_info.subtype, 'info': wav_info.subtype_info } # Map sub type to bit depth if info['subtype'] == 'PCM_16': info['bit_depth'] = 16 elif info['subtype'] == 'PCM_24': info['bit_depth'] = 24 elif info['subtype'] == 'PCM_32': info['bit_depth'] = 32 elif file.format in [FileFormat.FLAC, FileFormat.OGG, FileFormat.MP3, FileFormat.M4A, FileFormat.MP4, FileFormat.WEBM, FileFormat.MKV]: # Use ffprobe to get file info from other formats import subprocess import json import shlex cmd = "ffprobe -v quiet -print_format json -show_streams" args = shlex.split(cmd) args.append(file.filename) # Run command line command, fetch and parse json output try: output = subprocess.check_output(args).decode('utf-8') except OSError: # Error while running the command message = '{name}: It seems that ffmpeg (ffprobe) is not installed.'.format( name=__name__, filename=filename ) logger.exception(message) raise IOError(message) ffmpeg_meta = json.loads(output) for stream in ffmpeg_meta['streams']: if stream['codec_type'] == 'audio': # Fetch audio info from first audio stream info['fs'] = int(stream['sample_rate']) # Get duration if 'duration' not in stream: info['duration_sec'] = None elif is_float(stream['duration']): info['duration_sec'] = float(stream['duration']) else: info['duration_sec'] = None # Get bit rate if 'bit_rate' not in stream: info['bit_rate'] = None elif is_int(stream['bit_rate']): info['bit_rate'] = int(stream['bit_rate']) else: info['bit_rate'] = None # Get codec info info['codec'] = {} if 'codec_name' in stream: info['codec']['name'] = stream['codec_name'] if 'codec_long_name' in stream: info['codec']['name_long'] = stream['codec_long_name'] if 'codec_type' in stream: info['codec']['type'] = stream['codec_type'] break return info
[docs]def get_media_duration(filename, logger=None): """Get media file duration using ffprobe. Parameters ---------- filename : str filename logger : Logger class Logger class Default value None Returns ------- float Media duration in seconds """ import subprocess if logger is None: logger = logging.getLogger(__name__) if not os.path.exists(filename): # File does not exists message = '{name}: File does not exists [{filename}] '.format( name=__name__, filename=filename ) logger.exception(message) raise IOError(message) try: result = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) return float(result.stdout)
[docs]def merge_media_files(source_a_filename, source_b_filename, target_filename, overwrite=False, logger=None): """Merge media files using ffmpeg. Parameters ---------- source_a_filename : str source file A, filename source_b_filename : str source file B, filename target_filename : str target filename overwrite : bool overwrite existing target file logger : Logger class Logger class Default value None """ import subprocess if logger is None: logger = logging.getLogger(__name__) if not os.path.exists(source_a_filename): # Source file A does not exists message = '{name}: Source file A does not exists [{filename}] '.format( name=__name__, filename=source_a_filename ) logger.exception(message) raise IOError(message) if not os.path.exists(source_b_filename): # Source file B does not exists message = '{name}: Source File B does not exists [{filename}] '.format( name=__name__, filename=source_b_filename ) logger.exception(message) raise IOError(message) if not overwrite and os.path.exists(target_filename): # Target file does exists message = '{name}: Target file exists already [{filename}]. Enable overwrite flag or change target filename.'.format( name=__name__, filename=target_filename ) logger.exception(message) raise IOError(message) try: cmd = ['ffmpeg', '-i', source_a_filename, '-i', source_b_filename, '-shortest', '-strict', '-2', target_filename] if overwrite: cmd.append('-y') result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) except subprocess.CalledProcessError as e: raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
[docs]class SuppressStdoutAndStderr(object): """Context manager to suppress STDOUT and STDERR A context manager for doing a deep suppression of stdout and stderr, i.e. will suppress all print, even if the print originates in a compiled C/Fortran sub-function. After: http://stackoverflow.com/questions/11130156/suppress-stdout-stderr-print-from-python-functions """ def __enter__(self): if not is_jupyter(): # Only redirect STDOUT and STDERR in console self.stdout_null_file = open(os.devnull, 'w') self.stderr_null_file = open(os.devnull, 'w') self.stdout_fileno_undup_original = sys.stdout.fileno() self.stderr_fileno_undup_original = sys.stderr.fileno() self.stdout_fileno_original = os.dup (sys.stdout.fileno()) self.stderr_fileno_original = os.dup (sys.stderr.fileno()) self.stdout_original = sys.stdout self.stderr_original = sys.stderr # Assign stdout and stderr os.dup2(self.stdout_null_file.fileno(), self.stdout_fileno_undup_original) os.dup2(self.stderr_null_file.fileno(), self.stderr_fileno_undup_original) sys.stdout = self.stdout_null_file sys.stderr = self.stderr_null_file return self def __exit__(self, *_): if not is_jupyter(): # Return stdout and stderr sys.stdout = self.stdout_original sys.stderr = self.stderr_original os.dup2(self.stdout_fileno_original, self.stdout_fileno_undup_original) os.dup2(self.stderr_fileno_original, self.stderr_fileno_undup_original) os.close(self.stdout_fileno_original) os.close(self.stderr_fileno_original) self.stdout_null_file.close() self.stderr_null_file.close()
[docs]class VectorRecipeParser(object):
[docs] def __init__(self, delimiters=None, default_stream=0, **kwargs): # Define delimiters self.delimiters = { 'block': ';', 'detail': '=', 'dimension': ':', 'segment': '-', 'vector': ',' } if delimiters: self.delimiters.update(delimiters) self.default_stream = default_stream
def parse(self, recipe): """Parse feature vector recipe Overall format: [block #1];[block #2];[block #3];... Block formats: - [label (string)]=full vector - [label (string)]=[start index (int)]-[end index (int)] => default stream and vector [start:end] - [label (string)]=[stream (int or string)]:[start index (int)]-[end index (int)] => specified stream and vector [start:end] - [label (string)]=1,2,3,4,5 => vector [1,2,3,4,4] - [label (string)]=0 => specified stream and full vector Parameters ---------- recipe : str Feature recipe Returns ------- data : dict Feature recipe structure """ if isinstance(recipe, six.string_types): data = [] labels = recipe.split(self.delimiters['block']) for label in labels: label = label.strip() if label: detail_parts = label.split(self.delimiters['detail']) label = detail_parts[0].strip() # Default values, used when only extractor is # defined e.g. [extractor (string)]; [extractor (string)] vector_index_structure = { 'stream': self.default_stream, 'selection': False, 'full': True, } # Inspect recipe further if len(detail_parts) == 2: main_index_parts = detail_parts[1].split(self.delimiters['dimension']) vector_indexing_string = detail_parts[1] if len(main_index_parts) > 1: # Channel has been defined, # e.g. [extractor (string)]=[channel (int)]:[start index (int)]-[end index (int)] vector_index_structure['stream'] = int(main_index_parts[0]) vector_indexing_string = main_index_parts[1] vector_indexing = vector_indexing_string.split(self.delimiters['segment']) if len(vector_indexing) > 1: vector_index_structure['start'] = int(vector_indexing[0].strip()) vector_index_structure['stop'] = int(vector_indexing[1].strip()) + 1 vector_index_structure['full'] = False vector_index_structure['selection'] = False else: vector_indexing = vector_indexing_string.split(self.delimiters['vector']) if len(vector_indexing) > 1: a = list(map(int, vector_indexing)) vector_index_structure['full'] = False vector_index_structure['selection'] = True vector_index_structure['vector'] = a else: vector_index_structure['stream'] = int(vector_indexing[0]) vector_index_structure['full'] = True vector_index_structure['selection'] = False current_data = { 'label': label, 'vector-index': vector_index_structure, } else: current_data = { 'label': label, } data.append(current_data) from dcase_util.containers import ListDictContainer return ListDictContainer(data) else: return recipe