Module meerschaum.utils.pool

Global pools that are joined on exit.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Global pools that are joined on exit.
"""

from __future__ import annotations
from meerschaum.utils.typing import Optional, Callable, List, Any
from meerschaum.utils.threading import Lock, RLock
import signal

pools = None
_locks = {
    'pools': Lock(),
}

def _initializer():
    """Ignore keyboard interrupt in workers."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def get_pool(
        pool_class_name: str = 'ThreadPool',
        workers: Optional[int] = None,
        initializer: Optional[Callable[[None], None]] = None,
        initargs: Optional[List[Any]] = None,
    ):
    """If the requested pool does not exist, instantiate it here.
    Pools are joined and closed on exit."""
    global pools
    with _locks['pools']:
        if pools is None:
            pools = {}

    def build_pool(workers):
        from meerschaum.utils.warnings import warn
        from meerschaum.utils.packages import attempt_import
        import importlib
        try:
            Pool = getattr(
                importlib.import_module('multiprocessing.pool'),
                pool_class_name
            )
        except Exception as e:
            warn(e, stacklevel=3)
            Pool = getattr(
                importlib.import_module('multiprocessing.pool'),
                'ThreadPool'
            )

        if workers is None:
            from multiprocessing import cpu_count
            workers = cpu_count()
        try:
            pool = Pool(workers, initializer=initializer, initargs=initargs)
        except Exception as e:
            print(e)
            pool = None

        with _locks['pools']:
            pools[pool_class_name] = pool       

    if pool_class_name not in pools or pools.get(pool_class_name, None) is None:
        build_pool(workers)

    if (
        pools[pool_class_name] is not None
        and pools[pool_class_name]._state not in ('RUN', 0)
    ):
        try:
            pools[pool_class_name].close()
        except Exception as e:
            pass
        del pools[pool_class_name]
        build_pool(workers)

    return pools[pool_class_name]


def get_pools():
    """Return the global pools dictionary."""
    global pools
    if pools is None:
        with _locks['pools']:
            pools = {}
    return pools


def get_pool_executor(workers: Optional[int] = None):
    """ Return a new `ThreadPoolExecutor`. """
    try:
        from multiprocessing import cpu_count
        from concurrent.futures import ThreadPoolExecutor
        workers = cpu_count() if workers is None else workers
    except Exception as e:
        ThreadPoolExecutor = None

    return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None

Functions

def get_pool(pool_class_name: str = 'ThreadPool', workers: Optional[int] = None, initializer: Optional[Callable[[None], None]] = None, initargs: Optional[List[Any]] = None)

If the requested pool does not exist, instantiate it here. Pools are joined and closed on exit.

Expand source code
def get_pool(
        pool_class_name: str = 'ThreadPool',
        workers: Optional[int] = None,
        initializer: Optional[Callable[[None], None]] = None,
        initargs: Optional[List[Any]] = None,
    ):
    """If the requested pool does not exist, instantiate it here.
    Pools are joined and closed on exit."""
    global pools
    with _locks['pools']:
        if pools is None:
            pools = {}

    def build_pool(workers):
        from meerschaum.utils.warnings import warn
        from meerschaum.utils.packages import attempt_import
        import importlib
        try:
            Pool = getattr(
                importlib.import_module('multiprocessing.pool'),
                pool_class_name
            )
        except Exception as e:
            warn(e, stacklevel=3)
            Pool = getattr(
                importlib.import_module('multiprocessing.pool'),
                'ThreadPool'
            )

        if workers is None:
            from multiprocessing import cpu_count
            workers = cpu_count()
        try:
            pool = Pool(workers, initializer=initializer, initargs=initargs)
        except Exception as e:
            print(e)
            pool = None

        with _locks['pools']:
            pools[pool_class_name] = pool       

    if pool_class_name not in pools or pools.get(pool_class_name, None) is None:
        build_pool(workers)

    if (
        pools[pool_class_name] is not None
        and pools[pool_class_name]._state not in ('RUN', 0)
    ):
        try:
            pools[pool_class_name].close()
        except Exception as e:
            pass
        del pools[pool_class_name]
        build_pool(workers)

    return pools[pool_class_name]
def get_pool_executor(workers: Optional[int] = None)

Return a new ThreadPoolExecutor.

Expand source code
def get_pool_executor(workers: Optional[int] = None):
    """ Return a new `ThreadPoolExecutor`. """
    try:
        from multiprocessing import cpu_count
        from concurrent.futures import ThreadPoolExecutor
        workers = cpu_count() if workers is None else workers
    except Exception as e:
        ThreadPoolExecutor = None

    return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None
def get_pools()

Return the global pools dictionary.

Expand source code
def get_pools():
    """Return the global pools dictionary."""
    global pools
    if pools is None:
        with _locks['pools']:
            pools = {}
    return pools