meerschaum.utils.threading

Define a custom Thread class with a callback method.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Define a custom Thread class with a callback method.
  7"""
  8
  9from __future__ import annotations
 10from meerschaum.utils.typing import Optional
 11
 12import threading
 13import ctypes
 14import signal
 15
 16Lock = threading.Lock
 17RLock = threading.RLock
 18Event = threading.Event
 19Timer = threading.Timer
 20get_ident = threading.get_ident
 21
 22class Thread(threading.Thread):
 23    """Wrapper for threading.Thread with optional callback and error_callback functions."""
 24
 25    def __init__(self, *args, callback=None, error_callback=None, **kw):
 26        target = kw.pop('target')
 27        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
 28        self.callback = callback
 29        self.error_callback = error_callback
 30        self.method = target
 31        self._return = None
 32
 33    def wrap_target_with_callback(self, *args, **kw):
 34        """Wrap the designated target function with a try-except.
 35        Captures the output and executes either the callback or error_callback.
 36        """
 37        try:
 38            result = self.method(*args, **kw)
 39            success = True
 40        except Exception as e:
 41            success = False
 42            result = e
 43
 44        cb = self.callback if success else self.error_callback
 45        if cb is not None:
 46            cb(result)
 47        return result
 48
 49    def join(self, timeout: Optional[float] = None):
 50        """
 51        Join the thread with an optional timeout.
 52        """
 53        threading.Thread.join(self, timeout=timeout)
 54        return self._return
 55
 56    def run(self):
 57        """Set the return to the result of the target."""
 58        self._return = self._target(*self._args, **self._kwargs)
 59
 60    def send_signal(self, signalnum):
 61        """
 62        Send a signal to the thread.
 63        """
 64        if not self.is_alive():
 65            return
 66
 67        if signalnum == signal.SIGINT:
 68            self.raise_exception(KeyboardInterrupt())
 69        elif signalnum == signal.SIGTERM:
 70            self.raise_exception(SystemExit())
 71        else:
 72            signal.pthread_kill(self.ident, signalnum)
 73
 74    def raise_exception(self, exc: BaseException):
 75        """
 76        Raise an exception in the thread.
 77
 78        This uses a CPython-specific implementation and is not guaranteed to be stable.
 79        It may also be deprecated in future Python versions.
 80        """
 81        if not self.is_alive():
 82            return
 83
 84        if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'):
 85            return
 86
 87        exc_class = exc if isinstance(exc, type) else type(exc)
 88
 89        ident = self.ident
 90        if ident is None:
 91            return
 92
 93        ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
 94            ctypes.c_ulong(ident),
 95            ctypes.py_object(exc_class)
 96        )
 97        if ret > 1:
 98            ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
 99
100class Worker(threading.Thread):
101    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
102
103    def __init__(self, queue, *args, timeout: int = 3, **kw):
104        self.queue = queue
105        self.timeout = timeout
106        super().__init__(*args, **kw)
107
108    def run(self):
109        while True:
110            try:
111                _ = self.queue.get(timeout=self.timeout)
112            except self.queue.Empty:
113                return None
114
115            self.queue.task_done()
116
117
118class RepeatTimer(Timer):
119    """
120    Fire the timer's target function in a loop, every `interval` seconds.
121    """
122
123    def __init__(self, *args, **kwargs):
124        super().__init__(*args, **kwargs)
125        self._is_running = False
126
127    def is_running(self) -> bool:
128        """
129        Return whether this timer has been started and is running.
130        """
131        return self._is_running
132
133    def run(self) -> None:
134        """
135        Fire the target function in a loop.
136        """
137        self._is_running = True
138        while not self.finished.wait(self.interval):
139            self.function(*self.args, **self.kwargs)
140        self._is_running = False
def Lock(unknown):

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(type(threading.Lock())) for information about locks.

def RLock(*args, **kwargs):
125def RLock(*args, **kwargs):
126    """Factory function that returns a new reentrant lock.
127
128    A reentrant lock must be released by the thread that acquired it. Once a
129    thread has acquired a reentrant lock, the same thread may acquire it again
130    without blocking; the thread must release it once for each time it has
131    acquired it.
132
133    """
134    if _CRLock is None:
135        return _PyRLock(*args, **kwargs)
136    return _CRLock(*args, **kwargs)

Factory function that returns a new reentrant lock.

A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.

class Event:
578class Event:
579    """Class implementing event objects.
580
581    Events manage a flag that can be set to true with the set() method and reset
582    to false with the clear() method. The wait() method blocks until the flag is
583    true.  The flag is initially false.
584
585    """
586
587    # After Tim Peters' event class (without is_posted())
588
589    def __init__(self):
590        self._cond = Condition(Lock())
591        self._flag = False
592
593    def __repr__(self):
594        cls = self.__class__
595        status = 'set' if self._flag else 'unset'
596        return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
597
598    def _at_fork_reinit(self):
599        # Private method called by Thread._reset_internal_locks()
600        self._cond._at_fork_reinit()
601
602    def is_set(self):
603        """Return true if and only if the internal flag is true."""
604        return self._flag
605
606    def isSet(self):
607        """Return true if and only if the internal flag is true.
608
609        This method is deprecated, use is_set() instead.
610
611        """
612        import warnings
613        warnings.warn('isSet() is deprecated, use is_set() instead',
614                      DeprecationWarning, stacklevel=2)
615        return self.is_set()
616
617    def set(self):
618        """Set the internal flag to true.
619
620        All threads waiting for it to become true are awakened. Threads
621        that call wait() once the flag is true will not block at all.
622
623        """
624        with self._cond:
625            self._flag = True
626            self._cond.notify_all()
627
628    def clear(self):
629        """Reset the internal flag to false.
630
631        Subsequently, threads calling wait() will block until set() is called to
632        set the internal flag to true again.
633
634        """
635        with self._cond:
636            self._flag = False
637
638    def wait(self, timeout=None):
639        """Block until the internal flag is true.
640
641        If the internal flag is true on entry, return immediately. Otherwise,
642        block until another thread calls set() to set the flag to true, or until
643        the optional timeout occurs.
644
645        When the timeout argument is present and not None, it should be a
646        floating point number specifying a timeout for the operation in seconds
647        (or fractions thereof).
648
649        This method returns the internal flag on exit, so it will always return
650        True except if a timeout is given and the operation times out.
651
652        """
653        with self._cond:
654            signaled = self._flag
655            if not signaled:
656                signaled = self._cond.wait(timeout)
657            return signaled

Class implementing event objects.

Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.

def is_set(self):
602    def is_set(self):
603        """Return true if and only if the internal flag is true."""
604        return self._flag

Return true if and only if the internal flag is true.

def isSet(self):
606    def isSet(self):
607        """Return true if and only if the internal flag is true.
608
609        This method is deprecated, use is_set() instead.
610
611        """
612        import warnings
613        warnings.warn('isSet() is deprecated, use is_set() instead',
614                      DeprecationWarning, stacklevel=2)
615        return self.is_set()

Return true if and only if the internal flag is true.

This method is deprecated, use is_set() instead.

def set(self):
617    def set(self):
618        """Set the internal flag to true.
619
620        All threads waiting for it to become true are awakened. Threads
621        that call wait() once the flag is true will not block at all.
622
623        """
624        with self._cond:
625            self._flag = True
626            self._cond.notify_all()

Set the internal flag to true.

All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.

def clear(self):
628    def clear(self):
629        """Reset the internal flag to false.
630
631        Subsequently, threads calling wait() will block until set() is called to
632        set the internal flag to true again.
633
634        """
635        with self._cond:
636            self._flag = False

Reset the internal flag to false.

Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

def wait(self, timeout=None):
638    def wait(self, timeout=None):
639        """Block until the internal flag is true.
640
641        If the internal flag is true on entry, return immediately. Otherwise,
642        block until another thread calls set() to set the flag to true, or until
643        the optional timeout occurs.
644
645        When the timeout argument is present and not None, it should be a
646        floating point number specifying a timeout for the operation in seconds
647        (or fractions thereof).
648
649        This method returns the internal flag on exit, so it will always return
650        True except if a timeout is given and the operation times out.
651
652        """
653        with self._cond:
654            signaled = self._flag
655            if not signaled:
656                signaled = self._cond.wait(timeout)
657            return signaled

Block until the internal flag is true.

If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.

class Timer(threading.Thread):
1408class Timer(Thread):
1409    """Call a function after a specified number of seconds:
1410
1411            t = Timer(30.0, f, args=None, kwargs=None)
1412            t.start()
1413            t.cancel()     # stop the timer's action if it's still waiting
1414
1415    """
1416
1417    def __init__(self, interval, function, args=None, kwargs=None):
1418        Thread.__init__(self)
1419        self.interval = interval
1420        self.function = function
1421        self.args = args if args is not None else []
1422        self.kwargs = kwargs if kwargs is not None else {}
1423        self.finished = Event()
1424
1425    def cancel(self):
1426        """Stop the timer if it hasn't finished yet."""
1427        self.finished.set()
1428
1429    def run(self):
1430        self.finished.wait(self.interval)
1431        if not self.finished.is_set():
1432            self.function(*self.args, **self.kwargs)
1433        self.finished.set()

