# -*- coding: utf-8 -*-
# Copyright (C) 2014 Johannes Baiter <johannes.baiter@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Various utility functions and classes.
"""
from __future__ import division, unicode_literals, print_function
import abc
import glob
import json
import logging
import os
import pkg_resources
import platform
import re
import subprocess
from unicodedata import normalize
import blinker
import colorama
import psutil
import roman
from colorama import Fore, Back, Style
from pathlib import Path
[docs]class SpreadsException(Exception):
""" General exception """
pass
[docs]class DeviceException(SpreadsException):
""" Raised when a device-related error occured. """
pass
[docs]class MissingDependencyException(SpreadsException):
""" Raised when a dependency for a plugin is missing. """
pass
[docs]def get_version():
""" Get installed version via pkg_resources. """
return pkg_resources.require('spreads')[0].version
[docs]def find_in_path(name):
""" Find executable in $PATH.
:param name: name of the executable
:type name: unicode
:returns: Path to executable or None if not found
:rtype: unicode or None
"""
candidates = None
if is_os('windows'):
import _winreg
if name.startswith('scantailor'):
try:
cmd = _winreg.QueryValue(
_winreg.HKEY_CLASSES_ROOT,
'Scan Tailor Project\\shell\\open\\command')
bin_path = cmd.split('" "')[0][1:]
if name.endswith('-cli'):
bin_path = bin_path[:-4] + "-cli.exe"
return bin_path if os.path.exists(bin_path) else None
except OSError:
return None
else:
path_dirs = os.environ.get('PATH').split(';')
path_dirs.append(os.getcwd())
path_exts = os.environ.get('PATHEXT').split(';')
candidates = (os.path.join(p, name + e)
for p in path_dirs
for e in path_exts)
else:
candidates = (os.path.join(p, name)
for p in os.environ.get('PATH').split(':'))
return next((c for c in candidates if os.path.exists(c)), None)
[docs]def is_os(osname):
""" Check if the current operating system matches the expected.
:param osname: Operating system name as returned by
:py:func:`platform.system`
:returns: Whether the OS matches or not
:rtype: bool
"""
return platform.system().lower() == osname
[docs]def check_futures_exceptions(futures):
"""" Go through passed :py:class:`concurrent.futures._base.Future` objects
and re-raise the first Exception raised by any one of them.
:param futures: Iterable that contains the futures to be checked
:type futures: iterable with :py:class:`concurrent.futures._base.Future`
instances
"""
if any(x.exception() for x in futures):
raise next(x for x in futures if x.exception()).exception()
[docs]def get_free_space(path):
""" Return free space on file-system underlying the passed path.
:param path: Path on file-system the free space of which is desired.
:type path; unicode
:return: Free space in bytes.
:rtype: int
"""
return psutil.disk_usage(unicode(path)).free
[docs]def get_subprocess(cmdline, **kwargs):
""" Get a :py:class:`subprocess.Popen` instance.
On Windows systems, the process will be ran in the background and won't
open a cmd-window or appear in the taskbar.
The function signature matches that of the :py:class:`subprocess.Popen`
initialization method.
"""
if subprocess.mswindows and 'startupinfo' not in kwargs:
su = subprocess.STARTUPINFO()
su.dwFlags |= subprocess.STARTF_USESHOWWINDOW
su.wShowWindow = subprocess.SW_HIDE
kwargs['startupinfo'] = su
return subprocess.Popen(cmdline, **kwargs)
[docs]def wildcardify(pathnames):
""" Try to generate a single path with wildcards that matches all
`pathnames`.
:param pathnames: List of pathnames to find a wildcard string for
:type pathanmes: List of str/unicode
:return: The wildcard string or None if none was found
:rtype: unicode or None
"""
wildcard_str = ""
for idx, char in enumerate(pathnames[0]):
if all(p[idx] == char for p in pathnames[1:]):
wildcard_str += char
elif not wildcard_str or wildcard_str[-1] != "*":
wildcard_str += "*"
matched_paths = glob.glob(wildcard_str)
if not sorted(pathnames) == sorted(matched_paths):
return None
return wildcard_str
[docs]def diff_dicts(old, new):
""" Get the difference between two dictionaries.
:param old: Dictionary to base comparison on
:type old: dict
:param new: Dictionary to compare with
:type new: dict
:return: A (possibly nested) dictionary containing all items from `new`
that differ from the ones in `old`
:rtype: dict
"""
out = {}
for key, value in old.iteritems():
if new[key] != value:
out[key] = new[key]
elif isinstance(value, dict):
diff = diff_dicts(value, new[key])
if diff:
out[key] = diff
return out
[docs]def slugify(text, delimiter=u'-'):
"""Generates an ASCII-only slug.
Code adapted from Flask snipped by Armin Ronacher:
http://flask.pocoo.org/snippets/5/
:param text: Text to create slug for
:type text: unicode
:param delimiter: Delimiter to use in slug
:type delimiter: unicode
:return: The generated slug
:rtype: unicode
"""
punctuation_re = r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+'
result = []
for word in re.split(punctuation_re, text.lower()):
word = normalize('NFKD', word).encode('ascii', 'ignore')
if word:
result.append(word)
return unicode(delimiter.join(result))
class _instancemethodwrapper(object): # noqa
def __init__(self, callable):
self.callable = callable
self.__dontcall__ = False
def __getattr__(self, key):
return getattr(self.callable, key)
def __call__(self, *args, **kwargs):
if self.__dontcall__:
raise TypeError('Attempted to call abstract method.')
return self.callable(*args, **kwargs)
class _classmethod(classmethod): # noqa
def __init__(self, func):
super(_classmethod, self).__init__(func)
isabstractmethod = getattr(func, '__isabstractmethod__', False)
if isabstractmethod:
self.__isabstractmethod__ = isabstractmethod
def __get__(self, instance, owner):
result = _instancemethodwrapper(super(_classmethod, self)
.__get__(instance, owner))
isabstractmethod = getattr(self, '__isabstractmethod__', False)
if isabstractmethod:
result.__isabstractmethod__ = isabstractmethod
abstractmethods = getattr(owner, '__abstractmethods__', None)
if abstractmethods and result.__name__ in abstractmethods:
result.__dontcall__ = True
return result
[docs]class abstractclassmethod(_classmethod): # noqa
""" New decorator class that implements the @abstractclassmethod decorator
added in Python 3.3 for Python 2.7.
Kudos to http://stackoverflow.com/a/13640018/487903
"""
[docs] def __init__(self, func):
func = abc.abstractmethod(func)
super(abstractclassmethod, self).__init__(func)
[docs]class ColourStreamHandler(logging.StreamHandler):
""" A colorized output StreamHandler
Kudos to Leigh MacDonald: http://goo.gl/Lpr6C5
"""
# Some basic colour scheme defaults
colours = {
'DEBUG': Fore.CYAN,
'INFO': Fore.GREEN,
'WARN': Fore.YELLOW,
'WARNING': Fore.YELLOW,
'ERROR': Fore.RED,
'CRIT': Back.RED + Fore.WHITE,
'CRITICAL': Back.RED + Fore.WHITE
}
@property
def is_tty(self):
""" Check if we are using a "real" TTY. If we are not using a TTY it
means that the colour output should be disabled.
:return: Using a TTY status
:rtype: bool
"""
try:
return getattr(self.stream, 'isatty', None)()
except:
return False
def emit(self, record):
try:
message = self.format(record)
if not self.is_tty:
self.stream.write(message)
else:
self.stream.write(self.colours[record.levelname] +
message + Style.RESET_ALL)
self.stream.write(getattr(self, 'terminator', '\n'))
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
[docs]class EventHandler(logging.Handler):
""" Subclass of :py:class:`logging.Handler` that emits a
:py:class:`blinker.base.Signal` whenever a new record is emitted.
"""
signals = blinker.Namespace()
on_log_emit = signals.signal('logrecord', doc="""\
Sent when a log record was emitted.
:keyword :class:`logging.LogRecord` record: the LogRecord
""")
def emit(self, record):
self.on_log_emit.send(record=record)
[docs]def get_data_dir(create=False):
""" Return (and optionally create) the user's default data directory.
:param create: Create the data directory if it doesn't exist
:type create: bool
:return: Path to the default data directory
:rtype: unicode
"""
unix_dir_var = 'XDG_DATA_HOME'
unix_dir_fallback = '~/.config'
windows_dir_var = 'APPDATA'
windows_dir_fallback = '~\\AppData\\Roaming'
mac_dir = '~/Library/Application Support'
base_dir = None
if is_os('darwin'):
if Path(unix_dir_fallback).exists:
base_dir = unix_dir_fallback
else:
base_dir = mac_dir
elif is_os('windows'):
if windows_dir_var in os.environ:
base_dir = os.environ[windows_dir_var]
else:
base_dir = windows_dir_fallback
else:
if unix_dir_var in os.environ:
base_dir = os.environ[unix_dir_var]
else:
base_dir = unix_dir_fallback
app_path = Path(base_dir)/'spreads'
if create and not app_path.exists():
app_path.mkdir()
return unicode(app_path)
[docs]def colorize(text, color):
""" Return text with a new ANSI foreground color.
:param text: Text to be wrapped
:param color: ANSI color to wrap text in
:type color: str (from `colorama.ansi <http://git.io/9qnt0Q>`)
:return: Colorized text
"""
return color + text + colorama.Fore.RESET
[docs]class RomanNumeral(object):
""" Number type that represents integers as Roman numerals and that
can be used in all arithmetic operations applicable to integers.
"""
@staticmethod
[docs] def is_roman(value):
""" Check if `value` is a valid Roman numeral.
:param value: Value to be checked
:type value: unicode
:returns: Whether the value is valid or not
:rtype: bool
"""
return bool(roman.romanNumeralPattern.match(value))
[docs] def __init__(self, value, case='upper'):
""" Create a new instance.
:param value: Value of the instance
:type value: int, unicode containing valid Roman numeral or
:py:class:`RomanNumeral`
"""
self._val = self._to_int(value)
self._case = case
if isinstance(value, basestring) and not self.is_roman(value):
self._case = 'lower'
elif isinstance(value, RomanNumeral):
self._case = value._case
def _to_int(self, value):
if isinstance(value, int):
return value
elif isinstance(value, basestring) and self.is_roman(value.upper()):
return roman.fromRoman(value.upper())
elif isinstance(value, RomanNumeral):
return value._val
else:
raise ValueError("Value must be a valid roman numeral, a string"
" representing one or an integer: '{0}'"
.format(value))
def __cmp__(self, other):
if self._val > self._to_int(other):
return 1
elif self._val == self._to_int(other):
return 0
elif self._val < self._to_int(other):
return -1
def __add__(self, other):
return RomanNumeral(self._val + self._to_int(other), self._case)
def __sub__(self, other):
return RomanNumeral(self._val - self._to_int(other), self._case)
def __int__(self):
return self._val
def __str__(self):
strval = roman.toRoman(self._val)
if self._case == 'lower':
return strval.lower()
else:
return strval
def __unicode__(self):
return unicode(str(self))
def __repr__(self):
return str(self)
[docs]class CustomJSONEncoder(json.JSONEncoder):
""" Custom :py:class:`json.JSONEncoder`.
Uses an object's `to_dict` method if present for serialization.
Serializes :py:class:`pathlib.Path` instances to the string
representation of their relative path to a BagIt-compliant directory or
their absolute path if not applicable.
"""
def default(self, obj):
if hasattr(obj, 'to_dict'):
return obj.to_dict()
if isinstance(obj, Path):
# Serialize paths that belong to a workflow as paths relative to
# its base directory
base = next((p for p in obj.parents if (p/'bagit.txt').exists()),
None)
if base:
return unicode(obj.relative_to(base))
else:
return unicode(obj.absolute())
return json.JSONEncoder.default(self, obj)