meerschaum.utils.threading
Define a custom Thread class with a callback method.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Define a custom Thread class with a callback method. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Optional 11 12import threading 13import ctypes 14import signal 15 16Lock = threading.Lock 17RLock = threading.RLock 18Event = threading.Event 19Timer = threading.Timer 20get_ident = threading.get_ident 21 22class Thread(threading.Thread): 23 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 24 25 def __init__(self, *args, callback=None, error_callback=None, **kw): 26 target = kw.pop('target') 27 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 28 self.callback = callback 29 self.error_callback = error_callback 30 self.method = target 31 self._return = None 32 33 def wrap_target_with_callback(self, *args, **kw): 34 """Wrap the designated target function with a try-except. 35 Captures the output and executes either the callback or error_callback. 36 """ 37 try: 38 result = self.method(*args, **kw) 39 success = True 40 except Exception as e: 41 success = False 42 result = e 43 44 cb = self.callback if success else self.error_callback 45 if cb is not None: 46 cb(result) 47 return result 48 49 def join(self, timeout: Optional[float] = None): 50 """ 51 Join the thread with an optional timeout. 52 """ 53 threading.Thread.join(self, timeout=timeout) 54 return self._return 55 56 def run(self): 57 """Set the return to the result of the target.""" 58 self._return = self._target(*self._args, **self._kwargs) 59 60 def send_signal(self, signalnum): 61 """ 62 Send a signal to the thread. 63 """ 64 if not self.is_alive(): 65 return 66 67 if signalnum == signal.SIGINT: 68 self.raise_exception(KeyboardInterrupt()) 69 elif signalnum == signal.SIGTERM: 70 self.raise_exception(SystemExit()) 71 else: 72 signal.pthread_kill(self.ident, signalnum) 73 74 def raise_exception(self, exc: BaseException): 75 """ 76 Raise an exception in the thread. 77 78 This uses a CPython-specific implementation and is not guaranteed to be stable. 79 It may also be deprecated in future Python versions. 80 """ 81 if not self.is_alive(): 82 return 83 84 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 85 return 86 87 exc_class = exc if isinstance(exc, type) else type(exc) 88 89 ident = self.ident 90 if ident is None: 91 return 92 93 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 94 ctypes.c_ulong(ident), 95 ctypes.py_object(exc_class) 96 ) 97 if ret > 1: 98 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0) 99 100class Worker(threading.Thread): 101 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 102 103 def __init__(self, queue, *args, timeout: int = 3, **kw): 104 self.queue = queue 105 self.timeout = timeout 106 super().__init__(*args, **kw) 107 108 def run(self): 109 while True: 110 try: 111 _ = self.queue.get(timeout=self.timeout) 112 except self.queue.Empty: 113 return None 114 115 self.queue.task_done() 116 117 118class RepeatTimer(Timer): 119 """ 120 Fire the timer's target function in a loop, every `interval` seconds. 121 """ 122 123 def __init__(self, *args, **kwargs): 124 super().__init__(*args, **kwargs) 125 self._is_running = False 126 127 def is_running(self) -> bool: 128 """ 129 Return whether this timer has been started and is running. 130 """ 131 return self._is_running 132 133 def run(self) -> None: 134 """ 135 Fire the target function in a loop. 136 """ 137 self._is_running = True 138 while not self.finished.wait(self.interval): 139 self.function(*self.args, **self.kwargs) 140 self._is_running = False
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(type(threading.Lock())) for information about locks.
125def RLock(*args, **kwargs): 126 """Factory function that returns a new reentrant lock. 127 128 A reentrant lock must be released by the thread that acquired it. Once a 129 thread has acquired a reentrant lock, the same thread may acquire it again 130 without blocking; the thread must release it once for each time it has 131 acquired it. 132 133 """ 134 if _CRLock is None: 135 return _PyRLock(*args, **kwargs) 136 return _CRLock(*args, **kwargs)
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.
578class Event: 579 """Class implementing event objects. 580 581 Events manage a flag that can be set to true with the set() method and reset 582 to false with the clear() method. The wait() method blocks until the flag is 583 true. The flag is initially false. 584 585 """ 586 587 # After Tim Peters' event class (without is_posted()) 588 589 def __init__(self): 590 self._cond = Condition(Lock()) 591 self._flag = False 592 593 def __repr__(self): 594 cls = self.__class__ 595 status = 'set' if self._flag else 'unset' 596 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" 597 598 def _at_fork_reinit(self): 599 # Private method called by Thread._reset_internal_locks() 600 self._cond._at_fork_reinit() 601 602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag 605 606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set() 616 617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all() 627 628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False 637 638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.
602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag
Return true if and only if the internal flag is true.
606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set()
Return true if and only if the internal flag is true.
This method is deprecated, use is_set() instead.
617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all()
Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False
Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.
1408class Timer(Thread): 1409 """Call a function after a specified number of seconds: 1410 1411 t = Timer(30.0, f, args=None, kwargs=None) 1412 t.start() 1413 t.cancel() # stop the timer's action if it's still waiting 1414 1415 """ 1416 1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event() 1424 1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set() 1428 1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting
1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event()
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set()
Stop the timer if it hasn't finished yet.
1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
get_ident() -> integer
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
23class Thread(threading.Thread): 24 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 25 26 def __init__(self, *args, callback=None, error_callback=None, **kw): 27 target = kw.pop('target') 28 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 29 self.callback = callback 30 self.error_callback = error_callback 31 self.method = target 32 self._return = None 33 34 def wrap_target_with_callback(self, *args, **kw): 35 """Wrap the designated target function with a try-except. 36 Captures the output and executes either the callback or error_callback. 37 """ 38 try: 39 result = self.method(*args, **kw) 40 success = True 41 except Exception as e: 42 success = False 43 result = e 44 45 cb = self.callback if success else self.error_callback 46 if cb is not None: 47 cb(result) 48 return result 49 50 def join(self, timeout: Optional[float] = None): 51 """ 52 Join the thread with an optional timeout. 53 """ 54 threading.Thread.join(self, timeout=timeout) 55 return self._return 56 57 def run(self): 58 """Set the return to the result of the target.""" 59 self._return = self._target(*self._args, **self._kwargs) 60 61 def send_signal(self, signalnum): 62 """ 63 Send a signal to the thread. 64 """ 65 if not self.is_alive(): 66 return 67 68 if signalnum == signal.SIGINT: 69 self.raise_exception(KeyboardInterrupt()) 70 elif signalnum == signal.SIGTERM: 71 self.raise_exception(SystemExit()) 72 else: 73 signal.pthread_kill(self.ident, signalnum) 74 75 def raise_exception(self, exc: BaseException): 76 """ 77 Raise an exception in the thread. 78 79 This uses a CPython-specific implementation and is not guaranteed to be stable. 80 It may also be deprecated in future Python versions. 81 """ 82 if not self.is_alive(): 83 return 84 85 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 86 return 87 88 exc_class = exc if isinstance(exc, type) else type(exc) 89 90 ident = self.ident 91 if ident is None: 92 return 93 94 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 95 ctypes.c_ulong(ident), 96 ctypes.py_object(exc_class) 97 ) 98 if ret > 1: 99 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
Wrapper for threading.Thread with optional callback and error_callback functions.
26 def __init__(self, *args, callback=None, error_callback=None, **kw): 27 target = kw.pop('target') 28 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 29 self.callback = callback 30 self.error_callback = error_callback 31 self.method = target 32 self._return = None
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
34 def wrap_target_with_callback(self, *args, **kw): 35 """Wrap the designated target function with a try-except. 36 Captures the output and executes either the callback or error_callback. 37 """ 38 try: 39 result = self.method(*args, **kw) 40 success = True 41 except Exception as e: 42 success = False 43 result = e 44 45 cb = self.callback if success else self.error_callback 46 if cb is not None: 47 cb(result) 48 return result
Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.
50 def join(self, timeout: Optional[float] = None): 51 """ 52 Join the thread with an optional timeout. 53 """ 54 threading.Thread.join(self, timeout=timeout) 55 return self._return
Join the thread with an optional timeout.
57 def run(self): 58 """Set the return to the result of the target.""" 59 self._return = self._target(*self._args, **self._kwargs)
Set the return to the result of the target.
61 def send_signal(self, signalnum): 62 """ 63 Send a signal to the thread. 64 """ 65 if not self.is_alive(): 66 return 67 68 if signalnum == signal.SIGINT: 69 self.raise_exception(KeyboardInterrupt()) 70 elif signalnum == signal.SIGTERM: 71 self.raise_exception(SystemExit()) 72 else: 73 signal.pthread_kill(self.ident, signalnum)
Send a signal to the thread.
75 def raise_exception(self, exc: BaseException): 76 """ 77 Raise an exception in the thread. 78 79 This uses a CPython-specific implementation and is not guaranteed to be stable. 80 It may also be deprecated in future Python versions. 81 """ 82 if not self.is_alive(): 83 return 84 85 if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'): 86 return 87 88 exc_class = exc if isinstance(exc, type) else type(exc) 89 90 ident = self.ident 91 if ident is None: 92 return 93 94 ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( 95 ctypes.c_ulong(ident), 96 ctypes.py_object(exc_class) 97 ) 98 if ret > 1: 99 ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
Raise an exception in the thread.
This uses a CPython-specific implementation and is not guaranteed to be stable. It may also be deprecated in future Python versions.
101class Worker(threading.Thread): 102 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 103 104 def __init__(self, queue, *args, timeout: int = 3, **kw): 105 self.queue = queue 106 self.timeout = timeout 107 super().__init__(*args, **kw) 108 109 def run(self): 110 while True: 111 try: 112 _ = self.queue.get(timeout=self.timeout) 113 except self.queue.Empty: 114 return None 115 116 self.queue.task_done()
Wrapper for threading.Thread
for working with queue.Queue
objects.
104 def __init__(self, queue, *args, timeout: int = 3, **kw): 105 self.queue = queue 106 self.timeout = timeout 107 super().__init__(*args, **kw)
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
109 def run(self): 110 while True: 111 try: 112 _ = self.queue.get(timeout=self.timeout) 113 except self.queue.Empty: 114 return None 115 116 self.queue.task_done()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
119class RepeatTimer(Timer): 120 """ 121 Fire the timer's target function in a loop, every `interval` seconds. 122 """ 123 124 def __init__(self, *args, **kwargs): 125 super().__init__(*args, **kwargs) 126 self._is_running = False 127 128 def is_running(self) -> bool: 129 """ 130 Return whether this timer has been started and is running. 131 """ 132 return self._is_running 133 134 def run(self) -> None: 135 """ 136 Fire the target function in a loop. 137 """ 138 self._is_running = True 139 while not self.finished.wait(self.interval): 140 self.function(*self.args, **self.kwargs) 141 self._is_running = False
Fire the timer's target function in a loop, every interval
seconds.
124 def __init__(self, *args, **kwargs): 125 super().__init__(*args, **kwargs) 126 self._is_running = False
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.