meerschaum.utils.pool

Global pools that are joined on exit.

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Global pools that are joined on exit.
 7"""
 8
 9from __future__ import annotations
10from meerschaum.utils.typing import Optional, Callable, List, Any
11from meerschaum.utils.threading import Lock, RLock
12import signal
13
14pools = {}
15_locks = {
16    'pools': Lock(),
17}
18
19def _initializer():
20    """Ignore keyboard interrupt in workers."""
21    signal.signal(signal.SIGINT, signal.SIG_IGN)
22
23def get_pool(
24        pool_class_name: str = 'ThreadPool',
25        workers: Optional[int] = None,
26        initializer: Optional[Callable[[None], None]] = None,
27        initargs: Optional[List[Any]] = None,
28    ):
29    """If the requested pool does not exist, instantiate it here.
30    Pools are joined and closed on exit."""
31    from multiprocessing import cpu_count
32    if workers is None:
33        workers = cpu_count()
34    pool_key = pool_class_name + f'-{workers}'
35
36    def build_pool(workers):
37        from meerschaum.utils.warnings import warn
38        from meerschaum.utils.packages import attempt_import
39        import importlib
40        try:
41            Pool = getattr(
42                importlib.import_module('multiprocessing.pool'),
43                pool_class_name
44            )
45        except Exception as e:
46            warn(e, stacklevel=3)
47            Pool = getattr(
48                importlib.import_module('multiprocessing.pool'),
49                'ThreadPool'
50            )
51
52        try:
53            pool = Pool(workers, initializer=initializer, initargs=initargs)
54        except Exception as e:
55            print(e)
56            pool = None
57
58        with _locks['pools']:
59            pools[pool_key] = pool
60
61    if pools.get(pool_key, None) is None:
62        build_pool(workers)
63
64    if (
65        pools[pool_key] is not None
66        and pools[pool_key]._state not in ('RUN', 0)
67    ):
68        try:
69            pools[pool_key].close()
70            pools[pool_key].terminate()
71        except Exception as e:
72            pass
73        del pools[pool_key]
74        build_pool(workers)
75
76    return pools[pool_key]
77
78
79def get_pools():
80    """Return the global pools dictionary."""
81    global pools
82    if pools is None:
83        with _locks['pools']:
84            pools = {}
85    return pools
86
87
88def get_pool_executor(workers: Optional[int] = None):
89    """ Return a new `ThreadPoolExecutor`. """
90    try:
91        from multiprocessing import cpu_count
92        from concurrent.futures import ThreadPoolExecutor
93        workers = cpu_count() if workers is None else workers
94    except Exception as e:
95        return None
96
97    return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None
pools = {}
def get_pool( pool_class_name: str = 'ThreadPool', workers: Optional[int] = None, initializer: Optional[Callable[[NoneType], NoneType]] = None, initargs: Optional[List[Any]] = None):
24def get_pool(
25        pool_class_name: str = 'ThreadPool',
26        workers: Optional[int] = None,
27        initializer: Optional[Callable[[None], None]] = None,
28        initargs: Optional[List[Any]] = None,
29    ):
30    """If the requested pool does not exist, instantiate it here.
31    Pools are joined and closed on exit."""
32    from multiprocessing import cpu_count
33    if workers is None:
34        workers = cpu_count()
35    pool_key = pool_class_name + f'-{workers}'
36
37    def build_pool(workers):
38        from meerschaum.utils.warnings import warn
39        from meerschaum.utils.packages import attempt_import
40        import importlib
41        try:
42            Pool = getattr(
43                importlib.import_module('multiprocessing.pool'),
44                pool_class_name
45            )
46        except Exception as e:
47            warn(e, stacklevel=3)
48            Pool = getattr(
49                importlib.import_module('multiprocessing.pool'),
50                'ThreadPool'
51            )
52
53        try:
54            pool = Pool(workers, initializer=initializer, initargs=initargs)
55        except Exception as e:
56            print(e)
57            pool = None
58
59        with _locks['pools']:
60            pools[pool_key] = pool
61
62    if pools.get(pool_key, None) is None:
63        build_pool(workers)
64
65    if (
66        pools[pool_key] is not None
67        and pools[pool_key]._state not in ('RUN', 0)
68    ):
69        try:
70            pools[pool_key].close()
71            pools[pool_key].terminate()
72        except Exception as e:
73            pass
74        del pools[pool_key]
75        build_pool(workers)
76
77    return pools[pool_key]

If the requested pool does not exist, instantiate it here. Pools are joined and closed on exit.

def get_pools():
80def get_pools():
81    """Return the global pools dictionary."""
82    global pools
83    if pools is None:
84        with _locks['pools']:
85            pools = {}
86    return pools

Return the global pools dictionary.

def get_pool_executor(workers: Optional[int] = None):
89def get_pool_executor(workers: Optional[int] = None):
90    """ Return a new `ThreadPoolExecutor`. """
91    try:
92        from multiprocessing import cpu_count
93        from concurrent.futures import ThreadPoolExecutor
94        workers = cpu_count() if workers is None else workers
95    except Exception as e:
96        return None
97
98    return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None

Return a new ThreadPoolExecutor.