Source code for dcase_util.utils.files
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
from six import iteritems
import os
import argparse
import itertools
import platform
import logging
[docs]def argument_file_exists(filename):
"""Argument file checker
Type for argparse. Checks that file exists but does not open.
Parameters
----------
filename : str
Returns
-------
str
filename
"""
if not os.path.exists(filename):
# Argparse uses the ArgumentTypeError to give a rejection message like:
# error: argument input: x does not exist
raise argparse.ArgumentTypeError("{0} does not exist".format(filename))
return filename
[docs]def filelist_exists(filelist):
"""Check that all file in the list exists
Parameters
----------
filelist : dict of paths
Dict containing paths to files. Two level of dict inspected.
Returns
-------
bool
Returns True if all files exists, False if any of them does not
"""
file_exist = []
for item_key, item_value in iteritems(filelist):
if isinstance(item_value, dict):
for sub_item_key, sub_item_value in iteritems(item_value):
if isinstance(sub_item_value, str):
file_exist.append(os.path.isfile(sub_item_value))
elif isinstance(item_value, str):
file_exist.append(os.path.isfile(item_value))
return all(file_exist)
[docs]def posix_path(path):
"""Converts path to POSIX format
Parameters
----------
path : str
Path
Returns
-------
str
"""
return os.path.normpath(path).replace('\\', '/')
[docs]class Path(object):
"""Utility class for paths"""
[docs] def __init__(self, path=None):
"""Constructor
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
"""
self.path = path
self.path = self.posix()
@property
def logger(self):
"""Logger instance"""
logger = logging.getLogger(__name__)
if not logger.handlers:
from dcase_util.utils import setup_logging
setup_logging()
return logger
[docs] def posix(self, path=None):
"""Converts path to POSIX format
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
str
"""
if path is None:
path = self.path
if path is not None:
return os.path.normpath(path).replace('\\', '/')
else:
return None
[docs] def posix_to_nt(self, path=None):
"""Converts posix formatted path to nt
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
str
"""
if path is None:
path = self.path
return path.replace('/', os.path.sep)
def shorten(self, path=None, part_count=3):
"""Shorten path into given parts length
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
part_count : int
Count of path parts
Default value 3
Returns
-------
str
"""
if path is None:
path = self.path
if path is not None:
parts = path.split(os.sep)
if len(parts) > part_count:
return '.....' + os.path.join(*parts[-part_count:])
else:
return path
else:
return path
[docs] def file_list(self, path=None, recursive=True, extensions=None,
case_sensitive=False, absolute_paths=False, offset=0, limit=None):
"""Get file list
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
recursive : bool
Do recursive search to sub-directories
Default value True
extensions : str or list
List valid file extensions or comma-separated string.
Default value None
case_sensitive : bool
Use case sensitive file extension matching.
Default value False
absolute_paths : bool
Return absolute paths instead of relative ones.
Default value False
offset : int
Offset of files to be included.
Default value 0
limit : int
Amount of files to be included.
Default value None
Returns
-------
list
"""
def process_file(path, filename, extensions, absolute_paths=False):
current_path = None
filename_base, file_extension = os.path.splitext(filename)
if extensions is None or file_extension[1:] in extensions:
current_path = os.path.join(path, filename)
if absolute_paths:
current_path = os.path.abspath(current_path)
return current_path
if path is None:
path = self.path
if extensions is not None and isinstance(extensions, str):
extensions = extensions.split(',')
if extensions is not None and not case_sensitive:
for ext in extensions:
if ext.lower() not in extensions:
extensions.append(ext.lower())
if ext.upper() not in extensions:
extensions.append(ext.upper())
files = []
if recursive:
for dir_path, dir_names, filenames in os.walk(path):
for f in filenames:
current_path = process_file(
path=dir_path,
filename=f,
extensions=extensions,
absolute_paths=absolute_paths
)
if current_path:
files.append(current_path)
else:
for f in os.listdir(path):
current_path = process_file(
path=path,
filename=f,
extensions=extensions,
absolute_paths=absolute_paths
)
if current_path:
files.append(current_path)
files.sort()
if offset and 0 <= offset < len(files):
files = files[offset:]
if limit is not None and 0 <= limit < len(files):
files = files[:limit]
return files
[docs] def exists(self, path=None):
"""Checks that path exists
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
bool
"""
if path is None:
path = self.path
return os.path.isdir(path)
[docs] def file_count(self, path=None):
"""File count under given path including sub directories.
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
int
"""
if path is None:
path = self.path
total_files = 0
for root, dirs, files in os.walk(path):
total_files += len(files)
return total_files
[docs] def size_bytes(self, path=None):
"""Total byte count of all files under given path.
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
int
"""
if path is None:
path = self.path
total_size = 0
for f in self.file_list(path=path):
total_size += os.path.getsize(f)
return total_size
[docs] def size_string(self, path=None, show_bytes=False):
"""Total data size of all files under given path returned in human readable form.
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
show_bytes : bool
Show exact byte count
Default value False
Returns
-------
str
"""
if path is None:
path = self.path
return get_byte_string(self.size_bytes(path=path), show_bytes=show_bytes)
[docs] def makedirs(self, path=None):
"""Create given path.
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
Returns
-------
nothing
"""
if path is None:
path = self.path
if isinstance(path, str) and not os.path.isdir(path):
try:
os.makedirs(path)
except OSError as exception:
pass
[docs] def create(self, paths=None):
"""Create given paths.
Parameters
----------
paths : str, dict or list or str
Paths. If None given, path given to initializer is used instead.
Default value None
Returns
-------
nothing
"""
if paths is None:
paths = self.path
if isinstance(paths, str):
self.makedirs(paths)
elif isinstance(paths, dict):
for key, value in iteritems(paths):
self.makedirs(value)
elif isinstance(paths, list):
for value in paths:
self.makedirs(value)
else:
message = '{name}: Unknown data type for paths.'.format(name=self.__class__.__name__)
self.logger.exception(message)
raise ValueError(message)
def modify(self, path=None, path_base=None, filename_extension=None, filename_prefix=None, filename_postfix=None):
"""Modify path
Parameters
----------
path : str
Path, if none given one given to class constructor is used.
Default value None
path_base : str
Replacement path base, e.g. path base for "/test/audio/audio.wav" is "/test/audio".
Default value None
filename_extension : str
Replacement file extension
Default value None
filename_prefix : str
Prefix to be added to the filename body
Default value None
filename_postfix : str
Postfix to be added to the filename body
Default value None
Returns
-------
str
"""
if path is None:
path = self.path
current_path_base, current_last_level_path = os.path.split(path)
current_filename_base, current_extension = os.path.splitext(current_last_level_path)
if path_base:
current_path_base = path_base
if filename_extension:
current_extension = filename_extension
if filename_prefix:
current_filename_base = filename_prefix + current_filename_base
if filename_postfix:
current_filename_base = current_filename_base + filename_postfix
return os.path.join(current_path_base, current_filename_base+current_extension)
[docs]class ApplicationPaths(Path):
"""Utility class for application paths, paths are automatically generated based on parameters through parameter hash."""
[docs] def __init__(self, parameter_container=None):
"""Constructor
Parameters
----------
parameter_container : ParameterContainer
Application parameter container
Default value None
"""
self.parameter_container = parameter_container
[docs] def generate(self, path_base, structure):
"""Generate application paths and include parameter hashes to the paths
Parameters
----------
path_base : str
Path base, this is used as base of all paths
structure : dict
Dictionary where key is path name, and value is list of parameter paths
Returns
-------
dict
"""
path_parts = [path_base]
keys = []
wild_card_found = False
for part in structure:
if '*' in part:
wild_card_found = True
path_ = self.parameter_container.get_path(
path=part[:part.find('*') - 1]
)
if path_:
keys = list(path_.keys())
param_hash = self.parameter_container.get_path(
path=part + '._hash'
)
if param_hash is not None:
if isinstance(param_hash, list):
directory_name = []
for h in param_hash:
directory_name.append(part.split('.')[0]+'_'+h)
else:
directory_name = self.directory_name(
prefix=part.split('.')[0],
param_hash=param_hash
)
path_parts.append(directory_name)
paths = self.construct_path(path_parts)
if not wild_card_found and len(paths) == 1:
return paths[0]
else:
return dict(zip(keys, paths))
[docs] @staticmethod
def directory_name(prefix, param_hash):
"""Generate directory name.
Parameters
----------
prefix : str
Prefix
param_hash : str
Parameter hash
Returns
-------
str
"""
if platform.system() == 'Windows':
# Use short directory names and truncated hash for Windows, as it has path length limit (260)
return param_hash[0:20]
else:
return prefix + '_' + param_hash
[docs] def save_parameters_to_path(self, path_base, structure, parameter_filename='parameters.yaml'):
"""Save parameters to each application sub-directory.
Parameters
----------
path_base : str
Base path
structure : dict
Dictionary where key is path name, and value is list of parameter paths
parameter_filename : str
Default value "parameters.yaml"
Returns
-------
nothing
"""
from dcase_util.containers import ParameterContainer
path_parts = [path_base]
for part in structure:
param_hash = self.parameter_container.get_path(path=part + '._hash')
if param_hash is not None:
if isinstance(param_hash, list):
directory_name = []
for h in param_hash:
directory_name.append(part.split('.')[0] + '_' + h)
else:
directory_name = self.directory_name(
prefix=part.split('.')[0],
param_hash=param_hash
)
parameters = self.parameter_container.get_path(path=part)
path_parts.append(directory_name)
current_path = self.construct_path(path_parts)
if isinstance(current_path, str):
ParameterContainer(parameters).save(
filename=os.path.join(current_path[0], parameter_filename)
)
else:
if isinstance(parameters, dict):
ParameterContainer(parameters).save(
filename=os.path.join(current_path[0], parameter_filename)
)
else:
for path_id, path in enumerate(current_path):
if parameters[path_id]:
ParameterContainer(parameters[path_id]).save(
filename=os.path.join(path, parameter_filename)
)
[docs] @staticmethod
def construct_path(path_parts):
"""Generate all combinations of based on path parts
Parameters
----------
path_parts : list
Path parts
Returns
-------
list
"""
if len(path_parts) > 1:
for i, value in enumerate(path_parts):
if isinstance(value, str):
path_parts[i] = [value]
if len(path_parts) == 2:
path_parts = list(itertools.product(path_parts[0], path_parts[1]))
elif len(path_parts) == 3:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2]))
elif len(path_parts) == 4:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3]))
elif len(path_parts) == 5:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3],
path_parts[4]))
elif len(path_parts) == 6:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3],
path_parts[4], path_parts[5]))
elif len(path_parts) == 7:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3],
path_parts[4], path_parts[5], path_parts[6]))
elif len(path_parts) == 8:
path_parts = list(itertools.product(path_parts[0], path_parts[1], path_parts[2], path_parts[3],
path_parts[4], path_parts[5], path_parts[6], path_parts[7]))
out_path = []
for l in path_parts:
out_path.append(os.path.join(*l))
return out_path
else:
return path_parts
[docs]class FileFormat(object):
YAML = 'YAML' #: YAML file
CPICKLE = 'CPICKLE' #: pickled Python object
NUMPY = 'NPY' #: Numpy data object
NUMPYZ = 'NPZ' #: Numpy zip data object
XML = 'XML' #: Extensible Markup Language (XML) file
JSON = 'JSON' #: JavaScript Object Notation (JSON) file
MARSHAL = 'MARSHAL' #: Marshal Data Migration Model File
MSGPACK = 'MSGPACK' #: MessagePack
TXT = 'TXT' #: TXT file
CSV = 'CSV' #: Comma-separated values (CSV) file
ANN = 'ANN' #: Annotation file
META = 'META' #: Annotation file
WAV = 'WAV' #: Audio file, Waveform Audio File Format (WAVE) file
FLAC = 'FLAC' #: Audio file, Free Lossless Audio Codec (FLAC) file
MP3 = 'MP3' #: Audio file (compressed), MPEG-2 Audio Layer III file
AAC = 'AAC' #: Audio file (compressed), Advanced Audio Coding file
AC3 = 'AC3' #: Audio file (compressed), Audio Codec 3 file
M4A = 'M4A' #: Audio file (compressed), MPEG-4 codec audio file
AIFF = 'AIFF' #: Audio file, Audio Interchange File Format file
AMR = 'AMR' #: Audio file, Adaptive Multi-Rate audio codec file
AU = 'AU' #: Audio file, AU file format
OGG = 'OGG' #: Audio file (compressed)
RA = 'RA' #: Audio file (compressed), RealAudio files
VOC = 'VOC' #: Audio file, Creative voice file
WMA = 'WMA' #: Audio file, Windows Media Audio File
MKA = 'MKA' #: Audio file, Matroska audio
FLV = 'FLV' #: Video file, Flash video
WEBM = 'WEBM' #: Video file
MKV = 'MKV' #: Video file, Matroska video
MOV = 'MOV' #: Video file, Apple QuickTime Movie
MP4 = 'MP4' #: Video file, MPEG-4 Video File
MPG = 'MPG' #: Video file, MPEG Video File
AVI = 'AVI' #: Video file, Audio Video Interleave File
WMV = 'WMV' #: Video file, Windows Media Video File
TAR = 'TAR' #: Consolidated Unix File Archive
GZ = 'GZ' #: Compressed file, Gnu Zipped Archive
ZIP = 'ZIP' #: Compressed file, Zipped File
RAR = 'RAR' #: Compressed file, WinRAR Compressed Archive
PDF = 'PDF' #: Document file, Portable Document Format File
GIF = 'GIF' #: Image file, Graphical Interchange Format File
JPG = 'JPG' #: Image file, JPEG Image
PNG = 'PNG' #: Image file, Portable Network Graphic
SVG = 'SVG' #: Image file, Scalable Vector Graphics File
RAW = 'RAW' #: Raw binary file
UNKNOWN = 'UNKNOWN' #: Unknown format
[docs] @classmethod
def detect_based_on_filename(cls, filename):
"""Detect file format based on filename.
Parameters
----------
filename : str
Path to the file
Returns
-------
str
File format label
"""
extension = os.path.splitext(filename.lower())[1]
if extension == '.yaml':
return cls.YAML
elif extension == '.xml':
return cls.XML
elif extension == '.json':
return cls.JSON
elif extension in ['.cpickle', '.pickle', '.pkl']:
return cls.CPICKLE
elif extension == '.npy':
return cls.NUMPY
elif extension == '.npz':
return cls.NUMPYZ
elif extension == '.marshal':
return cls.MARSHAL
elif extension == '.msgpack':
return cls.MSGPACK
elif extension in ['.txt', '.hash']:
return cls.TXT
elif extension == '.csv':
return cls.CSV
elif extension == '.ann':
return cls.ANN
elif extension == '.meta':
return cls.META
elif extension == '.wav':
return cls.WAV
elif extension == '.flac':
return cls.FLAC
elif extension == '.mp3':
return cls.MP3
elif extension == '.ogg':
return cls.OGG
elif extension == '.aac':
return cls.AAC
elif extension == '.ac3':
return cls.AC3
elif extension == '.aiff':
return cls.AIFF
elif extension == '.amr':
return cls.AMR
elif extension == '.au':
return cls.AU
elif extension == '.ra':
return cls.RA
elif extension == '.voc':
return cls.VOC
elif extension == '.m4a':
return cls.M4A
elif extension == '.wma':
return cls.WMA
elif extension == '.wmv':
return cls.WMV
elif extension == '.webm':
return cls.WEBM
elif extension == '.avi':
return cls.AVI
elif extension == '.flv':
return cls.FLV
elif extension == '.mka':
return cls.MKA
elif extension == '.mkv':
return cls.MKV
elif extension == '.mov':
return cls.MOV
elif extension == '.mp4':
return cls.MP4
elif extension == '.mpg':
return cls.MPG
elif extension == '.tar':
return cls.TAR
elif extension == '.gz':
if '.tar' in filename.lower():
return cls.TAR
else:
return cls.GZ
elif extension == '.zip':
return cls.ZIP
elif extension == '.rar':
return cls.RAR
elif extension == '.pdf':
return cls.PDF
elif extension == '.png':
return cls.PNG
elif extension == '.jpg':
return cls.JPG
elif extension == '.gif':
return cls.GIF
elif extension == '.svg':
return cls.SVG
elif extension == '.raw':
return cls.RAW
else:
return cls.UNKNOWN
[docs] @classmethod
def detect_based_on_content(cls, filename):
"""Detect file format based on content by using python-magic.
Parameters
----------
filename : str
Path to the file
Returns
-------
str
File format label
"""
if os.path.isfile(filename):
try:
import magic
file_description = magic.from_file(filename).split(',')
if file_description[0] == 'FLAC audio bitstream data':
return cls.FLAC
elif file_description[0] == 'RIFF (little-endian) data' and file_description[1].strip() == 'WAVE audio':
return cls.WAV
elif file_description[0] == 'Ogg data':
return cls.OGG
elif file_description[0] == 'MPEG ADTS' and file_description[1].strip() == 'AAC':
return cls.AAC
elif file_description[0] == 'ATSC A/52 aka AC-3 aka Dolby Digital stream':
return cls.AC3
elif file_description[0] == 'IFF data' and file_description[1].strip() == 'AIFF audio':
return cls.AIFF
elif file_description[0] == 'Adaptive Multi-Rate Codec (GSM telephony)':
return cls.AMR
elif file_description[0].startswith('Creative Labs voice data'):
return cls.VOC
elif file_description[0].startswith('Sun/NeXT audio data'):
return cls.AU
elif file_description[0] == 'RealMedia file':
return cls.RA
elif file_description[0] == 'Macromedia Flash Video':
return cls.FLV
elif file_description[0] == 'RIFF (little-endian) data' and file_description[1].strip() == 'AVI':
return cls.AVI
elif file_description[0] == 'EBML file' and file_description[1].strip() == 'creator matroska':
return cls.MKA # cls.MKA or cls.MKV
elif file_description[0] == 'ISO Media' and file_description[1].strip() == 'Apple QuickTime movie':
return cls.MOV
elif file_description[0] == 'ISO Media' and file_description[1].strip() == 'MPEG v4 system':
return cls.MP4
elif file_description[0] == 'WebM':
return cls.WEBM
elif file_description[0] == 'MPEG sequence':
return cls.MPG
elif file_description[0] == 'Microsoft ASF':
return cls.WMA # cls.WMA or cls.WMV
elif file_description[0] == 'Zip archive data':
return cls.ZIP
elif file_description[0] == 'PDF document':
return cls.PDF
elif file_description[0] == 'SVG Scalable Vector Graphics image':
return cls.SVG
elif file_description[0] == 'GIF image data':
return cls.GIF
elif file_description[0] == 'JPEG image data':
return cls.JPG
elif file_description[0] == 'PNG image data':
return cls.PNG
elif file_description[0].startswith('Audio file with ID3') and file_description[1] == ' layer III':
return cls.MP3
elif file_description[0] == 'RAR archive data':
return cls.RAR
elif file_description[0] == 'POSIX tar archive':
return cls.TAR
elif file_description[0] == 'gzip compressed data':
f = magic.Magic(uncompress=True)
file_description = f.from_file(filename).split(',')
if file_description[0].startswith('POSIX tar archive (GNU)'):
return cls.TAR
else:
return cls.GZ
elif file_description[0] == '8086 relocatable (Microsoft)':
import cPickle as pickle
try:
pickle.load(open(filename, "rb"))
return cls.CPICKLE
except:
pass
elif file_description[0] == 'ASCII text':
with open(filename, "r") as in_fh:
# Read the file into memory for parsing
data = in_fh.read()
import json
try:
json.loads(data)
return cls.JSON
except ValueError:
pass
import xml.etree.ElementTree
try:
xml.etree.ElementTree.parse(filename).getroot()
return cls.XML
except xml.etree.ElementTree.ParseError:
pass
import yaml
try:
yaml.safe_load(data)
return cls.YAML
except (TypeError, yaml.scanner.ScannerError, yaml.constructor.ConstructorError):
pass
import csv
try:
with open(filename, 'rb') as f:
csv_reader = csv.reader(f)
return cls.CSV
except ValueError:
pass
return None
except ImportError:
return None
else:
return None
[docs] @classmethod
def detect(cls, filename, use_content_for_unknown=True):
"""Detect file format. First the file extension is used, if the format is not recognized based on filename alone then content is checked given that file exists.
Parameters
----------
filename : str
Path to the file
use_content_for_unknown : bool
Use file content to detect the file format if file exists.
Default value True
Returns
-------
str
File format label
"""
# Detect first from the filename
result1 = cls.detect_based_on_filename(filename=filename)
# If format is unknown still, try recover it from the content header
if use_content_for_unknown:
if result1 == cls.UNKNOWN:
result2 = cls.detect_based_on_content(filename=filename)
if result2 is not None:
return result2
return result1
[docs] @classmethod
def validate_label(cls, label):
"""Validate file format label against labels known by this class
Parameters
----------
label : str
file format label
Returns
-------
bool
"""
return label in list(cls.__dict__.keys())