#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
import os
import csv
import time
import zipfile
import tarfile
from dcase_util.containers import FileMixin, ObjectContainer, PackageMixin
from dcase_util.utils import get_byte_string, FileFormat
from dcase_util.ui import FancyStringifier
[docs]class File(FileMixin):
"""Generic file class"""
valid_formats = [FileFormat.YAML, FileFormat.JSON, FileFormat.CPICKLE, FileFormat.MARSHAL, FileFormat.MSGPACK,
FileFormat.TXT, FileFormat.CSV, FileFormat.ZIP, FileFormat.TAR]
[docs] def __init__(self, *args, **kwargs):
"""Constructor
Parameters
----------
filename : str, optional
File path
valid_formats : list of FileFormat items
List of valid formats (FileFormat)
Default [YAML,JSON,CPICKLE,MARSHAL,MSGPACK,TXT,CSV,ZIP,TAR]
"""
# Run FileMixin init
FileMixin.__init__(self, *args, **kwargs)
# Run super init to call init of mixins too
super(File, self).__init__(*args, **kwargs)
[docs] def load(self, filename=None):
"""Load file
Parameters
----------
filename : str, optional
File path
Default value filename given to class constructor
Raises
------
ImportError:
Error if file format specific module cannot be imported
IOError:
File does not exists or has unknown file format
Returns
-------
self
"""
if filename:
self.filename = filename
self.detect_file_format()
self.validate_format()
if self.exists():
from dcase_util.files import Serializer
# File exits
if self.format == FileFormat.YAML:
return Serializer.load_yaml(filename=self.filename)
elif self.format == FileFormat.CPICKLE:
return Serializer.load_cpickle(filename=self.filename)
elif self.format == FileFormat.MARSHAL:
return Serializer.load_marshal(filename=self.filename)
elif self.format == FileFormat.MSGPACK:
return Serializer.load_msgpack(filename=self.filename)
elif self.format == FileFormat.JSON:
return Serializer.load_json(filename=self.filename)
elif self.format == FileFormat.TXT:
with open(self.filename, 'r') as f:
lines = f.readlines()
return dict(zip(range(0, len(lines)), lines))
elif self.format == FileFormat.CSV:
data = {}
delimiter = self.delimiter()
with open(self.filename, 'rb') as f:
csv_reader = csv.reader(f, delimiter=delimiter)
for row in csv_reader:
if len(row) == 2:
data[row[0]] = row[1]
return data
else:
message = '{name}: Unknown format [{format}]'.format(name=self.__class__.__name__, format=self.filename)
self.logger.exception(message)
raise IOError(message)
else:
message = '{name}: File does not exists [{file}]'.format(name=self.__class__.__name__, file=self.filename)
self.logger.exception(message)
raise IOError(message)
[docs] def save(self, data, filename=None):
"""Save file
Parameters
----------
data
Data to be saved
filename : str, optional
File path
Default value filename given to class constructor
Raises
------
ImportError:
Error if file format specific module cannot be imported
IOError:
File has unknown file format
Returns
-------
self
"""
if filename:
self.filename = filename
self.detect_file_format()
self.validate_format()
if self.filename is None or self.filename == '':
message = '{name}: Filename is empty [{filename}]'.format(
name=self.__class__.__name__,
filename=self.filename
)
self.logger.exception(message)
raise IOError(message)
try:
from dcase_util.files import Serializer
if self.format == FileFormat.YAML:
Serializer.save_yaml(filename=self.filename, data=data)
elif self.format == FileFormat.CPICKLE:
Serializer.save_cpickle(filename=self.filename, data=data)
elif self.format == FileFormat.MARSHAL:
Serializer.save_marshal(filename=self.filename, data=data)
elif self.format == FileFormat.MSGPACK:
Serializer.save_msgpack(filename=self.filename, data=data)
elif self.format == FileFormat.JSON:
Serializer.save_json(filename=self.filename, data=data)
elif self.format == FileFormat.TXT:
with open(self.filename, "w") as text_file:
for line_id in data:
text_file.write(data[line_id])
else:
message = '{name}: Unknown format [{format}]'.format(
name=self.__class__.__name__,
format=self.filename
)
self.logger.exception(message)
raise IOError(message)
except KeyboardInterrupt:
os.remove(self.filename) # Delete the file, since most likely it was not saved fully
raise
return self
[docs]class FileLock(ObjectContainer):
"""Simple file-based locking class.
Usual solution for file locking is to use `fcntl` module. This class provides a bit more flexible solution
as it does not require file to be open to get a lock. This locking system should also work
with NFS mounts (also prior v3).
"""
[docs] def __init__(self, filename, timeout=60*1, lock_file_extension='lock', **kwargs):
"""Constructor
Parameters
----------
filename : str
File path
timeout : int
Timeout in seconds
lock_file_extension : str
File extension to be used for locking files
"""
# Run super init to call init of mixins too
super(FileLock, self).__init__(**kwargs)
self.timeout = timeout
self.main_filename = filename
self.lock_filename = filename + '.' + lock_file_extension
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indention used
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = super(FileLock, self).to_string(ui=ui, indent=indent)
output += ui.data(field='main_filename', value=self.main_filename, indent=indent) + '\n'
output += ui.data(field='lock_filename', value=self.lock_filename, indent=indent) + '\n'
output += ui.data(field='timeout', value=self.timeout, unit='sec', indent=indent) + '\n'
return output
@property
def expired(self):
"""Check is the locking file older than specified timeout.
Returns
-------
bool
"""
if self.is_locked:
if time.time() - os.path.getctime(self.lock_filename) > self.timeout:
return True
else:
return False
else:
return False
@property
def is_locked(self):
"""Check does the locking file exists.
Returns
-------
bool
"""
if os.path.isfile(self.lock_filename):
return True
else:
return False
[docs] def touch(self):
"""Create locking file with current time stamp.
Returns
-------
self
"""
with open(self.lock_filename, 'a'):
os.utime(self.lock_filename, None)
return self
def __enter__(self):
self.lock()
return self
def __exit__(self, type, value, traceback):
self.release()
[docs] def lock(self):
"""Lock file.
If lock is already set, method will wait until lock is released or timeout has reached.
Returns
-------
self
"""
while True:
if not self.is_locked:
# File is not locked we can proceed to the locking phase.
break
else:
if self.expired:
# Lock has been expired, suspected deadlock case, go on and steal the lock.
break
# File is locked by other, wait until lock is released or timeout has expired.
time.sleep(1) # pool only once per second
# Lock
self.touch()
return self
[docs] def release(self):
"""Release file lock.
Returns
-------
self
"""
if self.is_locked:
try:
os.remove(self.lock_filename)
except OSError as exception:
pass
return self
[docs]class Package(ObjectContainer, PackageMixin):
"""Generic package class"""
valid_formats = [FileFormat.ZIP, FileFormat.TAR]
[docs] def __init__(self, *args, **kwargs):
# Run ContainerMixin init
PackageMixin.__init__(self, *args, **kwargs)
self._file_info = None
self._size_compressed = None
self._size_uncompressed = None
super(Package, self).__init__(*args, **kwargs)
def to_string(self, ui=None, indent=0):
"""Get container information in a string
Parameters
----------
ui : FancyStringifier or FancyHTMLStringifier
Stringifier class
Default value FancyStringifier
indent : int
Amount of indention used
Default value 0
Returns
-------
str
"""
if ui is None:
ui = FancyStringifier()
output = ''
output += ui.class_name(self.__class__.__name__, indent=indent) + '\n'
if hasattr(self, 'filename') and self.filename:
output += ui.data(field='filename', value=self.filename, indent=indent) + '\n'
if self._file_info is None:
self.get_info()
output += ui.line('Size', indent=indent) + '\n'
output += ui.data(
field='Uncompressed',
value=get_byte_string(self._size_uncompressed),
indent=indent + 2
) + '\n'
if self.format == FileFormat.ZIP:
output += ui.data(
field='Compressed',
value=get_byte_string(self._size_compressed),
indent=indent + 2
) + '\n'
output += ui.data(
field='Ratio',
value=self._size_compressed/float(self._size_uncompressed) * 100,
unit='%',
indent=indent + 2
) + '\n'
output += ui.line('Files', indent=indent) + '\n'
output += ui.data(
field='Count',
value=len(self._file_info),
indent=indent + 2
) + '\n'
return output
def get_info(self):
"""Get package info
Returns
-------
self
"""
if self.format == FileFormat.ZIP:
zip = zipfile.ZipFile(
file=self.filename,
mode='r'
)
self._file_info = zip.infolist()
zip.close()
self._size_compressed = 0
self._size_uncompressed = 0
# Go through files and accumulate uncompressed and compressed file sizes
for file_info in self._file_info:
self._size_compressed += file_info.compress_size
self._size_uncompressed += file_info.file_size
elif self.format == FileFormat.TAR:
tar = tarfile.open(
name=self.filename,
mode='r:gz'
)
self._file_info = tar.getmembers()
tar.close()
self._size_uncompressed = 0
# Only uncompressed file size is available
for file_info in self._file_info:
self._size_uncompressed += file_info.size
self._size_compressed = None
return self