meerschaum.utils.threading

Define a custom Thread class with a callback method.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Define a custom Thread class with a callback method.
 7"""
 8
 9from __future__ import annotations
10from meerschaum.utils.typing import Optional
11
12import threading
13import traceback
14Lock = threading.Lock
15RLock = threading.RLock
16Event = threading.Event
17Timer = threading.Timer
18get_ident = threading.get_ident
19
20class Thread(threading.Thread):
21    """Wrapper for threading.Thread with optional callback and error_callback functions."""
22
23    def __init__(self, *args, callback=None, error_callback=None, **kw):
24        target = kw.pop('target')
25        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
26        self.callback = callback
27        self.error_callback = error_callback
28        self.method = target
29        self._return = None
30
31    def wrap_target_with_callback(self, *args, **kw):
32        """Wrap the designated target function with a try-except.
33        Captures the output and executes either the callback or error_callback.
34        """
35        try:
36            result = self.method(*args, **kw)
37            success = True
38        except Exception as e:
39            success = False
40            result = e
41
42        cb = self.callback if success else self.error_callback
43        if cb is not None:
44            cb(result)
45        return result
46
47    def join(self, timeout: Optional[float] = None):
48        """
49        Join the thread with an optional timeout.
50        """
51        threading.Thread.join(self, timeout=timeout)
52        return self._return
53
54    def run(self):
55        """Set the return to the result of the target."""
56        self._return = self._target(*self._args, **self._kwargs)
57
58
59class Worker(threading.Thread):
60    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
61
62    def __init__(self, queue, *args, timeout: int = 3, **kw):
63        self.queue = queue
64        self.timeout = timeout
65        super().__init__(*args, **kw)
66
67    def run(self):
68        while True:
69            try:
70                item = self.queue.get(timeout=self.timeout)
71            except queue.Empty:
72                return None
73
74            self.queue.task_done()
75
76
77class RepeatTimer(Timer):
78    """
79    Fire the timer's target function in a loop, every `interval` seconds.
80    """
81
82    def run(self) -> None:
83        """
84        Fire the target function in a loop.
85        """
86        while not self.finished.wait(self.interval):
87            self.function(*self.args, **self.kwargs)
def Lock(unknown):

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(type(threading.Lock())) for information about locks.

def RLock(*args, **kwargs):
 91def RLock(*args, **kwargs):
 92    """Factory function that returns a new reentrant lock.
 93
 94    A reentrant lock must be released by the thread that acquired it. Once a
 95    thread has acquired a reentrant lock, the same thread may acquire it again
 96    without blocking; the thread must release it once for each time it has
 97    acquired it.
 98
 99    """
100    if _CRLock is None:
101        return _PyRLock(*args, **kwargs)
102    return _CRLock(*args, **kwargs)

Factory function that returns a new reentrant lock.

A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.

class Event:
535class Event:
536    """Class implementing event objects.
537
538    Events manage a flag that can be set to true with the set() method and reset
539    to false with the clear() method. The wait() method blocks until the flag is
540    true.  The flag is initially false.
541
542    """
543
544    # After Tim Peters' event class (without is_posted())
545
546    def __init__(self):
547        self._cond = Condition(Lock())
548        self._flag = False
549
550    def _at_fork_reinit(self):
551        # Private method called by Thread._reset_internal_locks()
552        self._cond._at_fork_reinit()
553
554    def is_set(self):
555        """Return true if and only if the internal flag is true."""
556        return self._flag
557
558    def isSet(self):
559        """Return true if and only if the internal flag is true.
560
561        This method is deprecated, use is_set() instead.
562
563        """
564        import warnings
565        warnings.warn('isSet() is deprecated, use is_set() instead',
566                      DeprecationWarning, stacklevel=2)
567        return self.is_set()
568
569    def set(self):
570        """Set the internal flag to true.
571
572        All threads waiting for it to become true are awakened. Threads
573        that call wait() once the flag is true will not block at all.
574
575        """
576        with self._cond:
577            self._flag = True
578            self._cond.notify_all()
579
580    def clear(self):
581        """Reset the internal flag to false.
582
583        Subsequently, threads calling wait() will block until set() is called to
584        set the internal flag to true again.
585
586        """
587        with self._cond:
588            self._flag = False
589
590    def wait(self, timeout=None):
591        """Block until the internal flag is true.
592
593        If the internal flag is true on entry, return immediately. Otherwise,
594        block until another thread calls set() to set the flag to true, or until
595        the optional timeout occurs.
596
597        When the timeout argument is present and not None, it should be a
598        floating point number specifying a timeout for the operation in seconds
599        (or fractions thereof).
600
601        This method returns the internal flag on exit, so it will always return
602        True except if a timeout is given and the operation times out.
603
604        """
605        with self._cond:
606            signaled = self._flag
607            if not signaled:
608                signaled = self._cond.wait(timeout)
609            return signaled

Class implementing event objects.

Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.

def is_set(self):
554    def is_set(self):
555        """Return true if and only if the internal flag is true."""
556        return self._flag

Return true if and only if the internal flag is true.

def isSet(self):
558    def isSet(self):
559        """Return true if and only if the internal flag is true.
560
561        This method is deprecated, use is_set() instead.
562
563        """
564        import warnings
565        warnings.warn('isSet() is deprecated, use is_set() instead',
566                      DeprecationWarning, stacklevel=2)
567        return self.is_set()

Return true if and only if the internal flag is true.

This method is deprecated, use is_set() instead.

def set(self):
569    def set(self):
570        """Set the internal flag to true.
571
572        All threads waiting for it to become true are awakened. Threads
573        that call wait() once the flag is true will not block at all.
574
575        """
576        with self._cond:
577            self._flag = True
578            self._cond.notify_all()

Set the internal flag to true.

All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.

def clear(self):
580    def clear(self):
581        """Reset the internal flag to false.
582
583        Subsequently, threads calling wait() will block until set() is called to
584        set the internal flag to true again.
585
586        """
587        with self._cond:
588            self._flag = False

Reset the internal flag to false.

Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

def wait(self, timeout=None):
590    def wait(self, timeout=None):
591        """Block until the internal flag is true.
592
593        If the internal flag is true on entry, return immediately. Otherwise,
594        block until another thread calls set() to set the flag to true, or until
595        the optional timeout occurs.
596
597        When the timeout argument is present and not None, it should be a
598        floating point number specifying a timeout for the operation in seconds
599        (or fractions thereof).
600
601        This method returns the internal flag on exit, so it will always return
602        True except if a timeout is given and the operation times out.
603
604        """
605        with self._cond:
606            signaled = self._flag
607            if not signaled:
608                signaled = self._cond.wait(timeout)
609            return signaled

Block until the internal flag is true.

If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.

class Timer(threading.Thread):
1355class Timer(Thread):
1356    """Call a function after a specified number of seconds:
1357
1358            t = Timer(30.0, f, args=None, kwargs=None)
1359            t.start()
1360            t.cancel()     # stop the timer's action if it's still waiting
1361
1362    """
1363
1364    def __init__(self, interval, function, args=None, kwargs=None):
1365        Thread.__init__(self)
1366        self.interval = interval
1367        self.function = function
1368        self.args = args if args is not None else []
1369        self.kwargs = kwargs if kwargs is not None else {}
1370        self.finished = Event()
1371
1372    def cancel(self):
1373        """Stop the timer if it hasn't finished yet."""
1374        self.finished.set()
1375
1376    def run(self):
1377        self.finished.wait(self.interval)
1378        if not self.finished.is_set():
1379            self.function(*self.args, **self.kwargs)
1380        self.finished.set()

