meerschaum.connectors.poll

Poll database and API connections.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Poll database and API connections.
  7"""
  8
  9from meerschaum.utils.typing import InstanceConnector, Union, Optional, Dict, Any
 10
 11def retry_connect(
 12    connector: Union[InstanceConnector, None] = None,
 13    max_retries: int = 50,
 14    retry_wait: int = 3,
 15    workers: int = 1,
 16    warn: bool = True,
 17    print_on_connect: bool = False,
 18    enforce_chaining: bool = True,
 19    enforce_login: bool = True,
 20    debug: bool = False,
 21) -> bool:
 22    """
 23    Keep trying to connect to the database.
 24
 25    Parameters
 26    ----------
 27    connector: Union[InstanceConnector, None], default None
 28        The connector to the instance.
 29
 30    max_retries: int, default 40
 31        How many time to try connecting.
 32
 33    retry_wait: int, default 3
 34        The number of seconds between retries.
 35
 36    workers: int, default 1
 37        How many worker thread connections to make.
 38
 39    warn: bool, default True
 40        If `True`, print a warning in case the connection fails.
 41
 42    print_on_connect: bool, default False
 43        If `True`, print a message when a successful connection is established.
 44
 45    enforce_chaining: bool, default True
 46        If `False`, ignore the configured chaining option.
 47
 48    enforce_login: bool, default True
 49        If `False`, ignore an invalid login.
 50
 51    debug: bool, default False
 52        Verbosity toggle.
 53
 54    Returns
 55    -------
 56    Whether a connection could be made.
 57
 58    """
 59    import json
 60    from meerschaum.utils.venv import venv_exec
 61    from meerschaum.utils.packages import attempt_import
 62
 63    kw = {
 64        'connector_meta': connector.meta,
 65        'max_retries': max_retries,
 66        'retry_wait': retry_wait,
 67        'workers': workers,
 68        'warn': warn,
 69        'print_on_connect': print_on_connect,
 70        'enforce_chaining': enforce_chaining,
 71        'enforce_login': enforce_login,
 72        'debug': debug,
 73    }
 74
 75    dill = attempt_import('dill', lazy=False)
 76    code = (
 77        "import sys, json\n"
 78        + "from meerschaum.utils.typing import Optional, Dict, Any\n\n"
 79        + dill.source.getsource(_wrap_retry_connect) + '\n\n'
 80        + f"kw = json.loads({json.dumps(json.dumps(kw))})\n"
 81        + "success = _wrap_retry_connect(**kw)\n"
 82        + "sys.exit((0 if success else 1))"
 83    )
 84    return venv_exec(code, venv=None, debug=debug)
 85
 86
 87def _wrap_retry_connect(
 88    connector_meta: Dict[str, Any],
 89    max_retries: int = 50,
 90    retry_wait: int = 3,
 91    workers: int = 1,
 92    print_on_connect: bool = False,
 93    warn: bool = True,
 94    enforce_chaining: bool = True,
 95    enforce_login: bool = True,
 96    debug: bool = False,
 97) -> bool:
 98    """
 99    Keep trying to connect to the database.
