meerschaum.utils.threading
Define a custom Thread class with a callback method.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Define a custom Thread class with a callback method. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Optional 11 12import threading 13import traceback 14Lock = threading.Lock 15RLock = threading.RLock 16Event = threading.Event 17Timer = threading.Timer 18get_ident = threading.get_ident 19 20class Thread(threading.Thread): 21 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 22 23 def __init__(self, *args, callback=None, error_callback=None, **kw): 24 target = kw.pop('target') 25 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 26 self.callback = callback 27 self.error_callback = error_callback 28 self.method = target 29 self._return = None 30 31 def wrap_target_with_callback(self, *args, **kw): 32 """Wrap the designated target function with a try-except. 33 Captures the output and executes either the callback or error_callback. 34 """ 35 try: 36 result = self.method(*args, **kw) 37 success = True 38 except Exception as e: 39 success = False 40 result = e 41 42 cb = self.callback if success else self.error_callback 43 if cb is not None: 44 cb(result) 45 return result 46 47 def join(self, timeout: Optional[float] = None): 48 """ 49 Join the thread with an optional timeout. 50 """ 51 threading.Thread.join(self, timeout=timeout) 52 return self._return 53 54 def run(self): 55 """Set the return to the result of the target.""" 56 self._return = self._target(*self._args, **self._kwargs) 57 58 59class Worker(threading.Thread): 60 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 61 62 def __init__(self, queue, *args, timeout: int = 3, **kw): 63 self.queue = queue 64 self.timeout = timeout 65 super().__init__(*args, **kw) 66 67 def run(self): 68 while True: 69 try: 70 item = self.queue.get(timeout=self.timeout) 71 except queue.Empty: 72 return None 73 74 self.queue.task_done() 75 76 77class RepeatTimer(Timer): 78 """ 79 Fire the timer's target function in a loop, every `interval` seconds. 80 """ 81 82 def run(self) -> None: 83 """ 84 Fire the target function in a loop. 85 """ 86 while not self.finished.wait(self.interval): 87 self.function(*self.args, **self.kwargs)
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(type(threading.Lock())) for information about locks.
91def RLock(*args, **kwargs): 92 """Factory function that returns a new reentrant lock. 93 94 A reentrant lock must be released by the thread that acquired it. Once a 95 thread has acquired a reentrant lock, the same thread may acquire it again 96 without blocking; the thread must release it once for each time it has 97 acquired it. 98 99 """ 100 if _CRLock is None: 101 return _PyRLock(*args, **kwargs) 102 return _CRLock(*args, **kwargs)
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.
535class Event: 536 """Class implementing event objects. 537 538 Events manage a flag that can be set to true with the set() method and reset 539 to false with the clear() method. The wait() method blocks until the flag is 540 true. The flag is initially false. 541 542 """ 543 544 # After Tim Peters' event class (without is_posted()) 545 546 def __init__(self): 547 self._cond = Condition(Lock()) 548 self._flag = False 549 550 def _at_fork_reinit(self): 551 # Private method called by Thread._reset_internal_locks() 552 self._cond._at_fork_reinit() 553 554 def is_set(self): 555 """Return true if and only if the internal flag is true.""" 556 return self._flag 557 558 def isSet(self): 559 """Return true if and only if the internal flag is true. 560 561 This method is deprecated, use is_set() instead. 562 563 """ 564 import warnings 565 warnings.warn('isSet() is deprecated, use is_set() instead', 566 DeprecationWarning, stacklevel=2) 567 return self.is_set() 568 569 def set(self): 570 """Set the internal flag to true. 571 572 All threads waiting for it to become true are awakened. Threads 573 that call wait() once the flag is true will not block at all. 574 575 """ 576 with self._cond: 577 self._flag = True 578 self._cond.notify_all() 579 580 def clear(self): 581 """Reset the internal flag to false. 582 583 Subsequently, threads calling wait() will block until set() is called to 584 set the internal flag to true again. 585 586 """ 587 with self._cond: 588 self._flag = False 589 590 def wait(self, timeout=None): 591 """Block until the internal flag is true. 592 593 If the internal flag is true on entry, return immediately. Otherwise, 594 block until another thread calls set() to set the flag to true, or until 595 the optional timeout occurs. 596 597 When the timeout argument is present and not None, it should be a 598 floating point number specifying a timeout for the operation in seconds 599 (or fractions thereof). 600 601 This method returns the internal flag on exit, so it will always return 602 True except if a timeout is given and the operation times out. 603 604 """ 605 with self._cond: 606 signaled = self._flag 607 if not signaled: 608 signaled = self._cond.wait(timeout) 609 return signaled
Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true. The flag is initially false.
554 def is_set(self): 555 """Return true if and only if the internal flag is true.""" 556 return self._flag
Return true if and only if the internal flag is true.
558 def isSet(self): 559 """Return true if and only if the internal flag is true. 560 561 This method is deprecated, use is_set() instead. 562 563 """ 564 import warnings 565 warnings.warn('isSet() is deprecated, use is_set() instead', 566 DeprecationWarning, stacklevel=2) 567 return self.is_set()
Return true if and only if the internal flag is true.
This method is deprecated, use is_set() instead.
569 def set(self): 570 """Set the internal flag to true. 571 572 All threads waiting for it to become true are awakened. Threads 573 that call wait() once the flag is true will not block at all. 574 575 """ 576 with self._cond: 577 self._flag = True 578 self._cond.notify_all()
Set the internal flag to true.
All threads waiting for it to become true are awakened. Threads that call wait() once the flag is true will not block at all.
580 def clear(self): 581 """Reset the internal flag to false. 582 583 Subsequently, threads calling wait() will block until set() is called to 584 set the internal flag to true again. 585 586 """ 587 with self._cond: 588 self._flag = False
Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.
590 def wait(self, timeout=None): 591 """Block until the internal flag is true. 592 593 If the internal flag is true on entry, return immediately. Otherwise, 594 block until another thread calls set() to set the flag to true, or until 595 the optional timeout occurs. 596 597 When the timeout argument is present and not None, it should be a 598 floating point number specifying a timeout for the operation in seconds 599 (or fractions thereof). 600 601 This method returns the internal flag on exit, so it will always return 602 True except if a timeout is given and the operation times out. 603 604 """ 605 with self._cond: 606 signaled = self._flag 607 if not signaled: 608 signaled = self._cond.wait(timeout) 609 return signaled
Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.
1355class Timer(Thread): 1356 """Call a function after a specified number of seconds: 1357 1358 t = Timer(30.0, f, args=None, kwargs=None) 1359 t.start() 1360 t.cancel() # stop the timer's action if it's still waiting 1361 1362 """ 1363 1364 def __init__(self, interval, function, args=None, kwargs=None): 1365 Thread.__init__(self) 1366 self.interval = interval 1367 self.function = function 1368 self.args = args if args is not None else [] 1369 self.kwargs = kwargs if kwargs is not None else {} 1370 self.finished = Event() 1371 1372 def cancel(self): 1373 """Stop the timer if it hasn't finished yet.""" 1374 self.finished.set() 1375 1376 def run(self): 1377 self.finished.wait(self.interval) 1378 if not self.finished.is_set(): 1379 self.function(*self.args, **self.kwargs) 1380 self.finished.set()
Call a function after a specified number of seconds:
t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting
1364 def __init__(self, interval, function, args=None, kwargs=None): 1365 Thread.__init__(self) 1366 self.interval = interval 1367 self.function = function 1368 self.args = args if args is not None else [] 1369 self.kwargs = kwargs if kwargs is not None else {} 1370 self.finished = Event()
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1372 def cancel(self): 1373 """Stop the timer if it hasn't finished yet.""" 1374 self.finished.set()
Stop the timer if it hasn't finished yet.
1376 def run(self): 1377 self.finished.wait(self.interval) 1378 if not self.finished.is_set(): 1379 self.function(*self.args, **self.kwargs) 1380 self.finished.set()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
get_ident() -> integer
Return a non-zero integer that uniquely identifies the current thread amongst other threads that exist simultaneously. This may be used to identify per-thread resources. Even though on some platforms threads identities may appear to be allocated consecutive numbers starting at 1, this behavior should not be relied upon, and the number should be seen purely as a magic cookie. A thread's identity may be reused for another thread after it exits.
21class Thread(threading.Thread): 22 """Wrapper for threading.Thread with optional callback and error_callback functions.""" 23 24 def __init__(self, *args, callback=None, error_callback=None, **kw): 25 target = kw.pop('target') 26 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 27 self.callback = callback 28 self.error_callback = error_callback 29 self.method = target 30 self._return = None 31 32 def wrap_target_with_callback(self, *args, **kw): 33 """Wrap the designated target function with a try-except. 34 Captures the output and executes either the callback or error_callback. 35 """ 36 try: 37 result = self.method(*args, **kw) 38 success = True 39 except Exception as e: 40 success = False 41 result = e 42 43 cb = self.callback if success else self.error_callback 44 if cb is not None: 45 cb(result) 46 return result 47 48 def join(self, timeout: Optional[float] = None): 49 """ 50 Join the thread with an optional timeout. 51 """ 52 threading.Thread.join(self, timeout=timeout) 53 return self._return 54 55 def run(self): 56 """Set the return to the result of the target.""" 57 self._return = self._target(*self._args, **self._kwargs)
Wrapper for threading.Thread with optional callback and error_callback functions.
24 def __init__(self, *args, callback=None, error_callback=None, **kw): 25 target = kw.pop('target') 26 super().__init__(target=self.wrap_target_with_callback, *args, **kw) 27 self.callback = callback 28 self.error_callback = error_callback 29 self.method = target 30 self._return = None
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
32 def wrap_target_with_callback(self, *args, **kw): 33 """Wrap the designated target function with a try-except. 34 Captures the output and executes either the callback or error_callback. 35 """ 36 try: 37 result = self.method(*args, **kw) 38 success = True 39 except Exception as e: 40 success = False 41 result = e 42 43 cb = self.callback if success else self.error_callback 44 if cb is not None: 45 cb(result) 46 return result
Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.
48 def join(self, timeout: Optional[float] = None): 49 """ 50 Join the thread with an optional timeout. 51 """ 52 threading.Thread.join(self, timeout=timeout) 53 return self._return
Join the thread with an optional timeout.
55 def run(self): 56 """Set the return to the result of the target.""" 57 self._return = self._target(*self._args, **self._kwargs)
Set the return to the result of the target.
Inherited Members
- threading.Thread
- start
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
60class Worker(threading.Thread): 61 """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" 62 63 def __init__(self, queue, *args, timeout: int = 3, **kw): 64 self.queue = queue 65 self.timeout = timeout 66 super().__init__(*args, **kw) 67 68 def run(self): 69 while True: 70 try: 71 item = self.queue.get(timeout=self.timeout) 72 except queue.Empty: 73 return None 74 75 self.queue.task_done()
Wrapper for threading.Thread
for working with queue.Queue
objects.
63 def __init__(self, queue, *args, timeout: int = 3, **kw): 64 self.queue = queue 65 self.timeout = timeout 66 super().__init__(*args, **kw)
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
68 def run(self): 69 while True: 70 try: 71 item = self.queue.get(timeout=self.timeout) 72 except queue.Empty: 73 return None 74 75 self.queue.task_done()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
78class RepeatTimer(Timer): 79 """ 80 Fire the timer's target function in a loop, every `interval` seconds. 81 """ 82 83 def run(self) -> None: 84 """ 85 Fire the target function in a loop. 86 """ 87 while not self.finished.wait(self.interval): 88 self.function(*self.args, **self.kwargs)
Fire the timer's target function in a loop, every interval
seconds.