meerschaum.utils.threading

Define a custom Thread class with a callback method.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Define a custom Thread class with a callback method.
 7"""
 8
 9from __future__ import annotations
10from meerschaum.utils.typing import Optional
11
12import threading
13Lock = threading.Lock
14RLock = threading.RLock
15Event = threading.Event
16Timer = threading.Timer
17get_ident = threading.get_ident
18
19class Thread(threading.Thread):
20    """Wrapper for threading.Thread with optional callback and error_callback functions."""
21
22    def __init__(self, *args, callback=None, error_callback=None, **kw):
23        target = kw.pop('target')
24        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
25        self.callback = callback
26        self.error_callback = error_callback
27        self.method = target
28        self._return = None
29
30    def wrap_target_with_callback(self, *args, **kw):
31        """Wrap the designated target function with a try-except.
32        Captures the output and executes either the callback or error_callback.
33        """
34        try:
35            result = self.method(*args, **kw)
36            success = True
37        except Exception as e:
38            success = False
39            result = e
40
41        cb = self.callback if success else self.error_callback
42        if cb is not None:
43            cb(result)
44        return result
45
46    def join(self, timeout: Optional[float] = None):
47        """
48        Join the thread with an optional timeout.
49        """
50        threading.Thread.join(self, timeout=timeout)
51        return self._return
52
53    def run(self):
54        """Set the return to the result of the target."""
55        self._return = self._target(*self._args, **self._kwargs)
56
57
58class Worker(threading.Thread):
59    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
60
61    def __init__(self, queue, *args, timeout: int = 3, **kw):
62        self.queue = queue
63        self.timeout = timeout
64        super().__init__(*args, **kw)
65
66    def run(self):
67        while True:
68            try:
69                _ = self.queue.get(timeout=self.timeout)
70            except self.queue.Empty:
71                return None
72
73            self.queue.task_done()
74
75
76class RepeatTimer(Timer):
77    """
78    Fire the timer's target function in a loop, every `interval` seconds.
79    """
80
81    def __init__(self, *args, **kwargs):
82        super().__init__(*args, **kwargs)
83        self._is_running = False
84
85    def is_running(self) -> bool:
86        """
87        Return whether this timer has been started and is running.
88        """
89        return self._is_running
90
91    def run(self) -> None:
92        """
93        Fire the target function in a loop.
94        """
95        self._is_running = True
96        while not self.finished.wait(self.interval):
97            self.function(*self.args, **self.kwargs)
98        self._is_running = False
def Lock(unknown):

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(type(threading.Lock())) for information about locks.

def RLock(*args, **kwargs):
125def RLock(*args, **kwargs):
126    """Factory function that returns a new reentrant lock.
127
128    A reentrant lock must be released by the thread that acquired it. Once a
129    thread has acquired a reentrant lock, the same thread may acquire it again
130    without blocking; the thread must release it once for each time it has
131    acquired it.
132
133    """
134    if _CRLock is None:
135        return _PyRLock(*args, **kwargs)
136    return _CRLock(*args, **kwargs)

Factory function that returns a new reentrant lock.

A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.

class Event:
578class Event:
579    """Class implementing event objects.
580
581    Events manage a flag that can be set to true with the set() method and reset
582    to false with the clear() method. The wait() method blocks until the flag is
583    true.  The flag is initially false.
584
585    """
586
587    # After Tim Peters' event class (without is_posted())
588
589    def __init__(self):
590        self._cond = Condition(Lock())
591        self._flag = False
592
593    def __repr__(self):
594        cls = self.__class__
595        status = 'set' if self._flag else 'unset'
596        return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
597
598    def _at_fork_reinit(self):
599        # Private method called by Thread._reset_internal_locks()
600        self._cond._at_fork_reinit()
601
602    def is_set(self):
603        """Return true if and only if the internal flag is true."""
604        return self._flag
605
606    def isSet(self):
607        """Return true if and only if the internal flag is true.
608
609        This method is deprecated, use is_set() instead.
610
611        """
612        import warnings
613        warnings.warn('isSet() is deprecated, use is_set() instead',
614                      DeprecationWarning, stacklevel=2)
615        return self.is_set()
616
617    def set(self):
618        """Set the internal flag to true.
619
620        All threads waiting for it to become true are awakened. Threads
621        that call wait() once the flag is true will not block at all.
622
623        """
624        with self._cond:
625            self._flag = True
626            self._cond.notify_all()
627
628    def clear(self):
629        """Reset the internal flag to false.
630
631        Subsequently, threads calling wait() will block until set() is called to
632        set the internal flag to true again.
633
634        """
635        with self._cond:
636            self._flag = False
637
638    def wait(self, timeout=None):
639        """Block until the internal flag is true.
640
641        If the internal flag is true on entry, return immediately. Otherwise,
642        block until another thread calls set() to set the flag to true, or until
643        the optional timeout occurs.
644
645        When the timeout argument is present and not None, it should be a
646        floating point number specifying a timeout for the operation in seconds
647        (or fractions thereof).
648
649        This method returns the internal flag on exit, so it will always return
650        True except if a timeout is given and the operation times out.
651
652        """
653        with self._cond:
654            signaled = self._flag
655            if not signaled:
656                signaled = self._cond.wait(timeout)
657            return signaled

Class implementing event objects.

Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.

def is_set(self):
602    def is_set(self):
603        """Return true if and only if the internal flag is true."""
604        return self._flag

Return true if and only if the internal flag is true.

def isSet(self):
606    def isSet(self):
607        """Return true if and only if the internal flag is true.
608
609        This method is deprecated, use is_set() instead.
610
611        """
612        import warnings
613        warnings.warn('isSet() is deprecated, use is_set() instead',
614                      DeprecationWarning, stacklevel=2)
615        return self.is_set()

Return true if and only if the internal flag is true.

This method is deprecated, use is_set() instead.

def set(self):
617    def set(self):
618        """Set the internal flag to true.
619
620        All threads waiting for it to become true are awakened. Threads
621        that call wait() once the flag is true will not block at all.
622
623        """
624        with self._cond:
625            self._flag = True
626            self._cond.notify_all()

Set the internal flag to true.

All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.

def clear(self):
628    def clear(self):
629        """Reset the internal flag to false.
630
631        Subsequently, threads calling wait() will block until set() is called to
632        set the internal flag to true again.
633
634        """
635        with self._cond:
636            self._flag = False

Reset the internal flag to false.

Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

def wait(self, timeout=None):
638    def wait(self, timeout=None):
639        """Block until the internal flag is true.
640
641        If the internal flag is true on entry, return immediately. Otherwise,
642        block until another thread calls set() to set the flag to true, or until
643        the optional timeout occurs.
644
645        When the timeout argument is present and not None, it should be a
646        floating point number specifying a timeout for the operation in seconds
647        (or fractions thereof).
648
649        This method returns the internal flag on exit, so it will always return
650        True except if a timeout is given and the operation times out.
651
652        """
653        with self._cond:
654            signaled = self._flag
655            if not signaled:
656                signaled = self._cond.wait(timeout)
657            return signaled

Block until the internal flag is true.

If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.

class Timer(threading.Thread):
1408class Timer(Thread):
1409    """Call a function after a specified number of seconds:
1410
1411            t = Timer(30.0, f, args=None, kwargs=None)
1412            t.start()
1413            t.cancel()     # stop the timer's action if it's still waiting
1414
1415    """
1416
1417    def __init__(self, interval, function, args=None, kwargs=None):
1418        Thread.__init__(self)
1419        self.interval = interval
1420        self.function = function
1421        self.args = args if args is not None else []
1422        self.kwargs = kwargs if kwargs is not None else {}
1423        self.finished = Event()
1424
1425    def cancel(self):
1426        """Stop the timer if it hasn't finished yet."""
1427        self.finished.set()
1428
1429    def run(self):
1430        self.finished.wait(self.interval)
1431        if not self.finished.is_set():
1432            self.function(*self.args, **self.kwargs)
1433        self.finished.set()