100
101    Parameters
102    ----------
103    connector_keys: Optional[str], default None
104        The keys of the connector to the instance.
105
106    max_retries: int, default 40
107        How many time to try connecting.
108
109    retry_wait: int, default 3
110        The number of seconds between retries.
111
112    workers: int, default 1
113        How many worker thread connections to make.
114
115    warn: bool, default True
116        If `True`, print a warning in case the connection fails.
117
118    print_on_connect: bool, default False
119        If `True`, print a message when a successful connection is established.
120
121    enforce_chaining: bool, default True
122        If `False`, ignore the configured chaining option.
123
124    enforce_login: bool, default True
125        If `False`, ignore an invalid login.
126
127    debug: bool, default False
128        Verbosity toggle.
129
130    Returns
131    -------
132    Whether a connection could be made.
133
134    """
135    from meerschaum.utils.warnings import warn as _warn, error, info
136    from meerschaum.utils.debug import dprint
137    from meerschaum.connectors import instance_types, get_connector
138    from meerschaum.connectors.parse import parse_instance_keys
139    from meerschaum.utils.packages import attempt_import
140    from meerschaum.utils.sql import test_queries
141    from meerschaum.utils.misc import timed_input
142    from functools import partial
143    import platform
144    import time
145
146    connector = get_connector(**connector_meta)
147
148    if not hasattr(connector, 'test_connection'):
149        return True
150
151    retries = 0
152    connected, chaining_status = False, None
153    while retries < max_retries:
154        if debug:
155            dprint(f"Trying to connect to '{connector}'...")
156            dprint(f"Attempt ({retries + 1} / {max_retries})")
157
158        if connector.type not in ('sql', 'api'):
159            try:
160                connected = connector.test_connection()
161            except Exception as e:
162                if warn:
163                    print(e)
164                connected = False
165
166        elif connector.type == 'sql':
167
168            def _connect(_connector):
169                ### Test queries like `SELECT 1`.
170                connect_query = test_queries.get(connector.flavor, test_queries['default'])
171                if _connector.exec(connect_query) is None:
172                    raise Exception("Failed to connect.")
173
174            try:
175                _connect(connector)
176                connected = True
177            except Exception as e:
178                if warn:
179                    print(e)
180                connected = False
181
182        elif connector.type == 'api':
183            ### If the remote instance does not allow chaining, don't even try logging in.
184            if not isinstance(chaining_status, bool):
185                chaining_status = connector.get_chaining_status(debug=debug)
186                if chaining_status is None:
187                    connected = None
188                elif chaining_status is False:
189                    if enforce_chaining:
190                        if warn:
191                            _warn(
192                                f"Meerschaum instance '{connector}' does not allow chaining " +
193                                "and cannot be used as the parent for this instance.",
194                                stack=False,
195                            )
196                        return False
197
198                    ### Allow is the option to ignore chaining status.
199                    chaining_status = True
200
201            if chaining_status:
202                connected = (
203                    connector.login(warn=warn, debug=debug)[0]
204                ) if enforce_login else True
205
206                if not connected and warn:
207                    _warn(f"Unable to login to '{connector}'!", stack=False)
208
209        if connected:
210            if print_on_connect:
211                info(f"Connected to '{connector}'.")
212            return True
213
214        if warn:
215            _warn(
216                f"Connection to '{connector}' failed.\n    "
217                + f"Press [Enter] to retry or wait {retry_wait} seconds.",
218                stack = False
219            )
220            if workers and workers > 1:
221                info(
222                    f"To quit, press CTRL-C, then 'q' + [Enter] for each worker"
223                    + f" ({workers})."
224                )
225            info(f"Failed connection attempt ({retries + 1} / {max_retries})")
226
227        try:
228            if retry_wait > 0:
229                if platform.system() != 'Windows':
230                    text = timed_input(retry_wait)
231                    if text in ('q', 'quit', 'pass', 'exit', 'stop'):
232                        return None
233                else:
234                    time.sleep(retry_wait)
235        except KeyboardInterrupt:
236            return None
237        retries += 1
238
239    return False
def retry_connect( connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, max_retries: int = 50, retry_wait: int = 3, workers: int = 1, warn: bool = True, print_on_connect: bool = False, enforce_chaining: bool = True, enforce_login: bool = True, debug: bool = False) -> bool:
12def retry_connect(
13    connector: Union[InstanceConnector, None] = None,
14    max_retries: int = 50,
15    retry_wait: int = 3,
16    workers: int = 1,
17    warn: bool = True,
18    print_on_connect: bool = False,
19    enforce_chaining: bool = True,
20    enforce_login: bool = True,
21    debug: bool = False,
22) -> bool:
23    """
24    Keep trying to connect to the database.
25
26    Parameters
27    ----------
28    connector: Union[InstanceConnector, None], default None
29        The connector to the instance.
30
31    max_retries: int, default 40
32        How many time to try connecting.
33
34    retry_wait: int, default 3
35        The number of seconds between retries.
36
37    workers: int, default 1
38        How many worker thread connections to make.
39
40    warn: bool, default True
41        If `True`, print a warning in case the connection fails.
42
43    print_on_connect: bool, default False
44        If `True`, print a message when a successful connection is established.
45
46    enforce_chaining: bool, default True
47        If `False`, ignore the configured chaining option.
48
49    enforce_login: bool, default True
50        If `False`, ignore an invalid login.
51
52    debug: bool, default False
53        Verbosity toggle.
54
55    Returns
56    -------
57    Whether a connection could be made.
58
59    """
60    import json
61    from meerschaum.utils.venv import venv_exec
62    from meerschaum.utils.packages import attempt_import
63
64    kw = {
65        'connector_meta': connector.meta,
66        'max_retries': max_retries,
67        'retry_wait': retry_wait,
68        'workers': workers,
69        'warn': warn,
70        'print_on_connect': print_on_connect,
71        'enforce_chaining': enforce_chaining,
72        'enforce_login': enforce_login,
73        'debug': debug,
74    }
75
76    dill = attempt_import('dill', lazy=False)
77    code = (
78        "import sys, json\n"
79        + "from meerschaum.utils.typing import Optional, Dict, Any\n\n"
80        + dill.source.getsource(_wrap_retry_connect) + '\n\n'
81        + f"kw = json.loads({json.dumps(json.dumps(kw))})\n"
82        + "success = _wrap_retry_connect(**kw)\n"
83        + "sys.exit((0 if success else 1))"
84    )
85    return venv_exec(code, venv=None, debug=debug)

Keep trying to connect to the database.

Parameters
  • connector (Union[InstanceConnector, None], default None): The connector to the instance.
  • max_retries (int, default 40): How many time to try connecting.
  • retry_wait (int, default 3): The number of seconds between retries.
  • workers (int, default 1): How many worker thread connections to make.
  • warn (bool, default True): If True, print a warning in case the connection fails.
  • print_on_connect (bool, default False): If True, print a message when a successful connection is established.
  • enforce_chaining (bool, default True): If False, ignore the configured chaining option.
  • enforce_login (bool, default True): If False, ignore an invalid login.
  • debug (bool, default False): Verbosity toggle.
Returns
  • Whether a connection could be made.