Call a function after a specified number of seconds:

t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting

Timer(interval, function, args=None, kwargs=None)
1417    def __init__(self, interval, function, args=None, kwargs=None):
1418        Thread.__init__(self)
1419        self.interval = interval
1420        self.function = function
1421        self.args = args if args is not None else []
1422        self.kwargs = kwargs if kwargs is not None else {}
1423        self.finished = Event()

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

interval
function
args
kwargs
finished
def cancel(self):
1425    def cancel(self):
1426        """Stop the timer if it hasn't finished yet."""
1427        self.finished.set()

Stop the timer if it hasn't finished yet.

def run(self):
1429    def run(self):
1430        self.finished.wait(self.interval)
1431        if not self.finished.is_set():
1432            self.function(*self.args, **self.kwargs)
1433        self.finished.set()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def get_ident(unknown):

get_ident() -> integer

Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.

class Thread(threading.Thread):
23class Thread(threading.Thread):
24    """Wrapper for threading.Thread with optional callback and error_callback functions."""
25
26    def __init__(self, *args, callback=None, error_callback=None, **kw):
27        target = kw.pop('target')
28        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
29        self.callback = callback
30        self.error_callback = error_callback
31        self.method = target
32        self._return = None
33
34    def wrap_target_with_callback(self, *args, **kw):
35        """Wrap the designated target function with a try-except.
36        Captures the output and executes either the callback or error_callback.
37        """
38        try:
39            result = self.method(*args, **kw)
40            success = True
41        except Exception as e:
42            success = False
43            result = e
44
45        cb = self.callback if success else self.error_callback
46        if cb is not None:
47            cb(result)
48        return result
49
50    def join(self, timeout: Optional[float] = None):
51        """
52        Join the thread with an optional timeout.
53        """
54        threading.Thread.join(self, timeout=timeout)
55        return self._return
56
57    def run(self):
58        """Set the return to the result of the target."""
59        self._return = self._target(*self._args, **self._kwargs)
60
61    def send_signal(self, signalnum):
62        """
63        Send a signal to the thread.
64        """
65        if not self.is_alive():
66            return
67
68        if signalnum == signal.SIGINT:
69            self.raise_exception(KeyboardInterrupt())
70        elif signalnum == signal.SIGTERM:
71            self.raise_exception(SystemExit())
72        else:
73            signal.pthread_kill(self.ident, signalnum)
74
75    def raise_exception(self, exc: BaseException):
76        """
77        Raise an exception in the thread.
78
79        This uses a CPython-specific implementation and is not guaranteed to be stable.
80        It may also be deprecated in future Python versions.
81        """
82        if not self.is_alive():
83            return
84
85        if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'):
86            return
87
88        exc_class = exc if isinstance(exc, type) else type(exc)
89
90        ident = self.ident
91        if ident is None:
92            return
93
94        ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
95            ctypes.c_ulong(ident),
96            ctypes.py_object(exc_class)
97        )
98        if ret > 1:
99            ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)

