Module meerschaum.connectors.poll
Poll database and API connections.
Expand source code
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Poll database and API connections.
"""
from meerschaum.utils.typing import InstanceConnector, Union, Optional, Dict, Any
def retry_connect(
connector: Union[InstanceConnector, None] = None,
max_retries: int = 50,
retry_wait: int = 3,
workers: int = 1,
warn: bool = True,
print_on_connect: bool = False,
enforce_chaining: bool = True,
enforce_login: bool = True,
debug: bool = False,
) -> bool:
"""
Keep trying to connect to the database.
Parameters
----------
connector: Union[InstanceConnector, None], default None
The connector to the instance.
max_retries: int, default 40
How many time to try connecting.
retry_wait: int, default 3
The number of seconds between retries.
workers: int, default 1
How many worker thread connections to make.
warn: bool, default True
If `True`, print a warning in case the connection fails.
print_on_connect: bool, default False
If `True`, print a message when a successful connection is established.
enforce_chaining: bool, default True
If `False`, ignore the configured chaining option.
enforce_login: bool, default True
If `False`, ignore an invalid login.
debug: bool, default False
Verbosity toggle.
Returns
-------
Whether a connection could be made.
"""
import json
from meerschaum.utils.venv import venv_exec
from meerschaum.utils.packages import attempt_import
kw = {
'connector_meta': connector.meta,
'max_retries': max_retries,
'retry_wait': retry_wait,
'workers': workers,
'warn': warn,
'print_on_connect': print_on_connect,
'enforce_chaining': enforce_chaining,
'enforce_login': enforce_login,
'debug': debug,
}
dill = attempt_import('dill', lazy=False)
code = (
"import sys, json\n"
+ "from meerschaum.utils.typing import Optional, Dict, Any\n\n"
+ dill.source.getsource(_wrap_retry_connect) + '\n\n'
+ f"kw = json.loads({json.dumps(json.dumps(kw))})\n"
+ "success = _wrap_retry_connect(**kw)\n"
+ "sys.exit((0 if success else 1))"
)
return venv_exec(code, venv=None, debug=debug)
def _wrap_retry_connect(
connector_meta: Dict[str, Any],
max_retries: int = 50,
retry_wait: int = 3,
workers: int = 1,
print_on_connect: bool = False,
warn: bool = True,
enforce_chaining: bool = True,
enforce_login: bool = True,
debug: bool = False,
) -> bool:
"""
Keep trying to connect to the database.
Parameters
----------
connector_keys: Optional[str], default None
The keys of the connector to the instance.
max_retries: int, default 40
How many time to try connecting.
retry_wait: int, default 3
The number of seconds between retries.
workers: int, default 1
How many worker thread connections to make.
warn: bool, default True
If `True`, print a warning in case the connection fails.
print_on_connect: bool, default False
If `True`, print a message when a successful connection is established.
enforce_chaining: bool, default True
If `False`, ignore the configured chaining option.
enforce_login: bool, default True
If `False`, ignore an invalid login.
debug: bool, default False
Verbosity toggle.
Returns
-------
Whether a connection could be made.
"""
from meerschaum.utils.warnings import warn as _warn, error, info
from meerschaum.utils.debug import dprint
from meerschaum.connectors import instance_types, get_connector
from meerschaum.connectors.parse import parse_instance_keys
from meerschaum.utils.packages import attempt_import
from meerschaum.utils.sql import test_queries
from meerschaum.utils.misc import timed_input
from functools import partial
import time
connector = get_connector(**connector_meta)
if connector.type not in instance_types:
return None
if not hasattr(connector, 'test_connection'):
return True
retries = 0
connected, chaining_status = False, None
while retries < max_retries:
if debug:
dprint(f"Trying to connect to '{connector}'...")
dprint(f"Attempt ({retries + 1} / {max_retries})")
if connector.type == 'sql':
def _connect(_connector):
### Test queries like `SELECT 1`.
connect_query = test_queries.get(connector.flavor, test_queries['default'])
if _connector.exec(connect_query) is None:
raise Exception("Failed to connect.")
try:
_connect(connector)
connected = True
except Exception as e:
if warn:
print(e)
connected = False
elif connector.type == 'api':
### If the remote instance does not allow chaining, don't even try logging in.
if not isinstance(chaining_status, bool):
chaining_status = connector.get_chaining_status(debug=debug)
if chaining_status is None:
connected = None
elif chaining_status is False:
if enforce_chaining:
if warn:
_warn(
f"Meerschaum instance '{connector}' does not allow chaining " +
"and cannot be used as the parent for this instance.",
stack = False
)
return False
### Allow is the option to ignore chaining status.
chaining_status = True
if chaining_status:
connected = (
connector.login(warn=warn, debug=debug)[0]
) if enforce_login else True
if not connected and warn:
_warn(f"Unable to login to '{connector}'!", stack=False)
if connected:
if print_on_connect:
info(f"Connected to '{connector}'.")
return True
if warn:
_warn(
f"Connection to '{connector}' failed.\n "
+ f"Press [Enter] to retry or wait {retry_wait} seconds.",
stack = False
)
if workers and workers > 1:
info(
f"To quit, press CTRL-C, then 'q' + [Enter] for each worker"
+ f" ({workers})."
)
info(f"Failed connection attempt ({retries + 1} / {max_retries})")
try:
if retry_wait > 0:
text = timed_input(retry_wait)
if text in ('q', 'quit', 'pass', 'exit', 'stop'):
return None
except KeyboardInterrupt:
return None
retries += 1
return False
Functions
def retry_connect(connector: Union[ForwardRef('meerschaum.connectors.sql.SQLConnector'), ForwardRef('meerschaum.connectors.api.APIConnector'), ForwardRef(None)] = None, max_retries: int = 50, retry_wait: int = 3, workers: int = 1, warn: bool = True, print_on_connect: bool = False, enforce_chaining: bool = True, enforce_login: bool = True, debug: bool = False) ‑> bool
-
Keep trying to connect to the database.
Parameters
connector
:Union[InstanceConnector, None]
, defaultNone
- The connector to the instance.
max_retries
:int
, default40
- How many time to try connecting.
retry_wait
:int
, default3
- The number of seconds between retries.
workers
:int
, default1
- How many worker thread connections to make.
warn
:bool
, defaultTrue
- If
True
, print a warning in case the connection fails. print_on_connect
:bool
, defaultFalse
- If
True
, print a message when a successful connection is established. enforce_chaining
:bool
, defaultTrue
- If
False
, ignore the configured chaining option. enforce_login
:bool
, defaultTrue
- If
False
, ignore an invalid login. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
Whether a connection could be made.
Expand source code
def retry_connect( connector: Union[InstanceConnector, None] = None, max_retries: int = 50, retry_wait: int = 3, workers: int = 1, warn: bool = True, print_on_connect: bool = False, enforce_chaining: bool = True, enforce_login: bool = True, debug: bool = False, ) -> bool: """ Keep trying to connect to the database. Parameters ---------- connector: Union[InstanceConnector, None], default None The connector to the instance. max_retries: int, default 40 How many time to try connecting. retry_wait: int, default 3 The number of seconds between retries. workers: int, default 1 How many worker thread connections to make. warn: bool, default True If `True`, print a warning in case the connection fails. print_on_connect: bool, default False If `True`, print a message when a successful connection is established. enforce_chaining: bool, default True If `False`, ignore the configured chaining option. enforce_login: bool, default True If `False`, ignore an invalid login. debug: bool, default False Verbosity toggle. Returns ------- Whether a connection could be made. """ import json from meerschaum.utils.venv import venv_exec from meerschaum.utils.packages import attempt_import kw = { 'connector_meta': connector.meta, 'max_retries': max_retries, 'retry_wait': retry_wait, 'workers': workers, 'warn': warn, 'print_on_connect': print_on_connect, 'enforce_chaining': enforce_chaining, 'enforce_login': enforce_login, 'debug': debug, } dill = attempt_import('dill', lazy=False) code = ( "import sys, json\n" + "from meerschaum.utils.typing import Optional, Dict, Any\n\n" + dill.source.getsource(_wrap_retry_connect) + '\n\n' + f"kw = json.loads({json.dumps(json.dumps(kw))})\n" + "success = _wrap_retry_connect(**kw)\n" + "sys.exit((0 if success else 1))" ) return venv_exec(code, venv=None, debug=debug)