Module meerschaum.utils.threading
Define a custom Thread class with a callback method.
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Define a custom Thread class with a callback method.
"""
from __future__ import annotations
from meerschaum.utils.typing import Optional
import threading
Lock = threading.Lock
RLock = threading.RLock
Event = threading.Event
Timer = threading.Timer
get_ident = threading.get_ident
class Thread(threading.Thread):
"""Wrapper for threading.Thread with optional callback and error_callback functions."""
def __init__(self, *args, callback=None, error_callback=None, **kw):
target = kw.pop('target')
super().__init__(target=self.wrap_target_with_callback, *args, **kw)
self.callback = callback
self.error_callback = error_callback
self.method = target
self._return = None
def wrap_target_with_callback(self, *args, **kw):
"""Wrap the designated target function with a try-except.
Captures the output and executes either the callback or error_callback.
"""
try:
result = self.method(*args, **kw)
success = True
except Exception as e:
success = False
result = e
cb = self.callback if success else self.error_callback
if cb is not None:
cb(result)
return result
def join(self, timeout: Optional[float] = None):
"""
Join the thread with an optional timeout.
"""
threading.Thread.join(self, timeout=timeout)
return self._return
def run(self):
"""Set the return to the result of the target."""
self._return = self._target(*self._args, **self._kwargs)
class Worker(threading.Thread):
"""Wrapper for `threading.Thread` for working with `queue.Queue` objects."""
def __init__(self, queue, *args, timeout: int = 3, **kw):
self.queue = queue
self.timeout = timeout
super().__init__(*args, **kw)
def run(self):
while True:
try:
item = self.queue.get(timeout=self.timeout)
except queue.Empty:
return None
self.queue.task_done()
Classes
class Thread (*args, callback=None, error_callback=None, **kw)
-
Wrapper for threading.Thread with optional callback and error_callback functions.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Expand source code
class Thread(threading.Thread): """Wrapper for threading.Thread with optional callback and error_callback functions.""" def __init__(self, *args, callback=None, error_callback=None, **kw): target = kw.pop('target') super().__init__(target=self.wrap_target_with_callback, *args, **kw) self.callback = callback self.error_callback = error_callback self.method = target self._return = None def wrap_target_with_callback(self, *args, **kw): """Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback. """ try: result = self.method(*args, **kw) success = True except Exception as e: success = False result = e cb = self.callback if success else self.error_callback if cb is not None: cb(result) return result def join(self, timeout: Optional[float] = None): """ Join the thread with an optional timeout. """ threading.Thread.join(self, timeout=timeout) return self._return def run(self): """Set the return to the result of the target.""" self._return = self._target(*self._args, **self._kwargs)
Ancestors
- threading.Thread
Methods
def join(self, timeout: Optional[float] = None)
-
Join the thread with an optional timeout.
Expand source code
def join(self, timeout: Optional[float] = None): """ Join the thread with an optional timeout. """ threading.Thread.join(self, timeout=timeout) return self._return
def run(self)
-
Set the return to the result of the target.
Expand source code
def run(self): """Set the return to the result of the target.""" self._return = self._target(*self._args, **self._kwargs)
def wrap_target_with_callback(self, *args, **kw)
-
Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.
Expand source code
def wrap_target_with_callback(self, *args, **kw): """Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback. """ try: result = self.method(*args, **kw) success = True except Exception as e: success = False result = e cb = self.callback if success else self.error_callback if cb is not None: cb(result) return result
class Worker (queue, *args, timeout: int = 3, **kw)
-
Wrapper for
threading.Thread
for working withqueue.Queue
objects.This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Expand source code
class Worker(threading.Thread): """Wrapper for `threading.Thread` for working with `queue.Queue` objects.""" def __init__(self, queue, *args, timeout: int = 3, **kw): self.queue = queue self.timeout = timeout super().__init__(*args, **kw) def run(self): while True: try: item = self.queue.get(timeout=self.timeout) except queue.Empty: return None self.queue.task_done()
Ancestors
- threading.Thread
Methods
def run(self)
-
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Expand source code
def run(self): while True: try: item = self.queue.get(timeout=self.timeout) except queue.Empty: return None self.queue.task_done()