meerschaum.connectors

Create connectors with meerschaum.connectors.get_connector(). For ease of use, you can also import from the root meerschaum module:

>>> from meerschaum import get_connector
>>> conn = get_connector()
  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create connectors with `meerschaum.connectors.get_connector()`.
  7For ease of use, you can also import from the root `meerschaum` module:
  8```
  9>>> from meerschaum import get_connector
 10>>> conn = get_connector()
 11```
 12"""
 13
 14from __future__ import annotations
 15
 16import meerschaum as mrsm
 17from meerschaum.utils.typing import Any, Union, List, Dict
 18from meerschaum.utils.threading import RLock
 19from meerschaum.utils.warnings import warn
 20
 21from meerschaum.connectors._Connector import Connector, InvalidAttributesError
 22from meerschaum.connectors.sql._SQLConnector import SQLConnector
 23from meerschaum.connectors.api._APIConnector import APIConnector
 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs
 25
 26__all__ = (
 27    "make_connector",
 28    "Connector",
 29    "SQLConnector",
 30    "APIConnector",
 31    "get_connector",
 32    "is_connected",
 33    "poll",
 34    "api",
 35    "sql",
 36    "valkey",
 37)
 38
 39### store connectors partitioned by
 40### type, label for reuse
 41connectors: Dict[str, Dict[str, Connector]] = {
 42    'api'    : {},
 43    'sql'    : {},
 44    'plugin' : {},
 45    'valkey' : {},
 46}
 47instance_types: List[str] = ['sql', 'api']
 48_locks: Dict[str, RLock] = {
 49    'connectors'               : RLock(),
 50    'types'                    : RLock(),
 51    'custom_types'             : RLock(),
 52    '_loaded_plugin_connectors': RLock(),
 53    'instance_types'           : RLock(),
 54}
 55attributes: Dict[str, Dict[str, Any]] = {
 56    'api': {
 57        'required': [
 58            'host',
 59            'username',
 60            'password',
 61        ],
 62        'optional': [
 63            'port',
 64        ],
 65        'default': {
 66            'protocol': 'http',
 67        },
 68    },
 69    'sql': {
 70        'flavors': sql_flavor_configs,
 71    },
 72}
 73### Fill this with objects only when connectors are first referenced.
 74types: Dict[str, Any] = {}
 75custom_types: set = set()
 76_loaded_plugin_connectors: bool = False
 77
 78
 79def get_connector(
 80    type: str = None,
 81    label: str = None,
 82    refresh: bool = False,
 83    debug: bool = False,
 84    **kw: Any
 85) -> Connector:
 86    """
 87    Return existing connector or create new connection and store for reuse.
 88    
 89    You can create new connectors if enough parameters are provided for the given type and flavor.
 90    
 91
 92    Parameters
 93    ----------
 94    type: Optional[str], default None
 95        Connector type (sql, api, etc.).
 96        Defaults to the type of the configured `instance_connector`.
 97
 98    label: Optional[str], default None
 99        Connector label (e.g. main). Defaults to `'main'`.
