Module meerschaum.utils.threading

Define a custom Thread class with a callback method.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Define a custom Thread class with a callback method.
"""

from __future__ import annotations
from meerschaum.utils.typing import Optional

import threading
Lock = threading.Lock
RLock = threading.RLock
Event = threading.Event
Timer = threading.Timer
get_ident = threading.get_ident

class Thread(threading.Thread):
    """Wrapper for threading.Thread with optional callback and error_callback functions."""

    def __init__(self, *args, callback=None, error_callback=None, **kw):
        target = kw.pop('target')
        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
        self.callback = callback
        self.error_callback = error_callback
        self.method = target
        self._return = None

    def wrap_target_with_callback(self, *args, **kw):
        """Wrap the designated target function with a try-except.
        Captures the output and executes either the callback or error_callback.
        """
        try:
            result = self.method(*args, **kw)
            success = True
        except Exception as e:
            success = False
            result = e

        cb = self.callback if success else self.error_callback
        if cb is not None:
            cb(result)
        return result

    def join(self, timeout: Optional[float] = None):
        """
        Join the thread with an optional timeout.
        """
        threading.Thread.join(self, timeout=timeout)
        return self._return

    def run(self):
        """Set the return to the result of the target."""
        self._return = self._target(*self._args, **self._kwargs)


class Worker(threading.Thread):
    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""

    def __init__(self, queue, *args, timeout: int = 3, **kw):
        self.queue = queue
        self.timeout = timeout
        super().__init__(*args, **kw)

    def run(self):
        while True:
            try:
                item = self.queue.get(timeout=self.timeout)
            except queue.Empty:
                return None

            self.queue.task_done()

Classes

class Thread (*args, callback=None, error_callback=None, **kw)

Wrapper for threading.Thread with optional callback and error_callback functions.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class Thread(threading.Thread):
    """Wrapper for threading.Thread with optional callback and error_callback functions."""

    def __init__(self, *args, callback=None, error_callback=None, **kw):
        target = kw.pop('target')
        super().__init__(target=self.wrap_target_with_callback, *args, **kw)
        self.callback = callback
        self.error_callback = error_callback
        self.method = target
        self._return = None

    def wrap_target_with_callback(self, *args, **kw):
        """Wrap the designated target function with a try-except.
        Captures the output and executes either the callback or error_callback.
        """
        try:
            result = self.method(*args, **kw)
            success = True
        except Exception as e:
            success = False
            result = e

        cb = self.callback if success else self.error_callback
        if cb is not None:
            cb(result)
        return result

    def join(self, timeout: Optional[float] = None):
        """
        Join the thread with an optional timeout.
        """
        threading.Thread.join(self, timeout=timeout)
        return self._return

    def run(self):
        """Set the return to the result of the target."""
        self._return = self._target(*self._args, **self._kwargs)

Ancestors

  • threading.Thread

Methods

def join(self, timeout: Optional[float] = None)

Join the thread with an optional timeout.

Expand source code
def join(self, timeout: Optional[float] = None):
    """
    Join the thread with an optional timeout.
    """
    threading.Thread.join(self, timeout=timeout)
    return self._return
def run(self)

Set the return to the result of the target.

Expand source code
def run(self):
    """Set the return to the result of the target."""
    self._return = self._target(*self._args, **self._kwargs)
def wrap_target_with_callback(self, *args, **kw)

Wrap the designated target function with a try-except. Captures the output and executes either the callback or error_callback.

Expand source code
def wrap_target_with_callback(self, *args, **kw):
    """Wrap the designated target function with a try-except.
    Captures the output and executes either the callback or error_callback.
    """
    try:
        result = self.method(*args, **kw)
        success = True
    except Exception as e:
        success = False
        result = e

    cb = self.callback if success else self.error_callback
    if cb is not None:
        cb(result)
    return result
class Worker (queue, *args, timeout: int = 3, **kw)

Wrapper for threading.Thread for working with queue.Queue objects.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class Worker(threading.Thread):
    """Wrapper for `threading.Thread` for working with `queue.Queue` objects."""

    def __init__(self, queue, *args, timeout: int = 3, **kw):
        self.queue = queue
        self.timeout = timeout
        super().__init__(*args, **kw)

    def run(self):
        while True:
            try:
                item = self.queue.get(timeout=self.timeout)
            except queue.Empty:
                return None

            self.queue.task_done()

Ancestors

  • threading.Thread

Methods

def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Expand source code
def run(self):
    while True:
        try:
            item = self.queue.get(timeout=self.timeout)
        except queue.Empty:
            return None

        self.queue.task_done()