Call a function after a specified number of seconds:

t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting

Timer(interval, function, args=None, kwargs=None)
1364    def __init__(self, interval, function, args=None, kwargs=None):
1365        Thread.__init__(self)
1366        self.interval = interval
1367        self.function = function
1368        self.args = args if args is not None else []
1369        self.kwargs = kwargs if kwargs is not None else {}
1370        self.finished = Event()

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

interval
function
args
kwargs
finished
def cancel(self):
1372    def cancel(self):
1373        """Stop the timer if it hasn't finished yet."""
1374        self.finished.set()

Stop the timer if it hasn't finished yet.

def run(self):
1376    def run(self):
1377        self.finished.wait(self.interval)
1378        if not self.finished.is_set():
1379            self.function(*self.args, **self.kwargs)
1380        self.finished.set()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
def get_ident(unknown):

get_ident() -> integer

Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.

class Thread(threading.Thread):
21class Thread(threading.Thread):
22    """Wrapper for threading.Thread with optional callback and error_callback functions."""
23
24    def __init__(self, *args, callback=None, error_callback=None, **kw):
25        target = kw.pop('target')
26        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
27        self.callback = callback
28        self.error_callback = error_callback
29        self.method = target
30        self._return = None
31
32    def wrap_target_with_callback(self, *args, **kw):
33        """Wrap the designated target function with a try-except.
34        Captures the output and executes either the callback or error_callback.
35        """
36        try:
37            result = self.method(*args, **kw)
38            success = True
39        except Exception as e:
40            success = False
41            result = e
42
43        cb = self.callback if success else self.error_callback
44        if cb is not None:
45            cb(result)
46        return result
47
48    def join(self, timeout: Optional[float] = None):
49        """
50        Join the thread with an optional timeout.
51        """
52        threading.Thread.join(self, timeout=timeout)
53        return self._return
54
55    def run(self):
56        """Set the return to the result of the target."""
57        self._return = self._target(*self._args, **self._kwargs)