Call a function after a specified number of seconds:

t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting

Timer(interval, function, args=None, kwargs=None)
1417    def __init__(self, interval, function, args=None, kwargs=None):
1418        Thread.__init__(self)
1419        self.interval = interval
1420        self.function = function
1421        self.args = args if args is not None else []
1422        self.kwargs = kwargs if kwargs is not None else {}
1423        self.finished = Event()

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

interval
function
args
kwargs
finished
def cancel(self):
1425    def cancel(self):
1426        """Stop the timer if it hasn't finished yet."""
1427        self.finished.set()

Stop the timer if it hasn't finished yet.

def run(self):
1429    def run(self):
1430        self.finished.wait(self.interval)
1431        if not self.finished.is_set():
1432            self.function(*self.args, **self.kwargs)
1433        self.finished.set()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def get_ident(unknown):

get_ident() -> integer

Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.

class Thread(threading.Thread):
20class Thread(threading.Thread):
21    """Wrapper for threading.Thread with optional callback and error_callback functions."""
22
23    def __init__(self, *args, callback=None, error_callback=None, **kw):
24        target = kw.pop('target')
25        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
26        self.callback = callback
27        self.error_callback = error_callback
28        self.method = target
29        self._return = None
30
31    def wrap_target_with_callback(self, *args, **kw):
32        """Wrap the designated target function with a try-except.
33        Captures the output and executes either the callback or error_callback.
34        """
35        try:
36            result = self.method(*args, **kw)
37            success = True
38        except Exception as e:
39            success = False
40            result = e
41
42        cb = self.callback if success else self.error_callback
43        if cb is not None:
44            cb(result)
45        return result
46
47    def join(self, timeout: Optional[float] = None):
48        """
49        Join the thread with an optional timeout.
50        """
51        threading.Thread.join(self, timeout=timeout)
52        return self._return
53
54    def run(self):
55        """Set the return to the result of the target."""
56        self._return = self._target(*self._args, **self._kwargs)

