Source code for mltk.utils.signal_handler
import os
import signal
import threading
[docs]class SignalHandler(object):
"""Class to detect OS signals
e.g. detect when CTRL+C is pressed and issue a callback
See the source code on Github: `mltk/utils/signal_handler.py <https://github.com/siliconlabs/mltk/blob/master/mltk/utils/signal_handler.py>`_
"""
[docs] def __init__(
self,
sig=signal.SIGINT,
callback=None,
resignal_on_exit=False,
raise_exception_if_not_main_thread=True
):
self.sig = sig
self.interrupted = False
self.released = False
self.original_handler = None
self.resignal_on_exit = resignal_on_exit
self.raise_exception_if_not_main_thread = raise_exception_if_not_main_thread
self.callback = callback
def __enter__(self):
self.interrupted = False
self.released = False
is_main_thread = threading.current_thread() is threading.main_thread()
if self.raise_exception_if_not_main_thread and not is_main_thread:
raise RuntimeError('SignalHandler may only be used in the "main" thread')
if is_main_thread:
self.original_handler = signal.getsignal(self.sig)
def _handler(signum, frame):
forward_signal = False
if not self.interrupted:
if self.callback:
try:
forward_signal = self.callback() == 'forward-signal'
except:
pass
self.interrupted = True
self.release()
if forward_signal:
self.original_handler()
if is_main_thread:
signal.signal(self.sig, _handler)
return self
def __exit__(self, t, value, tb):
self.release()
def release(self):
if self.released:
return False
if self.original_handler is not None:
signal.signal(self.sig, self.original_handler)
self.released = True
if self.interrupted and self.resignal_on_exit:
os.kill(os.getpid(), self.sig)
return True