meerschaum.utils.threading
Define a custom Thread class with a callback method.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Define a custom Thread class with a callback method. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Optional 11 12import threading 13import ctypes 14import signal 15import weakref 16 17Lock = threading.Lock 18RLock = threading.RLock 19Event = threading.Event 20Timer = threading.Timer 21get_ident = threading.get_ident 22 23### Process-wide cooperative stop signal. 24### Long-running loops (e.g. `sync pipes --loop`) and worker threads should poll 25### `stop_requested()` and exit promptly when it is set. It is set by the daemon's 26### signal handler so that `stop jobs` propagates to background worker threads. 27STOP_EVENT = threading.Event() 28 29### Registry of live `Thread` instances so a signal handler can interrupt them 30### even when they are blocked outside of a `stop_requested()` check. 31_threads_registry: "weakref.WeakSet[Thread]" = weakref.WeakSet() 32_registry_lock = threading.Lock() 33 34 35def request_stop() -> None: 36 """Signal all cooperative loops to stop at their next opportunity.""" 37 STOP_EVENT.set() 38 39 40def stop_requested() -> bool: 41 """Return whether a process-wide stop has been requested.""" 42 return STOP_EVENT.is_set() 43 44 45def clear_stop() -> None: 46 """Clear the process-wide stop signal.""" 47 STOP_EVENT.clear() 48 49 50def interrupt_threads(exc: type = SystemExit) -> None: 51 """ 52 Raise `exc` in every registered, still-alive `Thread`. 53 54 This actively unwinds worker threads that are not polling `stop_requested()` 55 so that the process can exit instead of becoming a zombie. 56 """ 57 with _registry_lock: 58 threads = list(_threads_registry) 59 current_ident = threading.get_ident() 60 for thread in threads: 61 if thread.ident == current_ident: 62 continue 63 try: 64 thread.raise_exception(exc) 65 except Exception: 66 pass 67 68class Thread(threading.Thread): 69 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 70 71 def __init__(self, *args, callback=None, error_callback=None, **kw): 72 target = kw.pop('target') 73 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 74 self.callback = callback 75 self.error_callback = error_callback 76 self.method = target 77 self._return = None 78 79 def wrap_target_with_callback(self, *args, **kw): 80 """Wrap the designated target function with a try-except. 81 Captures the output and executes either the callback or error_callback. 82 """ 83 try: 84 result = self.method(*args, **kw) 85 success = True 86 except Exception as e: 87 success = False 88 result = e 89 90 cb = self.callback if success else self.error_callback 91 if cb is not None: 92 cb(result) 93 return result 94 95 def start(self): 96 """Register the thread before starting so it may be interrupted on shutdown.""" 97 with _registry_lock: 98 _threads_registry.add(self) 99 return super().start() 100 101 def join(self, timeout: Optional[float] = None): 102 """ 103 Join the thread with an optional timeout. 104 """ 105 threading.Thread.join(self, timeout=timeout) 106 return self._return 107 108 def run(self): 109 """Set the return to the result of the target.""" 110 self._return = self._target(*self._args, **self._kwargs) 111 112 def send_signal(self, signalnum): 113 """ 114 Send a signal to the thread. 115 """ 116 if not self.is_alive(): 117 return 118 119 if signalnum == signal.SIGINT: 120 self.raise_exception(KeyboardInterrupt()) 121 elif signalnum == signal.SIGTERM: 122 self.raise_exception(SystemExit()) 123 else: 124 signal.pthread_kill(self.ident, signalnum) 125 126 def raise_exception(self, exc: BaseException): 127 """ 128 Raise an exception in the thread. 129 130 This uses a CPython-specific implementation and is not guaranteed to be stable. 131 It may also be deprecated in future Python versions. 132 """ 133 if not self.is_alive(): 134 return 135 136 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 137 return 138 139 exc_class = exc if isinstance(exc, type) else type(exc) 140 141 ident = self.ident 142 if ident is None: 143 return 144 145 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 146 ctypes.c_ulong(ident), 147 ctypes.py_object(exc_class) 148 ) 149 if ret > 1: 150 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0) 151 152class Worker(threading.Thread): 153 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 154 155 def __init__(self, queue, *args, timeout: int = 3, **kw): 156 self.queue = queue 157 self.timeout = timeout 158 super().__init__(*args, **kw) 159 160 def run(self): 161 while True: 162 try: 163 _ = self.queue.get(timeout=self.timeout) 164 except self.queue.Empty: 165 return None 166 167 self.queue.task_done() 168 169 170class RepeatTimer(Timer): 171 """ 172 Fire the timer's target function in a loop, every `interval` seconds. 173 """ 174 175 def __init__(self, *args, **kwargs): 176 super().__init__(*args, **kwargs) 177 self._is_running = False 178 179 def is_running(self) -> bool: 180 """ 181 Return whether this timer has been started and is running. 182 """ 183 return self._is_running 184 185 def run(self) -> None: 186 """ 187 Fire the target function in a loop. 188 """ 189 self._is_running = True 190 while not self.finished.wait(self.interval): 191 self.function(*self.args, **self.kwargs) 192 self._is_running = False
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(type(threading.Lock())) for information about locks.
125def RLock(*args, **kwargs): 126 """Factory function that returns a new reentrant lock. 127 128 A reentrant lock must be released by the thread that acquired it. Once a 129 thread has acquired a reentrant lock, the same thread may acquire it again 130 without blocking; the thread must release it once for each time it has 131 acquired it. 132 133 """ 134 if _CRLock is None: 135 return _PyRLock(*args, **kwargs) 136 return _CRLock(*args, **kwargs)
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.
578class Event: 579 """Class implementing event objects. 580 581 Events manage a flag that can be set to true with the set() method and reset 582 to false with the clear() method. The wait() method blocks until the flag is 583 true. The flag is initially false. 584 585 """ 586 587 # After Tim Peters' event class (without is_posted()) 588 589 def __init__(self): 590 self._cond = Condition(Lock()) 591 self._flag = False 592 593 def __repr__(self): 594 cls = self.__class__ 595 status = 'set' if self._flag else 'unset' 596 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" 597 598 def _at_fork_reinit(self): 599 # Private method called by Thread._reset_internal_locks() 600 self._cond._at_fork_reinit() 601 602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag 605 606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set() 616 617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all() 627 628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False 637 638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.
602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag
Return true if and only if the internal flag is true.
606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set()
Return true if and only if the internal flag is true.
This method is deprecated, use is_set() instead.
617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all()
Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False
Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.
1408class Timer(Thread): 1409 """Call a function after a specified number of seconds: 1410 1411 t = Timer(30.0, f, args=None, kwargs=None) 1412 t.start() 1413 t.cancel() # stop the timer's action if it's still waiting 1414 1415 """ 1416 1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event() 1424 1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set() 1428 1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting
1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event()
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set()
Stop the timer if it hasn't finished yet.
1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
get_ident() -> integer
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
36def request_stop() -> None: 37 """Signal all cooperative loops to stop at their next opportunity.""" 38 STOP_EVENT.set()
Signal all cooperative loops to stop at their next opportunity.
41def stop_requested() -> bool: 42 """Return whether a process-wide stop has been requested.""" 43 return STOP_EVENT.is_set()
Return whether a process-wide stop has been requested.
Clear the process-wide stop signal.
51def interrupt_threads(exc: type = SystemExit) -> None: 52 """ 53 Raise `exc` in every registered, still-alive `Thread`. 54 55 This actively unwinds worker threads that are not polling `stop_requested()` 56 so that the process can exit instead of becoming a zombie. 57 """ 58 with _registry_lock: 59 threads = list(_threads_registry) 60 current_ident = threading.get_ident() 61 for thread in threads: 62 if thread.ident == current_ident: 63 continue 64 try: 65 thread.raise_exception(exc) 66 except Exception: 67 pass
Raise exc in every registered, still-alive Thread.
This actively unwinds worker threads that are not polling stop_requested()
so that the process can exit instead of becoming a zombie.
69class Thread(threading.Thread): 70 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 71 72 def __init__(self, *args, callback=None, error_callback=None, **kw): 73 target = kw.pop('target') 74 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 75 self.callback = callback 76 self.error_callback = error_callback 77 self.method = target 78 self._return = None 79 80 def wrap_target_with_callback(self, *args, **kw): 81 """Wrap the designated target function with a try-except. 82 Captures the output and executes either the callback or error_callback. 83 """ 84 try: 85 result = self.method(*args, **kw) 86 success = True 87 except Exception as e: 88 success = False 89 result = e 90 91 cb = self.callback if success else self.error_callback 92 if cb is not None: 93 cb(result) 94 return result 95 96 def start(self): 97 """Register the thread before starting so it may be interrupted on shutdown.""" 98 with _registry_lock: 99 _threads_registry.add(self) 100 return super().start() 101 102 def join(self, timeout: Optional[float] = None): 103 """ 104 Join the thread with an optional timeout. 105 """ 106 threading.Thread.join(self, timeout=timeout) 107 return self._return 108 109 def run(self): 110 """Set the return to the result of the target.""" 111 self._return = self._target(*self._args, **self._kwargs) 112 113 def send_signal(self, signalnum): 114 """ 115 Send a signal to the thread. 116 """ 117 if not self.is_alive(): 118 return 119 120 if signalnum == signal.SIGINT: 121 self.raise_exception(KeyboardInterrupt()) 122 elif signalnum == signal.SIGTERM: 123 self.raise_exception(SystemExit()) 124 else: 125 signal.pthread_kill(self.ident, signalnum) 126 127 def raise_exception(self, exc: BaseException): 128 """ 129 Raise an exception in the thread. 130 131 This uses a CPython-specific implementation and is not guaranteed to be stable. 132 It may also be deprecated in future Python versions. 133 """ 134 if not self.is_alive(): 135 return 136 137 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 138 return 139 140 exc_class = exc if isinstance(exc, type) else type(exc) 141 142 ident = self.ident 143 if ident is None: 144 return 145 146 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 147 ctypes.c_ulong(ident), 148 ctypes.py_object(exc_class) 149 ) 150 if ret > 1: 151 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
Wrapper for threading.Thread with optional callback and error_callback functions.
72 def __init__(self, *args, callback=None, error_callback=None, **kw): 73 target = kw.pop('target') 74 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 75 self.callback = callback 76 self.error_callback = error_callback 77 self.method = target 78 self._return = None
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
80 def wrap_target_with_callback(self, *args, **kw): 81 """Wrap the designated target function with a try-except. 82 Captures the output and executes either the callback or error_callback. 83 """ 84 try: 85 result = self.method(*args, **kw) 86 success = True 87 except Exception as e: 88 success = False 89 result = e 90 91 cb = self.callback if success else self.error_callback 92 if cb is not None: 93 cb(result) 94 return result
Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.
96 def start(self): 97 """Register the thread before starting so it may be interrupted on shutdown.""" 98 with _registry_lock: 99 _threads_registry.add(self) 100 return super().start()
Register the thread before starting so it may be interrupted on shutdown.
102 def join(self, timeout: Optional[float] = None): 103 """ 104 Join the thread with an optional timeout. 105 """ 106 threading.Thread.join(self, timeout=timeout) 107 return self._return
Join the thread with an optional timeout.
109 def run(self): 110 """Set the return to the result of the target.""" 111 self._return = self._target(*self._args, **self._kwargs)
Set the return to the result of the target.
113 def send_signal(self, signalnum): 114 """ 115 Send a signal to the thread. 116 """ 117 if not self.is_alive(): 118 return 119 120 if signalnum == signal.SIGINT: 121 self.raise_exception(KeyboardInterrupt()) 122 elif signalnum == signal.SIGTERM: 123 self.raise_exception(SystemExit()) 124 else: 125 signal.pthread_kill(self.ident, signalnum)
Send a signal to the thread.
127 def raise_exception(self, exc: BaseException): 128 """ 129 Raise an exception in the thread. 130 131 This uses a CPython-specific implementation and is not guaranteed to be stable. 132 It may also be deprecated in future Python versions. 133 """ 134 if not self.is_alive(): 135 return 136 137 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 138 return 139 140 exc_class = exc if isinstance(exc, type) else type(exc) 141 142 ident = self.ident 143 if ident is None: 144 return 145 146 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 147 ctypes.c_ulong(ident), 148 ctypes.py_object(exc_class) 149 ) 150 if ret > 1: 151 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
Raise an exception in the thread.
This uses a CPython-specific implementation and is not guaranteed to be stable. It may also be deprecated in future Python versions.
153class Worker(threading.Thread): 154 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 155 156 def __init__(self, queue, *args, timeout: int = 3, **kw): 157 self.queue = queue 158 self.timeout = timeout 159 super().__init__(*args, **kw) 160 161 def run(self): 162 while True: 163 try: 164 _ = self.queue.get(timeout=self.timeout) 165 except self.queue.Empty: 166 return None 167 168 self.queue.task_done()
Wrapper for threading.Thread for working with queue.Queue objects.
156 def __init__(self, queue, *args, timeout: int = 3, **kw): 157 self.queue = queue 158 self.timeout = timeout 159 super().__init__(*args, **kw)
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
161 def run(self): 162 while True: 163 try: 164 _ = self.queue.get(timeout=self.timeout) 165 except self.queue.Empty: 166 return None 167 168 self.queue.task_done()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
171class RepeatTimer(Timer): 172 """ 173 Fire the timer's target function in a loop, every `interval` seconds. 174 """ 175 176 def __init__(self, *args, **kwargs): 177 super().__init__(*args, **kwargs) 178 self._is_running = False 179 180 def is_running(self) -> bool: 181 """ 182 Return whether this timer has been started and is running. 183 """ 184 return self._is_running 185 186 def run(self) -> None: 187 """ 188 Fire the target function in a loop. 189 """ 190 self._is_running = True 191 while not self.finished.wait(self.interval): 192 self.function(*self.args, **self.kwargs) 193 self._is_running = False
Fire the timer's target function in a loop, every interval seconds.
176 def __init__(self, *args, **kwargs): 177 super().__init__(*args, **kwargs) 178 self._is_running = False
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.