meerschaum.connectors.poll
Poll database and API connections.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Poll database and API connections. 7""" 8 9from meerschaum.utils.typing import InstanceConnector, Union, Optional, Dict, Any 10 11def retry_connect( 12 connector: Union[InstanceConnector, None] = None, 13 max_retries: int = 50, 14 retry_wait: int = 3, 15 workers: int = 1, 16 warn: bool = True, 17 print_on_connect: bool = False, 18 enforce_chaining: bool = True, 19 enforce_login: bool = True, 20 debug: bool = False, 21) -> bool: 22 """ 23 Keep trying to connect to the database. 24 25 Parameters 26 ---------- 27 connector: Union[InstanceConnector, None], default None 28 The connector to the instance. 29 30 max_retries: int, default 40 31 How many time to try connecting. 32 33 retry_wait: int, default 3 34 The number of seconds between retries. 35 36 workers: int, default 1 37 How many worker thread connections to make. 38 39 warn: bool, default True 40 If `True`, print a warning in case the connection fails. 41 42 print_on_connect: bool, default False 43 If `True`, print a message when a successful connection is established. 44 45 enforce_chaining: bool, default True 46 If `False`, ignore the configured chaining option. 47 48 enforce_login: bool, default True 49 If `False`, ignore an invalid login. 50 51 debug: bool, default False 52 Verbosity toggle. 53 54 Returns 55 ------- 56 Whether a connection could be made. 57 58 """ 59 import json 60 from meerschaum.utils.venv import venv_exec 61 from meerschaum.utils.packages import attempt_import 62 63 kw = { 64 'connector_meta': connector.meta, 65 'max_retries': max_retries, 66 'retry_wait': retry_wait, 67 'workers': workers, 68 'warn': warn, 69 'print_on_connect': print_on_connect, 70 'enforce_chaining': enforce_chaining, 71 'enforce_login': enforce_login, 72 'debug': debug, 73 } 74 75 dill = attempt_import('dill', lazy=False) 76 code = ( 77 "import sys, json\n" 78 + "from meerschaum.utils.typing import Optional, Dict, Any\n\n" 79 + dill.source.getsource(_wrap_retry_connect) + '\n\n' 80 + f"kw = json.loads({json.dumps(json.dumps(kw))})\n" 81 + "success = _wrap_retry_connect(**kw)\n" 82 + "sys.exit((0 if success else 1))" 83 ) 84 return venv_exec(code, venv=None, debug=debug) 85 86 87def _wrap_retry_connect( 88 connector_meta: Dict[str, Any], 89 max_retries: int = 50, 90 retry_wait: int = 3, 91 workers: int = 1, 92 print_on_connect: bool = False, 93 warn: bool = True, 94 enforce_chaining: bool = True, 95 enforce_login: bool = True, 96 debug: bool = False, 97) -> bool: 98 """ 99 Keep trying to connect to the database. 100 101 Parameters 102 ---------- 103 connector_keys: Optional[str], default None 104 The keys of the connector to the instance. 105 106 max_retries: int, default 40 107 How many time to try connecting. 108 109 retry_wait: int, default 3 110 The number of seconds between retries. 111 112 workers: int, default 1 113 How many worker thread connections to make. 114 115 warn: bool, default True 116 If `True`, print a warning in case the connection fails. 117 118 print_on_connect: bool, default False 119 If `True`, print a message when a successful connection is established. 120 121 enforce_chaining: bool, default True 122 If `False`, ignore the configured chaining option. 123 124 enforce_login: bool, default True 125 If `False`, ignore an invalid login. 126 127 debug: bool, default False 128 Verbosity toggle. 129 130 Returns 131 ------- 132 Whether a connection could be made. 133 134 """ 135 from meerschaum.utils.warnings import warn as _warn, error, info 136 from meerschaum.utils.debug import dprint 137 from meerschaum.connectors import instance_types, get_connector 138 from meerschaum.connectors.parse import parse_instance_keys 139 from meerschaum.utils.packages import attempt_import 140 from meerschaum.utils.sql import test_queries 141 from meerschaum.utils.misc import timed_input 142 from functools import partial 143 import platform 144 import time 145 146 connector = get_connector(**connector_meta) 147 148 if not hasattr(connector, 'test_connection'): 149 return True 150 151 retries = 0 152 connected, chaining_status = False, None 153 while retries < max_retries: 154 if debug: 155 dprint(f"Trying to connect to '{connector}'...") 156 dprint(f"Attempt ({retries + 1} / {max_retries})") 157 158 if connector.type not in ('sql', 'api'): 159 try: 160 connected = connector.test_connection() 161 except Exception as e: 162 if warn: 163 print(e) 164 connected = False 165 166 elif connector.type == 'sql': 167 168 def _connect(_connector): 169 ### Test queries like `SELECT 1`. 170 connect_query = test_queries.get(connector.flavor, test_queries['default']) 171 if _connector.exec(connect_query) is None: 172 raise Exception("Failed to connect.") 173 174 try: 175 _connect(connector) 176 connected = True 177 except Exception as e: 178 if warn: 179 print(e) 180 connected = False 181 182 elif connector.type == 'api': 183 ### If the remote instance does not allow chaining, don't even try logging in. 184 if not isinstance(chaining_status, bool): 185 chaining_status = connector.get_chaining_status(debug=debug) 186 if chaining_status is None: 187 connected = None 188 elif chaining_status is False: 189 if enforce_chaining: 190 if warn: 191 _warn( 192 f"Meerschaum instance '{connector}' does not allow chaining " + 193 "and cannot be used as the parent for this instance.", 194 stack=False, 195 ) 196 return False 197 198 ### Allow is the option to ignore chaining status. 199 chaining_status = True 200 201 if chaining_status: 202 connected = ( 203 connector.login(warn=warn, debug=debug)[0] 204 ) if enforce_login else True 205 206 if not connected and warn: 207 _warn(f"Unable to login to '{connector}'!", stack=False) 208 209 if connected: 210 if print_on_connect: 211 info(f"Connected to '{connector}'.") 212 return True 213 214 if warn: 215 _warn( 216 f"Connection to '{connector}' failed.\n " 217 + f"Press [Enter] to retry or wait {retry_wait} seconds.", 218 stack = False 219 ) 220 if workers and workers > 1: 221 info( 222 f"To quit, press CTRL-C, then 'q' + [Enter] for each worker" 223 + f" ({workers})." 224 ) 225 info(f"Failed connection attempt ({retries + 1} / {max_retries})") 226 227 try: 228 if retry_wait > 0: 229 if platform.system() != 'Windows': 230 text = timed_input(retry_wait) 231 if text in ('q', 'quit', 'pass', 'exit', 'stop'): 232 return None 233 else: 234 time.sleep(retry_wait) 235 except KeyboardInterrupt: 236 return None 237 retries += 1 238 239 return False
def
retry_connect( connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, max_retries: int = 50, retry_wait: int = 3, workers: int = 1, warn: bool = True, print_on_connect: bool = False, enforce_chaining: bool = True, enforce_login: bool = True, debug: bool = False) -> bool:
12def retry_connect( 13 connector: Union[InstanceConnector, None] = None, 14 max_retries: int = 50, 15 retry_wait: int = 3, 16 workers: int = 1, 17 warn: bool = True, 18 print_on_connect: bool = False, 19 enforce_chaining: bool = True, 20 enforce_login: bool = True, 21 debug: bool = False, 22) -> bool: 23 """ 24 Keep trying to connect to the database. 25 26 Parameters 27 ---------- 28 connector: Union[InstanceConnector, None], default None 29 The connector to the instance. 30 31 max_retries: int, default 40 32 How many time to try connecting. 33 34 retry_wait: int, default 3 35 The number of seconds between retries. 36 37 workers: int, default 1 38 How many worker thread connections to make. 39 40 warn: bool, default True 41 If `True`, print a warning in case the connection fails. 42 43 print_on_connect: bool, default False 44 If `True`, print a message when a successful connection is established. 45 46 enforce_chaining: bool, default True 47 If `False`, ignore the configured chaining option. 48 49 enforce_login: bool, default True 50 If `False`, ignore an invalid login. 51 52 debug: bool, default False 53 Verbosity toggle. 54 55 Returns 56 ------- 57 Whether a connection could be made. 58 59 """ 60 import json 61 from meerschaum.utils.venv import venv_exec 62 from meerschaum.utils.packages import attempt_import 63 64 kw = { 65 'connector_meta': connector.meta, 66 'max_retries': max_retries, 67 'retry_wait': retry_wait, 68 'workers': workers, 69 'warn': warn, 70 'print_on_connect': print_on_connect, 71 'enforce_chaining': enforce_chaining, 72 'enforce_login': enforce_login, 73 'debug': debug, 74 } 75 76 dill = attempt_import('dill', lazy=False) 77 code = ( 78 "import sys, json\n" 79 + "from meerschaum.utils.typing import Optional, Dict, Any\n\n" 80 + dill.source.getsource(_wrap_retry_connect) + '\n\n' 81 + f"kw = json.loads({json.dumps(json.dumps(kw))})\n" 82 + "success = _wrap_retry_connect(**kw)\n" 83 + "sys.exit((0 if success else 1))" 84 ) 85 return venv_exec(code, venv=None, debug=debug)
Keep trying to connect to the database.
Parameters
- connector (Union[InstanceConnector, None], default None): The connector to the instance.
- max_retries (int, default 40): How many time to try connecting.
- retry_wait (int, default 3): The number of seconds between retries.
- workers (int, default 1): How many worker thread connections to make.
- warn (bool, default True):
If
True
, print a warning in case the connection fails. - print_on_connect (bool, default False):
If
True
, print a message when a successful connection is established. - enforce_chaining (bool, default True):
If
False
, ignore the configured chaining option. - enforce_login (bool, default True):
If
False
, ignore an invalid login. - debug (bool, default False): Verbosity toggle.
Returns
- Whether a connection could be made.