meerschaum.utils.threading
Define a custom Thread class with a callback method.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Define a custom Thread class with a callback method. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Optional 11 12import threading 13Lock = threading.Lock 14RLock = threading.RLock 15Event = threading.Event 16Timer = threading.Timer 17get_ident = threading.get_ident 18 19class Thread(threading.Thread): 20 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 21 22 def __init__(self, *args, callback=None, error_callback=None, **kw): 23 target = kw.pop('target') 24 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 25 self.callback = callback 26 self.error_callback = error_callback 27 self.method = target 28 self._return = None 29 30 def wrap_target_with_callback(self, *args, **kw): 31 """Wrap the designated target function with a try-except. 32 Captures the output and executes either the callback or error_callback. 33 """ 34 try: 35 result = self.method(*args, **kw) 36 success = True 37 except Exception as e: 38 success = False 39 result = e 40 41 cb = self.callback if success else self.error_callback 42 if cb is not None: 43 cb(result) 44 return result 45 46 def join(self, timeout: Optional[float] = None): 47 """ 48 Join the thread with an optional timeout. 49 """ 50 threading.Thread.join(self, timeout=timeout) 51 return self._return 52 53 def run(self): 54 """Set the return to the result of the target.""" 55 self._return = self._target(*self._args, **self._kwargs) 56 57 58class Worker(threading.Thread): 59 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 60 61 def __init__(self, queue, *args, timeout: int = 3, **kw): 62 self.queue = queue 63 self.timeout = timeout 64 super().__init__(*args, **kw) 65 66 def run(self): 67 while True: 68 try: 69 _ = self.queue.get(timeout=self.timeout) 70 except self.queue.Empty: 71 return None 72 73 self.queue.task_done() 74 75 76class RepeatTimer(Timer): 77 """ 78 Fire the timer's target function in a loop, every `interval` seconds. 79 """ 80 81 def __init__(self, *args, **kwargs): 82 super().__init__(*args, **kwargs) 83 self._is_running = False 84 85 def is_running(self) -> bool: 86 """ 87 Return whether this timer has been started and is running. 88 """ 89 return self._is_running 90 91 def run(self) -> None: 92 """ 93 Fire the target function in a loop. 94 """ 95 self._is_running = True 96 while not self.finished.wait(self.interval): 97 self.function(*self.args, **self.kwargs) 98 self._is_running = False
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(type(threading.Lock())) for information about locks.
125def RLock(*args, **kwargs): 126 """Factory function that returns a new reentrant lock. 127 128 A reentrant lock must be released by the thread that acquired it. Once a 129 thread has acquired a reentrant lock, the same thread may acquire it again 130 without blocking; the thread must release it once for each time it has 131 acquired it. 132 133 """ 134 if _CRLock is None: 135 return _PyRLock(*args, **kwargs) 136 return _CRLock(*args, **kwargs)
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.
578class Event: 579 """Class implementing event objects. 580 581 Events manage a flag that can be set to true with the set() method and reset 582 to false with the clear() method. The wait() method blocks until the flag is 583 true. The flag is initially false. 584 585 """ 586 587 # After Tim Peters' event class (without is_posted()) 588 589 def __init__(self): 590 self._cond = Condition(Lock()) 591 self._flag = False 592 593 def __repr__(self): 594 cls = self.__class__ 595 status = 'set' if self._flag else 'unset' 596 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" 597 598 def _at_fork_reinit(self): 599 # Private method called by Thread._reset_internal_locks() 600 self._cond._at_fork_reinit() 601 602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag 605 606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set() 616 617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all() 627 628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False 637 638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.
602 def is_set(self): 603 """Return true if and only if the internal flag is true.""" 604 return self._flag
Return true if and only if the internal flag is true.
606 def isSet(self): 607 """Return true if and only if the internal flag is true. 608 609 This method is deprecated, use is_set() instead. 610 611 """ 612 import warnings 613 warnings.warn('isSet() is deprecated, use is_set() instead', 614 DeprecationWarning, stacklevel=2) 615 return self.is_set()
Return true if and only if the internal flag is true.
This method is deprecated, use is_set() instead.
617 def set(self): 618 """Set the internal flag to true. 619 620 All threads waiting for it to become true are awakened. Threads 621 that call wait() once the flag is true will not block at all. 622 623 """ 624 with self._cond: 625 self._flag = True 626 self._cond.notify_all()
Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
628 def clear(self): 629 """Reset the internal flag to false. 630 631 Subsequently, threads calling wait() will block until set() is called to 632 set the internal flag to true again. 633 634 """ 635 with self._cond: 636 self._flag = False
Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
638 def wait(self, timeout=None): 639 """Block until the internal flag is true. 640 641 If the internal flag is true on entry, return immediately. Otherwise, 642 block until another thread calls set() to set the flag to true, or until 643 the optional timeout occurs. 644 645 When the timeout argument is present and not None, it should be a 646 floating point number specifying a timeout for the operation in seconds 647 (or fractions thereof). 648 649 This method returns the internal flag on exit, so it will always return 650 True except if a timeout is given and the operation times out. 651 652 """ 653 with self._cond: 654 signaled = self._flag 655 if not signaled: 656 signaled = self._cond.wait(timeout) 657 return signaled
Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.
1408class Timer(Thread): 1409 """Call a function after a specified number of seconds: 1410 1411 t = Timer(30.0, f, args=None, kwargs=None) 1412 t.start() 1413 t.cancel() # stop the timer's action if it's still waiting 1414 1415 """ 1416 1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event() 1424 1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set() 1428 1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting
1417 def __init__(self, interval, function, args=None, kwargs=None): 1418 Thread.__init__(self) 1419 self.interval = interval 1420 self.function = function 1421 self.args = args if args is not None else [] 1422 self.kwargs = kwargs if kwargs is not None else {} 1423 self.finished = Event()
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1425 def cancel(self): 1426 """Stop the timer if it hasn't finished yet.""" 1427 self.finished.set()
Stop the timer if it hasn't finished yet.
1429 def run(self): 1430 self.finished.wait(self.interval) 1431 if not self.finished.is_set(): 1432 self.function(*self.args, **self.kwargs) 1433 self.finished.set()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
get_ident() -> integer
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
20class Thread(threading.Thread): 21 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 22 23 def __init__(self, *args, callback=None, error_callback=None, **kw): 24 target = kw.pop('target') 25 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 26 self.callback = callback 27 self.error_callback = error_callback 28 self.method = target 29 self._return = None 30 31 def wrap_target_with_callback(self, *args, **kw): 32 """Wrap the designated target function with a try-except. 33 Captures the output and executes either the callback or error_callback. 34 """ 35 try: 36 result = self.method(*args, **kw) 37 success = True 38 except Exception as e: 39 success = False 40 result = e 41 42 cb = self.callback if success else self.error_callback 43 if cb is not None: 44 cb(result) 45 return result 46 47 def join(self, timeout: Optional[float] = None): 48 """ 49 Join the thread with an optional timeout. 50 """ 51 threading.Thread.join(self, timeout=timeout) 52 return self._return 53 54 def run(self): 55 """Set the return to the result of the target.""" 56 self._return = self._target(*self._args, **self._kwargs)
Wrapper for threading.Thread with optional callback and error_callback functions.
23 def __init__(self, *args, callback=None, error_callback=None, **kw): 24 target = kw.pop('target') 25 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 26 self.callback = callback 27 self.error_callback = error_callback 28 self.method = target 29 self._return = None
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
31 def wrap_target_with_callback(self, *args, **kw): 32 """Wrap the designated target function with a try-except. 33 Captures the output and executes either the callback or error_callback. 34 """ 35 try: 36 result = self.method(*args, **kw) 37 success = True 38 except Exception as e: 39 success = False 40 result = e 41 42 cb = self.callback if success else self.error_callback 43 if cb is not None: 44 cb(result) 45 return result
Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.
59class Worker(threading.Thread): 60 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 61 62 def __init__(self, queue, *args, timeout: int = 3, **kw): 63 self.queue = queue 64 self.timeout = timeout 65 super().__init__(*args, **kw) 66 67 def run(self): 68 while True: 69 try: 70 _ = self.queue.get(timeout=self.timeout) 71 except self.queue.Empty: 72 return None 73 74 self.queue.task_done()
Wrapper for threading.Thread
for working with queue.Queue
objects.
62 def __init__(self, queue, *args, timeout: int = 3, **kw): 63 self.queue = queue 64 self.timeout = timeout 65 super().__init__(*args, **kw)
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
67 def run(self): 68 while True: 69 try: 70 _ = self.queue.get(timeout=self.timeout) 71 except self.queue.Empty: 72 return None 73 74 self.queue.task_done()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
77class RepeatTimer(Timer): 78 """ 79 Fire the timer's target function in a loop, every `interval` seconds. 80 """ 81 82 def __init__(self, *args, **kwargs): 83 super().__init__(*args, **kwargs) 84 self._is_running = False 85 86 def is_running(self) -> bool: 87 """ 88 Return whether this timer has been started and is running. 89 """ 90 return self._is_running 91 92 def run(self) -> None: 93 """ 94 Fire the target function in a loop. 95 """ 96 self._is_running = True 97 while not self.finished.wait(self.interval): 98 self.function(*self.args, **self.kwargs) 99 self._is_running = False
Fire the timer's target function in a loop, every interval
seconds.
82 def __init__(self, *args, **kwargs): 83 super().__init__(*args, **kwargs) 84 self._is_running = False
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.