Wrapper for threading.Thread with optional callback and error_callback functions.

Thread(*args, callback=None, error_callback=None, **kw)
24    def __init__(self, *args, callback=None, error_callback=None, **kw):
25        target = kw.pop('target')
26        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
27        self.callback = callback
28        self.error_callback = error_callback
29        self.method = target
30        self._return = None

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

callback
error_callback
method
def wrap_target_with_callback(self, *args, **kw):
32    def wrap_target_with_callback(self, *args, **kw):
33        """Wrap the designated target function with a try-except.
34        Captures the output and executes either the callback or error_callback.
35        """
36        try:
37            result = self.method(*args, **kw)
38            success = True
39        except Exception as e:
40            success = False
41            result = e
42
43        cb = self.callback if success else self.error_callback
44        if cb is not None:
45            cb(result)
46        return result

Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.

def join(self, timeout: Optional[float] = None):
48    def join(self, timeout: Optional[float] = None):
49        """
50        Join the thread with an optional timeout.
51        """
52        threading.Thread.join(self, timeout=timeout)
53        return self._return

Join the thread with an optional timeout.

def run(self):
55    def run(self):
56        """Set the return to the result of the target."""
57        self._return = self._target(*self._args, **self._kwargs)

Set the return to the result of the target.

Inherited Members
threading.Thread
start
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
class Worker(threading.Thread):
60class Worker(threading.Thread):
61    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
62
63    def __init__(self, queue, *args, timeout: int = 3, **kw):
64        self.queue = queue
65        self.timeout = timeout
66        super().__init__(*args, **kw)
67
68    def run(self):
69        while True:
70            try:
71                item = self.queue.get(timeout=self.timeout)
72            except queue.Empty:
73                return None
74
75            self.queue.task_done()

Wrapper for threading.Thread for working with queue.Queue objects.

Worker(queue, *args, timeout: int = 3, **kw)
63    def __init__(self, queue, *args, timeout: int = 3, **kw):
64        self.queue = queue
65        self.timeout = timeout
66        super().__init__(*args, **kw)

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

queue
timeout
def run(self):
68    def run(self):
69        while True:
70            try:
71                item = self.queue.get(timeout=self.timeout)
72            except queue.Empty:
73                return None
74
75            self.queue.task_done()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
class RepeatTimer(meerschaum.utils.threading.Timer):
78class RepeatTimer(Timer):
79    """
80    Fire the timer's target function in a loop, every `interval` seconds.
81    """
82
83    def run(self) -> None:
84        """
85        Fire the target function in a loop.
86        """
87        while not self.finished.wait(self.interval):
88            self.function(*self.args, **self.kwargs)

Fire the timer's target function in a loop, every interval seconds.

def run(self) -> None:
83    def run(self) -> None:
84        """
85        Fire the target function in a loop.
86        """
87        while not self.finished.wait(self.interval):
88            self.function(*self.args, **self.kwargs)

Fire the target function in a loop.

Inherited Members
Timer
Timer
interval
function
args
kwargs
finished
cancel
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id