100
101    refresh: bool, default False
102        Refresh the Connector instance / construct new object. Defaults to `False`.
103
104    kw: Any
105        Other arguments to pass to the Connector constructor.
106        If the Connector has already been constructed and new arguments are provided,
107        `refresh` is set to `True` and the old Connector is replaced.
108
109    Returns
110    -------
111    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
112    `meerschaum.connectors.sql.SQLConnector`).
113    
114    Examples
115    --------
116    The following parameters would create a new
117    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
118
119    ```
120    >>> conn = get_connector(
121    ...     type = 'sql',
122    ...     label = 'newlabel',
123    ...     flavor = 'sqlite',
124    ...     database = '/file/path/to/database.db'
125    ... )
126    >>>
127    ```
128
129    """
130    from meerschaum.connectors.parse import parse_instance_keys
131    from meerschaum.config import get_config
132    from meerschaum.config.static import STATIC_CONFIG
133    from meerschaum.utils.warnings import warn
134    global _loaded_plugin_connectors
135    if isinstance(type, str) and not label and ':' in type:
136        type, label = type.split(':', maxsplit=1)
137
138    with _locks['_loaded_plugin_connectors']:
139        if not _loaded_plugin_connectors:
140            load_plugin_connectors()
141            _load_builtin_custom_connectors()
142            _loaded_plugin_connectors = True
143
144    if type is None and label is None:
145        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
146        ### recursive call to get_connector
147        return parse_instance_keys(default_instance_keys)
148
149    ### NOTE: the default instance connector may not be main.
150    ### Only fall back to 'main' if the type is provided by the label is omitted.
151    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
152
153    ### type might actually be a label. Check if so and raise a warning.
154    if type not in connectors:
155        possibilities, poss_msg = [], ""
156        for _type in get_config('meerschaum', 'connectors'):
157            if type in get_config('meerschaum', 'connectors', _type):
158                possibilities.append(f"{_type}:{type}")
159        if len(possibilities) > 0:
160            poss_msg = " Did you mean"
161            for poss in possibilities[:-1]:
162                poss_msg += f" '{poss}',"
163            if poss_msg.endswith(','):
164                poss_msg = poss_msg[:-1]
165            if len(possibilities) > 1:
166                poss_msg += " or"
167            poss_msg += f" '{possibilities[-1]}'?"
168
169        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
170        return None
171
172    if 'sql' not in types:
173        from meerschaum.connectors.plugin import PluginConnector
174        from meerschaum.connectors.valkey import ValkeyConnector
175        with _locks['types']:
176            types.update({
177                'api': APIConnector,
178                'sql': SQLConnector,
179                'plugin': PluginConnector,
180                'valkey': ValkeyConnector,
181            })
182
183    ### determine if we need to call the constructor
184    if not refresh:
185        ### see if any user-supplied arguments differ from the existing instance
186        if label in connectors[type]:
187            warning_message = None
188            for attribute, value in kw.items():
189                if attribute not in connectors[type][label].meta:
190                    import inspect
191                    cls = connectors[type][label].__class__
192                    cls_init_signature = inspect.signature(cls)
193                    cls_init_params = cls_init_signature.parameters
194                    if attribute not in cls_init_params:
195                        warning_message = (
196                            f"Received new attribute '{attribute}' not present in connector " +
197                            f"{connectors[type][label]}.\n"
198                        )
199                elif connectors[type][label].__dict__[attribute] != value:
200                    warning_message = (
201                        f"Mismatched values for attribute '{attribute}' in connector "
202                        + f"'{connectors[type][label]}'.\n" +
203                        f"  - Keyword value: '{value}'\n" +
204                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
205                    )
206            if warning_message is not None:
207                warning_message += (
208                    "\nSetting `refresh` to True and recreating connector with type:"
209                    + f" '{type}' and label '{label}'."
210                )
211                refresh = True
212                warn(warning_message)
213        else: ### connector doesn't yet exist
214            refresh = True
215
216    ### only create an object if refresh is True
217    ### (can be manually specified, otherwise determined above)
218    if refresh:
219        with _locks['connectors']:
220            try:
221                ### will raise an error if configuration is incorrect / missing
222                conn = types[type](label=label, **kw)
223                connectors[type][label] = conn
224            except InvalidAttributesError as ie:
225                warn(
226                    f"Incorrect attributes for connector '{type}:{label}'.\n"
227                    + str(ie),
228                    stack = False,
229                )
230                conn = None
231            except Exception as e:
232                from meerschaum.utils.formatting import get_console
233                console = get_console()
234                if console:
235                    console.print_exception()
236                warn(
237                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
238                    stack = False,
239                )
240                conn = None
241        if conn is None:
242            return None
243
244    return connectors[type][label]
245
246
247def is_connected(keys: str, **kw) -> bool:
248    """
249    Check if the connector keys correspond to an active connection.
250    If the connector has not been created, it will immediately return `False`.
251    If the connector exists but cannot communicate with the source, return `False`.
252    
253    **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
254    Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
255
256    Parameters
257    ----------
258    keys:
259        The keys to the connector (e.g. `'sql:main'`).
260        
261    Returns
262    -------
263    A `bool` corresponding to whether a successful connection may be made.
264
265    """
266    import warnings
267    if ':' not in keys:
268        warn(f"Invalid connector keys '{keys}'")
269
270    try:
271        typ, label = keys.split(':')
272    except Exception:
273        return False
274    if typ not in instance_types:
275        return False
276    if label not in connectors.get(typ, {}):
277        return False
278
279    from meerschaum.connectors.parse import parse_instance_keys
280    conn = parse_instance_keys(keys)
281    try:
282        with warnings.catch_warnings():
283            warnings.filterwarnings('ignore')
284            return conn.test_connection(**kw)
285    except Exception:
286        return False
287
288
289def make_connector(cls, _is_executor: bool = False):
290    """
291    Register a class as a `Connector`.
292    The `type` will be the lower case of the class name, without the suffix `connector`.
293
294    Parameters
295    ----------
296    instance: bool, default False
297        If `True`, make this connector type an instance connector.
298        This requires implementing the various pipes functions and lots of testing.
299
300    Examples
301    --------
302    >>> import meerschaum as mrsm
303    >>> from meerschaum.connectors import make_connector, Connector
304    >>> 
305    >>> @make_connector
306    >>> class FooConnector(Connector):
307    ...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
308    ... 
309    >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
310    >>> print(conn.username, conn.password)
311    dog cat
312    >>> 
313    """
314    import re
315    suffix_regex = (
316        r'connector$'
317        if not _is_executor
318        else r'executor$'
319    )
320    typ = re.sub(suffix_regex, '', cls.__name__.lower())
321    with _locks['types']:
322        types[typ] = cls
323    with _locks['custom_types']:
324        custom_types.add(typ)
325    with _locks['connectors']:
326        if typ not in connectors:
327            connectors[typ] = {}
328    if getattr(cls, 'IS_INSTANCE', False):
329        with _locks['instance_types']:
330            if typ not in instance_types:
331                instance_types.append(typ)
332
333    return cls
334
335
336def load_plugin_connectors():
337    """
338    If a plugin makes use of the `make_connector` decorator,
339    load its module.
340    """
341    from meerschaum.plugins import get_plugins, import_plugins
342    to_import = []
343    for plugin in get_plugins():
344        if plugin is None:
345            continue
346        with open(plugin.__file__, encoding='utf-8') as f:
347            text = f.read()
348        if 'make_connector' in text or 'Connector' in text:
349            to_import.append(plugin.name)
350    if not to_import:
351        return
352    import_plugins(*to_import)
353
354
355def get_connector_plugin(
356    connector: Connector,
357) -> Union[str, None, mrsm.Plugin]:
358    """
359    Determine the plugin for a connector.
360    This is useful for handling virtual environments for custom instance connectors.
361
362    Parameters
363    ----------
364    connector: Connector
365        The connector which may require a virtual environment.
366
367    Returns
368    -------
369    A Plugin, 'mrsm', or None.
370    """
371    if not hasattr(connector, 'type'):
372        return None
373    plugin_name = (
374        connector.__module__.replace('plugins.', '').split('.')[0]
375        if connector.type in custom_types else (
376            connector.label
377            if connector.type == 'plugin'
378            else 'mrsm'
379        )
380    )
381    plugin = mrsm.Plugin(plugin_name)
382    return plugin if plugin.is_installed() else None
383
384
385def _load_builtin_custom_connectors():
386    """
387    Import custom connectors decorated with `@make_connector` or `@make_executor`.
388    """
389    import meerschaum.jobs.systemd
390    import meerschaum.connectors.valkey
def make_connector(cls, _is_executor: bool = False):
290def make_connector(cls, _is_executor: bool = False):
291    """
292    Register a class as a `Connector`.
293    The `type` will be the lower case of the class name, without the suffix `connector`.
294
295    Parameters
296    ----------
297    instance: bool, default False
298        If `True`, make this connector type an instance connector.
299        This requires implementing the various pipes functions and lots of testing.
300
301    Examples
302    --------
303    >>> import meerschaum as mrsm
304    >>> from meerschaum.connectors import make_connector, Connector
305    >>> 
306    >>> @make_connector
307    >>> class FooConnector(Connector):
308    ...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
309    ... 
310    >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
311    >>> print(conn.username, conn.password)
312    dog cat
313    >>> 
314    """
315    import re
316    suffix_regex = (
317        r'connector$'
318        if not _is_executor
319        else r'executor$'
320    )
321    typ = re.sub(suffix_regex, '', cls.__name__.lower())
322    with _locks['types']:
323        types[typ] = cls
324    with _locks['custom_types']:
325        custom_types.add(typ)
326    with _locks['connectors']:
327        if typ not in connectors:
328            connectors[typ] = {}
329    if getattr(cls, 'IS_INSTANCE', False):
330        with _locks['instance_types']:
331            if typ not in instance_types:
332                instance_types.append(typ)
333
334    return cls

Register a class as a Connector. The type will be the lower case of the class name, without the suffix connector.

Parameters
  • instance (bool, default False): If True, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>> 
>>> @make_connector
>>> class FooConnector(Connector):
...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
... 
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>
class Connector:
 20class Connector(metaclass=abc.ABCMeta):
 21    """
 22    The base connector class to hold connection attributes.
 23    """
 24    def __init__(
 25        self,
 26        type: Optional[str] = None,
 27        label: Optional[str] = None,
 28        **kw: Any
 29    ):
 30        """
 31        Set the given keyword arguments as attributes.
 32
 33        Parameters
 34        ----------
 35        type: str
 36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
 37
 38        label: str
 39            The `label` for the connector.
 40
 41
 42        Examples
 43        --------
 44        Run `mrsm edit config` and to edit connectors in the YAML file:
 45
 46        ```yaml
 47        meerschaum:
 48            connections:
 49                {type}:
 50                    {label}:
 51                        ### attributes go here
 52        ```
 53
 54        """
 55        self._original_dict = copy.deepcopy(self.__dict__)
 56        self._set_attributes(type=type, label=label, **kw)
 57
 58        ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set.
 59        self.verify_attributes(
 60            ['uri']
 61            if 'uri' in self.__dict__
 62            else getattr(self, 'REQUIRED_ATTRIBUTES', None)
 63        )
 64
 65    def _reset_attributes(self):
 66        self.__dict__ = self._original_dict
 67
 68    def _set_attributes(
 69        self,
 70        *args,
 71        inherit_default: bool = True,
 72        **kw: Any
 73    ):
 74        from meerschaum.config.static import STATIC_CONFIG
 75        from meerschaum.utils.warnings import error
 76
 77        self._attributes = {}
 78
 79        default_label = STATIC_CONFIG['connectors']['default_label']
 80
 81        ### NOTE: Support the legacy method of explicitly passing the type.
 82        label = kw.get('label', None)
 83        if label is None:
 84            if len(args) == 2:
 85                label = args[1]
 86            elif len(args) == 0:
 87                label = None
 88            else:
 89                label = args[0]
 90
 91        if label == 'default':
 92            error(
 93                f"Label cannot be 'default'. Did you mean '{default_label}'?",
 94                InvalidAttributesError,
 95            )
 96        self.__dict__['label'] = label
 97
 98        from meerschaum.config import get_config
 99        conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors'))
100        connector_config = copy.deepcopy(get_config('system', 'connectors'))
101
102        ### inherit attributes from 'default' if exists
103        if inherit_default:
104            inherit_from = 'default'
105            if self.type in conn_configs and inherit_from in conn_configs[self.type]:
106                _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from])
107                self._attributes.update(_inherit_dict)
108
109        ### load user config into self._attributes
110        if self.type in conn_configs and self.label in conn_configs[self.type]:
111            self._attributes.update(conn_configs[self.type][self.label] or {})
112
113        ### load system config into self._sys_config
114        ### (deep copy so future Connectors don't inherit changes)
115        if self.type in connector_config:
116            self._sys_config = copy.deepcopy(connector_config[self.type])
117
118        ### add additional arguments or override configuration
119        self._attributes.update(kw)
120
121        ### finally, update __dict__ with _attributes.
122        self.__dict__.update(self._attributes)
123
124    def verify_attributes(
125        self,
126        required_attributes: Optional[List[str]] = None,
127        debug: bool = False,
128    ) -> None:
129        """
130        Ensure that the required attributes have been met.
131        
132        The Connector base class checks the minimum requirements.
133        Child classes may enforce additional requirements.
134
135        Parameters
136        ----------
137        required_attributes: Optional[List[str]], default None
138            Attributes to be verified. If `None`, default to `['label']`.
139
140        debug: bool, default False
141            Verbosity toggle.
142
143        Returns
144        -------
145        Don't return anything.
146
147        Raises
148        ------
149        An error if any of the required attributes are missing.
150        """
151        from meerschaum.utils.warnings import error, warn
152        from meerschaum.utils.debug import dprint
153        from meerschaum.utils.misc import items_str
154        if required_attributes is None:
155            required_attributes = ['label']
156
157        missing_attributes = set()
158        for a in required_attributes:
159            if a not in self.__dict__:
160                missing_attributes.add(a)
161        if len(missing_attributes) > 0:
162            error(
163                (
164                    f"Missing {items_str(list(missing_attributes))} "
165                    + f"for connector '{self.type}:{self.label}'."
166                ),
167                InvalidAttributesError,
168                silent=True,
169                stack=False
170            )
171
172
173    def __str__(self):
174        """
175        When cast to a string, return type:label.
176        """
177        return f"{self.type}:{self.label}"
178
179    def __repr__(self):
180        """
181        Represent the connector as type:label.
182        """
183        return str(self)
184
185    @property
186    def meta(self) -> Dict[str, Any]:
187        """
188        Return the keys needed to reconstruct this Connector.
189        """
190        _meta = {
191            key: value
192            for key, value in self.__dict__.items()
193            if not str(key).startswith('_')
194        }
195        _meta.update({
196            'type': self.type,
197            'label': self.label,
198        })
199        return _meta
200
201
202    @property
203    def type(self) -> str:
204        """
205        Return the type for this connector.
206        """
207        _type = self.__dict__.get('type', None)
208        if _type is None:
209            import re
210            is_executor = self.__class__.__name__.lower().endswith('executor')
211            suffix_regex = (
212                r'connector$'
213                if not is_executor
214                else r'executor$'
215            )
216            _type = re.sub(suffix_regex, '', self.__class__.__name__.lower())
217            self.__dict__['type'] = _type
218        return _type
219
220
221    @property
222    def label(self) -> str:
223        """
224        Return the label for this connector.
225        """
226        _label = self.__dict__.get('label', None)
227        if _label is None:
228            from meerschaum.config.static import STATIC_CONFIG
229            _label = STATIC_CONFIG['connectors']['default_label']
230            self.__dict__['label'] = _label
231        return _label

The base connector class to hold connection attributes.

Connector(type: Optional[str] = None, label: Optional[str] = None, **kw: Any)
24    def __init__(
25        self,
26        type: Optional[str] = None,
27        label: Optional[str] = None,
28        **kw: Any
29    ):
30        """
31        Set the given keyword arguments as attributes.
32
33        Parameters
34        ----------
35        type: str
36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
37
38        label: str
39            The `label` for the connector.
40
41
42        Examples
43        --------
44        Run `mrsm edit config` and to edit connectors in the YAML file:
45
46        ```yaml
47        meerschaum:
48            connections:
49                {type}:
50                    {label}:
51                        ### attributes go here
52        ```
53
54        """
55        self._original_dict = copy.deepcopy(self.__dict__)
56        self._set_attributes(type=type, label=label, **kw)
57
58        ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set.
59        self.verify_attributes(
60            ['uri']
61            if 'uri' in self.__dict__
62            else getattr(self, 'REQUIRED_ATTRIBUTES', None)
63        )

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False) -> None:
124    def verify_attributes(
125        self,
126        required_attributes: Optional[List[str]] = None,
127        debug: bool = False,
128    ) -> None:
129        """
130        Ensure that the required attributes have been met.
131        
132        The Connector base class checks the minimum requirements.
133        Child classes may enforce additional requirements.
134
135        Parameters
136        ----------
137        required_attributes: Optional[List[str]], default None
138            Attributes to be verified. If `None`, default to `['label']`.
139
140        debug: bool, default False
141            Verbosity toggle.
142
143        Returns
144        -------
145        Don't return anything.
146
147        Raises
148        ------
149        An error if any of the required attributes are missing.
150        """
151        from meerschaum.utils.warnings import error, warn
152        from meerschaum.utils.debug import dprint
153        from meerschaum.utils.misc import items_str
154        if required_attributes is None:
155            required_attributes = ['label']
156
157        missing_attributes = set()
158        for a in required_attributes:
159            if a not in self.__dict__:
160                missing_attributes.add(a)
161        if len(missing_attributes) > 0:
162            error(
163                (
164                    f"Missing {items_str(list(missing_attributes))} "
165                    + f"for connector '{self.type}:{self.label}'."
166                ),
167                InvalidAttributesError,
168                silent=True,
169                stack=False
170            )

Ensure that the required attributes have been met.

The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.

Parameters
  • required_attributes (Optional[List[str]], default None): Attributes to be verified. If None, default to ['label'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • Don't return anything.
Raises
  • An error if any of the required attributes are missing.
meta: Dict[str, Any]
185    @property
186    def meta(self) -> Dict[str, Any]:
187        """
188        Return the keys needed to reconstruct this Connector.
189        """
190        _meta = {
191            key: value
192            for key, value in self.__dict__.items()
193            if not str(key).startswith('_')
194        }
195        _meta.update({
196            'type': self.type,
197            'label': self.label,
198        })
199        return _meta

Return the keys needed to reconstruct this Connector.

type: str
202    @property
203    def type(self) -> str:
204        """
205        Return the type for this connector.
206        """
207        _type = self.__dict__.get('type', None)
208        if _type is None:
209            import re
210            is_executor = self.__class__.__name__.lower().endswith('executor')
211            suffix_regex = (
212                r'connector$'
213                if not is_executor
214                else r'executor$'
215            )
216            _type = re.sub(suffix_regex, '', self.__class__.__name__.lower())
217            self.__dict__['type'] = _type
218        return _type

Return the type for this connector.

label: str
221    @property
222    def label(self) -> str:
223        """
224        Return the label for this connector.
225        """
226        _label = self.__dict__.get('label', None)
227        if _label is None:
228            from meerschaum.config.static import STATIC_CONFIG
229            _label = STATIC_CONFIG['connectors']['default_label']
230            self.__dict__['label'] = _label
231        return _label

Return the label for this connector.

class SQLConnector(meerschaum.connectors.Connector):
 18class SQLConnector(Connector):
 19    """
 20    Connect to SQL databases via `sqlalchemy`.
 21    
 22    SQLConnectors may be used as Meerschaum instance connectors.
 23    Read more about connectors and instances at
 24    https://meerschaum.io/reference/connectors/
 25
 26    """
 27
 28    IS_INSTANCE: bool = True
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40    )
 41    from meerschaum.utils.sql import test_connection
 42    from ._fetch import fetch, get_pipe_metadef
 43    from ._cli import cli, _cli_exit
 44    from ._pipes import (
 45        fetch_pipes_keys,
 46        create_indices,
 47        drop_indices,
 48        get_create_index_queries,
 49        get_drop_index_queries,
 50        get_add_columns_queries,
 51        get_alter_columns_queries,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_data_query,
 55        register_pipe,
 56        edit_pipe,
 57        get_pipe_id,
 58        get_pipe_attributes,
 59        sync_pipe,
 60        sync_pipe_inplace,
 61        get_sync_time,
 62        pipe_exists,
 63        get_pipe_rowcount,
 64        drop_pipe,
 65        clear_pipe,
 66        deduplicate_pipe,
 67        get_pipe_table,
 68        get_pipe_columns_types,
 69        get_to_sql_dtype,
 70        get_pipe_schema,
 71    )
 72    from ._plugins import (
 73        register_plugin,
 74        delete_plugin,
 75        get_plugin_id,
 76        get_plugin_version,
 77        get_plugins,
 78        get_plugin_user_id,
 79        get_plugin_username,
 80        get_plugin_attributes,
 81    )
 82    from ._users import (
 83        register_user,
 84        get_user_id,
 85        get_users,
 86        edit_user,
 87        delete_user,
 88        get_user_password_hash,
 89        get_user_type,
 90        get_user_attributes,
 91    )
 92    from ._uri import from_uri, parse_uri
 93    from ._instance import (
 94        _log_temporary_tables_creation,
 95        _drop_temporary_table,
 96        _drop_temporary_tables,
 97        _drop_old_temporary_tables,
 98    )
 99
100    def __init__(
101        self,
102        label: Optional[str] = None,
103        flavor: Optional[str] = None,
104        wait: bool = False,
105        connect: bool = False,
106        debug: bool = False,
107        **kw: Any
108    ):
109        """
110        Parameters
111        ----------
112        label: str, default 'main'
113            The identifying label for the connector.
114            E.g. for `sql:main`, 'main' is the label.
115            Defaults to 'main'.
116
117        flavor: Optional[str], default None
118            The database flavor, e.g.
119            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
120            To see supported flavors, run the `bootstrap connectors` command.
121
122        wait: bool, default False
123            If `True`, block until a database connection has been made.
124            Defaults to `False`.
125
126        connect: bool, default False
127            If `True`, immediately attempt to connect the database and raise
128            a warning if the connection fails.
129            Defaults to `False`.
130
131        debug: bool, default False
132            Verbosity toggle.
133            Defaults to `False`.
134
135        kw: Any
136            All other arguments will be passed to the connector's attributes.
137            Therefore, a connector may be made without being registered,
138            as long enough parameters are supplied to the constructor.
139        """
140        if 'uri' in kw:
141            uri = kw['uri']
142            if uri.startswith('postgres') and not uri.startswith('postgresql'):
143                uri = uri.replace('postgres', 'postgresql', 1)
144            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
145                uri = uri.replace('postgresql://', 'postgresql+psycopg', 1)
146            if uri.startswith('timescaledb://'):
147                uri = uri.replace('timescaledb://', 'postgresql://', 1)
148                flavor = 'timescaledb'
149            kw['uri'] = uri
150            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
151            label = label or from_uri_params.get('label', None)
152            _ = from_uri_params.pop('label', None)
153
154            ### Sometimes the flavor may be provided with a URI.
155            kw.update(from_uri_params)
156            if flavor:
157                kw['flavor'] = flavor
158
159
160        ### set __dict__ in base class
161        super().__init__(
162            'sql',
163            label = label or self.__dict__.get('label', None),
164            **kw
165        )
166
167        if self.__dict__.get('flavor', None) == 'sqlite':
168            self._reset_attributes()
169            self._set_attributes(
170                'sql',
171                label = label,
172                inherit_default = False,
173                **kw
174            )
175            ### For backwards compatability reasons, set the path for sql:local if its missing.
176            if self.label == 'local' and not self.__dict__.get('database', None):
177                from meerschaum.config._paths import SQLITE_DB_PATH
178                self.database = str(SQLITE_DB_PATH)
179
180        ### ensure flavor and label are set accordingly
181        if 'flavor' not in self.__dict__:
182            if flavor is None and 'uri' not in self.__dict__:
183                raise Exception(
184                    f"    Missing flavor. Provide flavor as a key for '{self}'."
185                )
186            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
187
188        if self.flavor == 'postgres':
189            self.flavor = 'postgresql'
190
191        self._debug = debug
192        ### Store the PID and thread at initialization
193        ### so we can dispose of the Pool in child processes or threads.
194        import os, threading
195        self._pid = os.getpid()
196        self._thread_ident = threading.current_thread().ident
197        self._sessions = {}
198        self._locks = {'_sessions': threading.RLock(), }
199
200        ### verify the flavor's requirements are met
201        if self.flavor not in self.flavor_configs:
202            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
203        if not self.__dict__.get('uri'):
204            self.verify_attributes(
205                self.flavor_configs[self.flavor].get('requirements', set()),
206                debug=debug,
207            )
208
209        if wait:
210            from meerschaum.connectors.poll import retry_connect
211            retry_connect(connector=self, debug=debug)
212
213        if connect:
214            if not self.test_connection(debug=debug):
215                from meerschaum.utils.warnings import warn
216                warn(f"Failed to connect with connector '{self}'!", stack=False)
217
218    @property
219    def Session(self):
220        if '_Session' not in self.__dict__:
221            if self.engine is None:
222                return None
223
224            from meerschaum.utils.packages import attempt_import
225            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
226            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
227            self._Session = sqlalchemy_orm.scoped_session(session_factory)
228
229        return self._Session
230
231    @property
232    def engine(self):
233        import os, threading
234        ### build the sqlalchemy engine
235        if '_engine' not in self.__dict__:
236            self._engine, self._engine_str = self.create_engine(include_uri=True)
237
238        same_process = os.getpid() == self._pid
239        same_thread = threading.current_thread().ident == self._thread_ident
240
241        ### handle child processes
242        if not same_process:
243            self._pid = os.getpid()
244            self._thread = threading.current_thread()
245            from meerschaum.utils.warnings import warn
246            warn(f"Different PID detected. Disposing of connections...")
247            self._engine.dispose()
248
249        ### handle different threads
250        if not same_thread:
251            pass
252
253        return self._engine
254
255    @property
256    def DATABASE_URL(self) -> str:
257        """
258        Return the URI connection string (alias for `SQLConnector.URI`.
259        """
260        _ = self.engine
261        return str(self._engine_str)
262
263    @property
264    def URI(self) -> str:
265        """
266        Return the URI connection string.
267        """
268        _ = self.engine
269        return str(self._engine_str)
270
271    @property
272    def IS_THREAD_SAFE(self) -> str:
273        """
274        Return whether this connector may be multithreaded.
275        """
276        if self.flavor in ('duckdb', 'oracle'):
277            return False
278        if self.flavor == 'sqlite':
279            return ':memory:' not in self.URI
280        return True
281
282
283    @property
284    def metadata(self):
285        """
286        Return the metadata bound to this configured schema.
287        """
288        from meerschaum.utils.packages import attempt_import
289        sqlalchemy = attempt_import('sqlalchemy')
290        if '_metadata' not in self.__dict__:
291            self._metadata = sqlalchemy.MetaData(schema=self.schema)
292        return self._metadata
293
294
295    @property
296    def instance_schema(self):
297        """
298        Return the schema name for Meerschaum tables. 
299        """
300        return self.schema
301
302
303    @property
304    def internal_schema(self):
305        """
306        Return the schema name for internal tables. 
307        """
308        from meerschaum.config.static import STATIC_CONFIG
309        from meerschaum.utils.packages import attempt_import
310        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
311        schema_name = self.__dict__.get('internal_schema', None) or (
312            STATIC_CONFIG['sql']['internal_schema']
313            if self.flavor not in NO_SCHEMA_FLAVORS
314            else self.schema
315        )
316
317        if '_internal_schema' not in self.__dict__:
318            self._internal_schema = schema_name
319        return self._internal_schema
320
321
322    @property
323    def db(self) -> Optional[databases.Database]:
324        from meerschaum.utils.packages import attempt_import
325        databases = attempt_import('databases', lazy=False, install=True)
326        url = self.DATABASE_URL
327        if 'mysql' in url:
328            url = url.replace('+pymysql', '')
329        if '_db' not in self.__dict__:
330            try:
331                self._db = databases.Database(url)
332            except KeyError:
333                ### Likely encountered an unsupported flavor.
334                from meerschaum.utils.warnings import warn
335                self._db = None
336        return self._db
337
338
339    @property
340    def db_version(self) -> Union[str, None]:
341        """
342        Return the database version.
343        """
344        _db_version = self.__dict__.get('_db_version', None)
345        if _db_version is not None:
346            return _db_version
347
348        from meerschaum.utils.sql import get_db_version
349        self._db_version = get_db_version(self)
350        return self._db_version
351
352
353    @property
354    def schema(self) -> Union[str, None]:
355        """
356        Return the default schema to use.
357        A value of `None` will not prepend a schema.
358        """
359        if 'schema' in self.__dict__:
360            return self.__dict__['schema']
361
362        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
363        if self.flavor in NO_SCHEMA_FLAVORS:
364            self.__dict__['schema'] = None
365            return None
366
367        sqlalchemy = mrsm.attempt_import('sqlalchemy')
368        _schema = sqlalchemy.inspect(self.engine).default_schema_name
369        self.__dict__['schema'] = _schema
370        return _schema
371
372
373    def __getstate__(self):
374        return self.__dict__
375
376    def __setstate__(self, d):
377        self.__dict__.update(d)
378
379    def __call__(self):
380        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
100    def __init__(
101        self,
102        label: Optional[str] = None,
103        flavor: Optional[str] = None,
104        wait: bool = False,
105        connect: bool = False,
106        debug: bool = False,
107        **kw: Any
108    ):
109        """
110        Parameters
111        ----------
112        label: str, default 'main'
113            The identifying label for the connector.
114            E.g. for `sql:main`, 'main' is the label.
115            Defaults to 'main'.
116
117        flavor: Optional[str], default None
118            The database flavor, e.g.
119            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
120            To see supported flavors, run the `bootstrap connectors` command.
121
122        wait: bool, default False
123            If `True`, block until a database connection has been made.
124            Defaults to `False`.
125
126        connect: bool, default False
127            If `True`, immediately attempt to connect the database and raise
128            a warning if the connection fails.
129            Defaults to `False`.
130
131        debug: bool, default False
132            Verbosity toggle.
133            Defaults to `False`.
134
135        kw: Any
136            All other arguments will be passed to the connector's attributes.
137            Therefore, a connector may be made without being registered,
138            as long enough parameters are supplied to the constructor.
139        """
140        if 'uri' in kw:
141            uri = kw['uri']
142            if uri.startswith('postgres') and not uri.startswith('postgresql'):
143                uri = uri.replace('postgres', 'postgresql', 1)
144            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
145                uri = uri.replace('postgresql://', 'postgresql+psycopg', 1)
146            if uri.startswith('timescaledb://'):
147                uri = uri.replace('timescaledb://', 'postgresql://', 1)
148                flavor = 'timescaledb'
149            kw['uri'] = uri
150            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
151            label = label or from_uri_params.get('label', None)
152            _ = from_uri_params.pop('label', None)
153
154            ### Sometimes the flavor may be provided with a URI.
155            kw.update(from_uri_params)
156            if flavor:
157                kw['flavor'] = flavor
158
159
160        ### set __dict__ in base class
161        super().__init__(
162            'sql',
163            label = label or self.__dict__.get('label', None),
164            **kw
165        )
166
167        if self.__dict__.get('flavor', None) == 'sqlite':
168            self._reset_attributes()
169            self._set_attributes(
170                'sql',
171                label = label,
172                inherit_default = False,
173                **kw
174            )
175            ### For backwards compatability reasons, set the path for sql:local if its missing.
176            if self.label == 'local' and not self.__dict__.get('database', None):
177                from meerschaum.config._paths import SQLITE_DB_PATH
178                self.database = str(SQLITE_DB_PATH)
179
180        ### ensure flavor and label are set accordingly
181        if 'flavor' not in self.__dict__:
182            if flavor is None and 'uri' not in self.__dict__:
183                raise Exception(
184                    f"    Missing flavor. Provide flavor as a key for '{self}'."
185                )
186            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
187
188        if self.flavor == 'postgres':
189            self.flavor = 'postgresql'
190
191        self._debug = debug
192        ### Store the PID and thread at initialization
193        ### so we can dispose of the Pool in child processes or threads.
194        import os, threading
195        self._pid = os.getpid()
196        self._thread_ident = threading.current_thread().ident
197        self._sessions = {}
198        self._locks = {'_sessions': threading.RLock(), }
199
200        ### verify the flavor's requirements are met
201        if self.flavor not in self.flavor_configs:
202            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
203        if not self.__dict__.get('uri'):
204            self.verify_attributes(
205                self.flavor_configs[self.flavor].get('requirements', set()),
206                debug=debug,
207            )
208
209        if wait:
210            from meerschaum.connectors.poll import retry_connect
211            retry_connect(connector=self, debug=debug)
212
213        if connect:
214            if not self.test_connection(debug=debug):
215                from meerschaum.utils.warnings import warn
216                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
218    @property
219    def Session(self):
220        if '_Session' not in self.__dict__:
221            if self.engine is None:
222                return None
223
224            from meerschaum.utils.packages import attempt_import
225            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
226            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
227            self._Session = sqlalchemy_orm.scoped_session(session_factory)
228
229        return self._Session
engine
231    @property
232    def engine(self):
233        import os, threading
234        ### build the sqlalchemy engine
235        if '_engine' not in self.__dict__:
236            self._engine, self._engine_str = self.create_engine(include_uri=True)
237
238        same_process = os.getpid() == self._pid
239        same_thread = threading.current_thread().ident == self._thread_ident
240
241        ### handle child processes
242        if not same_process:
243            self._pid = os.getpid()
244            self._thread = threading.current_thread()
245            from meerschaum.utils.warnings import warn
246            warn(f"Different PID detected. Disposing of connections...")
247            self._engine.dispose()
248
249        ### handle different threads
250        if not same_thread:
251            pass
252
253        return self._engine
DATABASE_URL: str
255    @property
256    def DATABASE_URL(self) -> str:
257        """
258        Return the URI connection string (alias for `SQLConnector.URI`.
259        """
260        _ = self.engine
261        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
263    @property
264    def URI(self) -> str:
265        """
266        Return the URI connection string.
267        """
268        _ = self.engine
269        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
271    @property
272    def IS_THREAD_SAFE(self) -> str:
273        """
274        Return whether this connector may be multithreaded.
275        """
276        if self.flavor in ('duckdb', 'oracle'):
277            return False
278        if self.flavor == 'sqlite':
279            return ':memory:' not in self.URI
280        return True

Return whether this connector may be multithreaded.

metadata
283    @property
284    def metadata(self):
285        """
286        Return the metadata bound to this configured schema.
287        """
288        from meerschaum.utils.packages import attempt_import
289        sqlalchemy = attempt_import('sqlalchemy')
290        if '_metadata' not in self.__dict__:
291            self._metadata = sqlalchemy.MetaData(schema=self.schema)
292        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
295    @property
296    def instance_schema(self):
297        """
298        Return the schema name for Meerschaum tables. 
299        """
300        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
303    @property
304    def internal_schema(self):
305        """
306        Return the schema name for internal tables. 
307        """
308        from meerschaum.config.static import STATIC_CONFIG
309        from meerschaum.utils.packages import attempt_import
310        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
311        schema_name = self.__dict__.get('internal_schema', None) or (
312            STATIC_CONFIG['sql']['internal_schema']
313            if self.flavor not in NO_SCHEMA_FLAVORS
314            else self.schema
315        )
316
317        if '_internal_schema' not in self.__dict__:
318            self._internal_schema = schema_name
319        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
322    @property
323    def db(self) -> Optional[databases.Database]:
324        from meerschaum.utils.packages import attempt_import
325        databases = attempt_import('databases', lazy=False, install=True)
326        url = self.DATABASE_URL
327        if 'mysql' in url:
328            url = url.replace('+pymysql', '')
329        if '_db' not in self.__dict__:
330            try:
331                self._db = databases.Database(url)
332            except KeyError:
333                ### Likely encountered an unsupported flavor.
334                from meerschaum.utils.warnings import warn
335                self._db = None
336        return self._db
db_version: Optional[str]
339    @property
340    def db_version(self) -> Union[str, None]:
341        """
342        Return the database version.
343        """
344        _db_version = self.__dict__.get('_db_version', None)
345        if _db_version is not None:
346            return _db_version
347
348        from meerschaum.utils.sql import get_db_version
349        self._db_version = get_db_version(self)
350        return self._db_version

Return the database version.

schema: Optional[str]
353    @property
354    def schema(self) -> Union[str, None]:
355        """
356        Return the default schema to use.
357        A value of `None` will not prepend a schema.
358        """
359        if 'schema' in self.__dict__:
360            return self.__dict__['schema']
361
362        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
363        if self.flavor in NO_SCHEMA_FLAVORS:
364            self.__dict__['schema'] = None
365            return None
366
367        sqlalchemy = mrsm.attempt_import('sqlalchemy')
368        _schema = sqlalchemy.inspect(self.engine).default_schema_name
369        self.__dict__['schema'] = _schema
370        return _schema

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+cx_oracle', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'username', 'database', 'host', 'password'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
180def create_engine(
181    self,
182    include_uri: bool = False,
183    debug: bool = False,
184    **kw
185) -> 'sqlalchemy.engine.Engine':
186    """Create a sqlalchemy engine by building the engine string."""
187    from meerschaum.utils.packages import attempt_import
188    from meerschaum.utils.warnings import error, warn
189    sqlalchemy = attempt_import('sqlalchemy')
190    import urllib
191    import copy
192    ### Install and patch required drivers.
193    if self.flavor in install_flavor_drivers:
194        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
195        if self.flavor == 'mssql':
196            pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False)
197            pyodbc.pooling = False
198    if self.flavor in require_patching_flavors:
199        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
200        import pathlib
201        for install_name, import_name in require_patching_flavors[self.flavor]:
202            pkg = attempt_import(
203                import_name,
204                debug=debug,
205                lazy=False,
206                warn=False
207            )
208            _monkey_patch_get_distribution(
209                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
210            )
211
212    ### supplement missing values with defaults (e.g. port number)
213    for a, value in flavor_configs[self.flavor]['defaults'].items():
214        if a not in self.__dict__:
215            self.__dict__[a] = value
216
217    ### Verify that everything is in order.
218    if self.flavor not in flavor_configs:
219        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
220
221    _engine = flavor_configs[self.flavor].get('engine', None)
222    _username = self.__dict__.get('username', None)
223    _password = self.__dict__.get('password', None)
224    _host = self.__dict__.get('host', None)
225    _port = self.__dict__.get('port', None)
226    _database = self.__dict__.get('database', None)
227    _options = self.__dict__.get('options', {})
228    if isinstance(_options, str):
229        _options = dict(urllib.parse.parse_qsl(_options))
230    _uri = self.__dict__.get('uri', None)
231
232    ### Handle registering specific dialects (due to installing in virtual environments).
233    if self.flavor in flavor_dialects:
234        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
235
236    ### self._sys_config was deepcopied and can be updated safely
237    if self.flavor in ("sqlite", "duckdb"):
238        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
239        if 'create_engine' not in self._sys_config:
240            self._sys_config['create_engine'] = {}
241        if 'connect_args' not in self._sys_config['create_engine']:
242            self._sys_config['create_engine']['connect_args'] = {}
243        self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
244    else:
245        engine_str = (
246            _engine + "://" + (_username if _username is not None else '') +
247            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
248            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
249            (("/" + _database) if _database is not None else '')
250            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
251        ) if not _uri else _uri
252
253        ### Sometimes the timescaledb:// flavor can slip in.
254        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
255            engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1)
256
257    if debug:
258        dprint(
259            (
260                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
261                    if _password is not None else engine_str
262            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
263        )
264
265    _kw_copy = copy.deepcopy(kw)
266
267    ### NOTE: Order of inheritance:
268    ###       1. Defaults
269    ###       2. System configuration
270    ###       3. Connector configuration
271    ###       4. Keyword arguments
272    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
273    def _apply_create_engine_args(update):
274        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
275            _create_engine_args.update(
276                { k: v for k, v in update.items()
277                    if 'omit_create_engine' not in flavor_configs[self.flavor]
278                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
279                }
280            )
281    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
282    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
283    _apply_create_engine_args(_kw_copy)
284
285    try:
286        engine = sqlalchemy.create_engine(
287            engine_str,
288            ### I know this looks confusing, and maybe it's bad code,
289            ### but it's simple. It dynamically parses the config string
290            ### and splits it to separate the class name (QueuePool)
291            ### from the module name (sqlalchemy.pool).
292            poolclass    = getattr(
293                attempt_import(
294                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
295                ),
296                self._sys_config['poolclass'].split('.')[-1]
297            ),
298            echo         = debug,
299            **_create_engine_args
300        )
301    except Exception as e:
302        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
303        engine = None
304
305    if include_uri:
306        return engine, engine_str
307    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 26def read(
 27    self,
 28    query_or_table: Union[str, sqlalchemy.Query],
 29    params: Union[Dict[str, Any], List[str], None] = None,
 30    dtype: Optional[Dict[str, Any]] = None,
 31    coerce_float: bool = True,
 32    chunksize: Optional[int] = -1,
 33    workers: Optional[int] = None,
 34    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 35    as_hook_results: bool = False,
 36    chunks: Optional[int] = None,
 37    schema: Optional[str] = None,
 38    as_chunks: bool = False,
 39    as_iterator: bool = False,
 40    as_dask: bool = False,
 41    index_col: Optional[str] = None,
 42    silent: bool = False,
 43    debug: bool = False,
 44    **kw: Any
 45) -> Union[
 46    pandas.DataFrame,
 47    dask.DataFrame,
 48    List[pandas.DataFrame],
 49    List[Any],
 50    None,
 51]:
 52    """
 53    Read a SQL query or table into a pandas dataframe.
 54
 55    Parameters
 56    ----------
 57    query_or_table: Union[str, sqlalchemy.Query]
 58        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 59
 60    params: Optional[Dict[str, Any]], default None
 61        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 62        See the pandas documentation for more information:
 63        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 64
 65    dtype: Optional[Dict[str, Any]], default None
 66        A dictionary of data types to pass to `pandas.read_sql()`.
 67        See the pandas documentation for more information:
 68        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 69
 70    chunksize: Optional[int], default -1
 71        How many chunks to read at a time. `None` will read everything in one large chunk.
 72        Defaults to system configuration.
 73
 74        **NOTE:** DuckDB does not allow for chunking.
 75
 76    workers: Optional[int], default None
 77        How many threads to use when consuming the generator.
 78        Only applies if `chunk_hook` is provided.
 79
 80    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 81        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 82        See `--sync-chunks` for an example.
 83        **NOTE:** `as_iterator` MUST be False (default).
 84
 85    as_hook_results: bool, default False
 86        If `True`, return a `List` of the outputs of the hook function.
 87        Only applicable if `chunk_hook` is not None.
 88
 89        **NOTE:** `as_iterator` MUST be `False` (default).
 90
 91    chunks: Optional[int], default None
 92        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 93        return into a single dataframe.
 94        For example, to limit the returned dataframe to 100,000 rows,
 95        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 96
 97    schema: Optional[str], default None
 98        If just a table name is provided, optionally specify the table schema.
 99        Defaults to `SQLConnector.schema`.
100
101    as_chunks: bool, default False
102        If `True`, return a list of DataFrames.
103        Otherwise return a single DataFrame.
104
105    as_iterator: bool, default False
106        If `True`, return the pandas DataFrame iterator.
107        `chunksize` must not be `None` (falls back to 1000 if so),
108        and hooks are not called in this case.
109
110    index_col: Optional[str], default None
111        If using Dask, use this column as the index column.
112        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
113
114    silent: bool, default False
115        If `True`, don't raise warnings in case of errors.
116        Defaults to `False`.
117
118    Returns
119    -------
120    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
121    or `None` if something breaks.
122
123    """
124    if chunks is not None and chunks <= 0:
125        return []
126    from meerschaum.utils.sql import sql_item_name, truncate_item_name
127    from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS
128    from meerschaum.utils.packages import attempt_import, import_pandas
129    from meerschaum.utils.pool import get_pool
130    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
131    import warnings
132    import traceback
133    from decimal import Decimal
134    pd = import_pandas()
135    dd = None
136    is_dask = 'dask' in pd.__name__
137    pd = attempt_import('pandas')
138    is_dask = dd is not None
139    npartitions = chunksize_to_npartitions(chunksize)
140    if is_dask:
141        chunksize = None
142    schema = schema or self.schema
143
144    pool = get_pool(workers=workers)
145    sqlalchemy = attempt_import("sqlalchemy")
146    default_chunksize = self._sys_config.get('chunksize', None)
147    chunksize = chunksize if chunksize != -1 else default_chunksize
148    if chunksize is None and as_iterator:
149        if not silent and self.flavor not in _disallow_chunks_flavors:
150            warn(
151                "An iterator may only be generated if chunksize is not None.\n"
152                + "Falling back to a chunksize of 1000.", stacklevel=3,
153            )
154        chunksize = 1000
155    if chunksize is not None and self.flavor in _max_chunks_flavors:
156        if chunksize > _max_chunks_flavors[self.flavor]:
157            if chunksize != default_chunksize:
158                warn(
159                    f"The specified chunksize of {chunksize} exceeds the maximum of "
160                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
161                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
162                    stacklevel=3,
163                )
164            chunksize = _max_chunks_flavors[self.flavor]
165
166    ### NOTE: A bug in duckdb_engine does not allow for chunks.
167    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
168        chunksize = None
169
170    if debug:
171        import time
172        start = time.perf_counter()
173        dprint(f"[{self}]\n{query_or_table}")
174        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
175
176    ### This might be sqlalchemy object or the string of a table name.
177    ### We check for spaces and quotes to see if it might be a weird table.
178    if (
179        ' ' not in str(query_or_table)
180        or (
181            ' ' in str(query_or_table)
182            and str(query_or_table).startswith('"')
183            and str(query_or_table).endswith('"')
184        )
185    ):
186        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
187        if truncated_table_name != str(query_or_table) and not silent:
188            warn(
189                f"Table '{query_or_table}' is too long for '{self.flavor}',"
190                + f" will instead read the table '{truncated_table_name}'."
191            )
192
193        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
194        if debug:
195            dprint(f"[{self}] Reading from table {query_or_table}")
196        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
197        str_query = f"SELECT * FROM {query_or_table}"
198    else:
199        str_query = query_or_table
200
201    formatted_query = (
202        sqlalchemy.text(str_query)
203        if not is_dask and isinstance(str_query, str)
204        else format_sql_query_for_dask(str_query)
205    )
206
207    chunk_list = []
208    chunk_hook_results = []
209    def _process_chunk(_chunk, _retry_on_failure: bool = True):
210        if not as_hook_results:
211            chunk_list.append(_chunk)
212        if chunk_hook is None:
213            return None
214
215        result = None
216        try:
217            result = chunk_hook(
218                _chunk,
219                workers=workers,
220                chunksize=chunksize,
221                debug=debug,
222                **kw
223            )
224        except Exception:
225            result = False, traceback.format_exc()
226            from meerschaum.utils.formatting import get_console
227            if not silent:
228                get_console().print_exception()
229
230        ### If the chunk fails to process, try it again one more time.
231        if isinstance(result, tuple) and result[0] is False:
232            if _retry_on_failure:
233                return _process_chunk(_chunk, _retry_on_failure=False)
234
235        return result
236
237    try:
238        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
239        with warnings.catch_warnings():
240            warnings.filterwarnings('ignore', 'case sensitivity issues')
241
242            read_sql_query_kwargs = {
243                'params': params,
244                'dtype': dtype,
245                'coerce_float': coerce_float,
246                'index_col': index_col,
247            }
248            if is_dask:
249                if index_col is None:
250                    dd = None
251                    pd = attempt_import('pandas')
252                    read_sql_query_kwargs.update({
253                        'chunksize': chunksize,
254                    })
255            else:
256                read_sql_query_kwargs.update({
257                    'chunksize': chunksize,
258                })
259
260            if is_dask and dd is not None:
261                ddf = dd.read_sql_query(
262                    formatted_query,
263                    self.URI,
264                    **read_sql_query_kwargs
265                )
266            else:
267
268                def get_chunk_generator(connectable):
269                    chunk_generator = pd.read_sql_query(
270                        formatted_query,
271                        self.engine,
272                        **read_sql_query_kwargs
273                    )
274                    to_return = (
275                        chunk_generator
276                        if as_iterator or chunksize is None
277                        else (
278                            list(pool.imap(_process_chunk, chunk_generator))
279                            if as_hook_results
280                            else None
281                        )
282                    )
283                    return chunk_generator, to_return
284
285                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
286                    chunk_generator, to_return = get_chunk_generator(self.engine)
287                else:
288                    with self.engine.begin() as transaction:
289                        with transaction.execution_options(stream_results=stream_results) as connection:
290                            chunk_generator, to_return = get_chunk_generator(connection)
291
292                if to_return is not None:
293                    return to_return
294
295    except Exception as e:
296        if debug:
297            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
298        if not silent:
299            warn(str(e), stacklevel=3)
300        from meerschaum.utils.formatting import get_console
301        if not silent:
302            get_console().print_exception()
303
304        return None
305
306    if is_dask and dd is not None:
307        ddf = ddf.reset_index()
308        return ddf
309
310    chunk_list = []
311    read_chunks = 0
312    chunk_hook_results = []
313    if chunksize is None:
314        chunk_list.append(chunk_generator)
315    elif as_iterator:
316        return chunk_generator
317    else:
318        try:
319            for chunk in chunk_generator:
320                if chunk_hook is not None:
321                    chunk_hook_results.append(
322                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
323                    )
324                chunk_list.append(chunk)
325                read_chunks += 1
326                if chunks is not None and read_chunks >= chunks:
327                    break
328        except Exception as e:
329            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
330            from meerschaum.utils.formatting import get_console
331            if not silent:
332                get_console().print_exception()
333
334    read_chunks = 0
335    try:
336        for chunk in chunk_generator:
337            if chunk_hook is not None:
338                chunk_hook_results.append(
339                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
340                )
341            chunk_list.append(chunk)
342            read_chunks += 1
343            if chunks is not None and read_chunks >= chunks:
344                break
345    except Exception as e:
346        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
347        from meerschaum.utils.formatting import get_console
348        if not silent:
349            get_console().print_exception()
350
351        return None
352
353    ### If no chunks returned, read without chunks
354    ### to get columns
355    if len(chunk_list) == 0:
356        with warnings.catch_warnings():
357            warnings.filterwarnings('ignore', 'case sensitivity issues')
358            _ = read_sql_query_kwargs.pop('chunksize', None)
359            with self.engine.begin() as connection:
360                chunk_list.append(
361                    pd.read_sql_query(
362                        formatted_query,
363                        connection,
364                        **read_sql_query_kwargs
365                    )
366                )
367
368    ### call the hook on any missed chunks.
369    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
370        for c in chunk_list[len(chunk_hook_results):]:
371            chunk_hook_results.append(
372                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
373            )
374
375    ### chunksize is not None so must iterate
376    if debug:
377        end = time.perf_counter()
378        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
379
380    if as_hook_results:
381        return chunk_hook_results
382    
383    ### Skip `pd.concat()` if `as_chunks` is specified.
384    if as_chunks:
385        for c in chunk_list:
386            c.reset_index(drop=True, inplace=True)
387            for col in get_numeric_cols(c):
388                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
389        return chunk_list
390
391    df = pd.concat(chunk_list).reset_index(drop=True)
392    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
393    for col in get_numeric_cols(df):
394        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
395
396    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
399def value(
400    self,
401    query: str,
402    *args: Any,
403    use_pandas: bool = False,
404    **kw: Any
405) -> Any:
406    """
407    Execute the provided query and return the first value.
408
409    Parameters
410    ----------
411    query: str
412        The SQL query to execute.
413        
414    *args: Any
415        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
416        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
417        
418    use_pandas: bool, default False
419        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
420        `meerschaum.connectors.sql.SQLConnector.exec` (default).
421        **NOTE:** This is always `True` for DuckDB.
422
423    **kw: Any
424        See `args`.
425
426    Returns
427    -------
428    Any value returned from the query.
429
430    """
431    from meerschaum.utils.packages import attempt_import
432    sqlalchemy = attempt_import('sqlalchemy')
433    if self.flavor == 'duckdb':
434        use_pandas = True
435    if use_pandas:
436        try:
437            return self.read(query, *args, **kw).iloc[0, 0]
438        except Exception:
439            return None
440
441    _close = kw.get('close', True)
442    _commit = kw.get('commit', (self.flavor != 'mssql'))
443
444    #  _close = True
445    #  _commit = True
446
447    try:
448        result, connection = self.exec(
449            query,
450            *args,
451            with_connection=True,
452            close=False,
453            commit=_commit,
454            **kw
455        )
456        first = result.first() if result is not None else None
457        _val = first[0] if first is not None else None
458    except Exception as e:
459        warn(e, stacklevel=3)
460        return None
461    if _close:
462        try:
463            connection.close()
464        except Exception as e:
465            warn("Failed to close connection with exception:\n" + str(e))
466    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
480def exec(
481    self,
482    query: str,
483    *args: Any,
484    silent: bool = False,
485    debug: bool = False,
486    commit: Optional[bool] = None,
487    close: Optional[bool] = None,
488    with_connection: bool = False,
489    **kw: Any
490) -> Union[
491        sqlalchemy.engine.result.resultProxy,
492        sqlalchemy.engine.cursor.LegacyCursorResult,
493        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
494        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
495        None
496]:
497    """
498    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
499    
500    If inserting data, please use bind variables to avoid SQL injection!
501
502    Parameters
503    ----------
504    query: Union[str, List[str], Tuple[str]]
505        The query to execute.
506        If `query` is a list or tuple, call `self.exec_queries()` instead.
507
508    args: Any
509        Arguments passed to `sqlalchemy.engine.execute`.
510
511    silent: bool, default False
512        If `True`, suppress warnings.
513
514    commit: Optional[bool], default None
515        If `True`, commit the changes after execution.
516        Causes issues with flavors like `'mssql'`.
517        This does not apply if `query` is a list of strings.
518
519    close: Optional[bool], default None
520        If `True`, close the connection after execution.
521        Causes issues with flavors like `'mssql'`.
522        This does not apply if `query` is a list of strings.
523
524    with_connection: bool, default False
525        If `True`, return a tuple including the connection object.
526        This does not apply if `query` is a list of strings.
527
528    Returns
529    -------
530    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
531
532    """
533    if isinstance(query, (list, tuple)):
534        return self.exec_queries(
535            list(query),
536            *args,
537            silent=silent,
538            debug=debug,
539            **kw
540        )
541
542    from meerschaum.utils.packages import attempt_import
543    sqlalchemy = attempt_import("sqlalchemy")
544    if debug:
545        dprint(f"[{self}] Executing query:\n{query}")
546
547    _close = close if close is not None else (self.flavor != 'mssql')
548    _commit = commit if commit is not None else (
549        (self.flavor != 'mssql' or 'select' not in str(query).lower())
550    )
551
552    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
553    if not hasattr(query, 'compile'):
554        query = sqlalchemy.text(query)
555
556    connection = self.get_connection()
557
558    try:
559        transaction = connection.begin() if _commit else None
560    except sqlalchemy.exc.InvalidRequestError:
561        connection = self.get_connection(rebuild=True)
562        transaction = connection.begin()
563
564    if transaction is not None and not transaction.is_active:
565        connection = self.get_connection(rebuild=True)
566        transaction = connection.begin() if _commit else None
567
568    result = None
569    try:
570        result = connection.execute(query, *args, **kw)
571        if _commit:
572            transaction.commit()
573    except Exception as e:
574        if debug:
575            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
576        if not silent:
577            warn(str(e), stacklevel=3)
578        result = None
579        if _commit:
580            transaction.rollback()
581            connection = self.get_connection(rebuild=True)
582    finally:
583        if _close:
584            connection.close()
585
586    if with_connection:
587        return result, connection
588
589    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
469def execute(
470    self,
471    *args : Any,
472    **kw : Any
473) -> Optional[sqlalchemy.engine.result.resultProxy]:
474    """
475    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
476    """
477    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) -> Union[bool, Tuple[bool, str]]:
686def to_sql(
687    self,
688    df: pandas.DataFrame,
689    name: str = None,
690    index: bool = False,
691    if_exists: str = 'replace',
692    method: str = "",
693    chunksize: Optional[int] = -1,
694    schema: Optional[str] = None,
695    silent: bool = False,
696    debug: bool = False,
697    as_tuple: bool = False,
698    as_dict: bool = False,
699    **kw
700) -> Union[bool, SuccessTuple]:
701    """
702    Upload a DataFrame's contents to the SQL server.
703
704    Parameters
705    ----------
706    df: pd.DataFrame
707        The DataFrame to be uploaded.
708
709    name: str
710        The name of the table to be created.
711
712    index: bool, default False
713        If True, creates the DataFrame's indices as columns.
714
715    if_exists: str, default 'replace'
716        Drop and create the table ('replace') or append if it exists
717        ('append') or raise Exception ('fail').
718        Options are ['replace', 'append', 'fail'].
719
720    method: str, default ''
721        None or multi. Details on pandas.to_sql.
722
723    chunksize: Optional[int], default -1
724        How many rows to insert at a time.
725
726    schema: Optional[str], default None
727        Optionally override the schema for the table.
728        Defaults to `SQLConnector.schema`.
729
730    as_tuple: bool, default False
731        If `True`, return a (success_bool, message) tuple instead of a `bool`.
732        Defaults to `False`.
733
734    as_dict: bool, default False
735        If `True`, return a dictionary of transaction information.
736        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
737        `method`, and `target`.
738
739    kw: Any
740        Additional arguments will be passed to the DataFrame's `to_sql` function
741
742    Returns
743    -------
744    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
745    """
746    import time
747    import json
748    import decimal
749    from decimal import Decimal, Context
750    from meerschaum.utils.warnings import error, warn
751    import warnings
752    import functools
753    if name is None:
754        error(f"Name must not be `None` to insert data into {self}.")
755
756    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
757    kw.pop('name', None)
758
759    schema = schema or self.schema
760
761    from meerschaum.utils.sql import (
762        sql_item_name,
763        table_exists,
764        json_flavors,
765        truncate_item_name,
766        DROP_IF_EXISTS_FLAVORS,
767    )
768    from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols
769    from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal
770    from meerschaum.utils.dtypes.sql import (
771        NUMERIC_PRECISION_FLAVORS,
772        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
773    )
774    from meerschaum.connectors.sql._create_engine import flavor_configs
775    from meerschaum.utils.packages import attempt_import, import_pandas
776    sqlalchemy = attempt_import('sqlalchemy', debug=debug)
777    pd = import_pandas()
778    is_dask = 'dask' in df.__module__
779
780    stats = {'target': name, }
781    ### resort to defaults if None
782    if method == "":
783        if self.flavor in _bulk_flavors:
784            method = functools.partial(psql_insert_copy, schema=self.schema)
785        else:
786            ### Should resolve to 'multi' or `None`.
787            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
788    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
789
790    default_chunksize = self._sys_config.get('chunksize', None)
791    chunksize = chunksize if chunksize != -1 else default_chunksize
792    if chunksize is not None and self.flavor in _max_chunks_flavors:
793        if chunksize > _max_chunks_flavors[self.flavor]:
794            if chunksize != default_chunksize:
795                warn(
796                    f"The specified chunksize of {chunksize} exceeds the maximum of "
797                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
798                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
799                    stacklevel = 3,
800                )
801            chunksize = _max_chunks_flavors[self.flavor]
802    stats['chunksize'] = chunksize
803
804    success, msg = False, "Default to_sql message"
805    start = time.perf_counter()
806    if debug:
807        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
808        print(msg, end="", flush=True)
809    stats['num_rows'] = len(df)
810
811    ### Check if the name is too long.
812    truncated_name = truncate_item_name(name, self.flavor)
813    if name != truncated_name:
814        warn(
815            f"Table '{name}' is too long for '{self.flavor}',"
816            + f" will instead create the table '{truncated_name}'."
817        )
818
819    ### filter out non-pandas args
820    import inspect
821    to_sql_params = inspect.signature(df.to_sql).parameters
822    to_sql_kw = {}
823    for k, v in kw.items():
824        if k in to_sql_params:
825            to_sql_kw[k] = v
826
827    to_sql_kw.update({
828        'name': truncated_name,
829        'schema': schema,
830        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
831        'index': index,
832        'if_exists': if_exists,
833        'method': method,
834        'chunksize': chunksize,
835    })
836    if is_dask:
837        to_sql_kw.update({
838            'parallel': True,
839        })
840
841    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
842    if self.flavor == 'oracle':
843        ### For some reason 'replace' doesn't work properly in pandas,
844        ### so try dropping first.
845        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
846            success = self.exec(
847                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
848            ) is not None
849            if not success:
850                warn(f"Unable to drop {name}")
851
852
853        ### Enforce NVARCHAR(2000) as text instead of CLOB.
854        dtype = to_sql_kw.get('dtype', {})
855        for col, typ in df.dtypes.items():
856            if are_dtypes_equal(str(typ), 'object'):
857                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
858            elif are_dtypes_equal(str(typ), 'int'):
859                dtype[col] = sqlalchemy.types.INTEGER
860        to_sql_kw['dtype'] = dtype
861    elif self.flavor == 'mssql':
862        dtype = to_sql_kw.get('dtype', {})
863        for col, typ in df.dtypes.items():
864            if are_dtypes_equal(str(typ), 'bool'):
865                dtype[col] = sqlalchemy.types.INTEGER
866        to_sql_kw['dtype'] = dtype
867
868    ### Check for JSON columns.
869    if self.flavor not in json_flavors:
870        json_cols = get_json_cols(df)
871        if json_cols:
872            for col in json_cols:
873                df[col] = df[col].apply(
874                    (
875                        lambda x: json.dumps(x, default=str, sort_keys=True)
876                        if not isinstance(x, Hashable)
877                        else x
878                    )
879                )
880
881    ### Check for numeric columns.
882    numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None))
883    if numeric_precision is not None and numeric_scale is not None:
884        numeric_cols = get_numeric_cols(df)
885        for col in numeric_cols:
886            df[col] = df[col].apply(
887                lambda x: (
888                    quantize_decimal(x, numeric_scale, numeric_precision)
889                    if isinstance(x, Decimal)
890                    else x
891                )
892            )
893
894    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
895        uuid_cols = get_uuid_cols(df)
896        for col in uuid_cols:
897            df[col] = df[col].astype(str)
898
899    try:
900        with warnings.catch_warnings():
901            warnings.filterwarnings('ignore', 'case sensitivity issues')
902            df.to_sql(**to_sql_kw)
903        success = True
904    except Exception as e:
905        if not silent:
906            warn(str(e))
907        success, msg = False, str(e)
908
909    end = time.perf_counter()
910    if success:
911        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
912    stats['start'] = start
913    stats['end'] = end
914    stats['duration'] = end - start
915
916    if debug:
917        print(f" done.", flush=True)
918        dprint(msg)
919
920    stats['success'] = success
921    stats['msg'] = msg
922    if as_tuple:
923        return success, msg
924    if as_dict:
925        return stats
926    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be uploaded.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[sqlalchemy.engine.cursor.LegacyCursorResult]':
592def exec_queries(
593    self,
594    queries: List[
595        Union[
596            str,
597            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
598        ]
599    ],
600    break_on_error: bool = False,
601    rollback: bool = True,
602    silent: bool = False,
603    debug: bool = False,
604) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
605    """
606    Execute a list of queries in a single transaction.
607
608    Parameters
609    ----------
610    queries: List[
611        Union[
612            str,
613            Tuple[str, Callable[[], List[str]]]
614        ]
615    ]
616        The queries in the transaction to be executed.
617        If a query is a tuple, the second item of the tuple
618        will be considered a callable hook that returns a list of queries to be executed
619        before the next item in the list.
620
621    break_on_error: bool, default True
622        If `True`, stop executing when a query fails.
623
624    rollback: bool, default True
625        If `break_on_error` is `True`, rollback the transaction if a query fails.
626
627    silent: bool, default False
628        If `True`, suppress warnings.
629
630    Returns
631    -------
632    A list of SQLAlchemy results.
633    """
634    from meerschaum.utils.warnings import warn
635    from meerschaum.utils.debug import dprint
636    from meerschaum.utils.packages import attempt_import
637    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm')
638    session = sqlalchemy_orm.Session(self.engine)
639
640    result = None
641    results = []
642    with session.begin():
643        for query in queries:
644            hook = None
645            result = None
646
647            if isinstance(query, tuple):
648                query, hook = query
649            if isinstance(query, str):
650                query = sqlalchemy.text(query)
651
652            if debug:
653                dprint(f"[{self}]\n" + str(query))
654
655            try:
656                result = session.execute(query)
657                session.flush()
658            except Exception as e:
659                msg = (f"Encountered error while executing:\n{e}")
660                if not silent:
661                    warn(msg)
662                elif debug:
663                    dprint(f"[{self}]\n" + str(msg))
664                result = None
665            if result is None and break_on_error:
666                if rollback:
667                    session.rollback()
668                break
669            elif result is not None and hook is not None:
670                hook_queries = hook(session)
671                if hook_queries:
672                    hook_results = self.exec_queries(
673                        hook_queries,
674                        break_on_error = break_on_error,
675                        rollback=rollback,
676                        silent=silent,
677                        debug=debug,
678                    )
679                    result = (result, hook_results)
680
681            results.append(result)
682
683    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default True): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1025def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1026    """
1027    Return the current alive connection.
1028
1029    Parameters
1030    ----------
1031    rebuild: bool, default False
1032        If `True`, close the previous connection and open a new one.
1033
1034    Returns
1035    -------
1036    A `sqlalchemy.engine.base.Connection` object.
1037    """
1038    import threading
1039    if '_thread_connections' not in self.__dict__:
1040        self.__dict__['_thread_connections'] = {}
1041
1042    self._cleanup_connections()
1043
1044    thread_id = threading.get_ident()
1045
1046    thread_connections = self.__dict__.get('_thread_connections', {})
1047    connection = thread_connections.get(thread_id, None)
1048
1049    if rebuild and connection is not None:
1050        try:
1051            connection.close()
1052        except Exception:
1053            pass
1054
1055        _ = thread_connections.pop(thread_id, None)
1056        connection = None
1057
1058    if connection is None or connection.closed:
1059        connection = self.engine.connect()
1060        thread_connections[thread_id] = connection
1061
1062    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
428def test_connection(
429        self,
430        **kw: Any
431    ) -> Union[bool, None]:
432    """
433    Test if a successful connection to the database may be made.
434
435    Parameters
436    ----------
437    **kw:
438        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
439
440    Returns
441    -------
442    `True` if a connection is made, otherwise `False` or `None` in case of failure.
443
444    """
445    import warnings
446    from meerschaum.connectors.poll import retry_connect
447    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
448    _default_kw.update(kw)
449    with warnings.catch_warnings():
450        warnings.filterwarnings('ignore', 'Could not')
451        try:
452            return retry_connect(**_default_kw)
453        except Exception as e:
454            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunk_hook: "Optional[Callable[['pd.DataFrame'], Any]]" = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
 17def fetch(
 18    self,
 19    pipe: mrsm.Pipe,
 20    begin: Union[datetime, int, str, None] = '',
 21    end: Union[datetime, int, str, None] = None,
 22    check_existing: bool = True,
 23    chunk_hook: Optional[Callable[['pd.DataFrame'], Any]] = None,
 24    chunksize: Optional[int] = -1,
 25    workers: Optional[int] = None,
 26    debug: bool = False,
 27    **kw: Any
 28) -> Union['pd.DataFrame', List[Any], None]:
 29    """Execute the SQL definition and return a Pandas DataFrame.
 30
 31    Parameters
 32    ----------
 33    pipe: mrsm.Pipe
 34        The pipe object which contains the `fetch` metadata.
 35
 36        - pipe.columns['datetime']: str
 37            - Name of the datetime column for the remote table.
 38        - pipe.parameters['fetch']: Dict[str, Any]
 39            - Parameters necessary to execute a query.
 40        - pipe.parameters['fetch']['definition']: str
 41            - Raw SQL query to execute to generate the pandas DataFrame.
 42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
 43            - How many minutes before `begin` to search for data (*optional*).
 44
 45    begin: Union[datetime, int, str, None], default None
 46        Most recent datatime to search for data.
 47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
 48
 49    end: Union[datetime, int, str, None], default None
 50        The latest datetime to search for data.
 51        If `end` is `None`, do not bound 
 52
 53    check_existing: bool, defult True
 54        If `False`, use a backtrack interval of 0 minutes.
 55
 56    chunk_hook: Callable[[pd.DataFrame], Any], default None
 57        A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame.
 58
 59    chunksize: Optional[int], default -1
 60        How many rows to load into memory at once (when `chunk_hook` is provided).
 61        Otherwise the entire result set is loaded into memory.
 62
 63    workers: Optional[int], default None
 64        How many threads to use when consuming the generator (when `chunk_hook is provided).
 65        Defaults to the number of cores.
 66
 67    debug: bool, default False
 68        Verbosity toggle.
 69
 70    Returns
 71    -------
 72    A pandas DataFrame or `None`.
 73    If `chunk_hook` is not None, return a list of the hook function's results.
 74    """
 75    meta_def = self.get_pipe_metadef(
 76        pipe,
 77        begin=begin,
 78        end=end,
 79        check_existing=check_existing,
 80        debug=debug,
 81        **kw
 82    )
 83    as_hook_results = chunk_hook is not None
 84    chunks = self.read(
 85        meta_def,
 86        chunk_hook=chunk_hook,
 87        as_hook_results=as_hook_results,
 88        chunksize=chunksize,
 89        workers=workers,
 90        debug=debug,
 91    )
 92    ### if sqlite, parse for datetimes
 93    if not as_hook_results and self.flavor == 'sqlite':
 94        from meerschaum.utils.misc import parse_df_datetimes
 95        ignore_cols = [
 96            col
 97            for col, dtype in pipe.dtypes.items()
 98            if 'datetime' not in str(dtype)
 99        ]
100        return (
101            parse_df_datetimes(
102                chunk,
103                ignore_cols=ignore_cols,
104                debug=debug,
105            )
106            for chunk in chunks
107        )
108    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunk_hook (Callable[[pd.DataFrame], Any], default None): A function to pass to SQLConnector.read() that accepts a Pandas DataFrame.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once (when chunk_hook is provided). Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame or None.
  • If chunk_hook is not None, return a list of the hook function's results.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
111def get_pipe_metadef(
112    self,
113    pipe: mrsm.Pipe,
114    params: Optional[Dict[str, Any]] = None,
115    begin: Union[datetime, int, str, None] = '',
116    end: Union[datetime, int, str, None] = None,
117    check_existing: bool = True,
118    debug: bool = False,
119    **kw: Any
120) -> Union[str, None]:
121    """
122    Return a pipe's meta definition fetch query.
123
124    params: Optional[Dict[str, Any]], default None
125        Optional params dictionary to build the `WHERE` clause.
126        See `meerschaum.utils.sql.build_where`.
127
128    begin: Union[datetime, int, str, None], default None
129        Most recent datatime to search for data.
130        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
131
132    end: Union[datetime, int, str, None], default None
133        The latest datetime to search for data.
134        If `end` is `None`, do not bound 
135
136    check_existing: bool, default True
137        If `True`, apply the backtrack interval.
138
139    debug: bool, default False
140        Verbosity toggle.
141
142    Returns
143    -------
144    A pipe's meta definition fetch query string.
145    """
146    from meerschaum.utils.debug import dprint
147    from meerschaum.utils.warnings import warn, error
148    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
149    from meerschaum.utils.misc import is_int
150    from meerschaum.config import get_config
151
152    definition = get_pipe_query(pipe)
153
154    if not pipe.columns.get('datetime', None):
155        _dt = pipe.guess_datetime()
156        dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None
157        is_guess = True
158    else:
159        _dt = pipe.get_columns('datetime')
160        dt_name = sql_item_name(_dt, self.flavor, None)
161        is_guess = False
162
163    if begin not in (None, '') or end is not None:
164        if is_guess:
165            if _dt is None:
166                warn(
167                    f"Unable to determine a datetime column for {pipe}."
168                    + "\n    Ignoring begin and end...",
169                    stack = False,
170                )
171                begin, end = '', None
172            else:
173                warn(
174                    f"A datetime wasn't specified for {pipe}.\n"
175                    + f"    Using column \"{_dt}\" for datetime bounds...",
176                    stack = False
177                )
178
179    apply_backtrack = begin == '' and check_existing
180    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
181    btm = (
182        int(backtrack_interval.total_seconds() / 60)
183        if isinstance(backtrack_interval, timedelta)
184        else backtrack_interval
185    )
186    begin = (
187        pipe.get_sync_time(debug=debug)
188        if begin == ''
189        else begin
190    )
191
192    if begin and end and begin >= end:
193        begin = None
194
195    if dt_name:
196        begin_da = (
197            dateadd_str(
198                flavor=self.flavor,
199                datepart='minute',
200                number=((-1 * btm) if apply_backtrack else 0),
201                begin=begin,
202            )
203            if begin
204            else None
205        )
206        end_da = (
207            dateadd_str(
208                flavor=self.flavor,
209                datepart='minute',
210                number=0,
211                begin=end,
212            )
213            if end
214            else None
215        )
216
217    meta_def = (
218        _simple_fetch_query(pipe, self.flavor) if (
219            (not (pipe.columns or {}).get('id', None))
220            or (not get_config('system', 'experimental', 'join_fetch'))
221        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
222    )
223
224    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
225    if dt_name and (begin_da or end_da):
226        definition_dt_name = (
227            dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
228            if not is_int((begin_da or end_da))
229            else f"definition.{dt_name}"
230        )
231        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
232        has_where = True
233        if begin_da:
234            meta_def += f"{definition_dt_name} >= {begin_da}"
235        if begin_da and end_da:
236            meta_def += " AND "
237        if end_da:
238            meta_def += f"{definition_dt_name} < {end_da}"
239
240    if params is not None:
241        params_where = build_where(params, self, with_where=False)
242        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
243        has_where = True
244        meta_def += params_where
245
246    return meta_def

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
35def cli(
36        self,
37        debug: bool = False,
38    ) -> SuccessTuple:
39    """
40    Launch a subprocess for an interactive CLI.
41    """
42    from meerschaum.utils.venv import venv_exec
43    env = copy.deepcopy(dict(os.environ))
44    env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta)
45    cli_code = (
46        "import sys\n"
47        "import meerschaum as mrsm\n"
48        f"conn = mrsm.get_connector('sql:{self.label}')\n"
49        "success, msg = conn._cli_exit()\n"
50        "mrsm.pprint((success, msg))\n"
51        "if not success:\n"
52        "    raise Exception(msg)"
53    )
54    try:
55        _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False)
56    except Exception as e:
57        return False, f"[{self}] Failed to start CLI:\n{e}"
58    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
144def fetch_pipes_keys(
145    self,
146    connector_keys: Optional[List[str]] = None,
147    metric_keys: Optional[List[str]] = None,
148    location_keys: Optional[List[str]] = None,
149    tags: Optional[List[str]] = None,
150    params: Optional[Dict[str, Any]] = None,
151    debug: bool = False
152) -> Optional[List[Tuple[str, str, Optional[str]]]]:
153    """
154    Return a list of tuples corresponding to the parameters provided.
155
156    Parameters
157    ----------
158    connector_keys: Optional[List[str]], default None
159        List of connector_keys to search by.
160
161    metric_keys: Optional[List[str]], default None
162        List of metric_keys to search by.
163
164    location_keys: Optional[List[str]], default None
165        List of location_keys to search by.
166
167    params: Optional[Dict[str, Any]], default None
168        Dictionary of additional parameters to search by.
169        E.g. `--params pipe_id:1`
170
171    debug: bool, default False
172        Verbosity toggle.
173    """
174    from meerschaum.utils.debug import dprint
175    from meerschaum.utils.packages import attempt_import
176    from meerschaum.utils.misc import separate_negation_values, flatten_list
177    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
178    from meerschaum.config.static import STATIC_CONFIG
179    import json
180    from copy import deepcopy
181    sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions')
182    coalesce = sqlalchemy_sql_functions.coalesce
183
184    if connector_keys is None:
185        connector_keys = []
186    if metric_keys is None:
187        metric_keys = []
188    if location_keys is None:
189        location_keys = []
190    else:
191        location_keys = [
192            (
193                lk
194                if lk not in ('[None]', 'None', 'null')
195                else 'None'
196            )
197            for lk in location_keys
198        ]
199    if tags is None:
200        tags = []
201
202    if params is None:
203        params = {}
204
205    ### Add three primary keys to params dictionary
206    ###   (separated for convenience of arguments).
207    cols = {
208        'connector_keys': [str(ck) for ck in connector_keys],
209        'metric_key': [str(mk) for mk in metric_keys],
210        'location_key': [str(lk) for lk in location_keys],
211    }
212
213    ### Make deep copy so we don't mutate this somewhere else.
214    parameters = deepcopy(params)
215    for col, vals in cols.items():
216        if vals not in [[], ['*']]:
217            parameters[col] = vals
218
219    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
220        return []
221
222    from meerschaum.connectors.sql.tables import get_tables
223    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
224
225    _params = {}
226    for k, v in parameters.items():
227        _v = json.dumps(v) if isinstance(v, dict) else v
228        _params[k] = _v
229
230    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
231    ### Parse regular params.
232    ### If a param begins with '_', negate it instead.
233    _where = [
234        (
235            (coalesce(pipes_tbl.c[key], 'None') == val)
236            if not str(val).startswith(negation_prefix)
237            else (pipes_tbl.c[key] != key)
238        ) for key, val in _params.items()
239        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
240    ]
241    select_cols = (
242        [
243            pipes_tbl.c.connector_keys,
244            pipes_tbl.c.metric_key,
245            pipes_tbl.c.location_key,
246        ]
247    )
248
249    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
250    for c, vals in cols.items():
251        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c:
252            continue
253        _in_vals, _ex_vals = separate_negation_values(vals)
254        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
255        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
256
257    ### Finally, parse tags.
258    tag_groups = [tag.split(',') for tag in tags]
259    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
260
261    ors, nands = [], []
262    for _in_tags, _ex_tags in in_ex_tag_groups:
263        sub_ands = []
264        for nt in _in_tags:
265            sub_ands.append(
266                sqlalchemy.cast(
267                    pipes_tbl.c['parameters'],
268                    sqlalchemy.String,
269                ).like(f'%"tags":%"{nt}"%')
270            )
271        if sub_ands:
272            ors.append(sqlalchemy.and_(*sub_ands))
273
274        for xt in _ex_tags:
275            nands.append(
276                sqlalchemy.cast(
277                    pipes_tbl.c['parameters'],
278                    sqlalchemy.String,
279                ).not_like(f'%"tags":%"{xt}"%')
280            )
281
282    q = q.where(sqlalchemy.and_(*nands)) if nands else q
283    q = q.where(sqlalchemy.or_(*ors)) if ors else q
284    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
285    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
286        loc_asc = sqlalchemy.nullsfirst(loc_asc)
287    q = q.order_by(
288        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
289        sqlalchemy.asc(pipes_tbl.c['metric_key']),
290        loc_asc,
291    )
292
293    ### execute the query and return a list of tuples
294    if debug:
295        dprint(q.compile(compile_kwargs={'literal_binds': True}))
296    try:
297        rows = (
298            self.execute(q).fetchall()
299            if self.flavor != 'duckdb'
300            else [
301                (row['connector_keys'], row['metric_key'], row['location_key'])
302                for row in self.read(q).to_dict(orient='records')
303            ]
304        )
305    except Exception as e:
306        error(str(e))
307
308    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
311def create_indices(
312    self,
313    pipe: mrsm.Pipe,
314    indices: Optional[List[str]] = None,
315    debug: bool = False
316) -> bool:
317    """
318    Create a pipe's indices.
319    """
320    from meerschaum.utils.sql import sql_item_name, update_queries
321    from meerschaum.utils.debug import dprint
322    if debug:
323        dprint(f"Creating indices for {pipe}...")
324    if not pipe.columns:
325        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
326        return True
327
328    ix_queries = {
329        ix: queries
330        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
331        if indices is None or ix in indices
332    }
333    success = True
334    for ix, queries in ix_queries.items():
335        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
336        success = success and ix_success
337        if not ix_success:
338            warn(f"Failed to create index on column: {ix}")
339
340    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
343def drop_indices(
344    self,
345    pipe: mrsm.Pipe,
346    indices: Optional[List[str]] = None,
347    debug: bool = False
348) -> bool:
349    """
350    Drop a pipe's indices.
351    """
352    from meerschaum.utils.debug import dprint
353    if debug:
354        dprint(f"Dropping indices for {pipe}...")
355    if not pipe.columns:
356        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
357        return False
358    ix_queries = {
359        ix: queries
360        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
361        if indices is None or ix in indices
362    }
363    success = True
364    for ix, queries in ix_queries.items():
365        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
366        if not ix_success:
367            success = False
368            if debug:
369                dprint(f"Failed to drop index on column: {ix}")
370    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
373def get_create_index_queries(
374    self,
375    pipe: mrsm.Pipe,
376    debug: bool = False,
377) -> Dict[str, List[str]]:
378    """
379    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
380
381    Parameters
382    ----------
383    pipe: mrsm.Pipe
384        The pipe to which the queries will correspond.
385
386    Returns
387    -------
388    A dictionary of index names mapping to lists of queries.
389    """
390    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
391    if self.flavor == 'duckdb':
392        return {}
393    from meerschaum.utils.sql import (
394        sql_item_name,
395        get_distinct_col_count,
396        update_queries,
397        get_null_replacement,
398        COALESCE_UNIQUE_INDEX_FLAVORS,
399    )
400    from meerschaum.config import get_config
401    index_queries = {}
402
403    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries
404    index_names = pipe.get_indices()
405    indices = pipe.indices
406
407    _datetime = pipe.get_columns('datetime', error=False)
408    _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]')
409    _datetime_name = (
410        sql_item_name(_datetime, self.flavor, None)
411        if _datetime is not None else None
412    )
413    _datetime_index_name = (
414        sql_item_name(index_names['datetime'], self.flavor, None)
415        if index_names.get('datetime', None)
416        else None
417    )
418    _id = pipe.get_columns('id', error=False)
419    _id_name = (
420        sql_item_name(_id, self.flavor, None)
421        if _id is not None
422        else None
423    )
424
425    _id_index_name = (
426        sql_item_name(index_names['id'], self.flavor, None)
427        if index_names.get('id', None)
428        else None
429    )
430    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
431    _create_space_partition = get_config('system', 'experimental', 'space')
432
433    ### create datetime index
434    if _datetime is not None:
435        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
436            _id_count = (
437                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
438                if (_id is not None and _create_space_partition) else None
439            )
440
441            chunk_interval = pipe.get_chunk_interval(debug=debug)
442            chunk_interval_minutes = (
443                chunk_interval
444                if isinstance(chunk_interval, int)
445                else int(chunk_interval.total_seconds() / 60)
446            )
447            chunk_time_interval = (
448                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
449                if isinstance(chunk_interval, timedelta)
450                else f'{chunk_interval_minutes}'
451            )
452
453            dt_query = (
454                f"SELECT public.create_hypertable('{_pipe_name}', " +
455                f"'{_datetime}', "
456                + (
457                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
458                    else ''
459                )
460                + f'chunk_time_interval => {chunk_time_interval}, '
461                + 'if_not_exists => true, '
462                + "migrate_data => true);"
463            )
464        elif self.flavor == 'mssql':
465            dt_query = (
466                f"CREATE CLUSTERED INDEX {_datetime_index_name} "
467                f"ON {_pipe_name} ({_datetime_name})"
468            )
469        else: ### mssql, sqlite, etc.
470            dt_query = (
471                f"CREATE INDEX {_datetime_index_name} "
472                + f"ON {_pipe_name} ({_datetime_name})"
473            )
474
475        index_queries[_datetime] = [dt_query]
476
477    ### create id index
478    if _id_name is not None:
479        if self.flavor == 'timescaledb':
480            ### Already created indices via create_hypertable.
481            id_query = (
482                None if (_id is not None and _create_space_partition)
483                else (
484                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
485                    if _id is not None
486                    else None
487                )
488            )
489            pass
490        else: ### mssql, sqlite, etc.
491            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
492
493        if id_query is not None:
494            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
495
496    ### Create indices for other labels in `pipe.columns`.
497    other_index_names = {
498        ix_key: ix_unquoted
499        for ix_key, ix_unquoted in index_names.items()
500        if ix_key not in ('datetime', 'id')
501    }
502    for ix_key, ix_unquoted in other_index_names.items():
503        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
504        cols = indices[ix_key]
505        if not isinstance(cols, (list, tuple)):
506            cols = [cols]
507        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
508        if not cols_names:
509            continue
510        cols_names_str = ", ".join(cols_names)
511        index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"]
512
513    existing_cols_types = pipe.get_columns_types(debug=debug)
514    indices_cols_str = ', '.join(
515        [
516            sql_item_name(ix, self.flavor)
517            for ix_key, ix in pipe.columns.items()
518            if ix and ix in existing_cols_types
519        ]
520    )
521    coalesce_indices_cols_str = ', '.join(
522        [
523            (
524                "COALESCE("
525                + sql_item_name(ix, self.flavor)
526                + ", "
527                + get_null_replacement(existing_cols_types[ix], self.flavor)
528                + ") "
529            ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor))
530            for ix_key, ix in pipe.columns.items()
531            if ix and ix in existing_cols_types
532        ]
533    )
534    unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor)
535    constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor)
536    add_constraint_query = (
537        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
538    )
539    unique_index_cols_str = (
540        indices_cols_str
541        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS
542        else coalesce_indices_cols_str
543    )
544    create_unique_index_query = (
545        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
546    )
547    constraint_queries = [create_unique_index_query]
548    if self.flavor != 'sqlite':
549        constraint_queries.append(add_constraint_query)
550    if upsert and indices_cols_str:
551        index_queries[unique_index_name] = constraint_queries
552    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
555def get_drop_index_queries(
556    self,
557    pipe: mrsm.Pipe,
558    debug: bool = False,
559) -> Dict[str, List[str]]:
560    """
561    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
562
563    Parameters
564    ----------
565    pipe: mrsm.Pipe
566        The pipe to which the queries will correspond.
567
568    Returns
569    -------
570    A dictionary of column names mapping to lists of queries.
571    """
572    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
573    if self.flavor == 'duckdb':
574        return {}
575    if not pipe.exists(debug=debug):
576        return {}
577    from meerschaum.utils.sql import (
578        sql_item_name,
579        table_exists,
580        hypertable_queries,
581        DROP_IF_EXISTS_FLAVORS,
582    )
583    drop_queries = {}
584    schema = self.get_pipe_schema(pipe)
585    schema_prefix = (schema + '_') if schema else ''
586    indices = {
587        col: schema_prefix + ix
588        for col, ix in pipe.get_indices().items()
589    }
590    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
591    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
592
593    if self.flavor not in hypertable_queries:
594        is_hypertable = False
595    else:
596        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
597        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
598
599    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
600    if is_hypertable:
601        nuke_queries = []
602        temp_table = '_' + pipe.target + '_temp_migration'
603        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
604
605        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
606            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
607        nuke_queries += [
608            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
609            f"DROP TABLE {if_exists_str} {pipe_name}",
610            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
611        ]
612        nuke_ix_keys = ('datetime', 'id')
613        nuked = False
614        for ix_key in nuke_ix_keys:
615            if ix_key in indices and not nuked:
616                drop_queries[ix_key] = nuke_queries
617                nuked = True
618
619    drop_queries.update({
620        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)]
621        for ix_key, ix_unquoted in indices.items()
622        if ix_key not in drop_queries
623    })
624    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
2413def get_add_columns_queries(
2414    self,
2415    pipe: mrsm.Pipe,
2416    df: Union[pd.DataFrame, Dict[str, str]],
2417    _is_db_types: bool = False,
2418    debug: bool = False,
2419) -> List[str]:
2420    """
2421    Add new null columns of the correct type to a table from a dataframe.
2422
2423    Parameters
2424    ----------
2425    pipe: mrsm.Pipe
2426        The pipe to be altered.
2427
2428    df: Union[pd.DataFrame, Dict[str, str]]
2429        The pandas DataFrame which contains new columns.
2430        If a dictionary is provided, assume it maps columns to Pandas data types.
2431
2432    _is_db_types: bool, default False
2433        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
2434
2435    Returns
2436    -------
2437    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2438    """
2439    if not pipe.exists(debug=debug):
2440        return []
2441
2442    from decimal import Decimal
2443    import copy
2444    from meerschaum.utils.sql import (
2445        sql_item_name,
2446        SINGLE_ALTER_TABLE_FLAVORS,
2447        get_table_cols_types,
2448    )
2449    from meerschaum.utils.dtypes.sql import (
2450        get_pd_type_from_db_type,
2451        get_db_type_from_pd_type,
2452    )
2453    from meerschaum.utils.misc import flatten_list
2454    table_obj = self.get_pipe_table(pipe, debug=debug)
2455    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
2456    if is_dask:
2457        df = df.partitions[0].compute()
2458    df_cols_types = (
2459        {
2460            col: str(typ)
2461            for col, typ in df.dtypes.items()
2462        }
2463        if not isinstance(df, dict)
2464        else copy.deepcopy(df)
2465    )
2466    if not isinstance(df, dict) and len(df.index) > 0:
2467        for col, typ in list(df_cols_types.items()):
2468            if typ != 'object':
2469                continue
2470            val = df.iloc[0][col]
2471            if isinstance(val, (dict, list)):
2472                df_cols_types[col] = 'json'
2473            elif isinstance(val, Decimal):
2474                df_cols_types[col] = 'numeric'
2475            elif isinstance(val, str):
2476                df_cols_types[col] = 'str'
2477    db_cols_types = {
2478        col: get_pd_type_from_db_type(str(typ.type))
2479        for col, typ in table_obj.columns.items()
2480    } if table_obj is not None else {
2481        col: get_pd_type_from_db_type(typ)
2482        for col, typ in get_table_cols_types(
2483            pipe.target,
2484            self,
2485            schema=self.get_pipe_schema(pipe),
2486            debug=debug,
2487        ).items()
2488    }
2489    new_cols = set(df_cols_types) - set(db_cols_types)
2490    if not new_cols:
2491        return []
2492
2493    new_cols_types = {
2494        col: get_db_type_from_pd_type(
2495            df_cols_types[col],
2496            self.flavor
2497        ) for col in new_cols
2498    }
2499
2500    alter_table_query = "ALTER TABLE " + sql_item_name(
2501        pipe.target, self.flavor, self.get_pipe_schema(pipe)
2502    )
2503    queries = []
2504    for col, typ in new_cols_types.items():
2505        add_col_query = (
2506            "\nADD "
2507            + sql_item_name(col, self.flavor, None)
2508            + " " + typ + ","
2509        )
2510
2511        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
2512            queries.append(alter_table_query + add_col_query[:-1])
2513        else:
2514            alter_table_query += add_col_query
2515
2516    ### For most flavors, only one query is required.
2517    ### This covers SQLite which requires one query per column.
2518    if not queries:
2519        queries.append(alter_table_query[:-1])
2520
2521    if self.flavor != 'duckdb':
2522        return queries
2523
2524    ### NOTE: For DuckDB, we must drop and rebuild the indices.
2525    drop_index_queries = list(flatten_list(
2526        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
2527    ))
2528    create_index_queries = list(flatten_list(
2529        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
2530    ))
2531
2532    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
  • _is_db_types (bool, default False): If True, assume df is a dictionary mapping columns to SQL native dtypes.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
2535def get_alter_columns_queries(
2536    self,
2537    pipe: mrsm.Pipe,
2538    df: Union[pd.DataFrame, Dict[str, str]],
2539    debug: bool = False,
2540) -> List[str]:
2541    """
2542    If we encounter a column of a different type, set the entire column to text.
2543    If the altered columns are numeric, alter to numeric instead.
2544
2545    Parameters
2546    ----------
2547    pipe: mrsm.Pipe
2548        The pipe to be altered.
2549
2550    df: Union[pd.DataFrame, Dict[str, str]]
2551        The pandas DataFrame which may contain altered columns.
2552        If a dict is provided, assume it maps columns to Pandas data types.
2553
2554    Returns
2555    -------
2556    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2557    """
2558    if not pipe.exists(debug=debug):
2559        return []
2560    from meerschaum.utils.sql import sql_item_name, DROP_IF_EXISTS_FLAVORS, get_table_cols_types
2561    from meerschaum.utils.dataframe import get_numeric_cols
2562    from meerschaum.utils.dtypes import are_dtypes_equal
2563    from meerschaum.utils.dtypes.sql import (
2564        get_pd_type_from_db_type,
2565        get_db_type_from_pd_type,
2566    )
2567    from meerschaum.utils.misc import flatten_list, generate_password, items_str
2568    table_obj = self.get_pipe_table(pipe, debug=debug)
2569    target = pipe.target
2570    session_id = generate_password(3)
2571    numeric_cols = (
2572        get_numeric_cols(df)
2573        if not isinstance(df, dict)
2574        else [
2575            col
2576            for col, typ in df.items()
2577            if typ == 'numeric'
2578        ]
2579    )
2580    df_cols_types = (
2581        {
2582            col: str(typ)
2583            for col, typ in df.dtypes.items()
2584        }
2585        if not isinstance(df, dict)
2586        else df
2587    )
2588    db_cols_types = {
2589        col: get_pd_type_from_db_type(str(typ.type))
2590        for col, typ in table_obj.columns.items()
2591    } if table_obj is not None else {
2592        col: get_pd_type_from_db_type(typ)
2593        for col, typ in get_table_cols_types(
2594            pipe.target,
2595            self,
2596            schema=self.get_pipe_schema(pipe),
2597            debug=debug,
2598        ).items()
2599    }
2600    pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
2601    pd_db_df_aliases = {
2602        'int': 'bool',
2603        'float': 'bool',
2604        'numeric': 'bool',
2605        'guid': 'object',
2606    }
2607    if self.flavor == 'oracle':
2608        pd_db_df_aliases['int'] = 'numeric'
2609
2610    altered_cols = {
2611        col: (db_cols_types.get(col, 'object'), typ)
2612        for col, typ in df_cols_types.items()
2613        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
2614        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
2615    }
2616
2617    ### NOTE: Sometimes bools are coerced into ints or floats.
2618    altered_cols_to_ignore = set()
2619    for col, (db_typ, df_typ) in altered_cols.items():
2620        for db_alias, df_alias in pd_db_df_aliases.items():
2621            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
2622                altered_cols_to_ignore.add(col)
2623
2624    ### Oracle's bool handling sometimes mixes NUMBER and INT.
2625    for bool_col in pipe_bool_cols:
2626        if bool_col not in altered_cols:
2627            continue
2628        db_is_bool_compatible = (
2629            are_dtypes_equal('int', altered_cols[bool_col][0])
2630            or are_dtypes_equal('float', altered_cols[bool_col][0])
2631            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
2632            or are_dtypes_equal('bool', altered_cols[bool_col][0])
2633        )
2634        df_is_bool_compatible = (
2635            are_dtypes_equal('int', altered_cols[bool_col][1])
2636            or are_dtypes_equal('float', altered_cols[bool_col][1])
2637            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
2638            or are_dtypes_equal('bool', altered_cols[bool_col][1])
2639        )
2640        if db_is_bool_compatible and df_is_bool_compatible:
2641            altered_cols_to_ignore.add(bool_col)
2642
2643    for col in altered_cols_to_ignore:
2644        _ = altered_cols.pop(col, None)
2645    if not altered_cols:
2646        return []
2647
2648    if numeric_cols:
2649        pipe.dtypes.update({col: 'numeric' for col in numeric_cols})
2650        edit_success, edit_msg = pipe.edit(debug=debug)
2651        if not edit_success:
2652            warn(
2653                f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
2654                + f"{edit_msg}"
2655            )
2656    else:
2657        numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric'])
2658
2659    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
2660    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
2661    altered_cols_types = {
2662        col: (
2663            numeric_type
2664            if col in numeric_cols
2665            else text_type
2666        )
2667        for col, (db_typ, typ) in altered_cols.items()
2668    }
2669
2670    if self.flavor == 'sqlite':
2671        temp_table_name = '-' + session_id + '_' + target
2672        rename_query = (
2673            "ALTER TABLE "
2674            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2675            + " RENAME TO "
2676            + sql_item_name(temp_table_name, self.flavor, None)
2677        )
2678        create_query = (
2679            "CREATE TABLE "
2680            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2681            + " (\n"
2682        )
2683        for col_name, col_obj in table_obj.columns.items():
2684            create_query += (
2685                sql_item_name(col_name, self.flavor, None)
2686                + " "
2687                + (
2688                    str(col_obj.type)
2689                    if col_name not in altered_cols
2690                    else altered_cols_types[col_name]
2691                )
2692                + ",\n"
2693            )
2694        create_query = create_query[:-2] + "\n)"
2695
2696        insert_query = (
2697            "INSERT INTO "
2698            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2699            + ' ('
2700            + ', '.join([
2701                sql_item_name(col_name, self.flavor, None)
2702                for col_name, _ in table_obj.columns.items()
2703            ])
2704            + ')'
2705            + "\nSELECT\n"
2706        )
2707        for col_name, col_obj in table_obj.columns.items():
2708            new_col_str = (
2709                sql_item_name(col_name, self.flavor, None)
2710                if col_name not in altered_cols
2711                else (
2712                    "CAST("
2713                    + sql_item_name(col_name, self.flavor, None)
2714                    + " AS "
2715                    + altered_cols_types[col_name]
2716                    + ")"
2717                )
2718            )
2719            insert_query += new_col_str + ",\n"
2720        insert_query = insert_query[:-2] + (
2721            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
2722        )
2723
2724        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
2725
2726        drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name(
2727            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
2728        )
2729        return [
2730            rename_query,
2731            create_query,
2732            insert_query,
2733            drop_query,
2734        ]
2735
2736    queries = []
2737    if self.flavor == 'oracle':
2738        for col, typ in altered_cols_types.items():
2739            add_query = (
2740                "ALTER TABLE "
2741                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2742                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
2743                + " " + typ
2744            )
2745            queries.append(add_query)
2746
2747        for col, typ in altered_cols_types.items():
2748            populate_temp_query = (
2749                "UPDATE "
2750                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2751                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
2752                + ' = ' + sql_item_name(col, self.flavor, None)
2753            )
2754            queries.append(populate_temp_query)
2755
2756        for col, typ in altered_cols_types.items():
2757            set_old_cols_to_null_query = (
2758                "UPDATE "
2759                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2760                + "\nSET " + sql_item_name(col, self.flavor, None)
2761                + ' = NULL'
2762            )
2763            queries.append(set_old_cols_to_null_query)
2764
2765        for col, typ in altered_cols_types.items():
2766            alter_type_query = (
2767                "ALTER TABLE "
2768                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2769                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
2770                + typ
2771            )
2772            queries.append(alter_type_query)
2773
2774        for col, typ in altered_cols_types.items():
2775            set_old_to_temp_query = (
2776                "UPDATE "
2777                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2778                + "\nSET " + sql_item_name(col, self.flavor, None)
2779                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
2780            )
2781            queries.append(set_old_to_temp_query)
2782
2783        for col, typ in altered_cols_types.items():
2784            drop_temp_query = (
2785                "ALTER TABLE "
2786                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2787                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
2788            )
2789            queries.append(drop_temp_query)
2790
2791        return queries
2792
2793
2794    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2795    for col, typ in altered_cols_types.items():
2796        alter_col_prefix = (
2797            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
2798            else 'MODIFY'
2799        )
2800        type_prefix = (
2801            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
2802            else 'TYPE '
2803        )
2804        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
2805        query += (
2806            f"\n{alter_col_prefix} {column_str} "
2807            + sql_item_name(col, self.flavor, None)
2808            + " " + type_prefix + typ + ","
2809        )
2810
2811    query = query[:-1]
2812    queries.append(query)
2813    if self.flavor != 'duckdb':
2814        return queries
2815
2816    drop_index_queries = list(flatten_list(
2817        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
2818    ))
2819    create_index_queries = list(flatten_list(
2820        [q for