Wrapper for threading.Thread with optional callback and error_callback functions.

Thread(*args, callback=None, error_callback=None, **kw)
26    def __init__(self, *args, callback=None, error_callback=None, **kw):
27        target = kw.pop('target')
28        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
29        self.callback = callback
30        self.error_callback = error_callback
31        self.method = target
32        self._return = None

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

callback
error_callback
method
def wrap_target_with_callback(self, *args, **kw):
34    def wrap_target_with_callback(self, *args, **kw):
35        """Wrap the designated target function with a try-except.
36        Captures the output and executes either the callback or error_callback.
37        """
38        try:
39            result = self.method(*args, **kw)
40            success = True
41        except Exception as e:
42            success = False
43            result = e
44
45        cb = self.callback if success else self.error_callback
46        if cb is not None:
47            cb(result)
48        return result

Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.

def join(self, timeout: Optional[float] = None):
50    def join(self, timeout: Optional[float] = None):
51        """
52        Join the thread with an optional timeout.
53        """
54        threading.Thread.join(self, timeout=timeout)
55        return self._return

Join the thread with an optional timeout.

def run(self):
57    def run(self):
58        """Set the return to the result of the target."""
59        self._return = self._target(*self._args, **self._kwargs)

Set the return to the result of the target.

def send_signal(self, signalnum):
61    def send_signal(self, signalnum):
62        """
63        Send a signal to the thread.
64        """
65        if not self.is_alive():
66            return
67
68        if signalnum == signal.SIGINT:
69            self.raise_exception(KeyboardInterrupt())
70        elif signalnum == signal.SIGTERM:
71            self.raise_exception(SystemExit())
72        else:
73            signal.pthread_kill(self.ident, signalnum)

