Source code for aquaduct.utils.clui

# -*- coding: utf-8 -*-

# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2018  Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
# Copyright (C) 2019  Tomasz Magdziarz <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Module comprises convieniences functions and definitios for different operations related to command line user interface.
"""

import logging

logger = logging.getLogger(__name__)

from aquaduct import logger as root_logger
import datetime
import time
from sys import stderr
from os import linesep
from functools import partial
import numpy as np
from aquaduct.utils.helpers import is_number, is_float, is_iterable
import json
from traceback import print_tb

from multiprocessing import Manager

try:
    from termcolor import colored


    def bold(mess):
        return colored(mess, attrs=['bold'])
except ImportError:
    def bold(mess):
        return mess


# roman emulation
class roman_emulation(object):
    def toRoman(self, nr):
        out = ''
        if nr < 0:
            out += '-'
            nr = -nr
        assert nr <= 10, 'Only values 0-10 are supported'
        if nr >= 1 and nr <= 3:
            out += 'I' * nr
        if nr == 4:
            out += 'IV'
        if nr >= 5 and nr <= 8:
            out += 'V' + self.toRoman(nr - 5)
        if nr == 9:
            out += 'IX'
        if nr == 10:
            out += 'X'
        return out


roman = roman_emulation()


def emit_message_to_file_in_root_logger(mess):
    # emits message to the file used by file handler in the root logger
    # assumes there is only one file handler
    if logging.FileHandler in list(map(type, root_logger.handlers)):
        fh = root_logger.handlers[list(map(type, root_logger.handlers)).index(logging.FileHandler)]
        with fh.lock:
            with open(fh.baseFilename, 'a') as logfile:
                logfile.write(mess)


def emit_tvtb_to_file_in_root_logger(tvtb):
    # emits special message to the file used by file handler in the root logger
    # assumes there is only one file handler
    # tvtb should be output of sys.exc_info()
    if logging.FileHandler in list(map(type, root_logger.handlers)):
        fh = root_logger.handlers[list(map(type, root_logger.handlers)).index(logging.FileHandler)]
        with fh.lock:
            with open(fh.baseFilename, 'a') as logfile:
                t, v, tb = tvtb
                logfile.write("Traceback (most recent call last):" + linesep)
                print_tb(tb, None, logfile)
                logfile.write("%s: %s%s" % (t.__name__, str(v), linesep))


def message_special(mess):
    emit_message_to_file_in_root_logger(mess)


[docs]def message(mess, cont=False): """ Prints message to standard error. If FileHandler is present in the :class:`root_logger` the same message is appended to the log file. :param str mess: message to print :param bool cont: if set True no new line is printed """ if cont: mess = mess + ' ' else: mess = mess + linesep emit_message_to_file_in_root_logger(mess) stderr.write(mess)
class fbm(object): # feedback message def __init__(self, info, cont=True): self.__cont = cont self.__info = info message(info + '...', cont=self.__cont) def __enter__(self): return self def __exit__(self, typ, value, traceback): if typ is None: if self.__cont: message("OK.") else: message(self.__info + ": DONE.") def __call__(self, info): if self.__cont: message(info, cont=True) else: message(info, cont=False) class tictoc(object): def __init__(self, mess, stdout=False): self.__tic = 0 self.__toc = 0 self.__mess = mess self.__stdout = stdout def __enter__(self): self.__tic = time.time() return self def __exit__(self, typ, value, traceback): self.__toc = time.time() if typ is None: if self.__stdout: message('Execution time of [%s] %f' % (self.__mess, (self.__toc - self.__tic))) else: # logger.debug('Execution time of [%s] %s', self.__mess, smart_time_string(self.__toc - self.__tic)) logger.debug('Execution time of [%s] %f', self.__mess, (self.__toc - self.__tic)) @property def duration(self): return self.__toc - self.__tic gregorian_year_in_days = 365.2425 '''Length of Gregorian year in days. Average value. Source: https://en.wikipedia.org/wiki/Year'''
[docs]def smart_time_string(s, rl=0, t=1.1, maximal_length=None, maximal_units=5): """ Function transforms time in seconds to nicely formatted string of length defined by :attr:`maximal_length`. Depending on number of seconds time is represented with one or more of the following units: ========= ================= Unit name Unit abbreviation ========= ================= seconds s minutes m hours h days d years y ========= ================= Maximal number of units used in time string can be set with :attr:`maximal_units`. :param int s: Input time in seconds. :param int rl: Number of units already used for representing time. :param float t: Exces above standard number of current time units. :param int maximal_length: Maximal length of the output string. Must be greater then 0. :param int maximal_units: Maximal number of units used in the output string. Must be greater then 0 and lower then 6. :return: string of nicely formated time :rtype: str """ # assert isinstance(maximal_length, (int, long)) # assert maximal_length > 0 assert isinstance(maximal_units, int) assert maximal_units > 0 assert maximal_units < 6 output = '' rl += 1 if rl > maximal_units: output = '' else: # seconds if s < t * 60: output = "%2.2d s" % s elif s < t * 3600: output = ("%2.2d m" % (int(s) / 60)) + ' ' + smart_time_string(int(s) % 60, rl).strip() elif s < t * 3600 * 24: output = ("%2.2d h" % (int(s) / 3600)) + ' ' + smart_time_string(int(s) % 3600, rl).strip() elif s < t * 3600 * 24 * gregorian_year_in_days: output = ("%d d" % (int(s) / (3600 * 24))) + ' ' + smart_time_string(int(s) % (3600 * 24), rl).strip() elif True: output = ("%d y" % (int(s) / (3600 * 24 * gregorian_year_in_days))) + ' ' + smart_time_string( int(s) % (3600 * 24 * gregorian_year_in_days), rl).strip() if maximal_length: return (output + " " * maximal_length)[:maximal_length] return output
################################### # vis separators
[docs]def gsep(sep='-', times=72, length=None): """ Generic separator. :param str sep: Element(s) of separator. :param int times: Number of times :attr:`sep` is printed. :param int length: Optional maximal length of output. :return: String separator. :rtype: str """ return (sep * times)[:length]
[docs]def tsep(line): """ :param str line: Input line. :return: Returns default :func:`gsep` of length of :attr:`line`. """ return gsep(sep='-', times=len(line))
[docs]def underline(line): """ :param str line: Input line. :return: String made by concatenation of :attr:`line`, :mod:`os.linesep`, and output of :func:`tsep` called with :attr:`line`. :rtype: str """ uline = line uline += linesep uline += tsep(line) return uline
[docs]def thead(line): """ :param str line: Input line. :return: String made by concatenation of output of :func:`tsep` called with :attr:`line`, :attr:`line`, :mod:`os.linesep`, and again output of :func:`tsep` called with :attr:`line`. :rtype: str """ header = tsep(line) header += linesep header += line header += linesep header += tsep(line) return header
###################################
[docs]class SimpleProgressBar(object): """ Simple progress bar displaying progress with percent indicator, progress bar and ETA. Progress is measured by iterations. :cvar str rotate: String comprising characters with frames of a rotating toy. :cvar int barlenght: Length of progress bar. :ivar int maxval: maximal number of iterations :ivar int current: current number of iterations :ivar bool overrun_notice: if True, overrun above :attr:`maxval` iterations causes insert of newline :ivar bool overrun: flag of overrun :ivar int begin: time in seconds at the initialization of the :class:`SimpleProgressBar` class. :ivar int tcurrent: time in seconds of current iteration """ rotate = '\\|/-' # rotate = '<^>v' # rotate = '.:|:.' # rotate = 'x+' barlenght = 24 def __init__(self, maxval=None, mess=None, iterable=None): """ :param int maxval: Maximal number of iterations stored to :attr:`maxval`. :param str mess: Optional message displayed at progress bar initialization. """ # self.lock = Manager().Lock() if maxval is None and is_iterable(iterable): maxval = len(iterable) self.iterable = iterable assert isinstance(maxval, int), 'Parameter maxval should be of int or long type, %r given instead.' % type( maxval) if maxval < 1: self.maxval = 1 else: self.maxval = maxval self.tens = [] self.hundreds = [] self.current = 0 self.overrun_notice = True self.overrun = False self.begin = time.time() self.tcurrent = self.begin self.tictoclist = [] self.last_eta = None self.last_rotate_time = self.begin self.last_rotate_idx = 0 if mess is not None: message(mess) self.show() def __enter__(self): return self def __exit__(self, typ, value, traceback): if typ is None: self.finish() def iter(self, finish=False): for e in self.iterable: yield e next(self) if finish: self.finish() def bar(self): barval = int(self.percent() / 100 * self.barlenght) if barval > self.barlenght: barval = self.barlenght bar = '#' * barval if self.current: if self.tcurrent - self.last_rotate_time > 1. / 4: # FIXME: magic constant, remove it! self.last_rotate_idx += 1 self.last_rotate_time = self.tcurrent if self.last_rotate_idx > len(self.rotate) - 1: self.last_rotate_idx = 0 bar += self.rotate[self.last_rotate_idx] bar += ' ' * self.barlenght return '[%s]' % bar[:self.barlenght]
[docs] def ETA(self): """ Returns ETA calculated on the basis of current number of iterations :attr:`current` and current time :attr:`tcurrent`. If number of iterations is 0 returns ``?``. Time is formated wiht :func:`smart_time_string`. :return: ETA as string. :rtype: str """ if self.current == 0: return '?' if len(self.tictoclist) > 2: diff = (np.median(np.diff(self.tictoclist)) / (self.maxval / 100.)) * self.current else: diff = self.tcurrent - self.begin periteration = diff / self.current expected = periteration * self.maxval eta = periteration * (self.maxval - self.current) percent = 1 - self.percent() / 100. if self.last_eta is not None: if eta >= self.last_eta: self.last_eta = eta eta += eta * percent else: self.last_eta = eta eta_uncertain = eta * percent**2 if eta_uncertain > eta/10: return smart_time_string(eta)+" +/- "+smart_time_string(eta_uncertain) return smart_time_string(eta)
[docs] def percent(self): """ Returns float number of precent progress calculated in the basis of current number of iterations :attr:`current`. Should return number between 0 and 100. :returns: percent progress number :rtype: float """ percent = float(self.current) / float(self.maxval) * 100 return percent
[docs] def show(self): """ Shows current progress. If value returned by :meth:`percent` is =< 100 then progres is printed as percent indicator leaded by ETA calculated by :meth:`ETA`. If value returned by :meth:`percent` is > 100 then progress is printed as number of iterations and total time. Progress bar is writen to standard error. """ percent = self.percent() if int(percent) not in self.hundreds: self.hundreds.append(int(percent)) self.tictoclist.append(self.tcurrent) while len(self.tictoclist) > 20: self.tictoclist.pop(0) # TODO: create some unittests for pbar mess = '' mess_spec = '' if percent > 100 or self.overrun: if self.overrun_notice: stderr.write(linesep) self.overrun_notice = False self.overrun = True mess_spec = "%d iterations out of %d. Total time: %s" % (self.current, self.maxval, self.ttime()) mess = "\r" + mess_spec elif not self.overrun: mess_spec = "%s%% %s ETA: %s" % (bold('%3d' % self.percent()), self.bar(), self.ETA()) mess = "\r" + mess_spec + "\033[K" # FIXME: magic constant! stderr.write(mess) # TODO: do not use last_rotate_time here, use separate marker, last_rotate_time can be used in over run notice if int(percent) / 10 not in self.tens: # FIXME: magic constant! if percent > 100: if self.tcurrent - self.last_rotate_time > 60.: # FIXME: magic constant, remove it! message_special(mess_spec + linesep) self.tens.append(int(percent) / 10) self.last_rotate_time = self.tcurrent else: message_special(mess_spec + linesep) self.tens.append(int(percent) / 10)
def heartbeat(self): # with self.lock: self.tcurrent = time.time() self.show() #if self.tcurrent - self.last_rotate_time > 2.: # FIXME: magic constant, remove it! # self.show() def next(self, step=None): if step is None: return self.update(self.current + 1) return self.update(self.current + step)
[docs] def update(self, step): """ Updates number of current iterations :obj:`current` by one if :obj:`step` is > 0. Otherwise number of current iterations is not updated. In boths cases time of current iteration :obj:`tcurrent` is updated and :meth:`show` is called. :param int step: update step """ # TODO: change logic of step == 1 vs step > 1 - add or set? # with self.lock: if step > 0: self.current = step self.tcurrent = time.time() if (step == self.maxval) or ( self.tcurrent - self.last_rotate_time > 1. / 4): # FIXME: magic constant, remove it! # TODO: check for last_rotate_time is done twice, SimpleProgressBar code needs revision self.show()
[docs] def ttime(self): """ Calculates and returns total time string formated with :func:`smart_time_string`. :return: string of total time :rtype: str """ return smart_time_string(self.tcurrent - self.begin)
[docs] def finish(self): """ Finishes progress bar. First, :meth:`update` is called with :obj:`step` = 0. Next message of total time is writen to standard error. """ if self.current < self.maxval: self.update(self.maxval) else: self.update(0) self.show() stderr.write(linesep) message("Total time: %s" % self.ttime())
# stderr.write(linesep) pbar = SimpleProgressBar # default progress bar def get_str_timestamp(): # returns time stamp as string return str(datetime.datetime(*tuple(time.localtime())[:6])) class SimpleTree(object): def __init__(self, name=None, message=None, treestr=None): # assert no {}[], are in name or message if treestr is not None: self._init_str(treestr) else: self.name = name self.message = [] self.add_message(message) self.branches = [] def _init_str(self, s): d = json.loads(s) self.name = str(d['name']) if is_number(self.name): if is_float(self.name): self.name = float(self.name) else: self.name = int(self.name) self.message = list(map(str, d['message'])) self.branches = [SimpleTree(treestr=treestr) for treestr in list(map(str, d['branches']))] def __repr__(self): # this can be used to rebuild return json.dumps({'name': self.name, 'message': self.message, 'branches': list(map(repr, self.branches))}) def __str__(self): return "%s {%s} %s" % (str(self.name), "; ".join(self.message), str(list(map(str, self.branches)))) def is_leaf(self): return len(self.branches) == 0 @property def leafs_names(self): return [leaf.name for leaf in self.branches] def get_leaf(self, name): assert name in self.leafs_names return [leaf for leaf in self.branches if name == leaf.name][0] def add_message(self, message=None, toleaf=None, replace=False): if toleaf is not None: return self.add_message_to_leaf(message=message, toleaf=toleaf, replace=replace) if message is not None: if isinstance(message, list): if replace: self.message = message else: self.message += message else: if replace: self.message = [message] else: self.message += [message] def add_message_to_leaf(self, message=None, toleaf=None, replace=False): if toleaf in self.leafs_names: leaf = self.get_leaf(toleaf) return leaf.add_message(message, replace=replace) else: for leaf in self.branches: leaf.add_message_to_leaf(message=message, toleaf=toleaf, replace=replace) def add_leaf(self, name=None, message=None, toleaf=None): if toleaf is not None: return self.add_leaf_to_leaf(name=name, message=message, toleaf=toleaf) leaf = SimpleTree(name=name, message=message) self.branches.append(leaf) def add_leaf_to_leaf(self, name=None, message=None, toleaf=None): if toleaf in self.leafs_names: leaf = self.get_leaf(toleaf) return leaf.add_leaf(name=name, message=message) else: for leaf in self.branches: leaf.add_leaf_to_leaf(name=name, message=message, toleaf=toleaf) def print_simple_tree(st, prefix=None, multiple=False, concise=True): _l = '|' _t = ' ' _c = '-+' + _t def name_str(name): if name is None: return '' return str(name) def name_len(name): return len(name_str(name)) def message_str(message): if len(message): return '{%s}' % '; '.join(message) return '' prefix_ = '' if prefix is not None: prefix_ += prefix out = '' # name out += prefix_ out += name_str(st.name) if not st.is_leaf(): if name_len(st.name): out += _c out += message_str(st.message) out += linesep if multiple: new_prefix = prefix_ + _l + (_t * (name_len(st.name) - 1)) else: new_prefix = prefix_ + (_t * (name_len(st.name))) new_prefix += _t if not concise: out += new_prefix + _l + linesep out_rec = [] for nr, branch in enumerate(st.branches): out_rec.append(partial(print_simple_tree, prefix=new_prefix, multiple=len(st.branches) - nr > 1)(branch)) out += ''.join(out_rec) if st.branches[-1].is_leaf() and not concise: out += new_prefix.rstrip(_t) + linesep else: out += _t out += message_str(st.message) out += linesep return out