Source code for mltk.utils.logger

"""Data logging utilities

See the source code on Github: `mltk/utils/logger.py <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/logger.py>`_
"""
import logging
import os
import re
import io
import sys
import types
import time
import atexit
import threading
from typing import Callable, Union, TextIO, List


[docs]def get_logger( name='mltk', level='INFO', console=False, log_file=None, log_file_mode='w', parent:logging.Logger=None, base_level='DEBUG', file_level='DEBUG' ): """Get or create a logger, optionally adding a console and/or file handler""" logger = logging.getLogger(name) if len(logger.handlers) == 0: if parent is None: logger.propagate = False else: logger.parent = parent logger.propagate = True logger.setLevel(base_level) if console: add_console_logger(logger, level=level) if log_file: log_dir = os.path.dirname(log_file) if log_dir: os.makedirs(log_dir, exist_ok=True) fh = logging.FileHandler(log_file, mode=log_file_mode) fh.setLevel(file_level) logger.addHandler(fh) if not hasattr(logger, 'close'): def _close(cls): for handler in cls.handlers: if isinstance(handler, logging.FileHandler): handler.close() logger.close = types.MethodType(_close, logger) return logger
[docs]def add_console_logger(logger: logging.Logger, level='INFO'): """Add a console logger to the given logger""" for handler in logger.handlers: if isinstance(handler, _ConsoleStreamLogger): return ch = _ConsoleStreamLogger(sys.stdout) ch.setLevel(get_level(level)) logger.addHandler(ch)
[docs]def make_filelike(logger: logging.Logger, level=logging.INFO): """Make the given logger 'file-like'""" # pylint: disable=protected-access if logger is None: return # Convert the level to an int level = logging._nameToLevel[get_level(level)] logger._buffer = '' def _isatty(cls:logging.Logger): return False def _write(cls:logging.Logger, data): cls._buffer += data def _flush(cls:logging.Logger): if cls._buffer: cls.log(level, cls._buffer) cls._buffer = '' for h in cls.handlers: h.flush() def _get_terminator(cls:logging.Logger): terminator_list = [] for h in cls.handlers: terminator_list.append(h.terminator) if getattr(cls, 'parent', None) and getattr(cls, 'propagate', None): return [terminator_list, _get_terminator(cls.parent)] else: return [terminator_list, []] def _set_terminator(cls:logging.Logger, terminator:Union[str,List[str]]): if isinstance(terminator, list): if not terminator: return None if len(terminator) < 2: raise ValueError('Invalid terminator arg, must contain at least 2 lists') current_terminators = terminator[0] if not isinstance(current_terminators, list) or not isinstance(terminator[1], list): raise ValueError('Invalid terminator arg, must be list of lists') if len(current_terminators) != len(cls.handlers): raise ValueError('Invalid terminator arg, list lenght must be same as logger handlers') for i, h in enumerate(cls.handlers): h.terminator = current_terminators[i] if getattr(cls, 'parent', None) and getattr(cls, 'propagate', None): _set_terminator(cls.parent, terminator[1]) return None else: previous_terminators = _get_terminator(cls) for i, h in enumerate(cls.handlers): h.terminator = terminator if getattr(cls, 'parent', None) and getattr(cls, 'propagate', None): _set_terminator(cls.parent, terminator) return previous_terminators if not hasattr(logger, 'write'): logger.write = types.MethodType(_write, logger) if not hasattr(logger, 'flush'): logger.flush = types.MethodType(_flush, logger) if not hasattr(logger, 'isatty'): logger.isatty = types.MethodType(_isatty, logger) if not hasattr(logger, 'set_terminator'): logger.set_terminator = types.MethodType(_set_terminator, logger) if not hasattr(logger, 'get_terminator'): logger.get_terminator = types.MethodType(_get_terminator, logger)
[docs]def redirect_stream( logger:logging.Logger, stream:Union[TextIO,str]='stderr', close_atexit=True ) -> Callable: """Redirect std logs to the given logger NOTE: This redirects ALL logs from the stream """ saved_sys_std_stream_name = None if isinstance(stream, str): saved_sys_std_stream_name = stream stream = getattr(sys, saved_sys_std_stream_name) stream_fd = stream.fileno() saved_stream_fd = os.dup(stream_fd) read_stream, write_stream = os.pipe() os.dup2(write_stream, stream_fd) os.close(write_stream) if saved_sys_std_stream_name is not None: if saved_sys_std_stream_name == 'stderr': setattr(sys, saved_sys_std_stream_name, sys.stdout) else: setattr(sys, saved_sys_std_stream_name, io.TextIOWrapper(os.fdopen(stream_fd, 'wb'), encoding='utf-8' )) # Map tensorflow logs to the Python logger's corresponding level tf_err_re = re.compile(r'.*:\s([DIWE])\s(.*)') def _process_line(line:str): match = tf_err_re.match(line) if match: level_str = match.group(1) # We only want to print TF errors at the error level # Everything else to too verbose so we just map them to debug if level_str == 'E': level = logging.ERROR else: level = logging.DEBUG logger.log(level, match.group(2)) else: logger.debug(line) def _drain_pipe(): line = '' while True: data = os.read(read_stream, 256).decode('utf-8') if not data: break for c in data: line += c if c == '\n': _process_line(line.strip()) line = '' pipe_redirect_thread = threading.Thread( target=_drain_pipe, name='stream_redirect', daemon=True ) def _close_pipe(): try: os.close(stream_fd) except: pass pipe_redirect_thread.join(timeout=1) try: os.close(read_stream) except: pass try: os.dup2(saved_stream_fd, stream_fd) except: pass try: os.close(saved_stream_fd) except: pass if close_atexit: atexit.register(_close_pipe) pipe_redirect_thread.start() return _close_pipe
[docs]def timing_decorator(f, level='INFO'): """Print the run-time of the decorated function to the logger If a logger is found in the args then that is used, else if a logger is found in the 'self' argument, then that is used """ def wrap(*args, **kwargs): logger = None if logger is None: for a in args: if isinstance(a, logging.Logger): logger = a break if logger is None: for a in kwargs.values(): if isinstance(a, logging.Logger): logger = a break if logger is None and len(args) > 0: self = args[0] for key in dir(self): value = getattr(self, key) if isinstance(value, logging.Logger): logger = value break if logger is None: logger = get_logger() ts = time.time() result = f(*args, **kwargs) te = time.time() elapsed = te - ts logger.log(get_level(level), f'{f.__name__} took: {elapsed}s') return result return wrap
[docs]def set_console_level(logger:logging.Logger, level:str) -> str: """Set the logger's console level and return the previous level""" if level is None: return None prev_console_level = None if hasattr(logger, 'console_level'): prev_console_level = logger.console_level logger.console_level = level return prev_console_level
[docs]def get_level(level:Union[str,int]) -> str: """Return the logging level as a string""" if isinstance(level, str): return level.upper() return logging.getLevelName(level)
[docs]class ConsoleLoggerLevelContext:
[docs] def __init__(self, logger:logging.Logger, level:str): self.logger = logger self.level = level
[docs] def __enter__(self): self.saved_console_level = self.logger.console_level self.logger.console_level = self.level
[docs] def __exit__(self ,type, value, traceback): self.logger.console_level = self.saved_console_level
[docs]class DummyLogger():
[docs] def __init__(self): self.handlers = []
[docs] def debug(self, *args, **kwargs): pass
[docs] def info(self, *args, **kwargs): pass
[docs] def warning(self, *args, **kwargs): pass
[docs] def error(self, *args, **kwargs): pass
[docs] def exception(self, *args, **kwargs): pass
[docs] def write(self, *args, **kwargs): pass
[docs] def flush(self, *args, **kwargs): pass
# This is needed to distinguish between a FileHandler and console StreamHandler class _ConsoleStreamLogger(logging.StreamHandler): pass def _get_verbose(self) -> bool: for h in self.handlers: if isinstance(h, _ConsoleStreamLogger): return h.level == logging.DEBUG return False def _set_verbose(self, value : bool): level = 'DEBUG' if value else 'INFO' for h in self.handlers: if isinstance(h, _ConsoleStreamLogger): h.setLevel(level) def _set_console_log_level(self, level:str): for h in self.handlers: if isinstance(h, _ConsoleStreamLogger): h.setLevel(get_level(level)) def _get_console_log_level(self) -> int: for h in self.handlers: if isinstance(h, _ConsoleStreamLogger): return h.level return None def _get_file_handler(self) -> logging.FileHandler: for h in self.handlers: if isinstance(h, logging.FileHandler): return h return None def _get_file_handler_path(self) -> str: for h in self.handlers: if isinstance(h, logging.FileHandler): return getattr(h, 'baseFilename', None) return None logging.Logger.verbose = property(_get_verbose, _set_verbose, doc='Enable/disable verbose logging to the console') logging.Logger.console_level = property(_get_console_log_level, _set_console_log_level, doc='Get/set the logger console logging level') logging.Logger.file_handler = property(_get_file_handler, doc='Get the logger\'s file handler') logging.Logger.file_handler_path = property(_get_file_handler_path, doc='Get the log file path to the logger\'s file handler')