Wrapper for threading.Thread with optional callback and error_callback functions.

Thread(*args, callback=None, error_callback=None, **kw)
23    def __init__(self, *args, callback=None, error_callback=None, **kw):
24        target = kw.pop('target')
25        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
26        self.callback = callback
27        self.error_callback = error_callback
28        self.method = target
29        self._return = None

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

callback
error_callback
method
def wrap_target_with_callback(self, *args, **kw):
31    def wrap_target_with_callback(self, *args, **kw):
32        """Wrap the designated target function with a try-except.
33        Captures the output and executes either the callback or error_callback.
34        """
35        try:
36            result = self.method(*args, **kw)
37            success = True
38        except Exception as e:
39            success = False
40            result = e
41
42        cb = self.callback if success else self.error_callback
43        if cb is not None:
44            cb(result)
45        return result

Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.

def join(self, timeout: Optional[float] = None):
47    def join(self, timeout: Optional[float] = None):
48        """
49        Join the thread with an optional timeout.
50        """
51        threading.Thread.join(self, timeout=timeout)
52        return self._return

Join the thread with an optional timeout.

def run(self):
54    def run(self):
55        """Set the return to the result of the target."""
56        self._return = self._target(*self._args, **self._kwargs)

Set the return to the result of the target.

class Worker(threading.Thread):
59class Worker(threading.Thread):
60    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
61
62    def __init__(self, queue, *args, timeout: int = 3, **kw):
63        self.queue = queue
64        self.timeout = timeout
65        super().__init__(*args, **kw)
66
67    def run(self):
68        while True:
69            try:
70                _ = self.queue.get(timeout=self.timeout)
71            except self.queue.Empty:
72                return None
73
74            self.queue.task_done()

Wrapper for threading.Thread for working with queue.Queue objects.

Worker(queue, *args, timeout: int = 3, **kw)
62    def __init__(self, queue, *args, timeout: int = 3, **kw):
63        self.queue = queue
64        self.timeout = timeout
65        super().__init__(*args, **kw)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

queue
timeout
def run(self):
67    def run(self):
68        while True:
69            try:
70                _ = self.queue.get(timeout=self.timeout)
71            except self.queue.Empty:
72                return None
73
74            self.queue.task_done()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class RepeatTimer(meerschaum.utils.threading.Timer):
77class RepeatTimer(Timer):
78    """
79    Fire the timer's target function in a loop, every `interval` seconds.
80    """
81
82    def __init__(self, *args, **kwargs):
83        super().__init__(*args, **kwargs)
84        self._is_running = False
85
86    def is_running(self) -> bool:
87        """
88        Return whether this timer has been started and is running.
89        """
90        return self._is_running
91
92    def run(self) -> None:
93        """
94        Fire the target function in a loop.
95        """
96        self._is_running = True
97        while not self.finished.wait(self.interval):
98            self.function(*self.args, **self.kwargs)
99        self._is_running = False

Fire the timer's target function in a loop, every interval seconds.

RepeatTimer(*args, **kwargs)
82    def __init__(self, *args, **kwargs):
83        super().__init__(*args, **kwargs)
84        self._is_running = False

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

def is_running(self) -> bool:
86    def is_running(self) -> bool:
87        """
88        Return whether this timer has been started and is running.
89        """
90        return self._is_running

Return whether this timer has been started and is running.

def run(self) -> None:
92    def run(self) -> None:
93        """
94        Fire the target function in a loop.
95        """
96        self._is_running = True
97        while not self.finished.wait(self.interval):
98            self.function(*self.args, **self.kwargs)
99        self._is_running = False

Fire the target function in a loop.