Send a signal to the thread.

def raise_exception(self, exc: BaseException):
75    def raise_exception(self, exc: BaseException):
76        """
77        Raise an exception in the thread.
78
79        This uses a CPython-specific implementation and is not guaranteed to be stable.
80        It may also be deprecated in future Python versions.
81        """
82        if not self.is_alive():
83            return
84
85        if not hasattr(ctypes.pythonapi, 'PyThreadState_SetAsyncExc'):
86            return
87
88        exc_class = exc if isinstance(exc, type) else type(exc)
89
90        ident = self.ident
91        if ident is None:
92            return
93
94        ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
95            ctypes.c_ulong(ident),
96            ctypes.py_object(exc_class)
97        )
98        if ret > 1:
99            ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)

Raise an exception in the thread.

This uses a CPython-specific implementation and is not guaranteed to be stable. It may also be deprecated in future Python versions.

class Worker(threading.Thread):
101class Worker(threading.Thread):
102    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
103
104    def __init__(self, queue, *args, timeout: int = 3, **kw):
105        self.queue = queue
106        self.timeout = timeout
107        super().__init__(*args, **kw)
108
109    def run(self):
110        while True:
111            try:
112                _ = self.queue.get(timeout=self.timeout)
113            except self.queue.Empty:
114                return None
115
116            self.queue.task_done()

Wrapper for threading.Thread for working with queue.Queue objects.

Worker(queue, *args, timeout: int = 3, **kw)
104    def __init__(self, queue, *args, timeout: int = 3, **kw):
105        self.queue = queue
106        self.timeout = timeout
107        super().__init__(*args, **kw)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

queue
timeout
def run(self):
109    def run(self):
110        while True:
111            try:
112                _ = self.queue.get(timeout=self.timeout)
113            except self.queue.Empty:
114                return None
115
116            self.queue.task_done()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class RepeatTimer(meerschaum.utils.threading.Timer):
119class RepeatTimer(Timer):
120    """
121    Fire the timer's target function in a loop, every `interval` seconds.
122    """
123
124    def __init__(self, *args, **kwargs):
125        super().__init__(*args, **kwargs)
126        self._is_running = False
127
128    def is_running(self) -> bool:
129        """
130        Return whether this timer has been started and is running.
131        """
132        return self._is_running
133
134    def run(self) -> None:
135        """
136        Fire the target function in a loop.
137        """
138        self._is_running = True
139        while not self.finished.wait(self.interval):
140            self.function(*self.args, **self.kwargs)
141        self._is_running = False

Fire the timer's target function in a loop, every interval seconds.

RepeatTimer(*args, **kwargs)
124    def __init__(self, *args, **kwargs):
125        super().__init__(*args, **kwargs)
126        self._is_running = False

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

def is_running(self) -> bool:
128    def is_running(self) -> bool:
129        """
130        Return whether this timer has been started and is running.
131        """
132        return self._is_running

Return whether this timer has been started and is running.

def run(self) -> None:
134    def run(self) -> None:
135        """
136        Fire the target function in a loop.
137        """
138        self._is_running = True
139        while not self.finished.wait(self.interval):
140            self.function(*self.args, **self.kwargs)
141        self._is_running = False

Fire the target function in a loop.