meerschaum.utils.pool
Global pools that are joined on exit.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Global pools that are joined on exit. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Optional, Callable, List, Any 11from meerschaum.utils.threading import Lock, RLock 12import signal 13 14pools = {} 15_locks = { 16 'pools': Lock(), 17} 18 19def _initializer(): 20 """Ignore keyboard interrupt in workers.""" 21 signal.signal(signal.SIGINT, signal.SIG_IGN) 22 23def get_pool( 24 pool_class_name: str = 'ThreadPool', 25 workers: Optional[int] = None, 26 initializer: Optional[Callable[[None], None]] = None, 27 initargs: Optional[List[Any]] = None, 28 ): 29 """If the requested pool does not exist, instantiate it here. 30 Pools are joined and closed on exit.""" 31 from multiprocessing import cpu_count 32 if workers is None: 33 workers = cpu_count() 34 pool_key = pool_class_name + f'-{workers}' 35 36 def build_pool(workers): 37 from meerschaum.utils.warnings import warn 38 from meerschaum.utils.packages import attempt_import 39 import importlib 40 try: 41 Pool = getattr( 42 importlib.import_module('multiprocessing.pool'), 43 pool_class_name 44 ) 45 except Exception as e: 46 warn(e, stacklevel=3) 47 Pool = getattr( 48 importlib.import_module('multiprocessing.pool'), 49 'ThreadPool' 50 ) 51 52 try: 53 pool = Pool(workers, initializer=initializer, initargs=initargs) 54 except Exception as e: 55 print(e) 56 pool = None 57 58 with _locks['pools']: 59 pools[pool_key] = pool 60 61 if pools.get(pool_key, None) is None: 62 build_pool(workers) 63 64 if ( 65 pools[pool_key] is not None 66 and pools[pool_key]._state not in ('RUN', 0) 67 ): 68 try: 69 pools[pool_key].close() 70 pools[pool_key].terminate() 71 except Exception as e: 72 pass 73 del pools[pool_key] 74 build_pool(workers) 75 76 return pools[pool_key] 77 78 79def get_pools(): 80 """Return the global pools dictionary.""" 81 global pools 82 if pools is None: 83 with _locks['pools']: 84 pools = {} 85 return pools 86 87 88def get_pool_executor(workers: Optional[int] = None): 89 """ Return a new `ThreadPoolExecutor`. """ 90 try: 91 from multiprocessing import cpu_count 92 from concurrent.futures import ThreadPoolExecutor 93 workers = cpu_count() if workers is None else workers 94 except Exception as e: 95 return None 96 97 return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None
pools =
{}
def
get_pool( pool_class_name: str = 'ThreadPool', workers: Optional[int] = None, initializer: Optional[Callable[[NoneType], NoneType]] = None, initargs: Optional[List[Any]] = None):
24def get_pool( 25 pool_class_name: str = 'ThreadPool', 26 workers: Optional[int] = None, 27 initializer: Optional[Callable[[None], None]] = None, 28 initargs: Optional[List[Any]] = None, 29 ): 30 """If the requested pool does not exist, instantiate it here. 31 Pools are joined and closed on exit.""" 32 from multiprocessing import cpu_count 33 if workers is None: 34 workers = cpu_count() 35 pool_key = pool_class_name + f'-{workers}' 36 37 def build_pool(workers): 38 from meerschaum.utils.warnings import warn 39 from meerschaum.utils.packages import attempt_import 40 import importlib 41 try: 42 Pool = getattr( 43 importlib.import_module('multiprocessing.pool'), 44 pool_class_name 45 ) 46 except Exception as e: 47 warn(e, stacklevel=3) 48 Pool = getattr( 49 importlib.import_module('multiprocessing.pool'), 50 'ThreadPool' 51 ) 52 53 try: 54 pool = Pool(workers, initializer=initializer, initargs=initargs) 55 except Exception as e: 56 print(e) 57 pool = None 58 59 with _locks['pools']: 60 pools[pool_key] = pool 61 62 if pools.get(pool_key, None) is None: 63 build_pool(workers) 64 65 if ( 66 pools[pool_key] is not None 67 and pools[pool_key]._state not in ('RUN', 0) 68 ): 69 try: 70 pools[pool_key].close() 71 pools[pool_key].terminate() 72 except Exception as e: 73 pass 74 del pools[pool_key] 75 build_pool(workers) 76 77 return pools[pool_key]
If the requested pool does not exist, instantiate it here. Pools are joined and closed on exit.
def
get_pools():
80def get_pools(): 81 """Return the global pools dictionary.""" 82 global pools 83 if pools is None: 84 with _locks['pools']: 85 pools = {} 86 return pools
Return the global pools dictionary.
def
get_pool_executor(workers: Optional[int] = None):
89def get_pool_executor(workers: Optional[int] = None): 90 """ Return a new `ThreadPoolExecutor`. """ 91 try: 92 from multiprocessing import cpu_count 93 from concurrent.futures import ThreadPoolExecutor 94 workers = cpu_count() if workers is None else workers 95 except Exception as e: 96 return None 97 98 return ThreadPoolExecutor(max_workers=workers) if ThreadPoolExecutor is not None else None
Return a new ThreadPoolExecutor
.